Skip to content

Commit

Permalink
Add gprc support-> client reconnection optimize (#3367)
Browse files Browse the repository at this point in the history
  • Loading branch information
shiyiyue1102 authored Jul 17, 2020
1 parent 688013b commit 4a702f8
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,33 @@ public boolean isWaitInited() {
return this.rpcClientStatus.get() == RpcClientStatus.WAIT_INIT;
}

/**
* check is this client is running.
*
* @return
*/
public boolean isRunning() {
return this.rpcClientStatus.get() == RpcClientStatus.RUNNING;
}

/**
* check is this client is in init status,have not start th client.
*
* @return
*/
public boolean isInitStatus() {
return this.rpcClientStatus.get() == RpcClientStatus.INITED;
}

/**
* check is this client is in starting process.
*
* @return
*/
public boolean isStarting() {
return this.rpcClientStatus.get() == RpcClientStatus.STARTING;
}

/**
* listener called where connect status changed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -97,35 +95,42 @@ public GrpcClient(ServerListFactory serverListFactory) {
}

/**
* tryConnectServer. 1.if in start stage, this method will return true after success to connect to th server or
* return false after timeout . 2.if in running stage ,this method will start a thread to reconnect the server
* asynchronous,and will return true directly.
* tryConnectServer. this method will start a thread to connect the server asynchronous
*
* @return
*/
private boolean tryConnectServer() {
LOGGER.error("tryConnectServer.....clientStarus={},currentServer={}", rpcClientStatus.get(),
private void tryConnectServer() {

LOGGER.info("try connect server.....clientStarus={},currentServer={}", rpcClientStatus.get(),
getServerListFactory().getCurrentServer());
//当前状态未运行中,说明是运行期异常,并且没有其他线程启动重联
if (rpcClientStatus.get() == RpcClientStatus.RUNNING) {
boolean updateSucess = rpcClientStatus
.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.RE_CONNECTING);
if (isRunning() || isInitStatus()) {
final RpcClientStatus prevStatus = rpcClientStatus.get();
boolean updateSucess = false;
if (isRunning()) {
updateSucess = rpcClientStatus.compareAndSet(prevStatus, RpcClientStatus.RE_CONNECTING);
} else {
updateSucess = rpcClientStatus.compareAndSet(prevStatus, RpcClientStatus.STARTING);
}

if (updateSucess) {
executorService.schedule(new Runnable() {
@Override
public void run() {
while (rpcClientStatus.get() != RpcClientStatus.RUNNING) {


// loop until start client success.
while (!isRunning()) {

buildClientAtFirstTime();
boolean sucess = serverCheck();
if (sucess) {
System.out.println("Service check success ....");
notifyReConnected();
rpcClientStatus.compareAndSet(RpcClientStatus.RE_CONNECTING, RpcClientStatus.RUNNING);
if (rpcClientStatus.get() == RpcClientStatus.RE_CONNECTING) {
notifyReConnected();
}
System.out.println("Current Server..." + getServerListFactory().getCurrentServer());
rpcClientStatus.compareAndSet(rpcClientStatus.get(), RpcClientStatus.RUNNING);
reConnectTimesLeft.set(MAX_RECONNECT_TIMES);

} else {

int leftRetryTimes = reConnectTimesLeft.decrementAndGet();
if (leftRetryTimes <= 0) {
getServerListFactory().genNextServer();
Expand All @@ -149,74 +154,13 @@ public void run() {
}, 0L, TimeUnit.MILLISECONDS);
}

return true;

} else if (rpcClientStatus.get() == RpcClientStatus.RE_CONNECTING
|| rpcClientStatus.get() == RpcClientStatus.STARTING) {
// Direct return if current client is in reconnting status...
return true;
} else if (rpcClientStatus.get() == RpcClientStatus.INITED) {
//First time to start ....

boolean updateStatusSucess = rpcClientStatus
.compareAndSet(RpcClientStatus.INITED, RpcClientStatus.STARTING);
if (!updateStatusSucess) {
return true;
}

try {
buildClient();
} catch (NacosException e) {
LOGGER.error("Fail to build client firt time in start. ", e);
}
ScheduledFuture<Boolean> future = executorService.schedule(new Callable<Boolean>() {

@Override
public Boolean call() throws Exception {

while (rpcClientStatus.get() != RpcClientStatus.RUNNING) {
boolean sucess = serverCheck();
if (sucess) {
rpcClientStatus.compareAndSet(RpcClientStatus.RE_CONNECTING, RpcClientStatus.RUNNING);
reConnectTimesLeft.set(MAX_RECONNECT_TIMES);
return true;
} else {

int leftRetryTimes = reConnectTimesLeft.decrementAndGet();
if (leftRetryTimes <= 0) {
getServerListFactory().genNextServer();
reConnectTimesLeft.set(MAX_RECONNECT_TIMES);
try {
reBuildClient();
} catch (NacosException e) {
LOGGER.error("Fail to build client in start. ", e);
}
}
}

try {
Thread.sleep(200L);
} catch (InterruptedException e) {
//do nothing
}
}
return true;
}
}, 0L, TimeUnit.MILLISECONDS);

try {
Boolean aBoolean = future.get(10000L, TimeUnit.MILLISECONDS);
return aBoolean.booleanValue();
} catch (Exception e) {
LOGGER.error("Fail to start RpcCLient . ", e);
return false;
}

// Do nothing if current client is in reconnting and starting status...
}
return true;

}


@Override
public void start() throws NacosException {

Expand All @@ -235,8 +179,8 @@ public void start() throws NacosException {
public void run() {
sendBeat();
}
}, 5000, 10000, TimeUnit.MILLISECONDS);

}, 5000, 2000, TimeUnit.MILLISECONDS);
rpcClientStatus.compareAndSet(RpcClientStatus.STARTING, RpcClientStatus.RUNNING);

super.registerServerPushResponseHandler(new ServerPushResponseHandler() {
Expand All @@ -246,8 +190,6 @@ public void responseReply(Response response) {
try {
buildClient();
} catch (Exception e) {
System.out.println(e);
e.printStackTrace();
LOGGER.error("rebuildClient error ", e);
}

Expand All @@ -262,11 +204,11 @@ public void responseReply(Response response) {
*/
public void sendBeat() {
try {

if (this.rpcClientStatus.get() == RpcClientStatus.RE_CONNECTING) {
return;
}

GrpcMetadata meta = GrpcMetadata.newBuilder().setConnectionId(connectionId).setClientIp(NetUtils.localIP())
.build();
HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
Expand All @@ -275,12 +217,7 @@ public void sendBeat() {
.build()).build();
GrpcResponse response = grpcServiceStub.request(streamRequest);
} catch (Exception e) {

System.out.println(e);
e.printStackTrace(System.out);
// 心跳失败
LOGGER.error("Send heart beat error ", e);

LOGGER.error("Send heart beat error,will tring to reconnet to server ", e);
tryConnectServer();
}
}
Expand All @@ -302,12 +239,22 @@ private boolean serverCheck() {

private void reBuildClient() throws NacosException {
if (this.channel != null && !this.channel.isShutdown()) {
System.out.println("Shutdown curent channel...");
this.channel.shutdown();
this.channel.shutdownNow();
}
buildClient();
}

private void buildClientAtFirstTime() {
if (this.channel == null) {
try {
LOGGER.info("trying to init build client");
buildClient();
} catch (NacosException e) {
LOGGER.error("init build client fail", e);
}
}
}

private void buildClient() throws NacosException {

String serverAddress = getServerListFactory().getCurrentServer();
Expand All @@ -322,8 +269,6 @@ private void buildClient() throws NacosException {
serverIp = serverAddress.split(":")[0];
serverPort += Integer.valueOf(serverAddress.split(":")[1]);
}

System.out.println("Build client... " + getServerListFactory().getCurrentServer());

LOGGER.info("GrpcClient start to connect to rpc server, serverIp={},port={}", serverIp, serverPort);

Expand All @@ -338,7 +283,6 @@ private void buildClient() throws NacosException {
GrpcRequest streamRequest = GrpcRequest.newBuilder().setMetadata(meta).build();

LOGGER.info("GrpcClient send stream request grpc server,streamRequest:{}", streamRequest);

grpcStreamServiceStub.requestStream(streamRequest, new StreamObserver<GrpcResponse>() {
@Override
public void onNext(GrpcResponse grpcResponse) {
Expand Down Expand Up @@ -368,12 +312,10 @@ public void accept(ServerPushResponseHandler serverPushResponseHandler) {

@Override
public void onError(Throwable throwable) {

}

@Override
public void onCompleted() {

}
});

Expand Down Expand Up @@ -401,6 +343,7 @@ public Response request(Request request) throws NacosException {
return (PlainBodyResponse) myresponse;
}
} catch (Exception e) {
e.printStackTrace(System.out);
throw new NacosException(NacosException.SERVER_ERROR, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ public class ConfigTest {
@Before
public void before() throws Exception {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:28848");
//properties.setProperty(PropertyKeyConst.SERVER_ADDR, "127.0.0.1:28848");
properties.setProperty(PropertyKeyConst.SERVER_ADDR,
"11.239.114.187:8848,11.239.113.204:8848,11.239.112.161:8848");

configService = NacosFactory.createConfigService(properties);
}

Expand Down

0 comments on commit 4a702f8

Please sign in to comment.