Skip to content

Commit

Permalink
[ISSUE alibaba#2856][module:core、consistency]Adjust the use of thread…
Browse files Browse the repository at this point in the history
… pools

Change-Id: I793770ca0f287157dc88e3ecf1147ea308e4a547
  • Loading branch information
wangweizZZ committed Jun 17, 2020
1 parent 7c4cca4 commit 315cf81
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,70 +27,169 @@
/**
* Unified thread pool creation factory, and actively create thread
* pool resources by ThreadPoolManager for unified life cycle management
* {@link ExecutorFactory.Managed}
*
* Unified thread pool creation factory without life cycle management
* {@link ExecutorFactory}
*
* @author <a href="mailto:[email protected]">liaochuntao</a>
*/
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public final class ExecutorFactory {

private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance();
public static ExecutorService newSingleExecutorService() {
ExecutorService executorService = Executors.newFixedThreadPool(1);
return executorService;
}

public static ExecutorService newSingleExecutorService(final ThreadFactory threadFactory) {
ExecutorService executorService = Executors.newFixedThreadPool(1, threadFactory);
return executorService;
}

public static final String DEFAULT_NAMESPACE = "nacos";
public static ExecutorService newFixExecutorService(final int nThreads) {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
return executorService;
}

public static ExecutorService newFixExecutorService(final int nThreads,
final ThreadFactory threadFactory) {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
return executorService;
}

public static ScheduledExecutorService newSingleScheduledExecutorService(final ThreadFactory threadFactory) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory);
return executorService;
}

public static ScheduledExecutorService newScheduledExecutorService( final int nThreads,
final ThreadFactory threadFactory) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(nThreads, threadFactory);
return executorService;
}

public static ThreadPoolExecutor newCustomerThreadExecutor(
final int coreThreads,
final int maxThreads,
final long keepAliveTimeMs,
final ThreadFactory threadFactory) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(coreThreads, maxThreads,
keepAliveTimeMs, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
return executor;
}
//TODO remove Deprecated function after replace all module
@Deprecated
public static ExecutorService newSingleExecutorService(final String group) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

