diff --git a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeNotifyResponse.java b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeNotifyResponse.java
index b873500c406..732b4f6bb06 100644
--- a/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeNotifyResponse.java
+++ b/api/src/main/java/com/alibaba/nacos/api/config/remote/response/ConfigChangeNotifyResponse.java
@@ -105,4 +105,10 @@ public String getTenant() {
public void setTenant(String tenant) {
this.tenant = tenant;
}
+
+ @Override
+ public String toString() {
+ return "ConfigChangeNotifyResponse{" + "dataId='" + dataId + '\'' + ", group='" + group + '\'' + ", tenant='"
+ + tenant + '\'' + '}';
+ }
}
diff --git a/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java b/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java
index 63faaece655..fd2c22822de 100644
--- a/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java
+++ b/api/src/main/java/com/alibaba/nacos/api/remote/response/PushCallBack.java
@@ -26,7 +26,7 @@ public interface PushCallBack {
public void onSuccess();
- public void onFail();
+ public void onFail(Exception e);
public void onTimeout();
diff --git a/client/src/main/resources/nacos-log4j2.xml b/client/src/main/resources/nacos-log4j2.xml
index eaa626f81fb..f6d491d6d47 100644
--- a/client/src/main/resources/nacos-log4j2.xml
+++ b/client/src/main/resources/nacos-log4j2.xml
@@ -30,9 +30,23 @@
-
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %p [%-5t:%c{2}] %m%n
+
+
+
+
+
+
+
+
+
+
+ filePattern="${sys:nacos.logging.path}/grpc.log.%d{yyyy-MM-dd}.%i">
%d{yyyy-MM-dd HH:mm:ss.SSS} %p [%-5t:%c{2}] %m%n
@@ -66,7 +80,13 @@
additivity="false">
-
+
+
+
+
+
+
diff --git a/client/src/main/resources/nacos-logback.xml b/client/src/main/resources/nacos-logback.xml
index eb55896327f..a9687925e55 100644
--- a/client/src/main/resources/nacos-logback.xml
+++ b/client/src/main/resources/nacos-logback.xml
@@ -52,12 +52,36 @@
+
+ ${nacos.logging.path}/rpc.log
+
+
+ ${nacos.logging.path}/rpc.log.%i
+ ${JM.LOG.RETAIN.COUNT:-7}
+
+
+
+ ${JM.LOG.FILE.SIZE:-10MB}
+
+
+
+ %d{yyyy-MM-dd HH:mm:ss.SSS} %p [%-5t:%c{2}] %m%n
+
+
+
+
+
+
+
+
+
diff --git a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java
index d59c305afa2..cba82d39c57 100644
--- a/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java
+++ b/client/src/test/java/com/alibaba/nacos/client/ConfigTest.java
@@ -27,6 +27,9 @@
import org.junit.Ignore;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.Scanner;
@@ -42,7 +45,7 @@ public void before() throws Exception {
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:28848");
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848");
- //"11.239.114.187:8848,11.239.113.204:8848,11.239.112.161:8848");
+ //"11.239.114.187:8848,,11.239.113.204:8848,11.239.112.161:8848");
//"11.239.114.187:8848");
configService = NacosFactory.createConfigService(properties);
//Thread.sleep(2000L);
@@ -55,6 +58,44 @@ public void cleanup() throws Exception {
@Test
public void test2() throws Exception {
+ Properties properties = new Properties();
+ properties.setProperty(PropertyKeyConst.SERVER_ADDR, "11.160.144.148:8848");
+ //"
+ List configServiceList = new ArrayList();
+ for (int i = 0; i < 200; i++) {
+
+ ConfigService configService = NacosFactory.createConfigService(properties);
+ configService.addListener("test", "test", new AbstractListener() {
+
+ @Override
+ public void receiveConfigInfo(String configInfo) {
+ }
+ });
+ configServiceList.add(configService);
+ }
+
+ Thread th = new Thread(new Runnable() {
+ @Override
+ public void run() {
+
+ Random random = new Random();
+ int times = 10000;
+ while (times > 0) {
+ try {
+ boolean result = configService
+ .publishConfig("test", "test", "value" + System.currentTimeMillis());
+
+ times--;
+ Thread.sleep(10000L);
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ }
+ }
+ }
+
+ });
+ th.start();
Thread.sleep(1000000L);
}
@@ -62,31 +103,58 @@ public void test2() throws Exception {
@Test
public void test() throws Exception {
- final String dataId = "lessspring";
- final String group = "lessspring";
+ Random random = new Random();
+ final String dataId = "xiaochun.xxc";
+ final String group = "xiaochun.xxc";
final String content = "lessspring-" + System.currentTimeMillis();
- Random random = new Random();
- int times = 10000;
- while (times > 0) {
-
- boolean result = configService.publishConfig(dataId, group, "value" + System.currentTimeMillis());
- times--;
- Thread.sleep(2000L);
-
- }
+ Thread th = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ long start = System.currentTimeMillis();
+ Random random = new Random();
+ int times = 1000;
+ while (times > 0) {
+ try {
+
+ for (int i = 0; i < 20; i++) {
+ configService.publishConfig(dataId + random.nextInt(20), group,
+ "value" + System.currentTimeMillis());
+ }
+ times--;
+ Thread.sleep(500L);
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ }
+ }
+
+ System.out.println(times);
+ System.out.println("Write Done");
+ }
- boolean result = configService.publishConfig(dataId, group, content);
- Assert.assertTrue(result);
+ });
+
+ th.start();
Listener listener = new AbstractListener() {
@Override
public void receiveConfigInfo(String configInfo) {
- System.out.println("receiveConfigInfo1 :" + configInfo);
+ System.out.println(new Date() + "receiveConfigInfo1 :" + configInfo);
}
};
- configService.getConfigAndSignListener(dataId, group, 5000, listener);
+ for (int i = 0; i < 20; i++) {
+ configService.addListener(dataId + i, group, listener);
+ }
+
+ //configService.getConfigAndSignListener(dataId, group, 5000, listener);
+
+ boolean result = configService.publishConfig(dataId, group, content);
+ Assert.assertTrue(result);
+
+ // configService.getConfigAndSignListener(dataId, group, 5000, listener);
+
//configService.removeListener(dataId, group, listener);
//configService.removeConfig(dataId, group);
@@ -129,16 +197,34 @@ public void receiveConfigInfo(String configInfo) {
};
configService.getConfigAndSignListener(dataId, group, 5000, listener);
- System.out.println("");
- Thread.sleep(10000L);
- System.out.println("Remove config..");
- configService.removeListener(dataId, group, listener);
- Thread.sleep(10000L);
System.out.println("Add Listen config..");
- configService.getConfigAndSignListener(dataId, group, 5000, listener);
+ Thread th = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ long start = System.currentTimeMillis();
+ Random random = new Random();
+ int times = 100;
+ while (times > 0) {
+ try {
+ configService.publishConfig(dataId, group, "value" + System.currentTimeMillis());
+
+ times--;
+ Thread.sleep(5000L);
+ } catch (Exception e) {
+ e.printStackTrace();
+
+ }
+ }
+
+ System.out.println(times);
+ System.out.println("Write Done");
+ }
+
+ });
+ th.start();
Scanner scanner = new Scanner(System.in);
System.out.println("input content");
while (scanner.hasNextLine()) {
diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
index e2386c8557d..43d8f8b7dd0 100644
--- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
+++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClient.java
@@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
@@ -48,32 +49,12 @@ public abstract class RpcClient implements Closeable {
protected String connectionId;
+ protected LinkedBlockingQueue eventLinkedBlockingQueue = new LinkedBlockingQueue();
+
protected AtomicReference rpcClientStatus = new AtomicReference(
RpcClientStatus.WAIT_INIT);
- protected ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName("com.alibaba.nacos.client.config.grpc.worker");
- t.setDaemon(true);
- return t;
- }
- });
-
- /**
- * Notify when client re connected.
- */
- protected void notifyReConnected() {
-
- if (!connectionEventListeners.isEmpty()) {
- LoggerUtils.printIfInfoEnabled(LOGGER, "notify connection event listeners.");
- for (ConnectionEventListener listener : connectionEventListeners) {
- listener.onReconnected();
- }
- }
-
- }
+ protected ScheduledExecutorService executorService;
/**
* Notify when client re connected.
@@ -196,7 +177,53 @@ public RpcClient(ServerListFactory serverListFactory) {
/**
* Start this client.
*/
- public abstract void start() throws NacosException;
+ public void start() throws NacosException {
+
+ executorService = new ScheduledThreadPoolExecutor(5, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName("com.alibaba.nacos.client.config.grpc.worker");
+ t.setDaemon(true);
+ return t;
+ }
+ });
+
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ ConnectionEvent take = null;
+ try {
+ take = eventLinkedBlockingQueue.take();
+ if (take.isConnected()) {
+ notifyConnected();
+ } else if (take.isDisConnected()) {
+ notifyDisConnected();
+ }
+ } catch (InterruptedException e) {
+ //Do nothing
+ }
+
+ }
+ }
+ });
+ innerStart();
+ }
+
+ /**
+ * start implements for sub rpc client.
+ *
+ * @throws NacosException exception to throw.
+ */
+ public abstract void innerStart() throws NacosException;
+
+ /**
+ * increase offset of the nacos server port for the rpc server port.
+ *
+ * @return rpc port offset
+ */
+ public abstract int rpcPortOffset();
/**
* send request.
@@ -220,7 +247,7 @@ public RpcClient(ServerListFactory serverListFactory) {
* @param connectionEventListener connectionEventListener
*/
public void registerConnectionListener(ConnectionEventListener connectionEventListener) {
-
+
LoggerUtils.printIfInfoEnabled(LOGGER,
"Registry connection listener to current client,connectionId={}, connectionEventListener={}",
this.connectionId, connectionEventListener.getClass().getName());
@@ -236,7 +263,7 @@ public void registerServerPushResponseHandler(ServerPushResponseHandler serverPu
LoggerUtils.printIfInfoEnabled(LOGGER,
" Registry server push response listener to current client,connectionId={}, connectionEventListener={}",
this.connectionId, serverPushResponseHandler.getClass().getName());
-
+
this.serverPushResponseListeners.add(serverPushResponseHandler);
}
@@ -248,4 +275,74 @@ public void registerServerPushResponseHandler(ServerPushResponseHandler serverPu
public ServerListFactory getServerListFactory() {
return serverListFactory;
}
+
+ protected GrpcServerInfo nextServer() {
+ getServerListFactory().genNextServer();
+ String serverAddress = getServerListFactory().getCurrentServer();
+ return resolveServerInfo(serverAddress);
+ }
+
+ protected GrpcServerInfo currentServer() {
+ String serverAddress = getServerListFactory().getCurrentServer();
+ return resolveServerInfo(serverAddress);
+ }
+
+ private GrpcServerInfo resolveServerInfo(String serverAddress) {
+ GrpcServerInfo serverInfo = new GrpcServerInfo();
+ serverInfo.serverPort = rpcPortOffset();
+ if (serverAddress.contains("http")) {
+ serverInfo.serverIp = serverAddress.split(":")[1].replaceAll("//", "");
+ serverInfo.serverPort += Integer.valueOf(serverAddress.split(":")[2].replaceAll("//", ""));
+ } else {
+ serverInfo.serverIp = serverAddress.split(":")[0];
+ serverInfo.serverPort += Integer.valueOf(serverAddress.split(":")[1]);
+ }
+ return serverInfo;
+ }
+
+ public class GrpcServerInfo {
+
+ protected String serverIp;
+
+ protected int serverPort;
+
+ /**
+ * Getter method for property serverIp.
+ *
+ * @return property value of serverIp
+ */
+ public String getServerIp() {
+ return serverIp;
+ }
+
+ /**
+ * Getter method for property serverPort.
+ *
+ * @return property value of serverPort
+ */
+ public int getServerPort() {
+ return serverPort;
+ }
+ }
+
+ public class ConnectionEvent {
+
+ public static final int CONNECTED = 1;
+
+ public static final int DISCONNECTED = 0;
+
+ int eventType;
+
+ public ConnectionEvent(int eventType) {
+ this.eventType = eventType;
+ }
+
+ public boolean isConnected() {
+ return eventType == CONNECTED;
+ }
+
+ public boolean isDisConnected() {
+ return eventType == DISCONNECTED;
+ }
+ }
}
diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java
index e027ee73217..7a2e1c35ee5 100644
--- a/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java
+++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/RpcClientFactory.java
@@ -35,12 +35,10 @@ public static RpcClient getClient(String clientName) {
synchronized (clientMap) {
if (clientMap.get(clientName) == null) {
RpcClient moduleClient = new GrpcClient();
- clientMap.putIfAbsent(clientName, moduleClient);
+ clientMap.put(clientName, moduleClient);
+ return moduleClient;
}
return clientMap.get(clientName);
-
}
-
}
-
}
diff --git a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java
index cc5c81ff7ce..deee920216e 100644
--- a/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java
+++ b/common/src/main/java/com/alibaba/nacos/common/remote/client/grpc/GrpcClient.java
@@ -30,6 +30,7 @@
import com.alibaba.nacos.api.remote.response.ConnectionUnregisterResponse;
import com.alibaba.nacos.api.remote.response.PlainBodyResponse;
import com.alibaba.nacos.api.remote.response.Response;
+import com.alibaba.nacos.api.remote.response.ResponseCode;
import com.alibaba.nacos.api.remote.response.ResponseTypeConstants;
import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.common.remote.client.ResponseRegistry;
@@ -53,7 +54,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@@ -66,36 +66,50 @@
*/
public class GrpcClient extends RpcClient {
- static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class);
+ static final Logger LOGGER = LoggerFactory.getLogger("com.alibaba.nacos.common.remote.client");
+ /**
+ * grpc channel.
+ */
protected ManagedChannel channel;
+ /**
+ * stub to send stream request.
+ */
protected RequestStreamGrpc.RequestStreamStub grpcStreamServiceStub;
+ /**
+ * stub to send request.
+ */
protected RequestGrpc.RequestFutureStub grpcFutureServiceStub;
- private ExecutorService aynsRequestExecutor = Executors.newFixedThreadPool(10);
-
- private ExecutorService eventExecutor = Executors.newFixedThreadPool(1);
-
- LinkedBlockingQueue eventLinkedBlockingQueue = new LinkedBlockingQueue();
-
- private ReentrantLock lock = new ReentrantLock();
+ /**
+ * executor to execute future request.
+ */
+ private ExecutorService aynsRequestExecutor;
+ /**
+ * Empty constructor.
+ */
public GrpcClient() {
super();
}
+ /**
+ * constructor with a server liset factory.
+ *
+ * @param serverListFactory serverListFactory.
+ */
public GrpcClient(ServerListFactory serverListFactory) {
super(serverListFactory);
}
/**
- * create a new channel .
+ * create a new channel with specfic server address.
*
* @param serverIp serverIp.
* @param serverPort serverPort.
- * @return if server check success,return stub.
+ * @return if server check success,return a non-null stub.
*/
private RequestGrpc.RequestFutureStub createNewChannelStub(String serverIp, int serverPort) {
@@ -126,43 +140,56 @@ private void shuntDownChannel(ManagedChannel managedChannel) {
}
}
+ /**
+ * try to connect to server ,if fail at first time ,it will retry asynchrous.
+ */
private void connectToServer() {
+ LOGGER.info("starting to connect to server . ");
rpcClientStatus.compareAndSet(RpcClientStatus.INITED, RpcClientStatus.STARTING);
-
- GrpcServerInfo serverInfo = nextServer();
- RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.serverIp,
- serverInfo.serverPort);
- if (newChannelStubTemp != null) {
- RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc
- .newStub(newChannelStubTemp.getChannel());
-
- bindRequestStream(requestStreamStubTemp);
- //switch current channel and stub
- channel = (ManagedChannel) newChannelStubTemp.getChannel();
- grpcStreamServiceStub = requestStreamStubTemp;
- RequestGrpc.RequestFutureStub grpcFutureServiceStubTemp = RequestGrpc
- .newFutureStub(newChannelStubTemp.getChannel());
- grpcFutureServiceStub = grpcFutureServiceStubTemp;
- rpcClientStatus.set(RpcClientStatus.RUNNING);
- eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
- notifyConnected();
- } else {
- switchServer(true);
+ try {
+ GrpcServerInfo serverInfo = nextServer();
+ LOGGER.info("trying to connect to server, " + serverInfo);
+ RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(),
+ serverInfo.getServerPort());
+ if (newChannelStubTemp != null) {
+
+ LOGGER.info("connect to server success !");
+
+ RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc
+ .newStub(newChannelStubTemp.getChannel());
+
+ bindRequestStream(requestStreamStubTemp);
+ //switch current channel and stub
+ channel = (ManagedChannel) newChannelStubTemp.getChannel();
+ grpcStreamServiceStub = requestStreamStubTemp;
+ RequestGrpc.RequestFutureStub grpcFutureServiceStubTemp = RequestGrpc
+ .newFutureStub(newChannelStubTemp.getChannel());
+ grpcFutureServiceStub = grpcFutureServiceStubTemp;
+ rpcClientStatus.set(RpcClientStatus.RUNNING);
+ eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
+ return;
+ }
+ } catch (Exception e) {
+ LOGGER.error("fail to connect to server ! ", e);
}
+ switchServer();
+
}
@Override
- public void start() throws NacosException {
-
+ public void innerStart() throws NacosException {
+
if (rpcClientStatus.get() == RpcClientStatus.WAIT_INIT) {
LOGGER.error("RpcClient has not init yet, please check init ServerListFactory...");
- throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "RpcClient not init yet");
+ throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "rpc client not init yet");
}
- if (rpcClientStatus.get() == RpcClientStatus.RUNNING || rpcClientStatus.get() == RpcClientStatus.STARTING) {
+ if (rpcClientStatus.get() != RpcClientStatus.INITED) {
return;
}
+ aynsRequestExecutor = Executors.newFixedThreadPool(10);
+
connectToServer();
executorService.scheduleWithFixedDelay(new Runnable() {
@@ -181,96 +208,79 @@ public void responseReply(Response response) {
if (!isRunning()) {
return;
}
- eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
- switchServer(false);
+ switchServer();
} catch (Exception e) {
- LOGGER.error("rebuildClient error ", e);
+ LOGGER.error("switch server error ", e);
}
}
}
});
- eventExecutor.submit(new Runnable() {
- @Override
- public void run() {
- while (true) {
- try {
- ConnectionEvent event = eventLinkedBlockingQueue.take();
- if (event.isConnected()) {
- notifyConnected();
- } else if (event.isDisConnected()) {
- notifyDisConnected();
- }
- } catch (Exception e) {
- LOGGER.error("connection event process fail ", e);
- }
- }
- }
- });
}
+ @Override
+ public int rpcPortOffset() {
+ return 1000;
+ }
+
+ private final ReentrantLock switchingLock = new ReentrantLock();
+
/**
* switch a new server.
*/
- private void switchServer(final boolean onStarting) {
-
- //try to get operate lock.
- boolean lockResult = lock.tryLock();
- if (!lockResult) {
- return;
- }
-
- if (onStarting) {
- // access on startup fail
- rpcClientStatus.set(RpcClientStatus.SWITCHING_SERVER);
-
- } else {
- // access from running status, sendbeat fail or receive reset message from server.
- boolean changeStatusSuccess = rpcClientStatus
- .compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.SWITCHING_SERVER);
- if (!changeStatusSuccess) {
- return;
- }
- }
+ private void switchServer() {
executorService.schedule(new Runnable() {
@Override
public void run() {
-
- // loop until start client success.
- while (!isRunning()) {
-
- //1.get a new server
- GrpcServerInfo serverInfo = nextServer();
- //2.get a new channel to new server
- RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.serverIp,
- serverInfo.serverPort);
- if (newChannelStubTemp != null) {
- RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc
- .newStub(newChannelStubTemp.getChannel());
-
- bindRequestStream(requestStreamStubTemp);
- final ManagedChannel depratedChannel = channel;
- //switch current channel and stub
- channel = (ManagedChannel) newChannelStubTemp.getChannel();
- grpcStreamServiceStub = requestStreamStubTemp;
- grpcFutureServiceStub = newChannelStubTemp;
- rpcClientStatus.getAndSet(RpcClientStatus.RUNNING);
- eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
- shuntDownChannel(depratedChannel);
- continue;
+
+ try {
+ //only one thread can execute switching meantime.
+ boolean innerLock = switchingLock.tryLock();
+ if (!innerLock) {
+ return;
}
- //
- try {
- //sleep 3 second to switch next server.
- Thread.sleep(3000L);
- } catch (InterruptedException e) {
- // Do nothing.
+
+ if (rpcClientStatus.get() == RpcClientStatus.RUNNING) {
+ eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
}
+ rpcClientStatus.set(RpcClientStatus.SWITCHING_SERVER);
+ // loop until start client success.
+ while (!isRunning()) {
+
+ //1.get a new server
+ GrpcServerInfo serverInfo = nextServer();
+ //2.create a new channel to new server
+ RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(
+ serverInfo.getServerIp(), serverInfo.getServerPort());
+ if (newChannelStubTemp != null) {
+ RequestStreamGrpc.RequestStreamStub requestStreamStubTemp = RequestStreamGrpc
+ .newStub(newChannelStubTemp.getChannel());
+
+ bindRequestStream(requestStreamStubTemp);
+ final ManagedChannel depratedChannel = channel;
+ //switch current channel and stub
+ channel = (ManagedChannel) newChannelStubTemp.getChannel();
+ grpcStreamServiceStub = requestStreamStubTemp;
+ grpcFutureServiceStub = newChannelStubTemp;
+ rpcClientStatus.getAndSet(RpcClientStatus.RUNNING);
+ eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
+ shuntDownChannel(depratedChannel);
+ continue;
+ }
+ //
+ try {
+ //sleep 3 second to switch next server.
+ Thread.sleep(3000L);
+ } catch (InterruptedException e) {
+ // Do nothing.
+ }
+ }
+ } finally {
+ switchingLock.unlock();
}
}
}, 0L, TimeUnit.MILLISECONDS);
- lock.unlock();
}
@@ -281,7 +291,7 @@ public void sendBeat() {
int maxRetryTimes = 3;
while (maxRetryTimes > 0) {
-
+
try {
if (!isRunning()) {
return;
@@ -295,8 +305,7 @@ public void sendBeat() {
GrpcResponse response = requestFuture.get();
if (ResponseTypeConstants.CONNECION_UNREGISTER.equals(response.getType())) {
LOGGER.warn(" connection is not register to current server ,trying to switch server ");
- eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
- switchServer(false);
+ switchServer();
}
return;
} catch (Exception e) {
@@ -306,9 +315,8 @@ public void sendBeat() {
}
}
- eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.DISCONNECTED));
- LOGGER.warn("Max retry times for send heart beat fail reached,trying to switch server... ");
- switchServer(false);
+ LOGGER.warn("max retry times for send heart beat fail reached,trying to switch server... ");
+ switchServer();
}
private GrpcMetadata buildMeta() {
@@ -325,7 +333,9 @@ private GrpcMetadata buildMeta() {
*/
private boolean serverCheck(RequestGrpc.RequestFutureStub requestBlockingStub) {
try {
-
+ if (requestBlockingStub == null) {
+ return false;
+ }
ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(buildMeta())
.setType(serverCheckRequest.getType())
@@ -353,7 +363,6 @@ public void onNext(GrpcResponse grpcResponse) {
LOGGER.debug(" stream response receive ,original reponse :{}", grpcResponse);
try {
- sendAckResponse(grpcResponse.getAck(), true);
String message = grpcResponse.getBody().getValue().toStringUtf8();
String type = grpcResponse.getType();
String bodyString = grpcResponse.getBody().getValue().toStringUtf8();
@@ -373,9 +382,10 @@ public void accept(ServerPushResponseHandler serverPushResponseHandler) {
serverPushResponseHandler.responseReply(response);
}
});
-
+ sendAckResponse(grpcResponse.getAck(), true);
} catch (Exception e) {
- e.printStackTrace(System.out);
+ sendAckResponse(grpcResponse.getAck(), false);
+
LOGGER.error("error tp process server push response :{}", grpcResponse);
}
}
@@ -397,7 +407,7 @@ private void sendAckResponse(String ackId, boolean success) {
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(request)))).build();
ListenableFuture requestFuture = grpcFutureServiceStub.request(grpcrequest);
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.error("error to send ack response,ackId->:{}", ackId);
}
}
@@ -407,7 +417,7 @@ public Response request(Request request) throws NacosException {
int maxRetryTimes = 3;
while (maxRetryTimes > 0) {
try {
-
+
GrpcRequest grpcrequest = GrpcRequest.newBuilder().setMetadata(buildMeta()).setType(request.getType())
.setBody(Any.newBuilder().setValue(ByteString.copyFromUtf8(JacksonUtils.toJson(request))))
.build();
@@ -423,7 +433,7 @@ public Response request(Request request) throws NacosException {
}
}
- LOGGER.warn("Max retry times for request fail reached.");
+ LOGGER.warn("Max retry times for request fail reached !");
throw new NacosException(NacosException.SERVER_ERROR, "Fail to request.");
}
@@ -460,7 +470,9 @@ public void onSuccess(@NullableDecl GrpcResponse grpcResponse) {
if (response != null && response.isSuccess()) {
callback.onSuccess(response);
} else {
- callback.onFailure(new NacosException(response.getErrorCode(), response.getMessage()));
+ callback.onFailure(new NacosException(
+ (response == null) ? ResponseCode.FAIL.getCode() : response.getErrorCode(),
+ (response == null) ? "null" : response.getMessage()));
}
}
@@ -478,58 +490,6 @@ public void shutdown() throws NacosException {
}
}
- private GrpcServerInfo nextServer() {
- getServerListFactory().genNextServer();
- String serverAddress = getServerListFactory().getCurrentServer();
- return resolveServerInfo(serverAddress);
- }
-
- private GrpcServerInfo currentServer() {
- String serverAddress = getServerListFactory().getCurrentServer();
- return resolveServerInfo(serverAddress);
- }
-
- private GrpcServerInfo resolveServerInfo(String serverAddress) {
- GrpcServerInfo serverInfo = new GrpcServerInfo();
- serverInfo.serverPort = 1000;
- if (serverAddress.contains("http")) {
- serverInfo.serverIp = serverAddress.split(":")[1].replaceAll("//", "");
- serverInfo.serverPort += Integer.valueOf(serverAddress.split(":")[2].replaceAll("//", ""));
- } else {
- serverInfo.serverIp = serverAddress.split(":")[0];
- serverInfo.serverPort += Integer.valueOf(serverAddress.split(":")[1]);
- }
- return serverInfo;
- }
-
- class GrpcServerInfo {
-
- String serverIp;
-
- int serverPort;
-
- }
-
- class ConnectionEvent {
-
- static final int CONNECTED = 1;
-
- static final int DISCONNECTED = 0;
-
- int eventType;
-
- public ConnectionEvent(int eventType) {
- this.eventType = eventType;
- }
-
- public boolean isConnected() {
- return eventType == CONNECTED;
- }
-
- public boolean isDisConnected() {
- return eventType == DISCONNECTED;
- }
- }
}
diff --git a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeNotifier.java b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeNotifier.java
index 72c4641b7c4..6728bddcf82 100644
--- a/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeNotifier.java
+++ b/config/src/main/java/com/alibaba/nacos/config/server/remote/ConfigChangeNotifier.java
@@ -16,15 +16,19 @@
package com.alibaba.nacos.config.server.remote;
+import com.alibaba.nacos.api.config.remote.response.ConfigChangeNotifyResponse;
import com.alibaba.nacos.api.remote.response.PushCallBack;
-import com.alibaba.nacos.api.remote.response.ServerPushResponse;
import com.alibaba.nacos.common.utils.CollectionUtils;
import com.alibaba.nacos.core.remote.RpcPushService;
+import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* ConfigChangeNotifier.
@@ -35,6 +39,9 @@
@Component
public class ConfigChangeNotifier {
+ private ThreadPoolExecutor retryPushexecutors = new ThreadPoolExecutor(15, 30, 5, TimeUnit.SECONDS,
+ new ArrayBlockingQueue(100000), new ThreadPoolExecutor.AbortPolicy());
+
@Autowired
ConfigChangeListenContext configChangeListenContext;
@@ -47,9 +54,8 @@ public class ConfigChangeNotifier {
* @param groupKey groupKey
* @param notifyResponse notifyResponse
*/
- public void configDataChanged(String groupKey, final ServerPushResponse notifyResponse) {
-
- long start = System.currentTimeMillis();
+ public void configDataChanged(String groupKey, final ConfigChangeNotifyResponse notifyResponse) {
+
Set clients = configChangeListenContext.getListeners(groupKey);
if (!CollectionUtils.isEmpty(clients)) {
@@ -57,77 +63,58 @@ public void configDataChanged(String groupKey, final ServerPushResponse notifyRe
rpcPushService.pushWithCallback(client, notifyResponse, new PushCallBack() {
@Override
public void onSuccess() {
- //System.out.println("推送变更成功:" + connectionId);
+ Loggers.CORE.debug("push callback success.,groupKey={},clientId={}", groupKey, client);
}
-
+
@Override
- public void onFail() {
- //System.out.println("推送变更失败:" + client);
+ public void onFail(Exception e) {
+ Loggers.CORE
+ .warn("push callback fail.will retry push ,groupKey={},clientId={}", groupKey, client);
retryPush(client, notifyResponse, 3);
}
-
+
@Override
public void onTimeout() {
- //System.out.println("推送变更超时:" + client);
+ Loggers.CORE.warn("push callback timeout.will retry push ,groupKey={},clientId={}", groupKey,
+ client);
retryPush(client, notifyResponse, 3);
}
});
-
+
}
}
- long end = System.currentTimeMillis();
-
}
- void retryPush(String clientId, ServerPushResponse notifyResponse, int maxRetyTimes) {
+ void retryPush(final String clientId, final ConfigChangeNotifyResponse notifyResponse, final int maxRetyTimes) {
- int maxTimes = maxRetyTimes;
- final AtomicBoolean success = new AtomicBoolean(false);
- Object lock = new Object();
- while (maxRetyTimes > 0) {
- if (success.get()) {
- return;
- }
- maxRetyTimes--;
- rpcPushService.pushWithCallback(clientId, notifyResponse, new PushCallBack() {
+ try {
+ retryPushexecutors.submit(new Runnable() {
@Override
- public void onSuccess() {
- //System.out.println("推送变更成功:" + connectionId);
- success.set(true);
- synchronized (lock) {
- lock.notify();
+ public void run() {
+ int maxTimes = maxRetyTimes;
+ boolean rePushFlag = false;
+ while (maxTimes > 0 && !rePushFlag) {
+ maxTimes--;
+ boolean push = rpcPushService.push(clientId, notifyResponse, 1000L);
+ if (push) {
+ rePushFlag = true;
+ }
}
- }
-
- @Override
- public void onFail() {
- //System.out.println("推送变更失败:" + client);
- synchronized (lock) {
- lock.notify();
- }
- }
-
- @Override
- public void onTimeout() {
- //System.out.println("推送变更超时:" + client);
- synchronized (lock) {
- lock.notify();
+ if (rePushFlag) {
+ Loggers.CORE.warn("push callback retry success.dataId={},group={},tenant={},clientId={}",
+ notifyResponse.getDataId(), notifyResponse.getGroup(), notifyResponse.getTenant(),
+ clientId);
+ } else {
+ Loggers.CORE.error(String
+ .format("push callback retry fail.dataId={},group={},tenant={},clientId={}",
+ notifyResponse.getDataId(), notifyResponse.getGroup(),
+ notifyResponse.getTenant(), clientId));
}
}
});
-
- synchronized (lock) {
- try {
- lock.wait(500L);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- if (success.get()) {
- //Success
- } else {
- //reTry fails.
+ } catch (RejectedExecutionException e) {
+ Loggers.CORE.warn("retry push callback task overlimit.dataId={},group={},tenant={},clientId={}",
+ notifyResponse.getDataId(), notifyResponse.getGroup(), notifyResponse.getTenant(), clientId);
}
}
diff --git a/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java b/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java
index ef6c9563c8c..d82414d871a 100644
--- a/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java
+++ b/console/src/main/java/com/alibaba/nacos/console/controller/ServerLoaderController.java
@@ -45,11 +45,36 @@ public class ServerLoaderController {
*
* @return state json.
*/
- @GetMapping("/uplimitclients")
- public ResponseEntity updateMaxClients(@RequestParam Integer maxclients) {
+ @GetMapping("/max")
+ public ResponseEntity updateMaxClients(@RequestParam Integer count) {
Map responseMap = new HashMap<>(3);
- grpcServer.setMaxClientCount(maxclients);
+ grpcServer.setMaxClientCount(count);
return ResponseEntity.ok().body("success");
}
+ /**
+ * Get server state of current server.
+ *
+ * @return state json.
+ */
+ @GetMapping("/reload")
+ public ResponseEntity reloadClients(@RequestParam Integer count) {
+ Map responseMap = new HashMap<>(3);
+ grpcServer.reloadClient(count);
+ return ResponseEntity.ok().body("success");
+ }
+
+ /**
+ * Get current clients.
+ *
+ * @return state json.
+ */
+ @GetMapping("/current")
+ public ResponseEntity currentCount() {
+ Map responseMap = new HashMap<>(3);
+ int count = grpcServer.currentClients();
+ return ResponseEntity.ok().body(count);
+ }
+
+
}
diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectCoordinator.java b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectCoordinator.java
deleted file mode 100644
index eaca341c578..00000000000
--- a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectCoordinator.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright 1999-2020 Alibaba Group Holding Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.nacos.core.remote;
-
-import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
-import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
-import com.alibaba.nacos.core.utils.Loggers;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.PostConstruct;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * ConnectCoordinator.
- *
- * @author liuzunfei
- * @version $Id: ConnectCoordinator.java, v 0.1 2020年07月14日 12:01 AM liuzunfei Exp $
- */
-
-@Service
-public class ConnectCoordinator implements ConnectionHeathyChecker {
-
- private int maxClient = -1;
-
- @Autowired
- private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry;
-
- @Autowired
- ConnectionManager connectionManager;
-
- private ScheduledExecutorService executors = Executors.newScheduledThreadPool(2);
-
- private static final long EXPIRE_MILLSECOND = 10000L;
-
- /**
- * Start Task:Expel the connection which active Time expire.
- */
- @PostConstruct
- public void start() {
-
- // Start UnHeathy Conection Expel Task.
- executors.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- try {
- long currentStamp = System.currentTimeMillis();
- Set> entries = connectionManager.connetions.entrySet();
-
- int expelCount =
- maxClient < 0 ? maxClient : connectionManager.getCurretConnectionCount() - maxClient;
- List expelClient = new LinkedList();
-
- List expireCLients = new LinkedList();
- for (Map.Entry entry : entries) {
- Connection client = entry.getValue();
- long lastActiveTimestamp = entry.getValue().getLastActiveTimestamp();
- if (currentStamp - lastActiveTimestamp > EXPIRE_MILLSECOND) {
- expireCLients.add(client.getConnectionId());
- expelCount--;
- } else if (expelCount > 0) {
- expelClient.add(client.getConnectionId());
- expelCount--;
- }
- }
-
- for (String expireClient : expireCLients) {
- connectionManager.unregister(expireClient);
- Loggers.GRPC.info("expire connection found ,success expel connectionid = {} ", expireClient);
- }
-
- for (String expeledClient : expelClient) {
- try {
- Connection connection = connectionManager.getConnection(expeledClient);
- if (connection != null) {
- if (connection.isSwitching()) {
- continue;
- }
- connectionManager.getConnection(expeledClient)
- .sendPushNoAck(new ConnectResetResponse());
- Loggers.GRPC.info("expel connection ,send switch server response connectionid = {} ",
- expeledClient);
- }
-
- } catch (ConnectionAlreadyClosedException e) {
- connectionManager.unregister(expeledClient);
- } catch (Exception e) {
- Loggers.GRPC.error("error occurs when expel connetion :", expeledClient, e);
- }
-
- }
-
- } catch (Exception e) {
- Loggers.GRPC.error("error occurs when heathy check... ", e);
- }
- }
- }, 500L, 3000L, TimeUnit.MILLISECONDS);
-
- }
-
- public void coordinateMaxClientsSmoth(int maxClient) {
- this.maxClient = maxClient;
- }
-
-}
-
diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java
index 05d5d81605f..1ab986b16ee 100644
--- a/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java
+++ b/core/src/main/java/com/alibaba/nacos/core/remote/ConnectionManager.java
@@ -16,12 +16,21 @@
package com.alibaba.nacos.core.remote;
+import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
+import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.HashMap;
+import javax.annotation.PostConstruct;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* connect manager.
@@ -32,14 +41,25 @@
@Service
public class ConnectionManager {
- @Autowired
- private ClientConnectionEventListenerRegistry connectionEventListenerRegistry;
+ /**
+ * maxLimitClient
+ */
+ private int maxClient = -1;
+
+ /**
+ * current loader adjust count,only effective once,use to rebalance.
+ */
+ private int loadClient = -1;
- Map connetions = new HashMap();
+ private static final long EXPIRE_MILLSECOND = 10000L;
+
+ private ScheduledExecutorService executors = Executors.newScheduledThreadPool(2);
@Autowired
private ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry;
+ Map connetions = new ConcurrentHashMap();
+
/**
* check connnectionid is valid.
*
@@ -110,4 +130,112 @@ public void refreshActiveTime(String connnectionId) {
}
}
+ /**
+ * Start Task:Expel the connection which active Time expire.
+ */
+ @PostConstruct
+ public void start() {
+
+ // Start UnHeathy Conection Expel Task.
+ executors.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long currentStamp = System.currentTimeMillis();
+ Set> entries = connetions.entrySet();
+ boolean isLoaderClient = loadClient > 0;
+ int currentMaxClient = isLoaderClient ? loadClient : maxClient;
+ int expelCount = currentMaxClient < 0 ? currentMaxClient : entries.size() - currentMaxClient;
+ List expelClient = new LinkedList();
+
+ List expireCLients = new LinkedList();
+ for (Map.Entry entry : entries) {
+ Connection client = entry.getValue();
+ long lastActiveTimestamp = entry.getValue().getLastActiveTimestamp();
+ if (currentStamp - lastActiveTimestamp > EXPIRE_MILLSECOND) {
+ expireCLients.add(client.getConnectionId());
+ expelCount--;
+ } else if (expelCount > 0) {
+ expelClient.add(client.getConnectionId());
+ expelCount--;
+ }
+ }
+
+ for (String expireClient : expireCLients) {
+ unregister(expireClient);
+ Loggers.GRPC.info("expire connection found ,success expel connectionid = {} ", expireClient);
+ }
+
+ for (String expeledClient : expelClient) {
+ try {
+ Connection connection = getConnection(expeledClient);
+ if (connection != null) {
+ if (connection.isSwitching()) {
+ continue;
+ }
+ connection.sendPushNoAck(new ConnectResetResponse());
+ connection.setStatus(Connection.SWITCHING);
+ Loggers.GRPC.info("expel connection ,send switch server response connectionid = {} ",
+ expeledClient);
+ }
+
+ } catch (ConnectionAlreadyClosedException e) {
+ unregister(expeledClient);
+ } catch (Exception e) {
+ Loggers.GRPC.error("error occurs when expel connetion :", expeledClient, e);
+ }
+
+ }
+
+ //reset loader client
+ if (isLoaderClient) {
+ loadClient = -1;
+ }
+
+ } catch (Exception e) {
+ Loggers.GRPC.error("error occurs when heathy check... ", e);
+ }
+ }
+ }, 500L, 3000L, TimeUnit.MILLISECONDS);
+
+ }
+
+ public void coordinateMaxClientsSmoth(int maxClient) {
+ this.maxClient = maxClient;
+ }
+
+ public void loadClientsSmoth(int loadClient) {
+ this.loadClient = loadClient;
+ }
+
+ public int currentClients() {
+ return connetions.size();
+ }
+
+ /**
+ * expel all connections.
+ */
+ public void expelAll() {
+ //reject all new connections.
+ this.maxClient = 0;
+ //send connect reset response to all clients.
+ for (Map.Entry entry : connetions.entrySet()) {
+ Connection client = entry.getValue();
+ try {
+ client.sendPushNoAck(new ConnectResetResponse());
+ } catch (Exception e) {
+ //Do Nothing.
+ }
+ }
+ }
+
+ /**
+ * check if over limit.
+ *
+ * @return
+ */
+ public boolean isOverLimit() {
+ return maxClient > 0 && this.connetions.size() >= maxClient;
+ }
+
}
diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java b/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java
index 76fa182e2a1..56ac8c7e74b 100644
--- a/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java
+++ b/core/src/main/java/com/alibaba/nacos/core/remote/RpcPushService.java
@@ -55,9 +55,11 @@ public boolean push(String connectionId, ServerPushResponse response, long timeo
} catch (Exception e) {
Loggers.GRPC.error("error to send push response to connectionId ={},push response={}", connectionId,
response, e);
+ return false;
}
+ } else {
+ return true;
}
- return false;
}
/**
@@ -66,7 +68,7 @@ public boolean push(String connectionId, ServerPushResponse response, long timeo
* @param connectionId connectionId.
* @param response response.
*/
- public Future pushWithFuture(String connectionId, ServerPushResponse response, long timeoutMills) {
+ public Future pushWithFuture(String connectionId, ServerPushResponse response) {
Connection connection = connectionManager.getConnection(connectionId);
if (connection != null) {
diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/RpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/RpcServer.java
index 897571228c9..9372d14810c 100644
--- a/core/src/main/java/com/alibaba/nacos/core/remote/RpcServer.java
+++ b/core/src/main/java/com/alibaba/nacos/core/remote/RpcServer.java
@@ -27,19 +27,34 @@
public abstract class RpcServer {
@Autowired
- private ConnectCoordinator connectCoordinator;
+ private ConnectionManager connectionManager;
/**
* Start sever.
*/
public abstract void start() throws Exception;
+ /**
+ * the increase offset of nacos server port for rpc server port.
+ *
+ * @return
+ */
+ public abstract int rpcPortOffset();
+
/**
* Stop Server.
*/
public abstract void stop() throws Exception;
public void setMaxClientCount(int maxClient) {
- this.connectCoordinator.coordinateMaxClientsSmoth(maxClient);
+ this.connectionManager.coordinateMaxClientsSmoth(maxClient);
+ }
+
+ public void reloadClient(int loadCount) {
+ this.connectionManager.loadClientsSmoth(loadCount);
+ }
+
+ public int currentClients() {
+ return this.connectionManager.currentClients();
}
}
diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcAckSynchronizer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcAckSynchronizer.java
index 6451c5bc309..8fe28a4af16 100644
--- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcAckSynchronizer.java
+++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcAckSynchronizer.java
@@ -17,6 +17,7 @@
package com.alibaba.nacos.core.remote.grpc;
import com.alibaba.nacos.api.remote.response.PushCallBack;
+import com.alibaba.nacos.core.utils.Loggers;
import com.alipay.hessian.clhm.ConcurrentLinkedHashMap;
import com.alipay.hessian.clhm.EvictionListener;
@@ -40,19 +41,20 @@ public class GrpcAckSynchronizer {
private static final Map ACK_WAITORS = new HashMap();
- private static final long TIMEOUT = 3000L;
+ private static final long TIMEOUT = 60000L;
static ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
- static Map callPool = new ConcurrentLinkedHashMap.Builder()
- .maximumWeightedCapacity(20000).listener(new EvictionListener() {
+ private static final Map CALLBACK_CONTEXT = new ConcurrentLinkedHashMap.Builder()
+ .maximumWeightedCapacity(30000).listener(new EvictionListener() {
@Override
public void onEviction(String s, PushCallBackWraper pushCallBack) {
if (System.currentTimeMillis() - pushCallBack.getTimeStamp() > TIMEOUT && pushCallBack
.tryDeActive()) {
+ Loggers.CORE.warn("time out on eviction:" + pushCallBack.ackId);
pushCallBack.getPushCallBack().onTimeout();
} else {
- pushCallBack.getPushCallBack().onFail();
+ pushCallBack.getPushCallBack().onFail(new RuntimeException("callback pool overlimit"));
}
}
}).build();
@@ -63,14 +65,15 @@ public void onEviction(String s, PushCallBackWraper pushCallBack) {
public void run() {
Set timeOutCalls = new HashSet<>();
long now = System.currentTimeMillis();
- for (Map.Entry enrty : callPool.entrySet()) {
+ for (Map.Entry enrty : CALLBACK_CONTEXT.entrySet()) {
if (now - enrty.getValue().getTimeStamp() > TIMEOUT) {
timeOutCalls.add(enrty.getKey());
}
}
for (String ackId : timeOutCalls) {
- PushCallBackWraper remove = callPool.remove(ackId);
+ PushCallBackWraper remove = CALLBACK_CONTEXT.remove(ackId);
if (remove != null && remove.tryDeActive()) {
+ Loggers.CORE.warn("time out on scheduler:" + ackId);
remove.pushCallBack.onTimeout();
}
}
@@ -84,13 +87,13 @@ public void run() {
* @param ackId ackId.
*/
public static void ackNotify(String ackId, boolean success) {
-
- PushCallBackWraper currentCallback = callPool.remove(ackId);
+
+ PushCallBackWraper currentCallback = CALLBACK_CONTEXT.remove(ackId);
if (currentCallback != null && currentCallback.tryDeActive()) {
if (success) {
currentCallback.pushCallBack.onSuccess();
} else {
- currentCallback.pushCallBack.onFail();
+ currentCallback.pushCallBack.onFail(new RuntimeException("client return fail"));
}
}
@@ -142,7 +145,8 @@ public static boolean waitAck(String ackId, long timeout) throws Exception {
* @param ackId ackId.
*/
public static void syncCallbackOnAck(String ackId, PushCallBack pushCallBack) throws Exception {
- PushCallBackWraper pushCallBackPrev = callPool.putIfAbsent(ackId, new PushCallBackWraper(pushCallBack, ackId));
+ PushCallBackWraper pushCallBackPrev = CALLBACK_CONTEXT
+ .putIfAbsent(ackId, new PushCallBackWraper(pushCallBack, ackId));
if (pushCallBackPrev != null) {
throw new RuntimeException("callback conflict.");
}
diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java
index d063e25439c..f8ada0c0413 100644
--- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java
+++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcConnection.java
@@ -21,6 +21,7 @@
import com.alibaba.nacos.common.remote.exception.ConnectionAlreadyClosedException;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
+import com.alibaba.nacos.core.utils.Loggers;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
@@ -39,9 +40,9 @@
public class GrpcConnection extends Connection {
static ThreadPoolExecutor pushWorkers = new ThreadPoolExecutor(10, 50, 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(5000));
+ new LinkedBlockingQueue<>(50000));
- private static final long MAX_TIMEOUTS = 500L;
+ private static final long MAX_TIMEOUTS = 5000L;
private StreamObserver streamObserver;
@@ -55,11 +56,13 @@ public boolean sendPush(ServerPushResponse request, long timeout) throws Excepti
try {
String requestId = String.valueOf(PushAckIdGenerator.getNextId());
request.setRequestId(requestId);
+
streamObserver.onNext(GrpcUtils.convert(request, requestId));
try {
- GrpcAckSynchronizer.waitAck(requestId, timeout);
+ return GrpcAckSynchronizer.waitAck(requestId, timeout);
} catch (Exception e) {
- e.printStackTrace();
+ //Do nothing,return fail.
+ return false;
} finally {
GrpcAckSynchronizer.release(requestId);
}
@@ -70,7 +73,6 @@ public boolean sendPush(ServerPushResponse request, long timeout) throws Excepti
}
throw e;
}
- return false;
}
private void sendPushWithCallback(ServerPushResponse request, PushCallBack callBack) {
@@ -78,13 +80,15 @@ private void sendPushWithCallback(ServerPushResponse request, PushCallBack callB
String requestId = String.valueOf(PushAckIdGenerator.getNextId());
request.setRequestId(requestId);
streamObserver.onNext(GrpcUtils.convert(request, requestId));
+ Loggers.CORE.warn("sync callback with ackid:" + requestId);
GrpcAckSynchronizer.syncCallbackOnAck(requestId, callBack);
} catch (Exception e) {
if (e instanceof StatusRuntimeException) {
//return true where client is not active yet.
callBack.onSuccess();
+ return;
}
- callBack.onFail();
+ callBack.onFail(e);
}
}
diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestHandlerReactor.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestHandlerReactor.java
index 21180bc02c4..fbc987c247d 100644
--- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestHandlerReactor.java
+++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcRequestHandlerReactor.java
@@ -65,7 +65,9 @@ public void request(GrpcRequest grpcRequest, StreamObserver respon
PushAckRequest request = JacksonUtils
.toObj(grpcRequest.getBody().getValue().toStringUtf8(), PushAckRequest.class);
GrpcAckSynchronizer.ackNotify(request.getAckId(), request.isSuccess());
+ responseObserver.onNext(GrpcUtils.convert(new ServerCheckResponse(), ""));
responseObserver.onCompleted();
+ return;
}
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java
index 58e5351d7da..a74fe55bde8 100644
--- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java
+++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcServer.java
@@ -16,6 +16,7 @@
package com.alibaba.nacos.core.remote.grpc;
+import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.RequestHandlerRegistry;
import com.alibaba.nacos.core.remote.RpcServer;
import com.alibaba.nacos.core.utils.ApplicationUtils;
@@ -36,6 +37,8 @@
@Service
public class GrpcServer extends RpcServer {
+ private static final int PORT_OFFSET = 1000;
+
private Server server;
@Autowired
@@ -47,7 +50,10 @@ public class GrpcServer extends RpcServer {
@Autowired
private RequestHandlerRegistry requestHandlerRegistry;
- int grpcServerPort = ApplicationUtils.getPort() + 1000;
+ @Autowired
+ private ConnectionManager connectionManager;
+
+ int grpcServerPort = ApplicationUtils.getPort() + rpcPortOffset();
private void init() {
Loggers.GRPC.info("Nacos gRPC server initiazing Component ...");
@@ -78,9 +84,22 @@ public void run() {
}
+ @Override
+ public int rpcPortOffset() {
+ return PORT_OFFSET;
+ }
+
@Override
public void stop() {
if (server != null) {
+ Loggers.GRPC.info("Expel all clients...");
+ connectionManager.expelAll();
+ try {
+ //wait clients to switch server.
+ Thread.sleep(2000L);
+ } catch (InterruptedException e) {
+ //Do nothing.
+ }
server.shutdown();
}
}
diff --git a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcStreamRequestHanderImpl.java b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcStreamRequestHanderImpl.java
index 8832ee95bf0..24cd6771b89 100644
--- a/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcStreamRequestHanderImpl.java
+++ b/core/src/main/java/com/alibaba/nacos/core/remote/grpc/GrpcStreamRequestHanderImpl.java
@@ -20,6 +20,7 @@
import com.alibaba.nacos.api.grpc.GrpcRequest;
import com.alibaba.nacos.api.grpc.GrpcResponse;
import com.alibaba.nacos.api.grpc.RequestStreamGrpc;
+import com.alibaba.nacos.api.remote.response.ConnectResetResponse;
import com.alibaba.nacos.core.remote.Connection;
import com.alibaba.nacos.core.remote.ConnectionManager;
import com.alibaba.nacos.core.remote.ConnectionMetaInfo;
@@ -49,6 +50,16 @@ public void requestStream(GrpcRequest request, StreamObserver resp
ConnectionMetaInfo metaInfo = new ConnectionMetaInfo(connectionId, clientIp, ConnectionType.GRPC.getType(),
version);
Connection connection = new GrpcConnection(metaInfo, responseObserver);
- connectionManager.register(connectionId, connection);
+ if (connectionManager.isOverLimit()) {
+ //Not register to the connection manager if current server is over limit.
+ try {
+ connection.sendPushNoAck(new ConnectResetResponse());
+ } catch (Exception e) {
+ //Do nothing.
+ }
+ } else {
+ connectionManager.register(connectionId, connection);
+
+ }
}
}