Skip to content

Commit

Permalink
fix: fix bug (#4411)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun authored Dec 4, 2020
1 parent c8edd7d commit 81d4a6d
Show file tree
Hide file tree
Showing 12 changed files with 46 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public Class<? extends Event> subscribeType() {
NotifyCenter.registerToPublisher(ConfigDumpEvent.class, NotifyCenter.ringBufferSize);
NotifyCenter.registerSubscriber(new DumpConfigHandler());

this.protocol.addLogProcessors(Collections.singletonList(this));
this.protocol.addRequestProcessors(Collections.singletonList(this));
LogUtil.DEFAULT_LOG.info("use DistributedTransactionServicesImpl");
}

Expand Down Expand Up @@ -390,7 +390,7 @@ public CompletableFuture<RestResult<String>> dataImport(File file) {
if (submit) {
List<ModifyRequest> requests = batchUpdate.stream().map(ModifyRequest::new)
.collect(Collectors.toList());
CompletableFuture<Response> future = protocol.submitAsync(WriteRequest.newBuilder().setGroup(group())
CompletableFuture<Response> future = protocol.writeAsync(WriteRequest.newBuilder().setGroup(group())
.setData(ByteString.copyFrom(serializer.serialize(requests)))
.putExtendInfo(DATA_IMPORT_KEY, Boolean.TRUE.toString()).build());
futures.add(future);
Expand Down Expand Up @@ -432,14 +432,14 @@ public Boolean update(List<ModifyRequest> sqlContext, BiConsumer<Boolean, Throwa
.putAllExtendInfo(EmbeddedStorageContextUtils.getCurrentExtendInfo())
.setType(sqlContext.getClass().getCanonicalName()).build();
if (Objects.isNull(consumer)) {
Response response = this.protocol.submit(request);
Response response = this.protocol.write(request);
if (response.getSuccess()) {
return true;
}
LogUtil.DEFAULT_LOG.error("execute sql modify operation failed : {}", response.getErrMsg());
return false;
} else {
this.protocol.submitAsync(request).whenComplete((BiConsumer<Response, Throwable>) (response, ex) -> {
this.protocol.writeAsync(request).whenComplete((BiConsumer<Response, Throwable>) (response, ex) -> {
String errMsg = Objects.isNull(ex) ? response.getErrMsg() : ExceptionUtil.getCause(ex).getMessage();
consumer.accept(response.getSuccess(),
StringUtils.isBlank(errMsg) ? null : new NJdbcException(errMsg));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public interface ConsistencyProtocol<T extends Config, P extends RequestProcesso
void init(T config);

/**
* Add a log handler.
* Add a request handler.
*
* @param processors {@link RequestProcessor}
*/
void addLogProcessors(Collection<P> processors);
void addRequestProcessors(Collection<P> processors);

/**
* Copy of metadata information for this consensus protocol.
Expand Down Expand Up @@ -87,7 +87,7 @@ public interface ConsistencyProtocol<T extends Config, P extends RequestProcesso
* @return submit operation result {@link Response}
* @throws Exception {@link Exception}
*/
Response submit(WriteRequest request) throws Exception;
Response write(WriteRequest request) throws Exception;

/**
* Data submission operation, returning submission results asynchronously.
Expand All @@ -97,7 +97,7 @@ public interface ConsistencyProtocol<T extends Config, P extends RequestProcesso
* @return {@link CompletableFuture} submit result
* @throws Exception when submit throw Exception
*/
CompletableFuture<Response> submitAsync(WriteRequest request);
CompletableFuture<Response> writeAsync(WriteRequest request);

/**
* New member list .
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public Class<? extends Event> subscribeType() {
}

@Override
public void addLogProcessors(Collection<RequestProcessor4CP> processors) {
public void addRequestProcessors(Collection<RequestProcessor4CP> processors) {
raftServer.createMultiRaftGroup(processors);
}

Expand All @@ -175,14 +175,14 @@ public CompletableFuture<Response> aGetData(ReadRequest request) {
}

@Override
public Response submit(WriteRequest request) throws Exception {
CompletableFuture<Response> future = submitAsync(request);
public Response write(WriteRequest request) throws Exception {
CompletableFuture<Response> future = writeAsync(request);
// Here you wait for 10 seconds, as long as possible, for the request to complete
return future.get(10_000L, TimeUnit.MILLISECONDS);
}

@Override
public CompletableFuture<Response> submitAsync(WriteRequest request) {
public CompletableFuture<Response> writeAsync(WriteRequest request) {
return raftServer.commit(request.getGroup(), request, new CompletableFuture<>());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
*
* @author <a href="mailto:[email protected]">liaochuntao</a>
*/
@Deprecated
public class NacosGetRequestProcessor extends AbstractProcessor implements RpcProcessor<GetRequest> {

private static final String INTEREST_NAME = GetRequest.class.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
*
* @author <a href="mailto:[email protected]">liaochuntao</a>
*/
@Deprecated
public class NacosLogProcessor extends AbstractProcessor implements RpcProcessor<Log> {

private static final String INTEREST_NAME = Log.class.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@

package com.alibaba.nacos.core.distributed.raft.processor;

import com.alibaba.nacos.consistency.ProtoMessageUtil;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.core.distributed.raft.JRaftServer;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;

Expand All @@ -28,17 +30,22 @@
*/
public class NacosReadRequestProcessor extends AbstractProcessor implements RpcProcessor<ReadRequest> {

public NacosReadRequestProcessor(Serializer serializer) {
private static final String INTEREST_NAME = ReadRequest.class.getName();

private final JRaftServer server;

public NacosReadRequestProcessor(JRaftServer server, Serializer serializer) {
super(serializer);
this.server = server;
}

@Override
public void handleRequest(RpcContext rpcCtx, ReadRequest request) {

handleRequest(server, request.getGroup(), rpcCtx, request);
}

@Override
public String interest() {
return null;
return INTEREST_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.core.distributed.raft.JRaftServer;
import com.alipay.sofa.jraft.rpc.RpcContext;
import com.alipay.sofa.jraft.rpc.RpcProcessor;

Expand All @@ -28,17 +29,22 @@
*/
public class NacosWriteRequestProcessor extends AbstractProcessor implements RpcProcessor<WriteRequest> {

public NacosWriteRequestProcessor(Serializer serializer) {
private static final String INTEREST_NAME = WriteRequest.class.getName();

private final JRaftServer server;

public NacosWriteRequestProcessor(JRaftServer server, Serializer serializer) {
super(serializer);
this.server = server;
}

@Override
public void handleRequest(RpcContext rpcCtx, WriteRequest request) {

handleRequest(server, request.getGroup(), rpcCtx, request);
}

@Override
public String interest() {
return null;
return INTEREST_NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.alibaba.nacos.core.distributed.raft.JRaftServer;
import com.alibaba.nacos.core.distributed.raft.processor.NacosGetRequestProcessor;
import com.alibaba.nacos.core.distributed.raft.processor.NacosLogProcessor;
import com.alibaba.nacos.core.distributed.raft.processor.NacosReadRequestProcessor;
import com.alibaba.nacos.core.distributed.raft.processor.NacosWriteRequestProcessor;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.sys.utils.DiskUtils;
Expand Down Expand Up @@ -79,9 +81,14 @@ public static RpcServer initRpcServer(JRaftServer server, PeerId peerId) {
RaftRpcServerFactory.addRaftRequestProcessors(rpcServer, RaftExecutor.getRaftCoreExecutor(),
RaftExecutor.getRaftCliServiceExecutor());

// Deprecated
rpcServer.registerProcessor(new NacosLogProcessor(server, SerializeFactory.getDefault()));
// Deprecated
rpcServer.registerProcessor(new NacosGetRequestProcessor(server, SerializeFactory.getDefault()));

rpcServer.registerProcessor(new NacosWriteRequestProcessor(server, SerializeFactory.getDefault()));
rpcServer.registerProcessor(new NacosReadRequestProcessor(server, SerializeFactory.getDefault()));

return rpcServer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public PersistentServiceProcessor(ProtocolManager protocolManager, ClusterVersio
@Override
public void afterConstruct() {
super.afterConstruct();
this.protocol.addLogProcessors(Collections.singletonList(this));
this.protocol.addRequestProcessors(Collections.singletonList(this));
this.protocol.protocolMetaData()
.subscribe(Constants.NAMING_PERSISTENT_SERVICE_GROUP, MetadataKey.LEADER_META_DATA,
(o, arg) -> hasLeader = StringUtils.isNotBlank(String.valueOf(arg)));
Expand Down Expand Up @@ -95,7 +95,7 @@ public void put(String key, Record value) throws NacosException {
final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Write.desc).build();
try {
protocol.submit(request);
protocol.write(request);
} catch (Exception e) {
throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
}
Expand All @@ -108,7 +108,7 @@ public void remove(String key) throws NacosException {
final WriteRequest request = WriteRequest.newBuilder().setData(ByteString.copyFrom(serializer.serialize(req)))
.setGroup(Constants.NAMING_PERSISTENT_SERVICE_GROUP).setOperation(Op.Delete.desc).build();
try {
protocol.submit(request);
protocol.write(request);
} catch (Exception e) {
throw new NacosException(ErrorCode.ProtoSubmitError.getCode(), e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public InstanceMetadataProcessor(NamingMetadataManager namingMetadataManager, Pr
this.processType = TypeUtils.parameterize(MetadataOperation.class, InstanceMetadata.class);
this.lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
protocolManager.getCpProtocol().addLogProcessors(Collections.singletonList(this));
protocolManager.getCpProtocol().addRequestProcessors(Collections.singletonList(this));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private <T> MetadataOperation<T> buildMetadataOperation(Service service) {

private void submitMetadataOperation(WriteRequest operationLog) {
try {
Response response = cpProtocol.submit(operationLog);
Response response = cpProtocol.write(operationLog);
if (!response.getSuccess()) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR,
"do metadata operation failed " + response.getErrMsg());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public ServiceMetadataProcessor(NamingMetadataManager namingMetadataManager, Pro
this.processType = TypeUtils.parameterize(MetadataOperation.class, ServiceMetadata.class);
this.lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
protocolManager.getCpProtocol().addLogProcessors(Collections.singletonList(this));
protocolManager.getCpProtocol().addRequestProcessors(Collections.singletonList(this));
}

@Override
Expand Down

0 comments on commit 81d4a6d

Please sign in to comment.