@Deprecated
public static ExecutorService newSingleExecutorService(final String group,
final ThreadFactory threadFactory) {
final ThreadFactory threadFactory) {
ExecutorService executorService = Executors.newFixedThreadPool(1, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

@Deprecated
public static ExecutorService newFixExecutorService(final String group,
final int nThreads) {
final int nThreads) {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

@Deprecated
public static ExecutorService newFixExecutorService(final String group,
final int nThreads,
final ThreadFactory threadFactory) {
final int nThreads,
final ThreadFactory threadFactory) {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

@Deprecated
public static ScheduledExecutorService newSingleScheduledExecutorService(final String group,
final ThreadFactory threadFactory) {
final ThreadFactory threadFactory) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

@Deprecated
public static ScheduledExecutorService newScheduledExecutorService(final String group,
final int nThreads,
final ThreadFactory threadFactory) {
final int nThreads,
final ThreadFactory threadFactory) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(nThreads, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

@Deprecated
public static ThreadPoolExecutor newCustomerThreadExecutor(final String group,
final int coreThreads,
final int maxThreads,
final long keepAliveTimeMs,
final ThreadFactory threadFactory) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(coreThreads, maxThreads,
keepAliveTimeMs, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
return executor;
}

public final static class Managed{
public static final String DEFAULT_NAMESPACE = "nacos";

private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance();

public static ExecutorService newSingleExecutorService(final String group) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

public static ExecutorService newSingleExecutorService(final String group,
final ThreadFactory threadFactory) {
ExecutorService executorService = Executors.newFixedThreadPool(1, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

public static ExecutorService newFixExecutorService(final String group,
final int nThreads) {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

public static ExecutorService newFixExecutorService(final String group,
final int nThreads,
final ThreadFactory threadFactory) {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

public static ScheduledExecutorService newSingleScheduledExecutorService(final String group,
final ThreadFactory threadFactory) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

public static ScheduledExecutorService newScheduledExecutorService(final String group,
final int nThreads,
final ThreadFactory threadFactory) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(nThreads, threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
return executorService;
}

public static ThreadPoolExecutor newCustomerThreadExecutor(final String group,
final int coreThreads,
final int maxThreads,
final long keepAliveTimeMs,
final ThreadFactory threadFactory) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(coreThreads, maxThreads,
ThreadPoolExecutor executor = new ThreadPoolExecutor(coreThreads, maxThreads,
keepAliveTimeMs, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executor);
return executor;
THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executor);
return executor;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@
@SuppressWarnings("PMD.Rule:CollectionInitShouldAssignCapacityRule")
public final class ProtocolMetaData {

private static final Executor EXECUTOR = ExecutorFactory.newFixExecutorService(ProtocolMetaData.class.getCanonicalName(), 4, new NameThreadFactory("nacos.consistency.protocol.metadata"));
private static final Executor EXECUTOR = ExecutorFactory.Managed.newFixExecutorService(
ProtocolMetaData.class.getCanonicalName(),
4,
new NameThreadFactory("com.alibaba.nacos.consistency.protocol.metadata"));

private Map<String, MetaData> metaDataMap = new ConcurrentHashMap<>(4);

Expand Down Expand Up @@ -181,4 +184,4 @@ void setData(Object data) {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ private void closeExecutor() {
private void logStarting() {
if (!ApplicationUtils.getStandaloneMode()) {

scheduledExecutorService = ExecutorFactory
.newSingleScheduledExecutorService(getClass().getCanonicalName(),
new NameThreadFactory("nacos-starting"));
scheduledExecutorService = ExecutorFactory
.newSingleScheduledExecutorService(
new NameThreadFactory("com.alibaba.nacos.core.nacos-starting"));

scheduledExecutorService.scheduleWithFixedDelay(() -> {
if (starting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@

import com.alibaba.nacos.common.executor.ExecutorFactory;

import com.alibaba.nacos.core.utils.ClassUtils;
import java.util.concurrent.ExecutorService;

/**
* @author <a href="mailto:[email protected]">liaochuntao</a>
*/
public final class ProtocolExecutor {

private static final ExecutorService CP_MEMBER_CHANGE_EXECUTOR = ExecutorFactory.newSingleExecutorService(
ProtocolManager.class.getName());
private static final ExecutorService CP_MEMBER_CHANGE_EXECUTOR = ExecutorFactory.Managed.newSingleExecutorService(
ClassUtils.getCanonicalName(ProtocolManager.class));

private static final ExecutorService AP_MEMBER_CHANGE_EXECUTOR = ExecutorFactory.newSingleExecutorService(
ProtocolManager.class.getName());
private static final ExecutorService AP_MEMBER_CHANGE_EXECUTOR = ExecutorFactory.Managed.newSingleExecutorService(
ClassUtils.getCanonicalName(ProtocolManager.class));

public static void cpMemberChange(Runnable runnable) {
CP_MEMBER_CHANGE_EXECUTOR.execute(runnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.alibaba.nacos.core.distributed.raft.RaftConfig;
import com.alibaba.nacos.core.distributed.raft.RaftSysConstants;

import com.alibaba.nacos.core.utils.ClassUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,7 +37,7 @@ public final class RaftExecutor {
private static ScheduledExecutorService raftCommonExecutor;
private static ExecutorService raftSnapshotExecutor;

private static final String OWNER = JRaftServer.class.getName();
private static final String OWNER = ClassUtils.getCanonicalName(JRaftServer.class);

private RaftExecutor() {
}
Expand All @@ -49,22 +50,24 @@ public static void init(RaftConfig config) {
config.getValOfDefault(RaftSysConstants.RAFT_CLI_SERVICE_THREAD_NUM,
"4"));

raftCoreExecutor = ExecutorFactory.newFixExecutorService(OWNER, raftCoreThreadNum,
new NameThreadFactory("com.alibaba.naocs.core.raft-core"));
raftCoreExecutor = ExecutorFactory.Managed
.newFixExecutorService(OWNER, raftCoreThreadNum,
new NameThreadFactory("com.alibaba.naocs.core.raft-core"));

raftCliServiceExecutor = ExecutorFactory
raftCliServiceExecutor = ExecutorFactory.Managed
.newFixExecutorService(OWNER, raftCliServiceThreadNum,
new NameThreadFactory("com.alibaba.naocs.core.raft-cli-service"));

raftCommonExecutor = ExecutorFactory.newScheduledExecutorService(OWNER, 8,
new NameThreadFactory(
"com.alibaba.nacos.core.protocol.raft-common"));
raftCommonExecutor = ExecutorFactory.Managed
.newScheduledExecutorService(OWNER, 8,
new NameThreadFactory("com.alibaba.nacos.core.protocol.raft-common"));

int snapshotNum = raftCoreThreadNum / 2;
snapshotNum = snapshotNum == 0 ? raftCoreThreadNum : snapshotNum;

raftSnapshotExecutor = ExecutorFactory.newFixExecutorService(OWNER, snapshotNum,
new NameThreadFactory("com.alibaba.naocs.core.raft-snapshot"));
raftSnapshotExecutor = ExecutorFactory.Managed.
newFixExecutorService(OWNER, snapshotNum,
new NameThreadFactory("com.alibaba.naocs.core.raft-snapshot"));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,8 @@ public WatchDirJob(String paths) throws NacosException {
}

this.callBackExecutor = ExecutorFactory
.newFixExecutorService(WatchFileCenter.class.getCanonicalName(),
1,
new NameThreadFactory("com.alibaba.nacos.file.watch-" + paths));
.newSingleExecutorService(
new NameThreadFactory("com.alibaba.nacos.core.file.watch-" + paths));

try {
WatchService service = FILE_SYSTEM.newWatchService();
Expand All @@ -176,7 +175,8 @@ void addSubscribe(final FileWatcher watcher) {
}

void shutdown() {
watch = false;
watch = false;
ThreadUtils.shutdownThreadPool(this.callBackExecutor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.alibaba.nacos.core.utils;

import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -27,8 +26,8 @@
*/
public class GlobalExecutor {

private static final ScheduledExecutorService COMMON_EXECUTOR = ExecutorFactory.newScheduledExecutorService(
GlobalExecutor.class.getCanonicalName(),
private static final ScheduledExecutorService COMMON_EXECUTOR = ExecutorFactory.Managed.newScheduledExecutorService(
ClassUtils.getCanonicalName(GlobalExecutor.class),
4,
new NameThreadFactory("com.alibaba.nacos.core.common")
);
Expand Down

0 comments on commit 315cf81

Please sign in to comment.