Skip to content

Commit

Permalink
[ISSUE#219] Separating DLedgerProxy from the DLedger as a new moudle (#…
Browse files Browse the repository at this point in the history
…220)

* fix(main): fix incompatible with old version

1. fix incompatible with old version

* style(global): separating proxy from dledger

1. separating proxy from dledger
  • Loading branch information
TheR1sing3un authored Sep 8, 2022
1 parent 8d195a7 commit 1504848
Show file tree
Hide file tree
Showing 38 changed files with 1,927 additions and 703 deletions.
6 changes: 5 additions & 1 deletion command/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@
<dependencies>
<dependency>
<groupId>io.openmessaging.storage</groupId>
<artifactId>dledger</artifactId>
<artifactId>proxy</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import com.alibaba.fastjson.JSON;
import com.beust.jcommander.JCommander;
import io.openmessaging.storage.dledger.cmdline.ConfigCommand;
import io.openmessaging.storage.dledger.dledger.DLedgerProxy;
import io.openmessaging.storage.dledger.dledger.DLedgerProxyConfig;
import io.openmessaging.storage.dledger.utils.ConfigUtils;
import io.openmessaging.storage.dledger.proxy.DLedgerProxy;
import io.openmessaging.storage.dledger.proxy.DLedgerProxyConfig;
import io.openmessaging.storage.dledger.proxy.util.ConfigUtils;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.beust.jcommander.Parameters;
import io.openmessaging.storage.dledger.DLedger;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.dledger.DLedgerProxyConfig;
import io.openmessaging.storage.dledger.utils.ConfigUtils;
import io.openmessaging.storage.dledger.proxy.DLedgerProxyConfig;
import io.openmessaging.storage.dledger.proxy.util.ConfigUtils;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

import com.beust.jcommander.JCommander;
import io.openmessaging.storage.dledger.cmdline.ConfigCommand;
import io.openmessaging.storage.dledger.utils.ConfigUtils;
import io.openmessaging.storage.dledger.proxy.DLedgerProxyConfig;
import io.openmessaging.storage.dledger.proxy.util.ConfigUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down
8 changes: 0 additions & 8 deletions dledger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,6 @@
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.openmessaging.storage.dledger.dledger;
package io.openmessaging.storage.dledger;

import io.openmessaging.storage.dledger.protocol.DLedgerProtocolHandler;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.openmessaging.storage.dledger;

import com.beust.jcommander.Parameter;
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;

