diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index d62dbdc2cc173..081f8150d8c8e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -38,8 +38,8 @@ public interface ClusterStateTaskExecutor { * already have become master and updated the state in a way that would be inconsistent with the response that {@code N} sends back to * clients. * - * @return The resulting cluster state after executing all the tasks. If {code batchExecutionContext.initialState()} is returned then no - * update is published. + * @return The resulting cluster state after executing all the tasks. If {@code batchExecutionContext.initialState()} is returned then + * no update is published. */ ClusterState execute(BatchExecutionContext batchExecutionContext) throws Exception; diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java index bb0ffe0ee1c8d..91280c4da40b6 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayService.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayService.java @@ -31,9 +31,11 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.atomic.AtomicBoolean; @@ -80,9 +82,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste private final TimeValue recoverAfterTime; private final int recoverAfterDataNodes; private final int expectedDataNodes; - - private final AtomicBoolean recoveryInProgress = new AtomicBoolean(); - private final AtomicBoolean scheduledRecovery = new AtomicBoolean(); + volatile PendingStateRecovery currentPendingStateRecovery; @Inject public GatewayService( @@ -131,8 +131,9 @@ public void clusterChanged(final ClusterChangedEvent event) { } final ClusterState state = event.state(); + final DiscoveryNodes nodes = state.nodes(); - if (state.nodes().isLocalNodeElectedMaster() == false) { + if (nodes.isLocalNodeElectedMaster() == false) { // not our job to recover return; } @@ -141,83 +142,153 @@ public void clusterChanged(final ClusterChangedEvent event) { return; } - final DiscoveryNodes nodes = state.nodes(); - if (state.nodes().getMasterNodeId() == null) { - logger.debug("not recovering from gateway, no master elected yet"); - } else if (recoverAfterDataNodes != -1 && nodes.getDataNodes().size() < recoverAfterDataNodes) { - logger.debug( - "not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]", - nodes.getDataNodes().size(), - recoverAfterDataNodes - ); - } else { - boolean enforceRecoverAfterTime; - String reason; - if (expectedDataNodes == -1) { - // no expected is set, honor recover_after_data_nodes - enforceRecoverAfterTime = true; - reason = "recover_after_time was set to [" + recoverAfterTime + "]"; - } else if (expectedDataNodes <= nodes.getDataNodes().size()) { - // expected is set and satisfied so recover immediately - enforceRecoverAfterTime = false; - reason = ""; + // At this point, we know the state is not recovered and this node is qualified for state recovery + // But we still need to check whether a previous one is running already + final long currentTerm = state.term(); + final PendingStateRecovery existingPendingStateRecovery = currentPendingStateRecovery; + + // Always start a new state recovery if the master term changes + // If there is a previous one still waiting, both will probably run but at most one of them will + // actually make changes to cluster state because either: + // 1. The previous recovers the cluster state and the current one will be skipped + // 2. The previous one sees a new cluster term and skips its own execution + if (existingPendingStateRecovery == null || existingPendingStateRecovery.expectedTerm < currentTerm) { + currentPendingStateRecovery = new PendingStateRecovery(currentTerm); + } + currentPendingStateRecovery.onDataNodeSize(nodes.getDataNodes().size()); + } + + /** + * This class manages the cluster state recovery behaviours. It has two major scenarios depending + * on whether {@code recoverAfterDataNodes} is configured. + * + *

When {@code recoverAfterDataNodes} is configured: + *

    + *
  1. Nothing can happen until it is reached + *
  2. When {@code recoverAfterDataNodes} is reached, the cluster either: + *
      + *
    • Recover immediately when {@code expectedDataNodes} is reached or + * both {@code expectedDataNodes} and {@code recoverAfterTime} are not configured + *
    • Or schedule a recovery with a delay of {@code recoverAfterTime} + *
    + *
  3. The scheduled recovery can be cancelled if {@code recoverAfterDataNodes} drops below required number + * before the recovery can happen. When this happens, the process goes back to the beginning (step 1). + *
  4. The recovery is scheduled only once each time {@code recoverAfterDataNodes} crosses the required number + *
+ * + *

When {@code recoverAfterDataNodes} is Not configured, the cluster either: + *

+ */ + class PendingStateRecovery { + private final long expectedTerm; + @Nullable + private Scheduler.ScheduledCancellable scheduledRecovery; + private final AtomicBoolean taskSubmitted = new AtomicBoolean(); + + PendingStateRecovery(long expectedTerm) { + this.expectedTerm = expectedTerm; + } + + void onDataNodeSize(int currentDataNodeSize) { + if (recoverAfterDataNodes != -1 && currentDataNodeSize < recoverAfterDataNodes) { + logger.debug( + "not recovering from gateway, nodes_size (data) [{}] < recover_after_data_nodes [{}]", + currentDataNodeSize, + recoverAfterDataNodes + ); + cancelScheduledRecovery(); } else { - // expected is set but not satisfied so wait until it is satisfied or times out - enforceRecoverAfterTime = true; - reason = "expecting [" + expectedDataNodes + "] data nodes, but only have [" + nodes.getDataNodes().size() + "]"; + maybePerformOrScheduleRecovery(currentDataNodeSize); } - performStateRecovery(enforceRecoverAfterTime, reason); } - } - private void performStateRecovery(final boolean enforceRecoverAfterTime, final String reason) { - if (enforceRecoverAfterTime && recoverAfterTime != null) { - if (scheduledRecovery.compareAndSet(false, true)) { - logger.info("delaying initial state recovery for [{}]. {}", recoverAfterTime, reason); - threadPool.schedule(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - logger.warn("delayed state recovery failed", e); - resetRecoveredFlags(); - } - - @Override - protected void doRun() { - if (recoveryInProgress.compareAndSet(false, true)) { - logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime); - runRecovery(); + void maybePerformOrScheduleRecovery(int currentDataNodeSize) { + if (expectedDataNodes != -1 && expectedDataNodes <= currentDataNodeSize) { + logger.debug( + "performing state recovery of term [{}], expected data nodes [{}] is reached", + expectedTerm, + expectedDataNodes + ); + cancelScheduledRecovery(); + runRecoveryImmediately(); + } else if (recoverAfterTime == null) { + logger.debug("performing state recovery of term [{}], no delay time is configured", expectedTerm); + cancelScheduledRecovery(); + runRecoveryImmediately(); + } else { + if (scheduledRecovery == null) { + logger.info( + "delaying initial state recovery for [{}] of term [{}]. expecting [{}] data nodes, but only have [{}]", + recoverAfterTime, + expectedTerm, + expectedDataNodes, + currentDataNodeSize + ); + scheduledRecovery = threadPool.schedule(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("delayed state recovery of term [" + expectedTerm + "] failed", e); } - } - }, recoverAfterTime, threadPool.generic()); - } - } else { - if (recoveryInProgress.compareAndSet(false, true)) { - try { - logger.debug("performing state recovery..."); - runRecovery(); - } catch (Exception e) { - logger.warn("state recovery failed", e); - resetRecoveredFlags(); + + @Override + protected void doRun() { + final PendingStateRecovery existingPendingStateRecovery = currentPendingStateRecovery; + if (PendingStateRecovery.this == existingPendingStateRecovery) { + runRecoveryImmediately(); + } else { + logger.debug( + "skip scheduled state recovery since a new one of term [{}] has started", + existingPendingStateRecovery.expectedTerm + ); + } + } + }, recoverAfterTime, threadPool.generic()); + } else { + logger.debug("state recovery is in already scheduled for term [{}]", expectedTerm); } } } - } - private void resetRecoveredFlags() { - recoveryInProgress.set(false); - scheduledRecovery.set(false); + void runRecoveryImmediately() { + if (taskSubmitted.compareAndSet(false, true)) { + submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask(expectedTerm)); + } else { + logger.debug("state recovery task is already submitted"); + } + } + + void cancelScheduledRecovery() { + if (scheduledRecovery != null) { + scheduledRecovery.cancel(); + scheduledRecovery = null; + } + } } private static final String TASK_SOURCE = "local-gateway-elected-state"; class RecoverStateUpdateTask extends ClusterStateUpdateTask { + private final long expectedTerm; + + RecoverStateUpdateTask(long expectedTerm) { + this.expectedTerm = expectedTerm; + } + @Override public ClusterState execute(final ClusterState currentState) { if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { logger.debug("cluster is already recovered"); return currentState; } + if (expectedTerm != currentState.term()) { + logger.debug("skip state recovery since current term [{}] != expected term [{}]", currentState.term(), expectedTerm); + return currentState; + } return ClusterStateUpdaters.removeStateNotRecoveredBlock( ClusterStateUpdaters.updateRoutingTable(currentState, shardRoutingRoleStrategy) ); @@ -228,7 +299,6 @@ public void clusterStateProcessed(final ClusterState oldState, final ClusterStat logger.info("recovered [{}] indices into cluster_state", newState.metadata().indices().size()); // reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a // not-recovered state, that we again do another state recovery. - resetRecoveredFlags(); rerouteService.reroute("state recovered", Priority.NORMAL, ActionListener.noop()); } @@ -239,7 +309,6 @@ public void onFailure(final Exception e) { () -> "unexpected failure during [" + TASK_SOURCE + "]", e ); - resetRecoveredFlags(); } } @@ -248,10 +317,6 @@ TimeValue recoverAfterTime() { return recoverAfterTime; } - private void runRecovery() { - submitUnbatchedTask(TASK_SOURCE, new RecoverStateUpdateTask()); - } - @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) { clusterService.submitUnbatchedStateUpdateTask(source, task); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java index e17522cd1efef..b4296ae684840 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayServiceTests.java @@ -8,77 +8,138 @@ package org.elasticsearch.gateway; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.TestShardRoutingRoleStrategies; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.coordination.CoordinationMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.cluster.routing.ShardRoutingRoleStrategy; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.cluster.service.FakeThreadPoolMasterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.core.Tuple; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; -import org.hamcrest.Matchers; +import org.junit.Before; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings; +import static org.elasticsearch.gateway.GatewayService.EXPECTED_DATA_NODES_SETTING; +import static org.elasticsearch.gateway.GatewayService.RECOVER_AFTER_DATA_NODES_SETTING; +import static org.elasticsearch.gateway.GatewayService.RECOVER_AFTER_TIME_SETTING; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; -import static org.elasticsearch.test.NodeRoles.masterNode; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class GatewayServiceTests extends ESTestCase { - private GatewayService createService(final Settings.Builder settings) { - final ClusterService clusterService = new ClusterService( - Settings.builder().put("cluster.name", "GatewayServiceTests").build(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - null, - (TaskManager) null - ); - return new GatewayService( - settings.build(), - (reason, priority, listener) -> fail("should not reroute"), + private DeterministicTaskQueue deterministicTaskQueue; + private AtomicInteger rerouteCount; + private String dataNodeIdPrefix; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + deterministicTaskQueue = new DeterministicTaskQueue(); + assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(0L)); + rerouteCount = new AtomicInteger(); + dataNodeIdPrefix = randomAlphaOfLength(10) + "-"; + } + + private GatewayService createGatewayService(final Settings.Builder settingsBuilder, final ClusterState initialState) { + return createGatewayService(createClusterService(settingsBuilder, initialState)); + } + + private GatewayService createGatewayService(final ClusterService clusterService) { + final RerouteService rerouteService = (reason, priority, listener) -> { + rerouteCount.incrementAndGet(); + listener.onResponse(null); + }; + + final var gatewayService = new GatewayService( + clusterService.getSettings(), + rerouteService, clusterService, - TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY, - null + ShardRoutingRoleStrategy.NO_SHARD_CREATION, + clusterService.threadPool() + ); + + gatewayService.start(); + return gatewayService; + } + + private ClusterService createClusterService(final Settings.Builder settingsBuilder, final ClusterState initialState) { + final var threadPool = deterministicTaskQueue.getThreadPool(); + final var settings = settingsBuilder.build(); + final var clusterSettings = createBuiltInClusterSettings(settings); + + final var clusterService = new ClusterService( + settings, + clusterSettings, + new FakeThreadPoolMasterService(initialState.nodes().getLocalNodeId(), threadPool, deterministicTaskQueue::scheduleNow), + new ClusterApplierService(initialState.nodes().getLocalNodeId(), settings, clusterSettings, threadPool) { + @Override + protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { + return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor(); + } + } ); + + clusterService.getClusterApplierService().setInitialState(initialState); + clusterService.setNodeConnectionsService(ClusterServiceUtils.createNoOpNodeConnectionsService()); + clusterService.getMasterService() + .setClusterStatePublisher(ClusterServiceUtils.createClusterStatePublisher(clusterService.getClusterApplierService())); + clusterService.getMasterService().setClusterStateSupplier(clusterService.getClusterApplierService()::state); + clusterService.start(); + return clusterService; } public void testDefaultRecoverAfterTime() { // check that the default is not set - GatewayService service = createService(Settings.builder()); + final ClusterState initialState = buildClusterState(1, 1); + GatewayService service = createGatewayService(Settings.builder(), initialState); assertNull(service.recoverAfterTime()); // ensure default is set when setting expected_data_nodes - service = createService(Settings.builder().put("gateway.expected_data_nodes", 1)); - assertThat(service.recoverAfterTime(), Matchers.equalTo(GatewayService.DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET)); + service = createGatewayService(Settings.builder().put("gateway.expected_data_nodes", 1), initialState); + assertThat(service.recoverAfterTime(), equalTo(GatewayService.DEFAULT_RECOVER_AFTER_TIME_IF_EXPECTED_NODES_IS_SET)); // ensure settings override default final TimeValue timeValue = TimeValue.timeValueHours(3); // ensure default is set when setting expected_nodes - service = createService(Settings.builder().put("gateway.recover_after_time", timeValue.toString())); - assertThat(service.recoverAfterTime().millis(), Matchers.equalTo(timeValue.millis())); + service = createGatewayService(Settings.builder().put("gateway.recover_after_time", timeValue.toString()), initialState); + assertThat(service.recoverAfterTime().millis(), equalTo(timeValue.millis())); } public void testRecoverStateUpdateTask() throws Exception { - GatewayService service = createService(Settings.builder()); - ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask(); - String nodeId = randomAlphaOfLength(10); - DiscoveryNode masterNode = DiscoveryNodeUtils.builder(nodeId) - .applySettings(settings(IndexVersion.current()).put(masterNode()).build()) - .address(new TransportAddress(TransportAddress.META_ADDRESS, 9300)) - .build(); - ClusterState stateWithBlock = ClusterState.builder(ClusterName.DEFAULT) - .nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build()) - .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK).build()) - .build(); + final long expectedTerm = randomLongBetween(1, 42); + ClusterState stateWithBlock = buildClusterState(1, expectedTerm); + GatewayService service = createGatewayService(Settings.builder(), stateWithBlock); + ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask(expectedTerm); ClusterState recoveredState = clusterStateUpdateTask.execute(stateWithBlock); assertNotEquals(recoveredState, stateWithBlock); @@ -88,4 +149,363 @@ public void testRecoverStateUpdateTask() throws Exception { assertSame(recoveredState, clusterState); } + public void testRecoveryWillAbortIfExpectedTermDoesNotMatch() throws Exception { + final long expectedTerm = randomLongBetween(1, 42); + final ClusterState stateWithBlock = buildClusterState(1, randomLongBetween(43, 99)); + final GatewayService service = createGatewayService(Settings.builder(), stateWithBlock); + final ClusterStateUpdateTask clusterStateUpdateTask = service.new RecoverStateUpdateTask(expectedTerm); + + final ClusterState recoveredState = clusterStateUpdateTask.execute(stateWithBlock); + assertSame(recoveredState, stateWithBlock); + } + + public void testNoActionWhenNodeIsNotMaster() { + final String localNodeId = dataNodeIdPrefix + "0"; + final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder() + .localNodeId(localNodeId) + .add(DiscoveryNodeUtils.create(localNodeId)); + if (randomBoolean()) { + final String masterNodeId = dataNodeIdPrefix + "1"; + nodesBuilder.masterNodeId(masterNodeId).add(DiscoveryNodeUtils.create(masterNodeId)); + } + + final ClusterState initialState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(nodesBuilder.build()) + .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK).build()) + .build(); + + final ClusterChangedEvent clusterChangedEvent = mock(ClusterChangedEvent.class); + when(clusterChangedEvent.state()).thenReturn(initialState); + + final GatewayService gatewayService = createGatewayService(Settings.builder(), initialState); + gatewayService.clusterChanged(clusterChangedEvent); + assertThat(deterministicTaskQueue.hasAnyTasks(), is(false)); + assertThat(gatewayService.currentPendingStateRecovery, nullValue()); + } + + public void testNoActionWhenStateIsAlreadyRecovered() { + final ClusterService clusterService = createClusterService( + Settings.builder() + .put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), 2) + .put(GatewayService.EXPECTED_DATA_NODES_SETTING.getKey(), 4) + .put(GatewayService.RECOVER_AFTER_TIME_SETTING.getKey(), TimeValue.timeValueMinutes(10)), + ClusterState.builder(buildClusterState(2, randomIntBetween(1, 42))).blocks(ClusterBlocks.builder()).build() + ); + final GatewayService gatewayService = createGatewayService(clusterService); + assertClusterStateBlocks(clusterService, false); + assertThat(rerouteCount.get(), equalTo(0)); + + final var taskQueue = createSetDataNodeCountTaskQueue(clusterService); + final int newDataNodeCount = randomIntBetween(1, 5); + taskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(newDataNodeCount), null); + deterministicTaskQueue.runAllTasksInTimeOrder(); + + assertClusterStateBlocks(clusterService, false); + assertThat(rerouteCount.get(), equalTo(0)); + assertThat(gatewayService.currentPendingStateRecovery, nullValue()); + assertThat(clusterService.state().nodes().getDataNodes().size(), equalTo(newDataNodeCount)); + } + + public void testImmediateRecovery() { + final Settings.Builder settingsBuilder = Settings.builder(); + final int expectedNumberOfDataNodes = randomIntBetween(1, 3); + // The cluster recover immediately because it either has the required expectedDataNodes + // or both expectedDataNodes and recoverAfterTime are not configured + if (randomBoolean()) { + settingsBuilder.put(EXPECTED_DATA_NODES_SETTING.getKey(), expectedNumberOfDataNodes); + } + + final ClusterState initialState = buildClusterState(expectedNumberOfDataNodes, 0); + final ClusterService clusterService = createClusterService(settingsBuilder, initialState); + final GatewayService gatewayService = createGatewayService(clusterService); + assertClusterStateBlocks(clusterService, true); + assertThat(rerouteCount.get(), equalTo(0)); + + // Recover immediately + final var setClusterStateTaskQueue = createSetClusterStateTaskQueue(clusterService); + final ClusterState clusterStateOfTerm1 = incrementTerm(initialState); + setClusterStateTaskQueue.submitTask(randomAlphaOfLength(5), new SetClusterStateTask(clusterStateOfTerm1), null); + assertThat(deterministicTaskQueue.hasRunnableTasks(), is(true)); + deterministicTaskQueue.runAllRunnableTasks(); + assertClusterStateBlocks(clusterService, false); + assertThat(rerouteCount.get(), equalTo(1)); + final var pendingStateRecoveryOfTerm1 = gatewayService.currentPendingStateRecovery; + assertThat(pendingStateRecoveryOfTerm1, notNullValue()); + + // Will *not* run recover again for the same term + setClusterStateTaskQueue.submitTask(randomAlphaOfLength(5), new SetClusterStateTask(clusterStateOfTerm1), null); + deterministicTaskQueue.runAllRunnableTasks(); + assertThat(deterministicTaskQueue.hasAnyTasks(), is(false)); + assertThat(rerouteCount.get(), equalTo(1)); + assertClusterStateBlocks(clusterService, true); + assertThat(gatewayService.currentPendingStateRecovery, sameInstance(pendingStateRecoveryOfTerm1)); + + // Will run recover again for a newer term + final ClusterState clusterStateOfTerm2 = ClusterState.builder(initialState) + .metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(2).build()).build()) + .build(); + setClusterStateTaskQueue.submitTask(randomAlphaOfLength(5), new SetClusterStateTask(clusterStateOfTerm2), null); + assertThat(deterministicTaskQueue.hasRunnableTasks(), is(true)); + deterministicTaskQueue.runAllRunnableTasks(); + assertClusterStateBlocks(clusterService, false); + assertThat(rerouteCount.get(), equalTo(2)); + assertThat(gatewayService.currentPendingStateRecovery, not(sameInstance(pendingStateRecoveryOfTerm1))); + + // Never ran any scheduled task since recovery is immediate + assertThat(deterministicTaskQueue.hasDeferredTasks(), is(false)); + assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(0L)); + } + + public void testScheduledRecovery() { + final var hasRecoverAfterTime = randomBoolean(); + final ClusterService clusterService = createServicesTupleForScheduledRecovery(randomIntBetween(2, 5), hasRecoverAfterTime).v1(); + + // Recover when the scheduled recovery is ready to run + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertClusterStateBlocks(clusterService, false); + assertThat(rerouteCount.get(), equalTo(1)); + assertTimeElapsed(TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis()); + } + + public void testScheduledRecoveryCancelledWhenClusterCanRecoverImmediately() { + final var expectedNumberOfDataNodes = randomIntBetween(2, 5); + final boolean hasRecoverAfterTime = randomBoolean(); + final var servicesTuple = createServicesTupleForScheduledRecovery(expectedNumberOfDataNodes, hasRecoverAfterTime); + final ClusterService clusterService = servicesTuple.v1(); + final GatewayService gatewayService = servicesTuple.v2(); + final var pendingStateRecoveryOfTerm1 = gatewayService.currentPendingStateRecovery; + + // The 1st schedule is cancelled when the cluster has enough nodes + final var setDataNodeCountTaskQueue = createSetDataNodeCountTaskQueue(clusterService); + setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(expectedNumberOfDataNodes), null); + deterministicTaskQueue.runAllRunnableTasks(); + assertClusterStateBlocks(clusterService, false); + assertThat(rerouteCount.get(), equalTo(1)); + assertThat(gatewayService.currentPendingStateRecovery, sameInstance(pendingStateRecoveryOfTerm1)); + assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(0L)); + // Cancelled scheduled recovery is a no-op + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertThat(rerouteCount.get(), equalTo(1)); + assertTimeElapsed(TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis()); + } + + public void testScheduledRecoveryNoOpWhenNewTermBegins() { + final var hasRecoverAfterTime = randomBoolean(); + final var servicesTuple = createServicesTupleForScheduledRecovery(randomIntBetween(2, 5), hasRecoverAfterTime); + final ClusterService clusterService = servicesTuple.v1(); + final GatewayService gatewayService = servicesTuple.v2(); + final var setClusterStateTaskQueue = createSetClusterStateTaskQueue(clusterService); + final var pendingStateRecoveryOfTerm1 = gatewayService.currentPendingStateRecovery; + + // The 1st schedule is effectively cancelled if a new term begins + final TimeValue elapsed = TimeValue.timeValueMinutes(1); + final ClusterState clusterStateOfTerm2 = incrementTerm(clusterService.state()); + deterministicTaskQueue.scheduleAt( + elapsed.millis(), + () -> setClusterStateTaskQueue.submitTask(randomAlphaOfLength(5), new SetClusterStateTask(clusterStateOfTerm2), null) + ); + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + assertThat(gatewayService.currentPendingStateRecovery, not(sameInstance(pendingStateRecoveryOfTerm1))); + // The 1st scheduled recovery is now a no-op + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + assertThat( + deterministicTaskQueue.getCurrentTimeMillis(), + equalTo(TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis()) + ); + assertClusterStateBlocks(clusterService, true); + assertThat(rerouteCount.get(), equalTo(0)); + // The 2nd schedule will perform the recovery + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertClusterStateBlocks(clusterService, false); + assertThat(rerouteCount.get(), equalTo(1)); + assertTimeElapsed(elapsed.millis() + TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis()); + } + + private Tuple createServicesTupleForScheduledRecovery( + int expectedNumberOfDataNodes, + boolean hasRecoverAfterTime + ) { + final Settings.Builder settingsBuilder = Settings.builder(); + settingsBuilder.put(EXPECTED_DATA_NODES_SETTING.getKey(), expectedNumberOfDataNodes); + if (hasRecoverAfterTime) { + settingsBuilder.put(RECOVER_AFTER_TIME_SETTING.getKey(), TimeValue.timeValueMinutes(10)); + } + final ClusterState initialState = buildClusterState(1, 0); + final ClusterService clusterService = createClusterService(settingsBuilder, initialState); + final GatewayService gatewayService = createGatewayService(clusterService); + assertClusterStateBlocks(clusterService, true); + + final ClusterState clusterStateOfTerm1 = incrementTerm(initialState); + final var setClusterStateTaskQueue = createSetClusterStateTaskQueue(clusterService); + setClusterStateTaskQueue.submitTask(randomAlphaOfLength(5), new SetClusterStateTask(clusterStateOfTerm1), null); + deterministicTaskQueue.runAllRunnableTasks(); // publish cluster state term change + // recovery is scheduled but has not run yet + assertThat(deterministicTaskQueue.hasDeferredTasks(), is(true)); + assertClusterStateBlocks(clusterService, true); + assertThat(rerouteCount.get(), equalTo(0)); + final GatewayService.PendingStateRecovery pendingStateRecoveryOfInitialTerm = gatewayService.currentPendingStateRecovery; + assertThat(pendingStateRecoveryOfInitialTerm, notNullValue()); + return new Tuple<>(clusterService, gatewayService); + } + + public void testScheduledRecoveryWithRecoverAfterNodes() { + final Settings.Builder settingsBuilder = Settings.builder(); + final int expectedNumberOfDataNodes = randomIntBetween(4, 6); + final boolean hasRecoverAfterTime = randomBoolean(); + if (hasRecoverAfterTime) { + settingsBuilder.put(RECOVER_AFTER_TIME_SETTING.getKey(), TimeValue.timeValueMinutes(10)); + } else { + settingsBuilder.put(EXPECTED_DATA_NODES_SETTING.getKey(), expectedNumberOfDataNodes); + } + final int recoverAfterNodes = expectedNumberOfDataNodes - 1; + settingsBuilder.put(RECOVER_AFTER_DATA_NODES_SETTING.getKey(), recoverAfterNodes); + + final ClusterState initialState = buildClusterState(1, 1); + final ClusterService clusterService = createClusterService(settingsBuilder, initialState); + final GatewayService gatewayService = createGatewayService(clusterService); + assertClusterStateBlocks(clusterService, true); + + // Not recover because recoverAfterDataNodes not met + final var setDataNodeCountTaskQueue = createSetDataNodeCountTaskQueue(clusterService); + setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes - 1), null); + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(0L)); + assertClusterStateBlocks(clusterService, true); + final var pendingStateRecoveryOfInitialTerm = gatewayService.currentPendingStateRecovery; + assertThat(pendingStateRecoveryOfInitialTerm, notNullValue()); + + // The 1st scheduled recovery when recoverAfterDataNodes is met + setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes), null); + deterministicTaskQueue.runAllRunnableTasks(); + assertThat(deterministicTaskQueue.hasDeferredTasks(), is(true)); + assertThat(gatewayService.currentPendingStateRecovery, sameInstance(pendingStateRecoveryOfInitialTerm)); + + // The 1st schedule is cancelled when data nodes drop below recoverAfterDataNodes + final TimeValue elapsed = TimeValue.timeValueMinutes(1); + deterministicTaskQueue.scheduleAt( + elapsed.millis(), + () -> setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes - 1), null) + ); + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + + // The 2nd scheduled recovery when data nodes are above recoverAfterDataNodes again + deterministicTaskQueue.scheduleAt( + elapsed.millis() * 2, + () -> setDataNodeCountTaskQueue.submitTask(randomAlphaOfLength(5), new SetDataNodeCountTask(recoverAfterNodes), null) + ); + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + + assertThat(gatewayService.currentPendingStateRecovery, sameInstance(pendingStateRecoveryOfInitialTerm)); + assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(elapsed.millis() * 2)); + + // The 1st scheduled recovery is now a no-op since it is cancelled + deterministicTaskQueue.advanceTime(); + deterministicTaskQueue.runAllRunnableTasks(); + assertThat( + deterministicTaskQueue.getCurrentTimeMillis(), + equalTo(TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis()) + ); + assertClusterStateBlocks(clusterService, true); + assertThat(rerouteCount.get(), equalTo(0)); + + // The 2nd scheduled recovery will recover the state + deterministicTaskQueue.runAllTasksInTimeOrder(); + assertClusterStateBlocks(clusterService, false); + assertThat(rerouteCount.get(), equalTo(1)); + assertTimeElapsed(elapsed.millis() * 2 + TimeValue.timeValueMinutes(hasRecoverAfterTime ? 10 : 5).millis()); + } + + private void assertClusterStateBlocks(ClusterService clusterService, boolean isBlocked) { + assertThat(clusterService.state().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), is(isBlocked)); + } + + private void assertTimeElapsed(long elapsedInMillis) { + assertThat(deterministicTaskQueue.getCurrentTimeMillis(), equalTo(elapsedInMillis)); + } + + private ClusterState buildClusterState(int numberOfNodes, long term) { + assert numberOfNodes >= 1; + final String localNodeId = dataNodeIdPrefix + "0"; + final DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder() + .localNodeId(localNodeId) + .masterNodeId(localNodeId) + .add(DiscoveryNodeUtils.create(localNodeId)); + for (int i = 1; i < numberOfNodes; i++) { + discoveryNodesBuilder.add(DiscoveryNodeUtils.create(dataNodeIdPrefix + i)); + } + + final ClusterState stateWithBlock = ClusterState.builder(ClusterName.DEFAULT) + .nodes(discoveryNodesBuilder.build()) + .metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(term).build()).build()) + .blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK).build()) + .build(); + return stateWithBlock; + } + + private static ClusterState incrementTerm(ClusterState initialState) { + return ClusterState.builder(initialState) + .metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(initialState.term() + 1).build()).build()) + .build(); + } + + record SetDataNodeCountTask(int dataNodeCount) implements ClusterStateTaskListener { + SetDataNodeCountTask(int dataNodeCount) { + assertThat(dataNodeCount, greaterThanOrEqualTo(1)); + this.dataNodeCount = dataNodeCount; + } + + @Override + public void onFailure(Exception e) { + fail(e, "unexpected failure"); + } + } + + private MasterServiceTaskQueue createSetDataNodeCountTaskQueue(ClusterService clusterService) { + return clusterService.createTaskQueue("set-data-node-count", Priority.NORMAL, batchExecutionContext -> { + final ClusterState initialState = batchExecutionContext.initialState(); + final DiscoveryNodes initialNodes = initialState.nodes(); + final int initialDataNodeCount = initialNodes.getDataNodes().size(); + int targetDataNodeCount = initialDataNodeCount; + for (var taskContext : batchExecutionContext.taskContexts()) { + targetDataNodeCount = taskContext.getTask().dataNodeCount(); + taskContext.success(() -> {}); + } + if (targetDataNodeCount == initialDataNodeCount) { + return initialState; + } + + final DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(initialNodes); + for (int i = initialDataNodeCount; i < targetDataNodeCount; i++) { + nodesBuilder.add(DiscoveryNodeUtils.create(dataNodeIdPrefix + i)); + } + for (int i = targetDataNodeCount; i < initialDataNodeCount; i++) { + nodesBuilder.remove(dataNodeIdPrefix + i); + } + final DiscoveryNodes targetNodes = nodesBuilder.build(); + assertThat(targetNodes.getDataNodes().size(), equalTo(targetDataNodeCount)); + return ClusterState.builder(initialState).nodes(targetNodes).build(); + }); + } + + record SetClusterStateTask(ClusterState clusterState) implements ClusterStateTaskListener { + @Override + public void onFailure(Exception e) { + fail(e, "unexpected failure"); + } + } + + private MasterServiceTaskQueue createSetClusterStateTaskQueue(ClusterService clusterService) { + return clusterService.createTaskQueue("set-cluster-state", Priority.NORMAL, batchExecutionContext -> { + ClusterState targetState = batchExecutionContext.initialState(); + for (var taskContext : batchExecutionContext.taskContexts()) { + targetState = taskContext.getTask().clusterState(); + taskContext.success(() -> {}); + } + return targetState; + }); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java index 96b3510ed6afe..81a419508dbee 100644 --- a/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java +++ b/test/framework/src/main/java/org/elasticsearch/common/util/concurrent/DeterministicTaskQueue.java @@ -82,7 +82,7 @@ public void runAllRunnableTasks() { } public void runAllTasks() { - while (hasDeferredTasks() || hasRunnableTasks()) { + while (hasAnyTasks()) { if (hasDeferredTasks() && random.nextBoolean()) { advanceTime(); } else if (hasRunnableTasks()) { @@ -92,7 +92,7 @@ public void runAllTasks() { } public void runAllTasksInTimeOrder() { - while (hasDeferredTasks() || hasRunnableTasks()) { + while (hasAnyTasks()) { if (hasRunnableTasks()) { runRandomTask(); } else { @@ -115,6 +115,13 @@ public boolean hasDeferredTasks() { return deferredTasks.isEmpty() == false; } + /** + * @return whether there are any runnable or deferred tasks + */ + public boolean hasAnyTasks() { + return hasDeferredTasks() || hasRunnableTasks(); + } + /** * @return the current (simulated) time, in milliseconds. */