Skip to content

Commit

Permalink
[ISSUE#3658] Move distro sync code to nacos-core module (#3750)
Browse files Browse the repository at this point in the history
* Move distro sync code to nacos-core module

* Update unit test
  • Loading branch information
KomachiSion authored Sep 3, 2020
1 parent 043a504 commit 37bb909
Show file tree
Hide file tree
Showing 40 changed files with 306 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency;
package com.alibaba.nacos.consistency;

/**
* Apply action.
*
* @author nkorange
*/
public enum ApplyAction {
public enum DataOperation {
/**
* Data add.
*/
ADD,
/**
* Data changed.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.core.distributed.distro;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
* Distro configuration.
*
* @author xiweng.yy
*/
@Component
public class DistroConfig {

@Value("${nacos.core.protocol.distro.data.sync_delay_ms:1000}")
private long syncDelayMillis = 1000;

@Value("${nacos.core.protocol.distro.data.sync_retry_delay_ms:3000}")
private long syncRetryDelayMillis = 3000;

@Value("${nacos.core.protocol.distro.data.verify_interval_ms:5000}")
private long verifyIntervalMillis = 5000;

@Value("${nacos.core.protocol.distro.data.load_retry_delay_ms:30000}")
private long loadDataRetryDelayMillis = 30000;

public long getSyncDelayMillis() {
return syncDelayMillis;
}

public void setSyncDelayMillis(long syncDelayMillis) {
this.syncDelayMillis = syncDelayMillis;
}

public long getSyncRetryDelayMillis() {
return syncRetryDelayMillis;
}

public void setSyncRetryDelayMillis(long syncRetryDelayMillis) {
this.syncRetryDelayMillis = syncRetryDelayMillis;
}

public long getVerifyIntervalMillis() {
return verifyIntervalMillis;
}

public void setVerifyIntervalMillis(long verifyIntervalMillis) {
this.verifyIntervalMillis = verifyIntervalMillis;
}

public long getLoadDataRetryDelayMillis() {
return loadDataRetryDelayMillis;
}

public void setLoadDataRetryDelayMillis(long loadDataRetryDelayMillis) {
this.loadDataRetryDelayMillis = loadDataRetryDelayMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl;
package com.alibaba.nacos.core.distributed.distro;

import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component.DistroCallback;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component.DistroComponentHolder;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component.DistroDataProcessor;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component.DistroDataStorage;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component.DistroTransportAgent;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroData;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroKey;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.DistroTaskEngineHolder;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.delay.DistroDelayTask;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.load.DistroLoadDataTask;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.verify.DistroVerifyTask;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.core.distributed.distro.component.DistroCallback;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.component.DistroDataProcessor;
import com.alibaba.nacos.core.distributed.distro.component.DistroDataStorage;
import com.alibaba.nacos.core.distributed.distro.component.DistroTransportAgent;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask;
import com.alibaba.nacos.core.distributed.distro.task.load.DistroLoadDataTask;
import com.alibaba.nacos.core.distributed.distro.task.verify.DistroVerifyTask;
import com.alibaba.nacos.core.utils.GlobalExecutor;
import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.stereotype.Component;

/**
Expand All @@ -48,13 +48,16 @@ public class DistroProtocol {

private final DistroTaskEngineHolder distroTaskEngineHolder;

private final DistroConfig distroConfig;

private volatile boolean loadCompleted = false;

public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
DistroTaskEngineHolder distroTaskEngineHolder) {
DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroTaskEngineHolder = distroTaskEngineHolder;
this.distroConfig = distroConfig;
startVerifyTask();
}

Expand All @@ -64,28 +67,30 @@ private void startVerifyTask() {
public void onSuccess() {
loadCompleted = true;
}

@Override
public void onFailed(Throwable throwable) {
loadCompleted = false;
}
};
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask(memberManager, distroComponentHolder));
GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, loadCallback));
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask(memberManager, distroComponentHolder),
distroConfig.getVerifyIntervalMillis());
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}

public boolean isLoadCompleted() {
return loadCompleted;
}

/**
* Start to sync immediately.
* Start to sync by configured delay.
*
* @param distroKey distro key of sync data
* @param action the action of data operation
*/
public void sync(DistroKey distroKey, ApplyAction action) {
sync(distroKey, action, 0L);
public void sync(DistroKey distroKey, DataOperation action) {
sync(distroKey, action, distroConfig.getSyncDelayMillis());
}

/**
Expand All @@ -94,7 +99,7 @@ public void sync(DistroKey distroKey, ApplyAction action) {
* @param distroKey distro key of sync data
* @param action the action of data operation
*/
public void sync(DistroKey distroKey, ApplyAction action, long delay) {
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component;
package com.alibaba.nacos.core.distributed.distro.component;

/**
* Distro callback.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component;
package com.alibaba.nacos.core.distributed.distro.component;

import org.springframework.stereotype.Component;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component;
package com.alibaba.nacos.core.distributed.distro.component;

import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;

/**
* Distro data processor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component;
package com.alibaba.nacos.core.distributed.distro.component;

import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroData;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;

/**
* Distro data storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component;
package com.alibaba.nacos.core.distributed.distro.component;

import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroKey;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;

/**
* Distro failed task handler.
Expand All @@ -32,5 +32,5 @@ public interface DistroFailedTaskHandler {
* @param distroKey distro key of failed task
* @param action action of task
*/
void retry(DistroKey distroKey, ApplyAction action);
void retry(DistroKey distroKey, DataOperation action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component;
package com.alibaba.nacos.core.distributed.distro.component;

import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroData;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;

/**
* Distro transport agent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity;
package com.alibaba.nacos.core.distributed.distro.entity;

import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.consistency.DataOperation;

/**
* Distro data.
Expand All @@ -27,7 +27,7 @@ public class DistroData {

private DistroKey distroKey;

private ApplyAction type;
private DataOperation type;

private byte[] content;

Expand All @@ -47,11 +47,11 @@ public void setDistroKey(DistroKey distroKey) {
this.distroKey = distroKey;
}

public ApplyAction getType() {
public DataOperation getType() {
return type;
}

public void setType(ApplyAction type) {
public void setType(DataOperation type) {
this.type = type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity;
package com.alibaba.nacos.core.distributed.distro.entity;

import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.exception;
package com.alibaba.nacos.core.distributed.distro.exception;

/**
* Distro exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task;
package com.alibaba.nacos.core.distributed.distro.task;

import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component.DistroComponentHolder;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.delay.DistroDelayTaskExecuteEngine;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.delay.DistroDelayTaskProcessor;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.execute.DistroExecuteWorkersManager;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor;
import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteWorkersManager;
import org.springframework.stereotype.Component;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.delay;
package com.alibaba.nacos.core.distributed.distro.task.delay;

import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroKey;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;

/**
* Distro delay task.
Expand All @@ -29,15 +29,15 @@ public class DistroDelayTask extends AbstractDelayTask {

private final DistroKey distroKey;

private ApplyAction action;
private DataOperation action;

private long createTime;

public DistroDelayTask(DistroKey distroKey, long delayTime) {
this(distroKey, ApplyAction.CHANGE, delayTime);
this(distroKey, DataOperation.CHANGE, delayTime);
}

public DistroDelayTask(DistroKey distroKey, ApplyAction action, long delayTime) {
public DistroDelayTask(DistroKey distroKey, DataOperation action, long delayTime) {
this.distroKey = distroKey;
this.action = action;
this.createTime = System.currentTimeMillis();
Expand All @@ -49,7 +49,7 @@ public DistroKey getDistroKey() {
return distroKey;
}

public ApplyAction getAction() {
public DataOperation getAction() {
return action;
}

Expand Down
Loading

0 comments on commit 37bb909

Please sign in to comment.