Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #9393] 设置线程为守护线程,保证在main线程推出时应用程序能过正常关闭。 #9396

Merged
merged 3 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.alibaba.nacos.api.remote;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

/**
* rpc scheduler executor .
Expand All @@ -33,7 +35,16 @@ public class RpcScheduledExecutor extends ScheduledThreadPoolExecutor {
"com.alibaba.nacos.remote.ServerCommonScheduler");

public RpcScheduledExecutor(int corePoolSize, final String threadName) {
super(corePoolSize, r -> new Thread(r, threadName));
super(corePoolSize, new ThreadFactory() {
private AtomicLong index = new AtomicLong();

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, threadName + "." + index.getAndIncrement());
thread.setDaemon(true);
return thread;
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.alibaba.nacos.common.http;

import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.http.client.NacosAsyncRestTemplate;
import com.alibaba.nacos.common.http.client.NacosRestTemplate;
import com.alibaba.nacos.common.http.client.request.DefaultAsyncHttpClientRequest;
Expand All @@ -24,7 +25,9 @@
import com.alibaba.nacos.common.tls.TlsFileWatcher;
import com.alibaba.nacos.common.tls.TlsHelper;
import com.alibaba.nacos.common.tls.TlsSystemConfig;

import java.util.function.BiConsumer;

import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
Expand Down Expand Up @@ -58,6 +61,10 @@
*/
public abstract class AbstractHttpClientFactory implements HttpClientFactory {

private static final String ASYNC_THREAD_NAME = "nacos-http-async-client";

private static final String AYNC_IO_REACTOR_NAME = ASYNC_THREAD_NAME + "#I/O Reactor";

@Override
public NacosRestTemplate createNacosRestTemplate() {
HttpClientConfig httpClientConfig = buildHttpClientConfig();
Expand All @@ -75,68 +82,67 @@ public NacosRestTemplate createNacosRestTemplate() {
@Override
public NacosAsyncRestTemplate createNacosAsyncRestTemplate() {
final HttpClientConfig originalRequestConfig = buildHttpClientConfig();
final DefaultConnectingIOReactor ioreactor = getIoReactor();
final DefaultConnectingIOReactor ioreactor = getIoReactor(AYNC_IO_REACTOR_NAME);
final RequestConfig defaultConfig = getRequestConfig();
return new NacosAsyncRestTemplate(assignLogger(), new DefaultAsyncHttpClientRequest(
HttpAsyncClients.custom()
.addInterceptorLast(new RequestContent(true))
.setDefaultIOReactorConfig(getIoReactorConfig())
.setDefaultRequestConfig(defaultConfig)
HttpAsyncClients.custom().addInterceptorLast(new RequestContent(true))
.setThreadFactory(new NameThreadFactory(ASYNC_THREAD_NAME))
.setDefaultIOReactorConfig(getIoReactorConfig()).setDefaultRequestConfig(defaultConfig)
.setMaxConnTotal(originalRequestConfig.getMaxConnTotal())
.setMaxConnPerRoute(originalRequestConfig.getMaxConnPerRoute())
.setUserAgent(originalRequestConfig.getUserAgent())
.setConnectionManager(getConnectionManager(originalRequestConfig, ioreactor))
.build(), ioreactor, defaultConfig));
.setConnectionManager(getConnectionManager(originalRequestConfig, ioreactor)).build(),
ioreactor, defaultConfig));
}

private DefaultConnectingIOReactor getIoReactor() {
private DefaultConnectingIOReactor getIoReactor(String threadName) {
final DefaultConnectingIOReactor ioreactor;
try {
ioreactor = new DefaultConnectingIOReactor(getIoReactorConfig());
ioreactor = new DefaultConnectingIOReactor(getIoReactorConfig(), new NameThreadFactory(threadName));
} catch (IOReactorException e) {
assignLogger().error("[NHttpClientConnectionManager] Create DefaultConnectingIOReactor failed", e);
throw new IllegalStateException();
}

// if the handle return true, then the exception thrown by IOReactor will be ignore, and will not finish the IOReactor.
ioreactor.setExceptionHandler(new IOReactorExceptionHandler() {

@Override
public boolean handle(IOException ex) {
assignLogger().warn("[NHttpClientConnectionManager] handle IOException, ignore it.", ex);
return true;
}

@Override
public boolean handle(RuntimeException ex) {
assignLogger().warn("[NHttpClientConnectionManager] handle RuntimeException, ignore it.", ex);
return true;
}
});

return ioreactor;
}

/**
* create the {@link NHttpClientConnectionManager}, the code mainly from {@link HttpAsyncClientBuilder#build()}.
* we add the {@link IOReactorExceptionHandler} to handle the {@link IOException} and {@link RuntimeException}
* thrown by the {@link org.apache.http.impl.nio.reactor.BaseIOReactor} when process the event of Network.
* Using this way to avoid the {@link DefaultConnectingIOReactor} killed by unknown error of network.
* create the {@link NHttpClientConnectionManager}, the code mainly from {@link HttpAsyncClientBuilder#build()}. we
* add the {@link IOReactorExceptionHandler} to handle the {@link IOException} and {@link RuntimeException} thrown
* by the {@link org.apache.http.impl.nio.reactor.BaseIOReactor} when process the event of Network. Using this way
* to avoid the {@link DefaultConnectingIOReactor} killed by unknown error of network.
*
* @param originalRequestConfig request config.
* @param ioreactor I/O reactor.
* @param ioreactor I/O reactor.
* @return {@link NHttpClientConnectionManager}.
*/
private NHttpClientConnectionManager getConnectionManager(HttpClientConfig originalRequestConfig, DefaultConnectingIOReactor ioreactor) {
private NHttpClientConnectionManager getConnectionManager(HttpClientConfig originalRequestConfig,
DefaultConnectingIOReactor ioreactor) {
SSLContext sslcontext = SSLContexts.createDefault();
HostnameVerifier hostnameVerifier = new DefaultHostnameVerifier();
SchemeIOSessionStrategy sslStrategy = new SSLIOSessionStrategy(sslcontext, null, null, hostnameVerifier);

Registry<SchemeIOSessionStrategy> registry = RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", sslStrategy)
.build();
final PoolingNHttpClientConnectionManager poolingmgr = new PoolingNHttpClientConnectionManager(ioreactor, registry);
.register("http", NoopIOSessionStrategy.INSTANCE).register("https", sslStrategy).build();
final PoolingNHttpClientConnectionManager poolingmgr = new PoolingNHttpClientConnectionManager(ioreactor,
registry);

int maxTotal = originalRequestConfig.getMaxConnTotal();
if (maxTotal > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public final class HttpClientBeanHolder {

private static final Map<String, NacosRestTemplate> SINGLETON_REST = new HashMap<>(10);

private static final Map<String, NacosAsyncRestTemplate> SINGLETON_ASYNC_REST = new HashMap<>(
10);
private static final Map<String, NacosAsyncRestTemplate> SINGLETON_ASYNC_REST = new HashMap<>(10);

private static final AtomicBoolean ALREADY_SHUTDOWN = new AtomicBoolean(false);

Expand Down Expand Up @@ -101,11 +100,14 @@ private static void shutdown() {
return;
}
LOGGER.warn("[HttpClientBeanHolder] Start destroying common HttpClient");

try {
shutdown(DefaultHttpClientFactory.class.getName());
} catch (Exception ex) {
LOGGER.error("An exception occurred when the common HTTP client was closed : {}", ExceptionUtil.getStackTrace(ex));
LOGGER.error("An exception occurred when the common HTTP client was closed : {}",
ExceptionUtil.getStackTrace(ex));
}

LOGGER.warn("[HttpClientBeanHolder] Destruction of the end");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {

private final AtomicBoolean closed;

private final InnerWorker realWorker;

public TaskExecuteWorker(final String name, final int mod, final int total) {
this(name, mod, total, null);
}
Expand All @@ -57,7 +59,8 @@ public TaskExecuteWorker(final String name, final int mod, final int total, fina
this.queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
this.closed = new AtomicBoolean(false);
this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
new InnerWorker(name).start();
realWorker = new InnerWorker(this.name);
realWorker.start();
}

public String getName() {
Expand Down Expand Up @@ -95,6 +98,7 @@ public String status() {
public void shutdown() throws NacosException {
queue.clear();
closed.compareAndSet(false, true);
realWorker.interrupt();
}

/**
Expand All @@ -119,7 +123,7 @@ public void run() {
log.warn("task {} takes {}ms", task, duration);
}
} catch (Throwable e) {
log.error("[TASK-FAILED] " + e.toString(), e);
log.error("[TASK-FAILED] " + e, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor;
import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteTaskExecuteEngine;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Component;

/**
Expand All @@ -29,7 +30,7 @@
* @author xiweng.yy
*/
@Component
public class DistroTaskEngineHolder {
public class DistroTaskEngineHolder implements DisposableBean {

private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine();

Expand All @@ -51,4 +52,10 @@ public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() {
public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) {
this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor);
}

@Override
public void destroy() throws Exception {
this.delayTaskExecuteEngine.shutdown();
this.executeWorkersManager.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@
*/
@Component
public class EventProcessor implements ApplicationListener<ContextRefreshedEvent> {

private static final int MAX_WAIT_EVENT_TIME = 100;

private NacosMcpService nacosMcpService;

private NacosXdsService nacosXdsService;

private NacosResourceManager resourceManager;

private final BlockingQueue<Event> events;

public EventProcessor() {
events = new ArrayBlockingQueue<>(20);
}
Expand All @@ -68,25 +68,27 @@ public void notify(Event event) {
Thread.currentThread().interrupt();
}
}

public void handleEvents() {
new Consumer("handle events").start();

private void handleEvents() {
Consumer handleEvents = new Consumer("handle events");
handleEvents.setDaemon(true);
handleEvents.start();
}

@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
if (contextRefreshedEvent.getApplicationContext().getParent() == null) {
checkDependenceReady();
handleEvents();
}
}

private class Consumer extends Thread {

Consumer(String name) {
setName(name);
}

@Override
@SuppressWarnings("InfiniteLoopStatement")
public void run() {
Expand Down Expand Up @@ -115,23 +117,23 @@ public void run() {
}
}
}

private boolean hasClientConnection() {
return nacosMcpService.hasClientConnection() || nacosXdsService.hasClientConnection();
}

private boolean needNewTask(boolean hasNewEvent, Future<Void> task) {
return hasNewEvent && (task == null || task.isDone());
}

private class EventHandleTask implements Callable<Void> {

private final Event event;

EventHandleTask(Event event) {
this.event = event;
}

@Override
public Void call() throws Exception {
ResourceSnapshot snapshot = resourceManager.createResourceSnapshot();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 1999-2022 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.naming.misc;

import com.alibaba.nacos.core.listener.NacosApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;

/**
* graceful shutdown listenner.
* @author Weizhan▪Yun
* @date 2022/11/2 14:40
*/
public class GracefulShutdownListener implements NacosApplicationListener {

@Override
public void starting() {

}

@Override
public void environmentPrepared(ConfigurableEnvironment environment) {

}

@Override
public void contextPrepared(ConfigurableApplicationContext context) {

}

@Override
public void contextLoaded(ConfigurableApplicationContext context) {

}

@Override
public void started(ConfigurableApplicationContext context) {

}

@Override
public void running(ConfigurableApplicationContext context) {

}

@Override
public void failed(ConfigurableApplicationContext context, Throwable exception) {
try {
NamingExecuteTaskDispatcher.getInstance().destroy();
} catch (Exception ignore) {
}
}
}
Loading