From 315cf811bb2571088fa23b08ce5cac630c305c28 Mon Sep 17 00:00:00 2001 From: wangwei Date: Wed, 17 Jun 2020 12:45:07 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#2856][module:core=E3=80=81consistency?= =?UTF-8?q?]Adjust=20the=20use=20of=20thread=20pools?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change-Id: I793770ca0f287157dc88e3ecf1147ea308e4a547 --- .../common/executor/ExecutorFactory.java | 149 +++++++++++++++--- .../nacos/consistency/ProtocolMetaData.java | 7 +- .../StartingSpringApplicationRunListener.java | 6 +- .../core/distributed/ProtocolExecutor.java | 9 +- .../distributed/raft/utils/RaftExecutor.java | 21 +-- .../nacos/core/file/WatchFileCenter.java | 8 +- .../nacos/core/utils/GlobalExecutor.java | 5 +- 7 files changed, 155 insertions(+), 50 deletions(-) diff --git a/common/src/main/java/com/alibaba/nacos/common/executor/ExecutorFactory.java b/common/src/main/java/com/alibaba/nacos/common/executor/ExecutorFactory.java index 301c4c2f00f..ce59d270c9a 100644 --- a/common/src/main/java/com/alibaba/nacos/common/executor/ExecutorFactory.java +++ b/common/src/main/java/com/alibaba/nacos/common/executor/ExecutorFactory.java @@ -27,70 +27,169 @@ /** * Unified thread pool creation factory, and actively create thread * pool resources by ThreadPoolManager for unified life cycle management + * {@link ExecutorFactory.Managed} + * + * Unified thread pool creation factory without life cycle management + * {@link ExecutorFactory} * * @author liaochuntao */ @SuppressWarnings("PMD.ThreadPoolCreationRule") public final class ExecutorFactory { - private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance(); + public static ExecutorService newSingleExecutorService() { + ExecutorService executorService = Executors.newFixedThreadPool(1); + return executorService; + } + + public static ExecutorService newSingleExecutorService(final ThreadFactory threadFactory) { + ExecutorService executorService = Executors.newFixedThreadPool(1, threadFactory); + return executorService; + } - public static final String DEFAULT_NAMESPACE = "nacos"; + public static ExecutorService newFixExecutorService(final int nThreads) { + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + return executorService; + } + public static ExecutorService newFixExecutorService(final int nThreads, + final ThreadFactory threadFactory) { + ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory); + return executorService; + } + + public static ScheduledExecutorService newSingleScheduledExecutorService(final ThreadFactory threadFactory) { + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory); + return executorService; + } + + public static ScheduledExecutorService newScheduledExecutorService( final int nThreads, + final ThreadFactory threadFactory) { + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(nThreads, threadFactory); + return executorService; + } + + public static ThreadPoolExecutor newCustomerThreadExecutor( + final int coreThreads, + final int maxThreads, + final long keepAliveTimeMs, + final ThreadFactory threadFactory) { + ThreadPoolExecutor executor = new ThreadPoolExecutor(coreThreads, maxThreads, + keepAliveTimeMs, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory); + return executor; + } + //TODO remove Deprecated function after replace all module + @Deprecated public static ExecutorService newSingleExecutorService(final String group) { ExecutorService executorService = Executors.newFixedThreadPool(1); - THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); return executorService; } - + @Deprecated public static ExecutorService newSingleExecutorService(final String group, - final ThreadFactory threadFactory) { + final ThreadFactory threadFactory) { ExecutorService executorService = Executors.newFixedThreadPool(1, threadFactory); - THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); return executorService; } - + @Deprecated public static ExecutorService newFixExecutorService(final String group, - final int nThreads) { + final int nThreads) { ExecutorService executorService = Executors.newFixedThreadPool(nThreads); - THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); return executorService; } - + @Deprecated public static ExecutorService newFixExecutorService(final String group, - final int nThreads, - final ThreadFactory threadFactory) { + final int nThreads, + final ThreadFactory threadFactory) { ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory); - THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); return executorService; } - + @Deprecated public static ScheduledExecutorService newSingleScheduledExecutorService(final String group, - final ThreadFactory threadFactory) { + final ThreadFactory threadFactory) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory); - THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); return executorService; } - + @Deprecated public static ScheduledExecutorService newScheduledExecutorService(final String group, - final int nThreads, - final ThreadFactory threadFactory) { + final int nThreads, + final ThreadFactory threadFactory) { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(nThreads, threadFactory); - THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); return executorService; } - + @Deprecated public static ThreadPoolExecutor newCustomerThreadExecutor(final String group, + final int coreThreads, + final int maxThreads, + final long keepAliveTimeMs, + final ThreadFactory threadFactory) { + ThreadPoolExecutor executor = new ThreadPoolExecutor(coreThreads, maxThreads, + keepAliveTimeMs, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory); + return executor; + } + + public final static class Managed{ + public static final String DEFAULT_NAMESPACE = "nacos"; + + private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance(); + + public static ExecutorService newSingleExecutorService(final String group) { + ExecutorService executorService = Executors.newFixedThreadPool(1); + THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); + return executorService; + } + + public static ExecutorService newSingleExecutorService(final String group, + final ThreadFactory threadFactory) { + ExecutorService executorService = Executors.newFixedThreadPool(1, threadFactory); + THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); + return executorService; + } + + public static ExecutorService newFixExecutorService(final String group, + final int nThreads) { + ExecutorService executorService = Executors.newFixedThreadPool(nThreads); + THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); + return executorService; + } + + public static ExecutorService newFixExecutorService(final String group, + final int nThreads, + final ThreadFactory threadFactory) { + ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory); + THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); + return executorService; + } + + public static ScheduledExecutorService newSingleScheduledExecutorService(final String group, + final ThreadFactory threadFactory) { + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory); + THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); + return executorService; + } + + public static ScheduledExecutorService newScheduledExecutorService(final String group, + final int nThreads, + final ThreadFactory threadFactory) { + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(nThreads, threadFactory); + THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService); + return executorService; + } + + public static ThreadPoolExecutor newCustomerThreadExecutor(final String group, final int coreThreads, final int maxThreads, final long keepAliveTimeMs, final ThreadFactory threadFactory) { - ThreadPoolExecutor executor = new ThreadPoolExecutor(coreThreads, maxThreads, + ThreadPoolExecutor executor = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTimeMs, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); - THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executor); - return executor; + THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executor); + return executor; + } } - } diff --git a/consistency/src/main/java/com/alibaba/nacos/consistency/ProtocolMetaData.java b/consistency/src/main/java/com/alibaba/nacos/consistency/ProtocolMetaData.java index 18ff691f3cc..9632e53dc8f 100644 --- a/consistency/src/main/java/com/alibaba/nacos/consistency/ProtocolMetaData.java +++ b/consistency/src/main/java/com/alibaba/nacos/consistency/ProtocolMetaData.java @@ -40,7 +40,10 @@ @SuppressWarnings("PMD.Rule:CollectionInitShouldAssignCapacityRule") public final class ProtocolMetaData { - private static final Executor EXECUTOR = ExecutorFactory.newFixExecutorService(ProtocolMetaData.class.getCanonicalName(), 4, new NameThreadFactory("nacos.consistency.protocol.metadata")); + private static final Executor EXECUTOR = ExecutorFactory.Managed.newFixExecutorService( + ProtocolMetaData.class.getCanonicalName(), + 4, + new NameThreadFactory("com.alibaba.nacos.consistency.protocol.metadata")); private Map metaDataMap = new ConcurrentHashMap<>(4); @@ -181,4 +184,4 @@ void setData(Object data) { } } } -} \ No newline at end of file +} diff --git a/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java b/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java index 6d213586719..c9208287495 100644 --- a/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java +++ b/core/src/main/java/com/alibaba/nacos/core/code/StartingSpringApplicationRunListener.java @@ -211,9 +211,9 @@ private void closeExecutor() { private void logStarting() { if (!ApplicationUtils.getStandaloneMode()) { - scheduledExecutorService = ExecutorFactory - .newSingleScheduledExecutorService(getClass().getCanonicalName(), - new NameThreadFactory("nacos-starting")); + scheduledExecutorService = ExecutorFactory + .newSingleScheduledExecutorService( + new NameThreadFactory("com.alibaba.nacos.core.nacos-starting")); scheduledExecutorService.scheduleWithFixedDelay(() -> { if (starting) { diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolExecutor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolExecutor.java index e72cded8b83..02c829b8201 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolExecutor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/ProtocolExecutor.java @@ -18,6 +18,7 @@ import com.alibaba.nacos.common.executor.ExecutorFactory; +import com.alibaba.nacos.core.utils.ClassUtils; import java.util.concurrent.ExecutorService; /** @@ -25,11 +26,11 @@ */ public final class ProtocolExecutor { - private static final ExecutorService CP_MEMBER_CHANGE_EXECUTOR = ExecutorFactory.newSingleExecutorService( - ProtocolManager.class.getName()); + private static final ExecutorService CP_MEMBER_CHANGE_EXECUTOR = ExecutorFactory.Managed.newSingleExecutorService( + ClassUtils.getCanonicalName(ProtocolManager.class)); - private static final ExecutorService AP_MEMBER_CHANGE_EXECUTOR = ExecutorFactory.newSingleExecutorService( - ProtocolManager.class.getName()); + private static final ExecutorService AP_MEMBER_CHANGE_EXECUTOR = ExecutorFactory.Managed.newSingleExecutorService( + ClassUtils.getCanonicalName(ProtocolManager.class)); public static void cpMemberChange(Runnable runnable) { CP_MEMBER_CHANGE_EXECUTOR.execute(runnable); diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/RaftExecutor.java b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/RaftExecutor.java index 07aec35e9f8..fe542df3673 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/RaftExecutor.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/raft/utils/RaftExecutor.java @@ -22,6 +22,7 @@ import com.alibaba.nacos.core.distributed.raft.RaftConfig; import com.alibaba.nacos.core.distributed.raft.RaftSysConstants; +import com.alibaba.nacos.core.utils.ClassUtils; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -36,7 +37,7 @@ public final class RaftExecutor { private static ScheduledExecutorService raftCommonExecutor; private static ExecutorService raftSnapshotExecutor; - private static final String OWNER = JRaftServer.class.getName(); + private static final String OWNER = ClassUtils.getCanonicalName(JRaftServer.class); private RaftExecutor() { } @@ -49,22 +50,24 @@ public static void init(RaftConfig config) { config.getValOfDefault(RaftSysConstants.RAFT_CLI_SERVICE_THREAD_NUM, "4")); - raftCoreExecutor = ExecutorFactory.newFixExecutorService(OWNER, raftCoreThreadNum, - new NameThreadFactory("com.alibaba.naocs.core.raft-core")); + raftCoreExecutor = ExecutorFactory.Managed + .newFixExecutorService(OWNER, raftCoreThreadNum, + new NameThreadFactory("com.alibaba.naocs.core.raft-core")); - raftCliServiceExecutor = ExecutorFactory + raftCliServiceExecutor = ExecutorFactory.Managed .newFixExecutorService(OWNER, raftCliServiceThreadNum, new NameThreadFactory("com.alibaba.naocs.core.raft-cli-service")); - raftCommonExecutor = ExecutorFactory.newScheduledExecutorService(OWNER, 8, - new NameThreadFactory( - "com.alibaba.nacos.core.protocol.raft-common")); + raftCommonExecutor = ExecutorFactory.Managed + .newScheduledExecutorService(OWNER, 8, + new NameThreadFactory("com.alibaba.nacos.core.protocol.raft-common")); int snapshotNum = raftCoreThreadNum / 2; snapshotNum = snapshotNum == 0 ? raftCoreThreadNum : snapshotNum; - raftSnapshotExecutor = ExecutorFactory.newFixExecutorService(OWNER, snapshotNum, - new NameThreadFactory("com.alibaba.naocs.core.raft-snapshot")); + raftSnapshotExecutor = ExecutorFactory.Managed. + newFixExecutorService(OWNER, snapshotNum, + new NameThreadFactory("com.alibaba.naocs.core.raft-snapshot")); } diff --git a/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java b/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java index 4a7d7e1180c..564405ab8ba 100644 --- a/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java +++ b/core/src/main/java/com/alibaba/nacos/core/file/WatchFileCenter.java @@ -154,9 +154,8 @@ public WatchDirJob(String paths) throws NacosException { } this.callBackExecutor = ExecutorFactory - .newFixExecutorService(WatchFileCenter.class.getCanonicalName(), - 1, - new NameThreadFactory("com.alibaba.nacos.file.watch-" + paths)); + .newSingleExecutorService( + new NameThreadFactory("com.alibaba.nacos.core.file.watch-" + paths)); try { WatchService service = FILE_SYSTEM.newWatchService(); @@ -176,7 +175,8 @@ void addSubscribe(final FileWatcher watcher) { } void shutdown() { - watch = false; + watch = false; + ThreadUtils.shutdownThreadPool(this.callBackExecutor); } @Override diff --git a/core/src/main/java/com/alibaba/nacos/core/utils/GlobalExecutor.java b/core/src/main/java/com/alibaba/nacos/core/utils/GlobalExecutor.java index 5c314a6102d..0fde48550d1 100644 --- a/core/src/main/java/com/alibaba/nacos/core/utils/GlobalExecutor.java +++ b/core/src/main/java/com/alibaba/nacos/core/utils/GlobalExecutor.java @@ -16,7 +16,6 @@ package com.alibaba.nacos.core.utils; -import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.common.executor.ExecutorFactory; import com.alibaba.nacos.common.executor.NameThreadFactory; import java.util.concurrent.ScheduledExecutorService; @@ -27,8 +26,8 @@ */ public class GlobalExecutor { - private static final ScheduledExecutorService COMMON_EXECUTOR = ExecutorFactory.newScheduledExecutorService( - GlobalExecutor.class.getCanonicalName(), + private static final ScheduledExecutorService COMMON_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService( + ClassUtils.getCanonicalName(GlobalExecutor.class), 4, new NameThreadFactory("com.alibaba.nacos.core.common") );