Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #305] Support fast advance commit index by appending no-op entry #306

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ public class DLedgerConfig {
// max interval in ms for each append request
private int maxBatchAppendIntervalMs = 1000;

/**
* When node change from candidate to leader, it maybe always keep the old commit index although this index's entry has been
* replicated to more than half of the nodes (it will keep until a new entry is appended in current term).
* The reason why this scenario happens is that leader can't commit the entries which are belong to the previous term.
*/
private boolean enableFastAdvanceCommitIndex = false;

public String getDefaultPath() {
return storeBaseDir + File.separator + "dledger-" + selfId;
}
Expand Down Expand Up @@ -544,4 +551,12 @@ public int getResetSnapshotEntriesButKeepLastEntriesNum() {
public void setResetSnapshotEntriesButKeepLastEntriesNum(int resetSnapshotEntriesButKeepLastEntriesNum) {
this.resetSnapshotEntriesButKeepLastEntriesNum = resetSnapshotEntriesButKeepLastEntriesNum;
}

public boolean isEnableFastAdvanceCommitIndex() {
return enableFastAdvanceCommitIndex;
}

public void setEnableFastAdvanceCommitIndex(boolean enableFastAdvanceCommitIndex) {
this.enableFastAdvanceCommitIndex = enableFastAdvanceCommitIndex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ private void resetBatchAppendEntryRequest() {
batchAppendEntryRequest.setGroup(memberState.getGroup());
batchAppendEntryRequest.setRemoteId(peerId);
batchAppendEntryRequest.setLeaderId(leaderId);
batchAppendEntryRequest.setLocalId(memberState.getSelfId());
batchAppendEntryRequest.setLocalId(selfId);
batchAppendEntryRequest.setTerm(term);
batchAppendEntryRequest.setType(PushEntryRequest.Type.APPEND);
batchAppendEntryRequest.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public class DLedgerServer extends AbstractDLedgerServer {

private final ScheduledExecutorService executorService;

private FastAdvanceCommitIndexService fastAdvanceCommitIndexService;

private StateMachineCaller fsmCaller;

private volatile boolean isStarted = false;
Expand Down Expand Up @@ -133,6 +135,10 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, NettyServerConfig nettyServerC
stateMachine = new NoOpStatemachine();
}
this.fsmCaller = new StateMachineCaller(this.dLedgerStore, stateMachine, this.dLedgerEntryPusher);
if (this.dLedgerConfig.isEnableFastAdvanceCommitIndex()) {
this.fastAdvanceCommitIndexService = new FastAdvanceCommitIndexService();
this.dLedgerLeaderElector.addRoleChangeHandler(this.fastAdvanceCommitIndexService);
}
}

/**
Expand All @@ -156,6 +162,10 @@ public DLedgerServer(DLedgerConfig dLedgerConfig, DLedgerRpcService dLedgerRpcSe
return t;
});
this.fsmCaller = new StateMachineCaller(this.dLedgerStore, new NoOpStatemachine(), this.dLedgerEntryPusher);
if (this.dLedgerConfig.isEnableFastAdvanceCommitIndex()) {
this.fastAdvanceCommitIndexService = new FastAdvanceCommitIndexService();
this.dLedgerLeaderElector.addRoleChangeHandler(this.fastAdvanceCommitIndexService);
}
}

/**
Expand Down Expand Up @@ -491,7 +501,8 @@ public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request)
}

@Override
public CompletableFuture<InstallSnapshotResponse> handleInstallSnapshot(InstallSnapshotRequest request) throws Exception {
public CompletableFuture<InstallSnapshotResponse> handleInstallSnapshot(
InstallSnapshotRequest request) throws Exception {
try {
PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());
PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());
Expand Down Expand Up @@ -709,4 +720,33 @@ enum RpcServiceMode {
SHARED
}

private class FastAdvanceCommitIndexService implements DLedgerLeaderElector.RoleChangeHandler {

@Override
public void handle(long term, MemberState.Role role) {
if (role == MemberState.Role.LEADER && term == memberState.currTerm() && memberState.getCommittedIndex() < memberState.getLedgerEndIndex()) {
DLedgerServer.this.handleRead(ReadMode.RAFT_LOG_READ, new ReadClosure() {
@Override
public void done(Status status) {
if (status != Status.ok()) {
LOGGER.error("[FastAdvanceCommitIndexService-{}] term: {} advance failed, status={}", term, memberState.getSelfId(), status);
} else {
LOGGER.info("[FastAdvanceCommitIndexService-{}] term: {} advance ok", term, memberState.getSelfId());
}
}
});
}
}

@Override
public void startup() {

}

@Override
public void shutdown() {

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public long getCommittedIndex() {
public boolean leaderUpdateCommittedIndex(long term, long committedIndex) {
if (!this.isLeader()) return false;
// prevent back to an old state
if (term != this.currTerm && committedIndex <= this.committedIndex) {
if (term < this.currTerm || committedIndex <= this.committedIndex) {
return false;
}
logger.debug("[MemberState] leader update committed index from {} to {}", this.committedIndex, committedIndex);
Expand All @@ -291,7 +291,7 @@ public boolean leaderUpdateCommittedIndex(long term, long committedIndex) {
}

public boolean followerUpdateCommittedIndex(long committedIndex) {
if (!this.isFollower()) return false;
if (this.isLeader()) return false;
if (committedIndex <= this.committedIndex) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,10 @@ public static Status error(DLedgerResponseCode code) {
return new Status(code);
}

@Override
public String toString() {
return "Status{" +
"code=" + code +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public int getCompleteAckNums() {
public boolean hasNext() {
while (inner.hasNext()) {
ApplyEntry applyEntry = inner.next();
if (filter.test(applyEntry.getEntry())) {
if (applyEntry != null && filter.test(applyEntry.getEntry())) {
nextTask = applyEntry;
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class AppendAndGetTest extends ServerTestHarness {
public void testSingleServerInMemory() throws Exception {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = "n0-localhost:10001";
String peers = String.format("n0-localhost:%d", nextPort());
launchServer(group, peers, selfId, selfId, DLedgerConfig.MEMORY);
DLedgerClient dLedgerClient = launchClient(group, peers);
for (long i = 0; i < 10; i++) {
Expand All @@ -58,7 +58,7 @@ public void testSingleServerInMemory() throws Exception {
public void testSingleServerInFile() throws Exception {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = "n0-localhost:10002";
String peers = String.format("n0-localhost:%d", nextPort());
launchServer(group, peers, selfId, selfId, DLedgerConfig.FILE);
DLedgerClient dLedgerClient = launchClient(group, peers);
long expectedPos = 0L;
Expand Down Expand Up @@ -106,7 +106,7 @@ public void testThreeServerInMemory() throws Exception {
@Test
public void testThreeServerInFile() throws Exception {
String group = UUID.randomUUID().toString();
String peers = "n0-localhost:10006;n1-localhost:10007;n2-localhost:10008";
String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort());
DLedgerServer dLedgerServer0 = launchServer(group, peers, "n0", "n1", DLedgerConfig.FILE);
DLedgerServer dLedgerServer1 = launchServer(group, peers, "n1", "n1", DLedgerConfig.FILE);
DLedgerServer dLedgerServer2 = launchServer(group, peers, "n2", "n1", DLedgerConfig.FILE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected String getBaseDir() {
public void testSingleServerInMemory() {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = "n0-localhost:" + nextPort();
String peers = String.format("n0-localhost:%d", nextPort());
DLedgerServer dLedgerServer = launchServerWithStateMachineDisableSnapshot(group, peers, selfId, selfId, DLedgerConfig.MEMORY,
102400, new RegisterStateMachine());
dLedgerServer.registerUserDefineProcessors(Collections.singletonList(new RegisterReadProcessor(dLedgerServer)));
Expand All @@ -55,7 +55,7 @@ public void testSingleServerInMemory() {
public void testSingleServerInFile() {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = "n0-localhost:" + nextPort();
String peers = String.format("n0-localhost:%d", nextPort());
DLedgerServer dLedgerServer = launchServerWithStateMachineDisableSnapshot(group, peers, selfId, selfId, DLedgerConfig.FILE,
102400, new RegisterStateMachine());
dLedgerServer.registerUserDefineProcessors(Collections.singletonList(new RegisterReadProcessor(dLedgerServer)));
Expand Down
Loading