Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cache removed when listeneradd delay #9311

Merged
merged 2 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ public class CacheData {
*/
private volatile boolean isSyncWithServer = false;

/**
* if is cache data is discard,need to remove.
*/
private volatile boolean isDiscard = false;

private String type;

public boolean isInitializing() {
Expand Down Expand Up @@ -402,6 +407,14 @@ public void setSyncWithServer(boolean syncWithServer) {
isSyncWithServer = syncWithServer;
}

public boolean isDiscard() {
return isDiscard;
}

public void setDiscard(boolean discard) {
isDiscard = discard;
}

public CacheData(ConfigFilterChainManager configFilterChainManager, String name, String dataId, String group) {
this(configFilterChainManager, name, dataId, group, TenantUtil.getUserTenantForAcm());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ public class ClientWorker implements Closeable {
private int taskPenaltyTime;

private boolean enableRemoteSyncConfig = false;

private static final int MIN_THREAD_NUM = 2;

private static final int THREAD_MULTIPLE = 1;

/**
* Add listeners for data.
*
Expand All @@ -146,6 +146,7 @@ public void addListeners(String dataId, String group, List<? extends Listener> l
for (Listener listener : listeners) {
cache.addListener(listener);
}
cache.setDiscard(false);
cache.setSyncWithServer(false);
agent.notifyListenConfig();

Expand All @@ -169,6 +170,7 @@ public void addTenantListeners(String dataId, String group, List<? extends Liste
for (Listener listener : listeners) {
cache.addListener(listener);
}
cache.setDiscard(false);
cache.setSyncWithServer(false);
agent.notifyListenConfig();
}
Expand Down Expand Up @@ -196,6 +198,7 @@ public void addTenantListenersWithContent(String dataId, String group, String co
for (Listener listener : listeners) {
cache.addListener(listener);
}
cache.setDiscard(false);
cache.setSyncWithServer(false);
agent.notifyListenConfig();
}
Expand All @@ -217,6 +220,7 @@ public void removeListener(String dataId, String group, Listener listener) {
cache.removeListener(listener);
if (cache.getListeners().isEmpty()) {
cache.setSyncWithServer(false);
cache.setDiscard(true);
agent.removeCache(dataId, group);
}
}
Expand All @@ -240,6 +244,7 @@ public void removeTenantListener(String dataId, String group, Listener listener)
cache.removeListener(listener);
if (cache.getListeners().isEmpty()) {
cache.setSyncWithServer(false);
cache.setDiscard(true);
agent.removeCache(dataId, group);
}
}
Expand Down Expand Up @@ -696,7 +701,7 @@ public void startInternal() {
continue;
}
executeConfigListen();
} catch (Exception e) {
} catch (Throwable e) {
LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
}
}
Expand Down Expand Up @@ -733,7 +738,7 @@ public void executeConfigListen() {
}
}

if (!CollectionUtils.isEmpty(cache.getListeners())) {
if (!cache.isDiscard()) {
//get listen config
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
Expand All @@ -744,7 +749,7 @@ public void executeConfigListen() {
cacheDatas.add(cache);

}
} else if (CollectionUtils.isEmpty(cache.getListeners())) {
} else if (cache.isDiscard()) {

if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
Expand Down Expand Up @@ -807,8 +812,8 @@ public void executeConfigListen() {
if (!cacheData.getListeners().isEmpty()) {

Long previousTimesStamp = timestampMap.get(groupKey);
if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp,
System.currentTimeMillis())) {
if (previousTimesStamp != null && !cacheData.getLastModifiedTs()
.compareAndSet(previousTimesStamp, System.currentTimeMillis())) {
continue;
}
cacheData.setSyncWithServer(true);
Expand Down Expand Up @@ -844,7 +849,7 @@ public void executeConfigListen() {
if (removeSuccess) {
for (CacheData cacheData : removeListenCaches) {
synchronized (cacheData) {
if (cacheData.getListeners().isEmpty()) {
if (cacheData.isDiscard()) {
ClientWorker.this
.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
}
Expand Down Expand Up @@ -979,8 +984,8 @@ public ConfigResponse queryConfig(String dataId, String group, String tenant, lo
LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", this.getName(), dataId,
group, tenant, response);
throw new NacosException(response.getErrorCode(),
"http error, code=" + response.getErrorCode() + ",msg=" + response.getMessage() + ",dataId=" + dataId + ",group=" + group
+ ",tenant=" + tenant);
"http error, code=" + response.getErrorCode() + ",msg=" + response.getMessage() + ",dataId="
+ dataId + ",group=" + group + ",tenant=" + tenant);

}
}
Expand Down