Skip to content

Commit

Permalink
setting the commit state before apply of listeners and using it for t…
Browse files Browse the repository at this point in the history
…erm-version
  • Loading branch information
rajiv-kv committed Aug 28, 2024
1 parent d87c895 commit f54847d
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ public ClusterState getStateFromLocalNode(ClusterStateTermVersion termVersion) {
if (termVersion.equals(new ClusterStateTermVersion(appliedState))) {
return appliedState;
}
ClusterState publishState = clusterService.publishState();
ClusterState publishState = clusterService.commitState();
if (publishState != null && termVersion.equals(new ClusterStateTermVersion(publishState))) {
return publishState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public TransportGetTermVersionAction(
GetTermVersionRequest::new,
indexNameExpressionResolver
);

}

@Override
Expand All @@ -76,7 +77,7 @@ protected void clusterManagerOperation(
ClusterState state,
ActionListener<GetTermVersionResponse> listener
) throws Exception {
ActionListener.completeWith(listener, () -> buildResponse(request, state));
ActionListener.completeWith(listener, () -> buildResponse(request, clusterService.commitState()));
}

private GetTermVersionResponse buildResponse(GetTermVersionRequest request, ClusterState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionList
coordinationState.get().handleCommit(applyCommitRequest);
final ClusterState committedState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState());
applierState = mode == Mode.CANDIDATE ? clusterStateWithNoClusterManagerBlock(committedState) : committedState;
setStateOnApplier(applierState);

if (applyCommitRequest.getSourceNode().equals(getLocalNode())) {
// cluster-manager node applies the committed state at the end of the publication process, not here.
applyListener.onResponse(null);
Expand Down Expand Up @@ -464,8 +466,6 @@ && getCurrentTerm() == ZEN1_BWC_TERM
ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term());
final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest);

setStateOnApplier(coordinationState.get().getLastAcceptedState());

if (sourceNode.equals(getLocalNode())) {
preVoteCollector.update(getPreVoteResponse(), getLocalNode());
} else {
Expand All @@ -480,11 +480,7 @@ && getCurrentTerm() == ZEN1_BWC_TERM
}

private void setStateOnApplier(ClusterState clusterState) {
ClusterState publishState = hideStateIfNotRecovered(clusterState);
final ClusterState publishClusterState = mode == Mode.CANDIDATE
? clusterStateWithNoClusterManagerBlock(publishState)
: publishState;
clusterApplier.setPublishState(publishClusterState);
clusterApplier.setCommitState(clusterState);
}

private static Optional<Join> joinWithDestination(Optional<Join> lastJoin, DiscoveryNode leader, long term) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public interface ClusterApplier {
void setInitialState(ClusterState initialState);

/**
* Sets the publish state for the applier
* @param clusterState state published by cluster-manager
* Sets the committed state for the applier.
* @param clusterState state that has been committed by cluster-manager
*/
void setPublishState(ClusterState clusterState);
void setCommitState(ClusterState clusterState);

/**
* Method to invoke when a new cluster state is available to be applied
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private final Collection<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<>();
private final Map<TimeoutClusterStateListener, NotifyTimeout> timeoutClusterStateListeners = new ConcurrentHashMap<>();
private final AtomicReference<ClusterState> publishState = new AtomicReference<>(); // last published state
private final AtomicReference<ClusterState> preApplyState = new AtomicReference<>(); // last committed state which is yet to be applied
private final AtomicReference<ClusterState> state; // last applied state

private final String nodeName;
Expand Down Expand Up @@ -170,8 +170,8 @@ public void setInitialState(ClusterState initialState) {
}

@Override
public void setPublishState(ClusterState clusterState) {
publishState.set(clusterState);
public void setCommitState(ClusterState clusterState) {
preApplyState.set(clusterState);
}

@Override
Expand Down Expand Up @@ -238,8 +238,8 @@ public ClusterState state() {
return clusterState;
}

public ClusterState publishState() {
return publishState.get();
public ClusterState commitState() {
return preApplyState.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ public ClusterState state() {
return clusterApplierService.state();
}

public ClusterState publishState() {
return clusterApplierService.publishState();
public ClusterState commitState() {
return clusterApplierService.commitState();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,17 @@
import org.opensearch.action.admin.cluster.state.ClusterStateAction;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand All @@ -22,6 +30,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -100,12 +109,92 @@ public void testDatanodeOutOfSync() throws Exception {
assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length));
}

public void testDatanodeWithSlowClusterApplier() throws Exception {
List<String> masters = internalCluster().startClusterManagerOnlyNodes(3);
List<String> datas = internalCluster().startDataOnlyNodes(3);

Map<String, AtomicInteger> callCounters = Map.ofEntries(
Map.entry(ClusterStateAction.NAME, new AtomicInteger()),
Map.entry(GetTermVersionAction.NAME, new AtomicInteger())
);
ensureGreen();

String master = internalCluster().getClusterManagerName();

// stubClusterTermResponse(master);
addCallCountInterceptor(master, callCounters);

AtomicBoolean latch = new AtomicBoolean();
ClusterService cmClsSerive = internalCluster().getInstance(ClusterService.class, datas.get(0));
cmClsSerive.addStateApplier(new ClusterStateApplier() {
@Override
public void applyClusterState(ClusterChangedEvent event) {

if (!latch.get()) {
return;
}
try {
System.out.println("sleep start");
Thread.sleep(60000);
System.out.println("sleep end");

} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});

String index = "index_1";
latch.set(true);
prepareCreate(index).setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE)
.build()
)
.get();

Thread.sleep(5000);
latch.set(false);

{
ClusterStateResponse stateResponseM = internalCluster().getInstance(Client.class, master)
.admin()
.cluster()
.state(new ClusterStateRequest())
.actionGet();

System.out.println("master is still lagging behind");

System.out.println(stateResponseM.getState().term());
System.out.println(stateResponseM.getState().version());
}

ClusterStateResponse stateResponseD = internalCluster().getInstance(Client.class, datas.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest())
.actionGet();
System.out.println("data has latest");
System.out.println(stateResponseD.getState().term());
System.out.println(stateResponseD.getState().version());

AtomicInteger clusterStateCallsOnMaster = callCounters.get(ClusterStateAction.NAME);
AtomicInteger termCallsOnMaster = callCounters.get(GetTermVersionAction.NAME);

assertThat(clusterStateCallsOnMaster.get(), is(0));
assertThat(termCallsOnMaster.get(), is(1));

}

private void addCallCountInterceptor(String nodeName, Map<String, AtomicInteger> callCounters) {
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeName);
for (var ctrEnty : callCounters.entrySet()) {
primaryService.addRequestHandlingBehavior(ctrEnty.getKey(), (handler, request, channel, task) -> {
ctrEnty.getValue().incrementAndGet();
logger.info("--> {} response redirect", ClusterStateAction.NAME);
logger.info("--> {} response redirect", ctrEnty.getKey());
handler.messageReceived(request, channel, task);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void setInitialState(ClusterState initialState) {
}

@Override
public void setPublishState(ClusterState clusterState) {
public void setCommitState(ClusterState clusterState) {

}

Expand Down

0 comments on commit f54847d

Please sign in to comment.