Skip to content

Commit

Permalink
设置线程为守护线程,保证在main线程退出时应用程序能过正常关闭。
Browse files Browse the repository at this point in the history
  • Loading branch information
YunWZ committed Oct 27, 2022
1 parent 308d772 commit f2e5300
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.alibaba.nacos.api.remote;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

/**
* rpc scheduler executor .
Expand All @@ -33,7 +35,16 @@ public class RpcScheduledExecutor extends ScheduledThreadPoolExecutor {
"com.alibaba.nacos.remote.ServerCommonScheduler");

public RpcScheduledExecutor(int corePoolSize, final String threadName) {
super(corePoolSize, r -> new Thread(r, threadName));
super(corePoolSize, new ThreadFactory() {
private AtomicLong index = new AtomicLong();

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, threadName + "." + index.getAndIncrement());
thread.setDaemon(true);
return thread;
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.nacos.common.http;

import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.http.client.NacosAsyncRestTemplate;
import com.alibaba.nacos.common.http.client.NacosRestTemplate;
import com.alibaba.nacos.common.http.client.request.DefaultAsyncHttpClientRequest;
Expand All @@ -24,7 +25,9 @@
import com.alibaba.nacos.common.tls.TlsFileWatcher;
import com.alibaba.nacos.common.tls.TlsHelper;
import com.alibaba.nacos.common.tls.TlsSystemConfig;

import java.util.function.BiConsumer;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
Expand Down Expand Up @@ -58,6 +61,10 @@
*/
public abstract class AbstractHttpClientFactory implements HttpClientFactory {

private static final String ASYNC_THREAD_NAME = "nacos-http-async-client";

private static final String AYNC_IO_REACTOR_NAME = ASYNC_THREAD_NAME + "#I/O Reactor";

@Override
public NacosRestTemplate createNacosRestTemplate() {
HttpClientConfig httpClientConfig = buildHttpClientConfig();
Expand All @@ -75,68 +82,67 @@ public NacosRestTemplate createNacosRestTemplate() {
@Override
public NacosAsyncRestTemplate createNacosAsyncRestTemplate() {
final HttpClientConfig originalRequestConfig = buildHttpClientConfig();
final DefaultConnectingIOReactor ioreactor = getIoReactor();
final DefaultConnectingIOReactor ioreactor = getIoReactor(AYNC_IO_REACTOR_NAME);
final RequestConfig defaultConfig = getRequestConfig();
return new NacosAsyncRestTemplate(assignLogger(), new DefaultAsyncHttpClientRequest(
HttpAsyncClients.custom()
.addInterceptorLast(new RequestContent(true))
.setDefaultIOReactorConfig(getIoReactorConfig())
.setDefaultRequestConfig(defaultConfig)
HttpAsyncClients.custom().addInterceptorLast(new RequestContent(true))
.setThreadFactory(new NameThreadFactory(ASYNC_THREAD_NAME))
.setDefaultIOReactorConfig(getIoReactorConfig()).setDefaultRequestConfig(defaultConfig)
.setMaxConnTotal(originalRequestConfig.getMaxConnTotal())
.setMaxConnPerRoute(originalRequestConfig.getMaxConnPerRoute())
.setUserAgent(originalRequestConfig.getUserAgent())
.setConnectionManager(getConnectionManager(originalRequestConfig, ioreactor))
.build(), ioreactor, defaultConfig));
.setConnectionManager(getConnectionManager(originalRequestConfig, ioreactor)).build(),
ioreactor, defaultConfig));
}

private DefaultConnectingIOReactor getIoReactor() {
private DefaultConnectingIOReactor getIoReactor(String threadName) {
final DefaultConnectingIOReactor ioreactor;
try {
ioreactor = new DefaultConnectingIOReactor(getIoReactorConfig());
ioreactor = new DefaultConnectingIOReactor(getIoReactorConfig(), new NameThreadFactory(threadName));
} catch (IOReactorException e) {
assignLogger().error("[NHttpClientConnectionManager] Create DefaultConnectingIOReactor failed", e);
throw new IllegalStateException();
}

// if the handle return true, then the exception thrown by IOReactor will be ignore, and will not finish the IOReactor.
ioreactor.setExceptionHandler(new IOReactorExceptionHandler() {

@Override
public boolean handle(IOException ex) {
assignLogger().warn("[NHttpClientConnectionManager] handle IOException, ignore it.", ex);
return true;
}

@Override
public boolean handle(RuntimeException ex) {
assignLogger().warn("[NHttpClientConnectionManager] handle RuntimeException, ignore it.", ex);
return true;
}
});

return ioreactor;
}

/**
* create the {@link NHttpClientConnectionManager}, the code mainly from {@link HttpAsyncClientBuilder#build()}.
* we add the {@link IOReactorExceptionHandler} to handle the {@link IOException} and {@link RuntimeException}
* thrown by the {@link org.apache.http.impl.nio.reactor.BaseIOReactor} when process the event of Network.
* Using this way to avoid the {@link DefaultConnectingIOReactor} killed by unknown error of network.
* create the {@link NHttpClientConnectionManager}, the code mainly from {@link HttpAsyncClientBuilder#build()}. we
* add the {@link IOReactorExceptionHandler} to handle the {@link IOException} and {@link RuntimeException} thrown
* by the {@link org.apache.http.impl.nio.reactor.BaseIOReactor} when process the event of Network. Using this way
* to avoid the {@link DefaultConnectingIOReactor} killed by unknown error of network.
*
* @param originalRequestConfig request config.
* @param ioreactor I/O reactor.
* @param ioreactor I/O reactor.
* @return {@link NHttpClientConnectionManager}.
*/
private NHttpClientConnectionManager getConnectionManager(HttpClientConfig originalRequestConfig, DefaultConnectingIOReactor ioreactor) {
private NHttpClientConnectionManager getConnectionManager(HttpClientConfig originalRequestConfig,
DefaultConnectingIOReactor ioreactor) {
SSLContext sslcontext = SSLContexts.createDefault();
HostnameVerifier hostnameVerifier = new DefaultHostnameVerifier();
SchemeIOSessionStrategy sslStrategy = new SSLIOSessionStrategy(sslcontext, null, null, hostnameVerifier);

Registry<SchemeIOSessionStrategy> registry = RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", sslStrategy)
.build();
final PoolingNHttpClientConnectionManager poolingmgr = new PoolingNHttpClientConnectionManager(ioreactor, registry);
.register("http", NoopIOSessionStrategy.INSTANCE).register("https", sslStrategy).build();
final PoolingNHttpClientConnectionManager poolingmgr = new PoolingNHttpClientConnectionManager(ioreactor,
registry);

int maxTotal = originalRequestConfig.getMaxConnTotal();
if (maxTotal > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public final class HttpClientBeanHolder {

private static final Map<String, NacosRestTemplate> SINGLETON_REST = new HashMap<>(10);

private static final Map<String, NacosAsyncRestTemplate> SINGLETON_ASYNC_REST = new HashMap<>(
10);
private static final Map<String, NacosAsyncRestTemplate> SINGLETON_ASYNC_REST = new HashMap<>(10);

private static final AtomicBoolean ALREADY_SHUTDOWN = new AtomicBoolean(false);

Expand Down Expand Up @@ -101,11 +100,14 @@ private static void shutdown() {
return;
}
LOGGER.warn("[HttpClientBeanHolder] Start destroying common HttpClient");

try {
shutdown(DefaultHttpClientFactory.class.getName());
} catch (Exception ex) {
LOGGER.error("An exception occurred when the common HTTP client was closed : {}", ExceptionUtil.getStackTrace(ex));
LOGGER.error("An exception occurred when the common HTTP client was closed : {}",
ExceptionUtil.getStackTrace(ex));
}

LOGGER.warn("[HttpClientBeanHolder] Destruction of the end");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {

private final AtomicBoolean closed;

private final InnerWorker realWorker;

public TaskExecuteWorker(final String name, final int mod, final int total) {
this(name, mod, total, null);
}
Expand All @@ -57,7 +59,8 @@ public TaskExecuteWorker(final String name, final int mod, final int total, fina
this.queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
this.closed = new AtomicBoolean(false);
this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
new InnerWorker(name).start();
realWorker = new InnerWorker(this.name);
realWorker.start();
}

public String getName() {
Expand Down Expand Up @@ -95,6 +98,7 @@ public String status() {
public void shutdown() throws NacosException {
queue.clear();
closed.compareAndSet(false, true);
realWorker.interrupt();
}

/**
Expand All @@ -119,7 +123,7 @@ public void run() {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e.toString(), e);
log.error("[TASK-FAILED] " + e, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor;
import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteTaskExecuteEngine;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Component;

/**
Expand All @@ -29,7 +30,7 @@
* @author xiweng.yy
*/
@Component
public class DistroTaskEngineHolder {
public class DistroTaskEngineHolder implements DisposableBean {

private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();

Expand All @@ -51,4 +52,10 @@ public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {
public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);
}

@Override
public void destroy() throws Exception {
this.delayTaskExecuteEngine.shutdown();
this.executeWorkersManager.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@
*/
@Component
public class EventProcessor implements ApplicationListener<ContextRefreshedEvent> {

private static final int MAX_WAIT_EVENT_TIME = 100;

private NacosMcpService nacosMcpService;

private NacosXdsService nacosXdsService;

private NacosResourceManager resourceManager;

private final BlockingQueue<Event> events;

public EventProcessor() {
events = new ArrayBlockingQueue<>(20);
}
Expand All @@ -68,25 +68,27 @@ public void notify(Event event) {
Thread.currentThread().interrupt();
}
}

public void handleEvents() {
new Consumer("handle events").start();

private void handleEvents() {
Consumer handleEvents = new Consumer("handle events");
handleEvents.setDaemon(true);
handleEvents.start();
}

@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
checkDependenceReady();
handleEvents();
}
}

private class Consumer extends Thread {

Consumer(String name) {
setName(name);
}

@Override
@SuppressWarnings("InfiniteLoopStatement")
public void run() {
Expand Down Expand Up @@ -115,23 +117,23 @@ public void run() {
}
}
}

private boolean hasClientConnection() {
return nacosMcpService.hasClientConnection() || nacosXdsService.hasClientConnection();
}

private boolean needNewTask(boolean hasNewEvent, Future<Void> task) {
return hasNewEvent && (task == null || task.isDone());
}

private class EventHandleTask implements Callable<Void> {

private final Event event;

EventHandleTask(Event event) {
this.event = event;
}

@Override
public Void call() throws Exception {
ResourceSnapshot snapshot = resourceManager.createResourceSnapshot();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.alibaba.nacos.naming.misc;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.core.listener.NacosApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;

/**
* Graceful shutdown.
*
* @author Weizhan▪Yun
* @date 2022/10/27 16:44
*/
public class GracefulShutdownListener implements NacosApplicationListener {

@Override
public void starting() {

}

@Override
public void environmentPrepared(ConfigurableEnvironment environment) {

}

@Override
public void contextPrepared(ConfigurableApplicationContext context) {

}

@Override
public void contextLoaded(ConfigurableApplicationContext context) {

}

@Override
public void started(ConfigurableApplicationContext context) {

}

@Override
public void running(ConfigurableApplicationContext context) {

}

@Override
public void failed(ConfigurableApplicationContext context, Throwable exception) {
try {
NamingExecuteTaskDispatcher.getInstance().getExecuteEngine().shutdown();
} catch (NacosException ignore) {

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@ public void dispatchAndExecuteTask(Object dispatchTag, AbstractExecuteTask task)
public String workersStatus() {
return executeEngine.workersStatus();
}

NacosExecuteTaskExecuteEngine getExecuteEngine() {
return executeEngine;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.alibaba.nacos.naming.misc.GracefulShutdownListener
Loading

0 comments on commit f2e5300

Please sign in to comment.