Skip to content

Commit

Permalink
[ISSUE #305] Support fast advance commit index by appending no-op ent…
Browse files Browse the repository at this point in the history
…ry (#306)

* feat(core): support fast advance commit index by appending no-np entry

1. support fast advance commit index by appending no-np entry
2. fix some
wrong listen port in ut

* fix(core): fix flaky test for too short wait time

1. fix flaky test for too short wait time

* fix(core): avoid NPE in CommittedEntryIterator

1. avoid NPE in CommittedEntryIterator

* rerun

* rerun
  • Loading branch information
TheR1sing3un authored Jul 17, 2023
1 parent dd78f67 commit f0ad185
Show file tree
Hide file tree
Showing 12 changed files with 325 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ public class DLedgerConfig {
// max interval in ms for each append request
private int maxBatchAppendIntervalMs = 1000;

/**
* When node change from candidate to leader, it maybe always keep the old commit index although this index's entry has been
* replicated to more than half of the nodes (it will keep until a new entry is appended in current term).
* The reason why this scenario happens is that leader can't commit the entries which are belong to the previous term.
*/
private boolean enableFastAdvanceCommitIndex = false;

public String getDefaultPath() {
return storeBaseDir + File.separator + "dledger-" + selfId;
}
Expand Down Expand Up @@ -544,4 +551,12 @@ public int getResetSnapshotEntriesButKeepLastEntriesNum() {
public void setResetSnapshotEntriesButKeepLastEntriesNum(int resetSnapshotEntriesButKeepLastEntriesNum) {
this.resetSnapshotEntriesButKeepLastEntriesNum = resetSnapshotEntriesButKeepLastEntriesNum;
}

public boolean isEnableFastAdvanceCommitIndex() {
return enableFastAdvanceCommitIndex;
}

public void setEnableFastAdvanceCommitIndex(boolean enableFastAdvanceCommitIndex) {
this.enableFastAdvanceCommitIndex = enableFastAdvanceCommitIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ private void resetBatchAppendEntryRequest() {
batchAppendEntryRequest.setGroup(memberState.getGroup());
batchAppendEntryRequest.setRemoteId(peerId);
batchAppendEntryRequest.setLeaderId(leaderId);
batchAppendEntryRequest.setLocalId(memberState.getSelfId());
batchAppendEntryRequest.setLocalId(selfId);
batchAppendEntryRequest.setTerm(term);
batchAppendEntryRequest.setType(PushEntryRequest.Type.APPEND);
batchAppendEntryRequest.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public class DLedgerServer extends AbstractDLedgerServer {

private final ScheduledExecutorService executorService;

private FastAdvanceCommitIndexService fastAdvanceCommitIndexService;

private StateMachineCaller fsmCaller;

private volatile boolean isStarted = false;
Expand Down Expand Up @@ -133,6 +135,10 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerC
stateMachine = new NoOpStatemachine();
}
this.fsmCaller = new StateMachineCaller(this.dLedgerStore, stateMachine, this.dLedgerEntryPusher);
if (this.dLedgerConfig.isEnableFastAdvanceCommitIndex()) {
this.fastAdvanceCommitIndexService = new FastAdvanceCommitIndexService();
this.dLedgerLeaderElector.addRoleChangeHandler(this.fastAdvanceCommitIndexService);
}
}

/**
Expand All @@ -156,6 +162,10 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, DLedgerRpcService dLedgerRpcSe
return t;
});
this.fsmCaller = new StateMachineCaller(this.dLedgerStore, new NoOpStatemachine(), this.dLedgerEntryPusher);
if (this.dLedgerConfig.isEnableFastAdvanceCommitIndex()) {
this.fastAdvanceCommitIndexService = new FastAdvanceCommitIndexService();
this.dLedgerLeaderElector.addRoleChangeHandler(this.fastAdvanceCommitIndexService);
}
}

/**
Expand Down Expand Up @@ -491,7 +501,8 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
}

@Override
public CompletableFuture<InstallSnapshotResponse> handleInstallSnapshot(InstallSnapshotRequest request) throws Exception {
public CompletableFuture<InstallSnapshotResponse> handleInstallSnapshot(
InstallSnapshotRequest request) throws Exception {
try {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
Expand Down Expand Up @@ -709,4 +720,33 @@ enum RpcServiceMode {
SHARED
}

private class FastAdvanceCommitIndexService implements DLedgerLeaderElector.RoleChangeHandler {

@Override
public void handle(long term, MemberState.Role role) {
if (role == MemberState.Role.LEADER && term == memberState.currTerm() && memberState.getCommittedIndex() < memberState.getLedgerEndIndex()) {
DLedgerServer.this.handleRead(ReadMode.RAFT_LOG_READ, new ReadClosure() {
@Override
public void done(Status status) {
if (status != Status.ok()) {
LOGGER.error("[FastAdvanceCommitIndexService-{}] term: {} advance failed, status={}", term, memberState.getSelfId(), status);
} else {
LOGGER.info("[FastAdvanceCommitIndexService-{}] term: {} advance ok", term, memberState.getSelfId());
}
}
});
}
}

@Override
public void startup() {

}

@Override
public void shutdown() {

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public long getCommittedIndex() {
public boolean leaderUpdateCommittedIndex(long term, long committedIndex) {
if (!this.isLeader()) return false;
// prevent back to an old state
if (term != this.currTerm && committedIndex <= this.committedIndex) {
if (term < this.currTerm || committedIndex <= this.committedIndex) {
return false;
}
logger.debug("[MemberState] leader update committed index from {} to {}", this.committedIndex, committedIndex);
Expand All @@ -291,7 +291,7 @@ public boolean leaderUpdateCommittedIndex(long term, long committedIndex) {
}

public boolean followerUpdateCommittedIndex(long committedIndex) {
if (!this.isFollower()) return false;
if (this.isLeader()) return false;
if (committedIndex <= this.committedIndex) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,10 @@ public static Status error(DLedgerResponseCode code) {
return new Status(code);
}

@Override
public String toString() {
return "Status{" +
"code=" + code +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public int getCompleteAckNums() {
public boolean hasNext() {
while (inner.hasNext()) {
ApplyEntry applyEntry = inner.next();
if (filter.test(applyEntry.getEntry())) {
if (applyEntry != null && filter.test(applyEntry.getEntry())) {
nextTask = applyEntry;
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class AppendAndGetTest extends ServerTestHarness {
public void testSingleServerInMemory() throws Exception {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = "n0-localhost:10001";
String peers = String.format("n0-localhost:%d", nextPort());
launchServer(group, peers, selfId, selfId, DLedgerConfig.MEMORY);
DLedgerClient dLedgerClient = launchClient(group, peers);
for (long i = 0; i < 10; i++) {
Expand All @@ -58,7 +58,7 @@ public void testSingleServerInMemory() throws Exception {
public void testSingleServerInFile() throws Exception {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = "n0-localhost:10002";
String peers = String.format("n0-localhost:%d", nextPort());
launchServer(group, peers, selfId, selfId, DLedgerConfig.FILE);
DLedgerClient dLedgerClient = launchClient(group, peers);
long expectedPos = 0L;
Expand Down Expand Up @@ -106,7 +106,7 @@ public void testThreeServerInMemory() throws Exception {
@Test
public void testThreeServerInFile() throws Exception {
String group = UUID.randomUUID().toString();
String peers = "n0-localhost:10006;n1-localhost:10007;n2-localhost:10008";
String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort());
DLedgerServer dLedgerServer0 = launchServer(group, peers, "n0", "n1", DLedgerConfig.FILE);
DLedgerServer dLedgerServer1 = launchServer(group, peers, "n1", "n1", DLedgerConfig.FILE);
DLedgerServer dLedgerServer2 = launchServer(group, peers, "n2", "n1", DLedgerConfig.FILE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected String getBaseDir() {
public void testSingleServerInMemory() {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = "n0-localhost:" + nextPort();
String peers = String.format("n0-localhost:%d", nextPort());
DLedgerServer dLedgerServer = launchServerWithStateMachineDisableSnapshot(group, peers, selfId, selfId, DLedgerConfig.MEMORY,
102400, new RegisterStateMachine());
dLedgerServer.registerUserDefineProcessors(Collections.singletonList(new RegisterReadProcessor(dLedgerServer)));
Expand All @@ -55,7 +55,7 @@ public void testSingleServerInMemory() {
public void testSingleServerInFile() {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = "n0-localhost:" + nextPort();
String peers = String.format("n0-localhost:%d", nextPort());
DLedgerServer dLedgerServer = launchServerWithStateMachineDisableSnapshot(group, peers, selfId, selfId, DLedgerConfig.FILE,
102400, new RegisterStateMachine());
dLedgerServer.registerUserDefineProcessors(Collections.singletonList(new RegisterReadProcessor(dLedgerServer)));
Expand Down
Loading

0 comments on commit f0ad185

Please sign in to comment.