Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE#3658] Move distro sync code to nacos-core module #3750

Merged
merged 2 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency;
package com.alibaba.nacos.consistency;

/**
* Apply action.
*
* @author nkorange
*/
public enum ApplyAction {
public enum DataOperation {
/**
* Data add.
*/
ADD,
/**
* Data changed.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.core.distributed.distro;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
* Distro configuration.
*
* @author xiweng.yy
*/
@Component
public class DistroConfig {

@Value("${nacos.core.protocol.distro.data.sync_delay_ms:1000}")
private long syncDelayMillis = 1000;

@Value("${nacos.core.protocol.distro.data.sync_retry_delay_ms:3000}")
private long syncRetryDelayMillis = 3000;

@Value("${nacos.core.protocol.distro.data.verify_interval_ms:5000}")
private long verifyIntervalMillis = 5000;

@Value("${nacos.core.protocol.distro.data.load_retry_delay_ms:30000}")
private long loadDataRetryDelayMillis = 30000;

public long getSyncDelayMillis() {
return syncDelayMillis;
}

public void setSyncDelayMillis(long syncDelayMillis) {
this.syncDelayMillis = syncDelayMillis;
}

public long getSyncRetryDelayMillis() {
return syncRetryDelayMillis;
}

public void setSyncRetryDelayMillis(long syncRetryDelayMillis) {
this.syncRetryDelayMillis = syncRetryDelayMillis;
}

public long getVerifyIntervalMillis() {
return verifyIntervalMillis;
}

public void setVerifyIntervalMillis(long verifyIntervalMillis) {
this.verifyIntervalMillis = verifyIntervalMillis;
}

public long getLoadDataRetryDelayMillis() {
return loadDataRetryDelayMillis;
}

public void setLoadDataRetryDelayMillis(long loadDataRetryDelayMillis) {
this.loadDataRetryDelayMillis = loadDataRetryDelayMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl;
package com.alibaba.nacos.core.distributed.distro;

import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component.DistroCallback;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component.DistroComponentHolder;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component.DistroDataProcessor;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component.DistroDataStorage;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component.DistroTransportAgent;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroData;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroKey;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.DistroTaskEngineHolder;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.delay.DistroDelayTask;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.load.DistroLoadDataTask;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.verify.DistroVerifyTask;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.core.distributed.distro.component.DistroCallback;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.component.DistroDataProcessor;
import com.alibaba.nacos.core.distributed.distro.component.DistroDataStorage;
import com.alibaba.nacos.core.distributed.distro.component.DistroTransportAgent;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.task.DistroTaskEngineHolder;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTask;
import com.alibaba.nacos.core.distributed.distro.task.load.DistroLoadDataTask;
import com.alibaba.nacos.core.distributed.distro.task.verify.DistroVerifyTask;
import com.alibaba.nacos.core.utils.GlobalExecutor;
import com.alibaba.nacos.core.utils.Loggers;
import org.springframework.stereotype.Component;

/**
Expand All @@ -48,13 +48,16 @@ public class DistroProtocol {

private final DistroTaskEngineHolder distroTaskEngineHolder;

private final DistroConfig distroConfig;

private volatile boolean loadCompleted = false;

public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
DistroTaskEngineHolder distroTaskEngineHolder) {
DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroTaskEngineHolder = distroTaskEngineHolder;
this.distroConfig = distroConfig;
startVerifyTask();
}

Expand All @@ -64,28 +67,30 @@ private void startVerifyTask() {
public void onSuccess() {
loadCompleted = true;
}

@Override
public void onFailed(Throwable throwable) {
loadCompleted = false;
}
};
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask(memberManager, distroComponentHolder));
GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, loadCallback));
GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask(memberManager, distroComponentHolder),
distroConfig.getVerifyIntervalMillis());
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}

public boolean isLoadCompleted() {
return loadCompleted;
}

/**
* Start to sync immediately.
* Start to sync by configured delay.
*
* @param distroKey distro key of sync data
* @param action the action of data operation
*/
public void sync(DistroKey distroKey, ApplyAction action) {
sync(distroKey, action, 0L);
public void sync(DistroKey distroKey, DataOperation action) {
sync(distroKey, action, distroConfig.getSyncDelayMillis());
}

/**
Expand All @@ -94,7 +99,7 @@ public void sync(DistroKey distroKey, ApplyAction action) {
* @param distroKey distro key of sync data
* @param action the action of data operation
*/
public void sync(DistroKey distroKey, ApplyAction action, long delay) {
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component;
package com.alibaba.nacos.core.distributed.distro.component;

/**
* Distro callback.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component;
package com.alibaba.nacos.core.distributed.distro.component;

import org.springframework.stereotype.Component;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component;
package com.alibaba.nacos.core.distributed.distro.component;

import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;

/**
* Distro data processor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component;
package com.alibaba.nacos.core.distributed.distro.component;

import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroData;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;

/**
* Distro data storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component;
package com.alibaba.nacos.core.distributed.distro.component;

import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroKey;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;

/**
* Distro failed task handler.
Expand All @@ -32,5 +32,5 @@ public interface DistroFailedTaskHandler {
* @param distroKey distro key of failed task
* @param action action of task
*/
void retry(DistroKey distroKey, ApplyAction action);
void retry(DistroKey distroKey, DataOperation action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component;
package com.alibaba.nacos.core.distributed.distro.component;

import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroData;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroKey;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;

/**
* Distro transport agent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity;
package com.alibaba.nacos.core.distributed.distro.entity;

import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.consistency.DataOperation;

/**
* Distro data.
Expand All @@ -27,7 +27,7 @@ public class DistroData {

private DistroKey distroKey;

private ApplyAction type;
private DataOperation type;

private byte[] content;

Expand All @@ -47,11 +47,11 @@ public void setDistroKey(DistroKey distroKey) {
this.distroKey = distroKey;
}

public ApplyAction getType() {
public DataOperation getType() {
return type;
}

public void setType(ApplyAction type) {
public void setType(DataOperation type) {
this.type = type;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity;
package com.alibaba.nacos.core.distributed.distro.entity;

import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.exception;
package com.alibaba.nacos.core.distributed.distro.exception;

/**
* Distro exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task;
package com.alibaba.nacos.core.distributed.distro.task;

import com.alibaba.nacos.common.task.NacosTaskProcessor;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.component.DistroComponentHolder;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.delay.DistroDelayTaskExecuteEngine;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.delay.DistroDelayTaskProcessor;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.execute.DistroExecuteWorkersManager;
import com.alibaba.nacos.core.distributed.distro.component.DistroComponentHolder;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskExecuteEngine;
import com.alibaba.nacos.core.distributed.distro.task.delay.DistroDelayTaskProcessor;
import com.alibaba.nacos.core.distributed.distro.task.execute.DistroExecuteWorkersManager;
import org.springframework.stereotype.Component;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* limitations under the License.
*/

package com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.task.delay;
package com.alibaba.nacos.core.distributed.distro.task.delay;

import com.alibaba.nacos.common.task.AbstractDelayTask;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.newimpl.entity.DistroKey;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;

/**
* Distro delay task.
Expand All @@ -29,15 +29,15 @@ public class DistroDelayTask extends AbstractDelayTask {

private final DistroKey distroKey;

private ApplyAction action;
private DataOperation action;

private long createTime;

public DistroDelayTask(DistroKey distroKey, long delayTime) {
this(distroKey, ApplyAction.CHANGE, delayTime);
this(distroKey, DataOperation.CHANGE, delayTime);
}

public DistroDelayTask(DistroKey distroKey, ApplyAction action, long delayTime) {
public DistroDelayTask(DistroKey distroKey, DataOperation action, long delayTime) {
this.distroKey = distroKey;
this.action = action;
this.createTime = System.currentTimeMillis();
Expand All @@ -49,7 +49,7 @@ public DistroKey getDistroKey() {
return distroKey;
}

public ApplyAction getAction() {
public DataOperation getAction() {
return action;
}

Expand Down
Loading