import io.openmessaging.storage.dledger.utils.DLedgerUtils;
Expand All @@ -31,28 +30,20 @@ public class DLedgerConfig {
public static final String FILE = "FILE";
public static final String MULTI_PATH_SPLITTER = System.getProperty("dLedger.multiPath.Splitter", ",");

@Parameter(names = {"--config", "-c"}, description = "Config path of DLedger")
private String configFilePath;

@Parameter(names = {"--group", "-g"}, description = "Group of this server")
private String group = "default";

@Parameter(names = {"--id", "-i"}, description = "Self id of this server")
private String selfId = "n0";

@Parameter(names = {"--peers", "-p"}, description = "Peer info of this server")
private String peers = "n0-localhost:20911";

@Parameter(names = {"--store-base-dir", "-s"}, description = "The base store dir of this server")
private String storeBaseDir = File.separator + "tmp" + File.separator + "dledgerstore";

@Parameter(names = {"--read-only-data-store-dirs"}, description = "The dirs of this server to be read only")
private String readOnlyDataStoreDirs = null;

@Parameter(names = {"--peer-push-throttle-point"}, description = "When the follower is behind the leader more than this value, it will trigger the throttle")
private int peerPushThrottlePoint = 300 * 1024 * 1024;

@Parameter(names = {"--peer-push-quotas"}, description = "The quotas of the pusher")
private int peerPushQuota = 20 * 1024 * 1024;

private String storeType = FILE; //FILE, MEMORY
Expand Down Expand Up @@ -90,7 +81,6 @@ public class DLedgerConfig {

private boolean enablePushToFollower = true;

@Parameter(names = {"--preferred-leader-id"}, description = "Preferred LeaderId")
private String preferredLeaderIds;
private long maxLeadershipTransferWaitIndex = 1000;
private int minTakeLeadershipVoteIntervalMs = 30;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import io.openmessaging.storage.dledger.dledger.AbstractDLedgerServer;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.DLedgerRequestCode;
Expand Down Expand Up @@ -463,7 +462,7 @@ public void shutdown() {
}

// only for test
public void setdLedgerProxy(AbstractDLedgerServer dLedger) {
public void setDLedger(AbstractDLedgerServer dLedger) {
this.dLedger = dLedger;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.openmessaging.storage.dledger;

import io.openmessaging.storage.dledger.dledger.AbstractDLedgerServer;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
Expand Down Expand Up @@ -57,8 +56,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;

import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -71,19 +73,55 @@ public class DLedgerServer extends AbstractDLedgerServer {

private DLedgerStore dLedgerStore;
private DLedgerRpcService dLedgerRpcService;

private final RpcServiceMode rpcServiceMode;
private DLedgerEntryPusher dLedgerEntryPusher;
private DLedgerLeaderElector dLedgerLeaderElector;

private ScheduledExecutorService executorService;
private Optional<StateMachineCaller> fsmCaller;

public DLedgerServer(DLedgerConfig dLedgerConfig) {
this(dLedgerConfig, null, null, null);
}

public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerConfig) {
this(dLedgerConfig, nettyServerConfig, null, null);
}

public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig) {
this(dLedgerConfig, nettyServerConfig, nettyClientConfig, null);
}

public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig, ChannelEventListener channelEventListener) {
dLedgerConfig.init();
this.dLedgerConfig = dLedgerConfig;
this.memberState = new MemberState(dLedgerConfig);
this.dLedgerStore = createDLedgerStore(dLedgerConfig.getStoreType(), this.dLedgerConfig, this.memberState);
this.dLedgerRpcService = new DLedgerRpcNettyService(this, nettyServerConfig, nettyClientConfig, channelEventListener);
this.rpcServiceMode = RpcServiceMode.EXCLUSIVE;
this.dLedgerEntryPusher = new DLedgerEntryPusher(dLedgerConfig, memberState, dLedgerStore, dLedgerRpcService);
this.dLedgerLeaderElector = new DLedgerLeaderElector(dLedgerConfig, memberState, dLedgerRpcService);
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(null, "DLedgerServer-ScheduledExecutor", true));
this.fsmCaller = Optional.empty();
}

/**
* Start in proxy mode, use shared DLedgerRpcService
* @param dLedgerConfig DLedgerConfig
* @param dLedgerRpcService Shared DLedgerRpcService
*/
public DLedgerServer(DLedgerConfig dLedgerConfig, DLedgerRpcService dLedgerRpcService) {
this.dLedgerConfig = dLedgerConfig;
this.memberState = new MemberState(dLedgerConfig);
this.dLedgerStore = createDLedgerStore(dLedgerConfig.getStoreType(), this.dLedgerConfig, this.memberState);
dLedgerEntryPusher = new DLedgerEntryPusher(dLedgerConfig, memberState, dLedgerStore);
dLedgerLeaderElector = new DLedgerLeaderElector(dLedgerConfig, memberState);
executorService = Executors.newSingleThreadScheduledExecutor(r -> {
this.dLedgerRpcService = dLedgerRpcService;
this.rpcServiceMode = RpcServiceMode.SHARED;
this.dLedgerEntryPusher = new DLedgerEntryPusher(dLedgerConfig, memberState, dLedgerStore, dLedgerRpcService);
this.dLedgerLeaderElector = new DLedgerLeaderElector(dLedgerConfig, memberState, dLedgerRpcService);
this.executorService = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("DLedgerServer-ScheduledExecutor");
Expand All @@ -98,16 +136,28 @@ public void registerDLedgerRpcService(DLedgerRpcService dLedgerRpcService) {
this.dLedgerEntryPusher.registerDLedgerRpcService(dLedgerRpcService);
}

/**
* Start up, if the DLedgerRpcService is exclusive for this DLedgerServer, we should also start up it.
*/
public void startup() {
this.dLedgerStore.startup();
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.startup();
}
this.dLedgerEntryPusher.startup();
this.dLedgerLeaderElector.startup();
executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS);
}

/**
* Shutdown, if the DLedgerRpcService is exclusive for this DLedgerServer, we should also shut down it.
*/
public void shutdown() {
this.dLedgerLeaderElector.shutdown();
this.dLedgerEntryPusher.shutdown();
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.shutdown();
}
this.dLedgerStore.shutdown();
executorService.shutdown();
this.fsmCaller.ifPresent(StateMachineCaller::shutdown);
Expand Down Expand Up @@ -492,4 +542,12 @@ public boolean isLeader() {
return this.memberState.isLeader();
}

/**
* Rpc service mode, exclusive or shared
*/
enum RpcServiceMode {
EXCLUSIVE,
SHARED
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.openmessaging.storage.dledger;

import io.openmessaging.storage.dledger.client.DLedgerClient;
import io.openmessaging.storage.dledger.dledger.DLedgerProxy;
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
Expand All @@ -37,7 +36,7 @@ public void testSingleServerInMemory() throws Exception {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = "n0-localhost:10001";
launchDLedgerProxy(group, peers, selfId, selfId, DLedgerConfig.MEMORY);
launchServer(group, peers, selfId, selfId, DLedgerConfig.MEMORY);
DLedgerClient dLedgerClient = launchClient(group, peers);
for (long i = 0; i < 10; i++) {
AppendEntryResponse appendEntryResponse = dLedgerClient.append(("HelloSingleServerInMemory" + i).getBytes());
Expand All @@ -56,7 +55,7 @@ public void testSingleServerInFile() throws Exception {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = "n0-localhost:10002";
launchDLedgerProxy(group, peers, selfId, selfId, DLedgerConfig.FILE);
launchServer(group, peers, selfId, selfId, DLedgerConfig.FILE);
DLedgerClient dLedgerClient = launchClient(group, peers);
long expectedPos = 0L;
for (long i = 0; i < 10; i++) {
Expand All @@ -78,12 +77,9 @@ public void testSingleServerInFile() throws Exception {
public void testThreeServerInMemory() throws Exception {
String group = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort());
DLedgerProxy dLedgerProxy0 = launchDLedgerProxy(group, peers, "n0", "n1", DLedgerConfig.MEMORY);
DLedgerProxy dLedgerProxy1 = launchDLedgerProxy(group, peers, "n1", "n1", DLedgerConfig.MEMORY);
DLedgerProxy dLedgerProxy2 = launchDLedgerProxy(group, peers, "n2", "n1", DLedgerConfig.MEMORY);
DLedgerServer dLedgerServer0 = dLedgerProxy0.getDLedgerManager().getDLedgerServers().get(0);
DLedgerServer dLedgerServer1 = dLedgerProxy1.getDLedgerManager().getDLedgerServers().get(0);
DLedgerServer dLedgerServer2 = dLedgerProxy2.getDLedgerManager().getDLedgerServers().get(0);
DLedgerServer dLedgerServer0 = launchServer(group, peers, "n0", "n1", DLedgerConfig.MEMORY);
DLedgerServer dLedgerServer1 = launchServer(group, peers, "n1", "n1", DLedgerConfig.MEMORY);
DLedgerServer dLedgerServer2 = launchServer(group, peers, "n2", "n1", DLedgerConfig.MEMORY);
DLedgerClient dLedgerClient = launchClient(group, peers.split(";")[0]);
for (int i = 0; i < 10; i++) {
AppendEntryResponse appendEntryResponse = dLedgerClient.append(("HelloThreeServerInMemory" + i).getBytes());
Expand All @@ -107,12 +103,9 @@ public void testThreeServerInMemory() throws Exception {
public void testThreeServerInFile() throws Exception {
String group = UUID.randomUUID().toString();
String peers = "n0-localhost:10006;n1-localhost:10007;n2-localhost:10008";
DLedgerProxy dLedgerProxy0 = launchDLedgerProxy(group, peers, "n0", "n1", DLedgerConfig.FILE);
DLedgerProxy dLedgerProxy1 = launchDLedgerProxy(group, peers, "n1", "n1", DLedgerConfig.FILE);
DLedgerProxy dLedgerProxy2 = launchDLedgerProxy(group, peers, "n2", "n1", DLedgerConfig.FILE);
DLedgerServer dLedgerServer0 = dLedgerProxy0.getDLedgerManager().getDLedgerServers().get(0);
DLedgerServer dLedgerServer1 = dLedgerProxy1.getDLedgerManager().getDLedgerServers().get(0);
DLedgerServer dLedgerServer2 = dLedgerProxy2.getDLedgerManager().getDLedgerServers().get(0);
DLedgerServer dLedgerServer0 = launchServer(group, peers, "n0", "n1", DLedgerConfig.FILE);
DLedgerServer dLedgerServer1 = launchServer(group, peers, "n1", "n1", DLedgerConfig.FILE);
DLedgerServer dLedgerServer2 = launchServer(group, peers, "n2", "n1", DLedgerConfig.FILE);
DLedgerClient dLedgerClient = launchClient(group, peers);
for (int i = 0; i < 10; i++) {
AppendEntryResponse appendEntryResponse = dLedgerClient.append(("HelloThreeServerInFile" + i).getBytes());
Expand All @@ -136,12 +129,9 @@ public void testThreeServerInFile() throws Exception {
public void testThreeServerInFileWithAsyncRequests() throws Exception {
String group = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort());
DLedgerProxy dLedgerProxy0 = launchDLedgerProxy(group, peers, "n0", "n1", DLedgerConfig.FILE);
DLedgerProxy dLedgerProxy1 = launchDLedgerProxy(group, peers, "n1", "n1", DLedgerConfig.FILE);
DLedgerProxy dLedgerProxy2 = launchDLedgerProxy(group, peers, "n2", "n1", DLedgerConfig.FILE);
DLedgerServer dLedgerServer0 = dLedgerProxy0.getDLedgerManager().getDLedgerServers().get(0);
DLedgerServer dLedgerServer1 = dLedgerProxy1.getDLedgerManager().getDLedgerServers().get(0);
DLedgerServer dLedgerServer2 = dLedgerProxy2.getDLedgerManager().getDLedgerServers().get(0);
DLedgerServer dLedgerServer0 = launchServer(group, peers, "n0", "n1", DLedgerConfig.FILE);
DLedgerServer dLedgerServer1 = launchServer(group, peers, "n1", "n1", DLedgerConfig.FILE);
DLedgerServer dLedgerServer2 = launchServer(group, peers, "n2", "n1", DLedgerConfig.FILE);
List<CompletableFuture<AppendEntryResponse>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
AppendEntryRequest request = new AppendEntryRequest();
Expand Down
Loading

0 comments on commit 1504848

Please sign in to comment.