Skip to content

Commit

Permalink
Merge branch 'develop' into summer2021_issue#5695
Browse files Browse the repository at this point in the history
  • Loading branch information
li-xiao-shuang authored Jan 7, 2022
2 parents 1de8d8d + 27ed668 commit 964e1b8
Show file tree
Hide file tree
Showing 60 changed files with 2,056 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import static com.alibaba.nacos.api.common.Constants.Naming.NAMING_MODULE;

/**
* Notify subscriber response.
* Notify subscriber request.
*
* @author xiweng.yy
*/
Expand All @@ -44,22 +44,12 @@ public String getModule() {
return NAMING_MODULE;
}

private NotifySubscriberRequest(ServiceInfo serviceInfo, String message) {
private NotifySubscriberRequest(ServiceInfo serviceInfo) {
this.serviceInfo = serviceInfo;
}

public static NotifySubscriberRequest buildSuccessResponse(ServiceInfo serviceInfo) {
return new NotifySubscriberRequest(serviceInfo, "success");
}

/**
* Build fail response.
*
* @param message error message
* @return fail response
*/
public static NotifySubscriberRequest buildFailResponse(String message) {
return new NotifySubscriberRequest();
public static NotifySubscriberRequest buildNotifySubscriberRequest(ServiceInfo serviceInfo) {
return new NotifySubscriberRequest(serviceInfo);
}

public ServiceInfo getServiceInfo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

public class PayloadRegistry {

private static final Map<String, Class> REGISTRY_REQUEST = new HashMap<String, Class>();
private static final Map<String, Class<?>> REGISTRY_REQUEST = new HashMap<>();

static boolean initialized = false;

Expand All @@ -55,7 +55,7 @@ private static synchronized void scan() {
for (String pkg : requestScanPackage) {
Reflections reflections = new Reflections(pkg);
Set<Class<? extends Request>> subTypesRequest = reflections.getSubTypesOf(Request.class);
for (Class clazz : subTypesRequest) {
for (Class<?> clazz : subTypesRequest) {
register(clazz.getSimpleName(), clazz);
}
}
Expand All @@ -67,15 +67,15 @@ private static synchronized void scan() {
for (String pkg : responseScanPackage) {
Reflections reflections = new Reflections(pkg);
Set<Class<? extends Response>> subTypesOfResponse = reflections.getSubTypesOf(Response.class);
for (Class clazz : subTypesOfResponse) {
for (Class<?> clazz : subTypesOfResponse) {
register(clazz.getSimpleName(), clazz);
}
}

initialized = true;
}

static void register(String type, Class clazz) {
static void register(String type, Class<?> clazz) {
if (Modifier.isAbstract(clazz.getModifiers())) {
return;
}
Expand All @@ -88,7 +88,7 @@ static void register(String type, Class clazz) {
REGISTRY_REQUEST.put(type, clazz);
}

public static Class getClassByType(String type) {
public static Class<?> getClassByType(String type) {
return REGISTRY_REQUEST.get(type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ public class ClientWorker implements Closeable {
private int taskPenaltyTime;

private boolean enableRemoteSyncConfig = false;


private static final int MIN_THREAD_NUM = 2;

private static final int THREAD_MULTIPLE = 1;

/**
* Add listeners for data.
*
Expand Down Expand Up @@ -459,9 +463,9 @@ public ClientWorker(final ConfigFilterChainManager configFilterChainManager, Ser
init(properties);

agent = new ConfigRpcTransportClient(properties, serverListManager);

int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);
ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(ThreadUtils.getSuitableThreadCount(1), r -> {
.newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker");
t.setDaemon(true);
Expand Down Expand Up @@ -1028,7 +1032,7 @@ public ConfigResponse queryConfig(String dataId, String group, String tenant, lo
LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", this.getName(), dataId,
group, tenant, response);
throw new NacosException(response.getErrorCode(),
"http error, code=" + response.getErrorCode() + ",dataId=" + dataId + ",group=" + group
"http error, code=" + response.getErrorCode() + ",msg=" + response.getMessage() + ",dataId=" + dataId + ",group=" + group
+ ",tenant=" + tenant);

}
Expand All @@ -1047,11 +1051,7 @@ private Response requestProxy(RpcClient rpcClientInner, Request request, long ti
} catch (Exception e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}

Map<String, String> signHeaders = SpasAdapter.getSignHeaders(resourceBuild(request), secretKey);
if (signHeaders != null && !signHeaders.isEmpty()) {
request.putAllHeader(signHeaders);
}
request.putAllHeader(SpasAdapter.getSignHeaders(resourceBuild(request), secretKey));
JsonObject asJsonObjectTemp = new Gson().toJsonTree(request).getAsJsonObject();
asJsonObjectTemp.remove("headers");
asJsonObjectTemp.remove("requestId");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.alibaba.nacos.client.constant;

import java.util.concurrent.TimeUnit;

/**
* All the constants.
*
Expand Down Expand Up @@ -56,4 +58,10 @@ public static class Protocols {
public static final String HTTPS = "https://";
}

public static class Security {

public static final long SECURITY_INFO_REFRESH_INTERVAL_MILLS = TimeUnit.SECONDS.toMillis(5);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,16 @@
import com.alibaba.nacos.client.naming.utils.InitUtils;
import com.alibaba.nacos.client.security.SecurityProxy;
import com.alibaba.nacos.client.utils.ValidatorUtils;
import com.alibaba.nacos.common.utils.ThreadUtils;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.alibaba.nacos.client.constant.Constants.Security.SECURITY_INFO_REFRESH_INTERVAL_MILLS;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;

/**
* Nacos naming maintain service.
Expand All @@ -48,6 +55,12 @@ public class NacosNamingMaintainService implements NamingMaintainService {

private NamingHttpClientProxy serverProxy;

private ServerListManager serverListManager;

private SecurityProxy securityProxy;

private ScheduledExecutorService executorService;

public NacosNamingMaintainService(String serverList) throws NacosException {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);
Expand All @@ -63,12 +76,26 @@ private void init(Properties properties) throws NacosException {
namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
InitUtils.initWebRootContext(properties);
ServerListManager serverListManager = new ServerListManager(properties, namespace);
SecurityProxy securityProxy = new SecurityProxy(properties,
serverListManager = new ServerListManager(properties, namespace);
securityProxy = new SecurityProxy(properties,
NamingHttpClientManager.getInstance().getNacosRestTemplate());
initSecurityProxy();
serverProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, null);
}

private void initSecurityProxy() {
this.executorService = new ScheduledThreadPoolExecutor(1, r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.naming.maintainService.security");
t.setDaemon(true);
return t;
});
this.securityProxy.login(serverListManager.getServerList());
this.executorService.scheduleWithFixedDelay(() -> securityProxy.login(serverListManager.getServerList()), 0,
SECURITY_INFO_REFRESH_INTERVAL_MILLS, TimeUnit.MILLISECONDS);

}

@Override
public void updateInstance(String serviceName, Instance instance) throws NacosException {
updateInstance(serviceName, Constants.DEFAULT_GROUP, instance);
Expand Down Expand Up @@ -167,6 +194,11 @@ public void updateService(Service service, AbstractSelector selector) throws Nac

@Override
public void shutDown() throws NacosException {
String className = this.getClass().getName();
NAMING_LOGGER.info("{} do shutdown begin", className);
serverListManager.shutdown();
serverProxy.shutdown();
ThreadUtils.shutdownThreadPool(executorService, NAMING_LOGGER);
NAMING_LOGGER.info("{} do shutdown stop", className);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public List<Instance> getAllInstances(String serviceName, String groupName, List
String clusterString = StringUtils.join(clusters, ",");
if (subscribe) {
serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
if (null == serviceInfo) {
if (null == serviceInfo || !clientProxy.isSubscribed(serviceName, groupName, clusterString)) {
serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ public class ServiceInfoHolder implements Closeable {
public ServiceInfoHolder(String namespace, Properties properties) {
initCacheDir(namespace, properties);
if (isLoadCacheAtStart(properties)) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
this.serviceInfoMap = new ConcurrentHashMap<>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
this.serviceInfoMap = new ConcurrentHashMap<>(16);
}
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushEmptyProtection = isPushEmptyProtect(properties);
Expand Down Expand Up @@ -185,6 +185,7 @@ private boolean isChangedServiceInfo(ServiceInfo oldService, ServiceInfo newServ
if (oldService.getLastRefTime() > newService.getLastRefTime()) {
NAMING_LOGGER.warn("out of date data received, old-t: {}, new-t: {}", oldService.getLastRefTime(),
newService.getLastRefTime());
return false;
}
boolean changed = false;
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@ ListView<String> getServiceList(int pageNo, int pageSize, String groupName, Abst
*/
void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException;

/**
* Judge whether service has been subscribed.
*
* @param serviceName service name
* @param groupName group name
* @param clusters clusters, current only support subscribe all clusters, maybe deprecated
* @return {@code true} if subscribed, otherwise {@code false}
* @throws NacosException nacos exception
*/
boolean isSubscribed(String serviceName, String groupName, String clusters) throws NacosException;

/**
* Update beat info.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.alibaba.nacos.client.constant.Constants.Security.SECURITY_INFO_REFRESH_INTERVAL_MILLS;
import static com.alibaba.nacos.client.utils.LogUtils.NAMING_LOGGER;

/**
Expand All @@ -48,8 +49,6 @@
*/
public class NamingClientProxyDelegate implements NamingClientProxy {

private final long securityInfoRefreshIntervalMills = TimeUnit.SECONDS.toMillis(5);

private final ServerListManager serverListManager;

private final ServiceInfoUpdateService serviceInfoUpdateService;
Expand Down Expand Up @@ -87,7 +86,7 @@ private void initSecurityProxy() {
});
this.securityProxy.login(serverListManager.getServerList());
this.executorService.scheduleWithFixedDelay(() -> securityProxy.login(serverListManager.getServerList()), 0,
securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
SECURITY_INFO_REFRESH_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -139,11 +138,12 @@ public ListView<String> getServiceList(int pageNo, int pageSize, String groupNam

@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
if (null == result) {
if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
}
serviceInfoHolder.processServiceInfo(result);
Expand All @@ -152,10 +152,16 @@ public ServiceInfo subscribe(String serviceName, String groupName, String cluste

@Override
public void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException {
NAMING_LOGGER.debug("[UNSUBSCRIBE-SERVICE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
serviceInfoUpdateService.stopUpdateIfContain(serviceName, groupName, clusters);
grpcClientProxy.unsubscribe(serviceName, groupName, clusters);
}

@Override
public boolean isSubscribed(String serviceName, String groupName, String clusters) throws NacosException {
return grpcClientProxy.isSubscribed(serviceName, groupName, clusters);
}

@Override
public void updateBeatInfo(Set<Instance> modifiedInstances) {
httpClientProxy.updateBeatInfo(modifiedInstances);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ public ListView<String> getServiceList(int pageNo, int pageSize, String groupNam

@Override
public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
}
redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
return doSubscribe(serviceName, groupName, clusters);
}
Expand All @@ -231,10 +234,18 @@ public ServiceInfo doSubscribe(String serviceName, String groupName, String clus

@Override
public void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[GRPC-UNSUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
}
redoService.subscriberDeregister(serviceName, groupName, clusters);
doUnsubscribe(serviceName, groupName, clusters);
}

@Override
public boolean isSubscribed(String serviceName, String groupName, String clusters) throws NacosException {
return redoService.isSubscriberRegistered(serviceName, groupName, clusters);
}

/**
* Execute unsubscribe operation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,22 @@ public void subscriberDeregister(String serviceName, String groupName, String cl
}
}

/**
* Judge subscriber has registered to server.
*
* @param serviceName service name
* @param groupName group name
* @param cluster cluster
* @return {@code true} if subscribed, otherwise {@code false}
*/
public boolean isSubscriberRegistered(String serviceName, String groupName, String cluster) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), cluster);
synchronized (subscribes) {
SubscriberRedoData redoData = subscribes.get(key);
return null != redoData && redoData.isRegistered();
}
}

/**
* Remove subscriber for redo.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,11 @@ public ServiceInfo subscribe(String serviceName, String groupName, String cluste
public void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException {
}

@Override
public boolean isSubscribed(String serviceName, String groupName, String clusters) throws NacosException {
return true;
}

@Override
public void updateBeatInfo(Set<Instance> modifiedInstances) {
for (Instance instance : modifiedInstances) {
Expand Down
Loading

0 comments on commit 964e1b8

Please sign in to comment.