diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index ea78dd4a2873d..4208480be48f1 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -419,7 +419,7 @@ public ClusterState getStateFromLocalNode(ClusterStateTermVersion termVersion) { if (termVersion.equals(new ClusterStateTermVersion(appliedState))) { return appliedState; } - ClusterState publishState = clusterService.publishState(); + ClusterState publishState = clusterService.commitState(); if (publishState != null && termVersion.equals(new ClusterStateTermVersion(publishState))) { return publishState; } diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/term/TransportGetTermVersionAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/term/TransportGetTermVersionAction.java index 4752a99c910e4..114ddf16ffc98 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/term/TransportGetTermVersionAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/term/TransportGetTermVersionAction.java @@ -52,6 +52,7 @@ public TransportGetTermVersionAction( GetTermVersionRequest::new, indexNameExpressionResolver ); + } @Override @@ -76,7 +77,7 @@ protected void clusterManagerOperation( ClusterState state, ActionListener listener ) throws Exception { - ActionListener.completeWith(listener, () -> buildResponse(request, state)); + ActionListener.completeWith(listener, () -> buildResponse(request, clusterService.commitState())); } private GetTermVersionResponse buildResponse(GetTermVersionRequest request, ClusterState state) { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 52db55b784996..73850df3bbac0 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -386,6 +386,8 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionList coordinationState.get().handleCommit(applyCommitRequest); final ClusterState committedState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState()); applierState = mode == Mode.CANDIDATE ? clusterStateWithNoClusterManagerBlock(committedState) : committedState; + setStateOnApplier(applierState); + if (applyCommitRequest.getSourceNode().equals(getLocalNode())) { // cluster-manager node applies the committed state at the end of the publication process, not here. applyListener.onResponse(null); @@ -464,8 +466,6 @@ && getCurrentTerm() == ZEN1_BWC_TERM ensureTermAtLeast(sourceNode, publishRequest.getAcceptedState().term()); final PublishResponse publishResponse = coordinationState.get().handlePublishRequest(publishRequest); - setStateOnApplier(coordinationState.get().getLastAcceptedState()); - if (sourceNode.equals(getLocalNode())) { preVoteCollector.update(getPreVoteResponse(), getLocalNode()); } else { @@ -480,11 +480,7 @@ && getCurrentTerm() == ZEN1_BWC_TERM } private void setStateOnApplier(ClusterState clusterState) { - ClusterState publishState = hideStateIfNotRecovered(clusterState); - final ClusterState publishClusterState = mode == Mode.CANDIDATE - ? clusterStateWithNoClusterManagerBlock(publishState) - : publishState; - clusterApplier.setPublishState(publishClusterState); + clusterApplier.setCommitState(clusterState); } private static Optional joinWithDestination(Optional lastJoin, DiscoveryNode leader, long term) { diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterApplier.java b/server/src/main/java/org/opensearch/cluster/service/ClusterApplier.java index 2d95811b8c4da..62a38086fd194 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterApplier.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterApplier.java @@ -50,10 +50,10 @@ public interface ClusterApplier { void setInitialState(ClusterState initialState); /** - * Sets the publish state for the applier - * @param clusterState state published by cluster-manager + * Sets the committed state for the applier. + * @param clusterState state that has been committed by cluster-manager */ - void setPublishState(ClusterState clusterState); + void setCommitState(ClusterState clusterState); /** * Method to invoke when a new cluster state is available to be applied diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java index b12501599e26c..745336926c747 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java @@ -119,7 +119,7 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements private final Collection clusterStateListeners = new CopyOnWriteArrayList<>(); private final Map timeoutClusterStateListeners = new ConcurrentHashMap<>(); - private final AtomicReference publishState = new AtomicReference<>(); // last published state + private final AtomicReference preApplyState = new AtomicReference<>(); // last committed state which is yet to be applied private final AtomicReference state; // last applied state private final String nodeName; @@ -170,8 +170,8 @@ public void setInitialState(ClusterState initialState) { } @Override - public void setPublishState(ClusterState clusterState) { - publishState.set(clusterState); + public void setCommitState(ClusterState clusterState) { + preApplyState.set(clusterState); } @Override @@ -238,8 +238,8 @@ public ClusterState state() { return clusterState; } - public ClusterState publishState() { - return publishState.get(); + public ClusterState commitState() { + return preApplyState.get(); } /** diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index 31145b5a8fc21..127c217154485 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -183,8 +183,8 @@ public ClusterState state() { return clusterApplierService.state(); } - public ClusterState publishState() { - return clusterApplierService.publishState(); + public ClusterState commitState() { + return clusterApplierService.commitState(); } /** diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/term/ClusterTermVersionIT.java b/server/src/test/java/org/opensearch/action/support/clustermanager/term/ClusterTermVersionIT.java index 7b783e025a575..6ac83c195d858 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/term/ClusterTermVersionIT.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/term/ClusterTermVersionIT.java @@ -11,9 +11,17 @@ import org.opensearch.action.admin.cluster.state.ClusterStateAction; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.support.ActiveShardCount; +import org.opensearch.client.Client; +import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.coordination.ClusterStateTermVersion; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.mapper.MapperService; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.transport.MockTransportService; @@ -22,6 +30,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.Matchers.is; @@ -100,12 +109,92 @@ public void testDatanodeOutOfSync() throws Exception { assertThat(stateResponse.getState().nodes().getSize(), is(internalCluster().getNodeNames().length)); } + public void testDatanodeWithSlowClusterApplier() throws Exception { + List masters = internalCluster().startClusterManagerOnlyNodes(3); + List datas = internalCluster().startDataOnlyNodes(3); + + Map callCounters = Map.ofEntries( + Map.entry(ClusterStateAction.NAME, new AtomicInteger()), + Map.entry(GetTermVersionAction.NAME, new AtomicInteger()) + ); + ensureGreen(); + + String master = internalCluster().getClusterManagerName(); + + // stubClusterTermResponse(master); + addCallCountInterceptor(master, callCounters); + + AtomicBoolean latch = new AtomicBoolean(); + ClusterService cmClsSerive = internalCluster().getInstance(ClusterService.class, datas.get(0)); + cmClsSerive.addStateApplier(new ClusterStateApplier() { + @Override + public void applyClusterState(ClusterChangedEvent event) { + + if (!latch.get()) { + return; + } + try { + System.out.println("sleep start"); + Thread.sleep(60000); + System.out.println("sleep end"); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + + String index = "index_1"; + latch.set(true); + prepareCreate(index).setWaitForActiveShards(ActiveShardCount.NONE) + .setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE) + .build() + ) + .get(); + + Thread.sleep(5000); + latch.set(false); + + { + ClusterStateResponse stateResponseM = internalCluster().getInstance(Client.class, master) + .admin() + .cluster() + .state(new ClusterStateRequest()) + .actionGet(); + + System.out.println("master is still lagging behind"); + + System.out.println(stateResponseM.getState().term()); + System.out.println(stateResponseM.getState().version()); + } + + ClusterStateResponse stateResponseD = internalCluster().getInstance(Client.class, datas.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest()) + .actionGet(); + System.out.println("data has latest"); + System.out.println(stateResponseD.getState().term()); + System.out.println(stateResponseD.getState().version()); + + AtomicInteger clusterStateCallsOnMaster = callCounters.get(ClusterStateAction.NAME); + AtomicInteger termCallsOnMaster = callCounters.get(GetTermVersionAction.NAME); + + assertThat(clusterStateCallsOnMaster.get(), is(0)); + assertThat(termCallsOnMaster.get(), is(1)); + + } + private void addCallCountInterceptor(String nodeName, Map callCounters) { MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeName); for (var ctrEnty : callCounters.entrySet()) { primaryService.addRequestHandlingBehavior(ctrEnty.getKey(), (handler, request, channel, task) -> { ctrEnty.getValue().incrementAndGet(); - logger.info("--> {} response redirect", ClusterStateAction.NAME); + logger.info("--> {} response redirect", ctrEnty.getKey()); handler.messageReceived(request, channel, task); }); } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/NoOpClusterApplier.java b/server/src/test/java/org/opensearch/cluster/coordination/NoOpClusterApplier.java index e327cb19bdcab..0d3196d53dfcf 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/NoOpClusterApplier.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/NoOpClusterApplier.java @@ -43,7 +43,7 @@ public void setInitialState(ClusterState initialState) { } @Override - public void setPublishState(ClusterState clusterState) { + public void setCommitState(ClusterState clusterState) { }