diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index aa0f90bc4a6d9..54765650cd202 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -29,7 +29,6 @@ import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.decommission.DecommissionAttribute; import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; -import org.opensearch.cluster.decommission.DecommissionService; import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.cluster.decommission.DecommissioningFailedException; import org.opensearch.cluster.decommission.NodeDecommissionedException; @@ -824,24 +823,11 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception { // and hence due to which the leader won't get abdicated and decommission request should eventually fail. // And in this case, to ensure decommission request doesn't leave mutating change in the cluster, we ensure // that no exclusion is set to the cluster and state for decommission is marked as FAILED - Logger clusterLogger = LogManager.getLogger(DecommissionService.class); - MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(clusterLogger); - mockLogAppender.addExpectation( - new MockLogAppender.SeenEventExpectation( - "test", - DecommissionService.class.getCanonicalName(), - Level.ERROR, - "failure in removing to-be-decommissioned cluster manager eligible nodes" - ) + OpenSearchTimeoutException ex = expectThrows( + OpenSearchTimeoutException.class, + () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() ); - - assertBusy(() -> { - OpenSearchTimeoutException ex = expectThrows( - OpenSearchTimeoutException.class, - () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() - ); - assertTrue(ex.getMessage().contains("timed out waiting for voting config exclusions")); - }); + assertTrue(ex.getMessage().contains("while removing to-be-decommissioned cluster manager eligible nodes")); ClusterService leaderClusterService = internalCluster().getInstance( ClusterService.class, @@ -877,7 +863,6 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception { // if the below condition is passed, then we are sure current decommission status is marked FAILED assertTrue(expectedStateLatch.await(30, TimeUnit.SECONDS)); - mockLogAppender.assertAllExpectationsMatched(); // ensure all nodes are part of cluster ensureStableCluster(6, TimeValue.timeValueMinutes(2)); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java index d0f5e8f198809..ffdb2735ae69f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java @@ -44,10 +44,8 @@ import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; -import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.inject.Inject; @@ -66,6 +64,9 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.addExclusionAndGetState; + /** * Transport endpoint action for adding exclusions to voting config * @@ -144,13 +145,7 @@ public ClusterState execute(ClusterState currentState) { assert resolvedExclusions == null : resolvedExclusions; final int finalMaxVotingConfigExclusions = TransportAddVotingConfigExclusionsAction.this.maxVotingConfigExclusions; resolvedExclusions = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, finalMaxVotingConfigExclusions); - - final CoordinationMetadata.Builder builder = CoordinationMetadata.builder(currentState.coordinationMetadata()); - resolvedExclusions.forEach(builder::addVotingConfigExclusion); - final Metadata newMetadata = Metadata.builder(currentState.metadata()).coordinationMetadata(builder.build()).build(); - final ClusterState newState = ClusterState.builder(currentState).metadata(newMetadata).build(); - assert newState.getVotingConfigExclusions().size() <= finalMaxVotingConfigExclusions; - return newState; + return addExclusionAndGetState(currentState, resolvedExclusions, finalMaxVotingConfigExclusions); } @Override @@ -213,18 +208,6 @@ public void onTimeout(TimeValue timeout) { }); } - private static Set resolveVotingConfigExclusionsAndCheckMaximum( - AddVotingConfigExclusionsRequest request, - ClusterState state, - int maxVotingConfigExclusions - ) { - return request.resolveVotingConfigExclusionsAndCheckMaximum( - state, - maxVotingConfigExclusions, - MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING.getKey() - ); - } - @Override protected ClusterBlockException checkBlock(AddVotingConfigExclusionsRequest request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java index 1fc02db4309b1..b65688dcc30f6 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java @@ -44,10 +44,8 @@ import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; -import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.inject.Inject; @@ -60,6 +58,8 @@ import java.io.IOException; import java.util.function.Predicate; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState; + /** * Transport endpoint action for clearing exclusions to voting config * @@ -166,13 +166,7 @@ private void submitClearVotingConfigExclusionsTask( clusterService.submitStateUpdateTask("clear-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { - final CoordinationMetadata newCoordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata()) - .clearVotingConfigExclusions() - .build(); - final Metadata newMetadata = Metadata.builder(currentState.metadata()) - .coordinationMetadata(newCoordinationMetadata) - .build(); - return ClusterState.builder(currentState).metadata(newMetadata).build(); + return clearExclusionsAndGetState(currentState); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java new file mode 100644 index 0000000000000..5cc4bd2f831d7 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.configuration; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; +import org.opensearch.cluster.metadata.Metadata; + +import java.util.Set; + +import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING; + +/** + * Static helper utilities for voting config exclusions cluster state updates + * + * @opensearch.internal + */ +public class VotingConfigExclusionsHelper { + + /** + * Static helper to update current state with given resolved exclusions + * + * @param currentState current cluster state + * @param resolvedExclusions resolved exclusions from the request + * @param finalMaxVotingConfigExclusions max exclusions that be added + * @return newly formed cluster state + */ + public static ClusterState addExclusionAndGetState( + ClusterState currentState, + Set resolvedExclusions, + int finalMaxVotingConfigExclusions + ) { + final CoordinationMetadata.Builder builder = CoordinationMetadata.builder(currentState.coordinationMetadata()); + resolvedExclusions.forEach(builder::addVotingConfigExclusion); + final Metadata newMetadata = Metadata.builder(currentState.metadata()).coordinationMetadata(builder.build()).build(); + final ClusterState newState = ClusterState.builder(currentState).metadata(newMetadata).build(); + assert newState.getVotingConfigExclusions().size() <= finalMaxVotingConfigExclusions; + return newState; + } + + /** + * Resolves the exclusion from the request and throws IAE if no nodes matched or maximum exceeded + * + * @param request AddVotingConfigExclusionsRequest request + * @param state current cluster state + * @param maxVotingConfigExclusions max number of exclusion acceptable + * @return set of VotingConfigExclusion + */ + public static Set resolveVotingConfigExclusionsAndCheckMaximum( + AddVotingConfigExclusionsRequest request, + ClusterState state, + int maxVotingConfigExclusions + ) { + return request.resolveVotingConfigExclusionsAndCheckMaximum( + state, + maxVotingConfigExclusions, + MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING.getKey() + ); + } + + /** + * Clears Voting config exclusion from the given cluster state + * + * @param currentState current cluster state + * @return newly formed cluster state after clearing voting config exclusions + */ + public static ClusterState clearExclusionsAndGetState(ClusterState currentState) { + final CoordinationMetadata newCoordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata()) + .clearVotingConfigExclusions() + .build(); + final Metadata newMetadata = Metadata.builder(currentState.metadata()).coordinationMetadata(newCoordinationMetadata).build(); + return ClusterState.builder(currentState).metadata(newMetadata).build(); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index fbb345ea3a441..fd52f48c7b5f8 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -105,7 +105,7 @@ import java.util.stream.StreamSupport; import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID; -import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned; +import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 02f3828e0e4c5..626e47108cc63 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -59,7 +59,7 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; -import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned; +import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; /** diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index ffb20a05b3ef7..1ff2fb52175c7 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -12,12 +12,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.ActionListener; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsResponse; -import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; -import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; -import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; @@ -33,7 +27,6 @@ import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; -import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.unit.TimeValue; import org.opensearch.http.HttpStats; @@ -52,6 +45,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState; + /** * Helper controller class to remove list of nodes from the cluster and update status * @@ -79,83 +74,6 @@ public class DecommissionController { this.threadPool = threadPool; } - /** - * Transport call to add nodes to voting config exclusion - * - * @param nodes set of nodes Ids to be added to voting config exclusion list - * @param listener callback for response or failure - */ - public void excludeDecommissionedNodesFromVotingConfig(Set nodes, ActionListener listener) { - transportService.sendRequest( - transportService.getLocalNode(), - AddVotingConfigExclusionsAction.NAME, - new AddVotingConfigExclusionsRequest( - Strings.EMPTY_ARRAY, - nodes.toArray(String[]::new), - Strings.EMPTY_ARRAY, - TimeValue.timeValueSeconds(120) // giving a larger timeout of 120 sec as cluster might already be in stress when - // decommission is triggered - ), - new TransportResponseHandler() { - @Override - public void handleResponse(AddVotingConfigExclusionsResponse response) { - listener.onResponse(null); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public AddVotingConfigExclusionsResponse read(StreamInput in) throws IOException { - return new AddVotingConfigExclusionsResponse(in); - } - } - ); - } - - /** - * Transport call to clear voting config exclusion - * - * @param listener callback for response or failure - */ - public void clearVotingConfigExclusion(ActionListener listener, boolean waitForRemoval) { - final ClearVotingConfigExclusionsRequest clearVotingConfigExclusionsRequest = new ClearVotingConfigExclusionsRequest(); - clearVotingConfigExclusionsRequest.setWaitForRemoval(waitForRemoval); - transportService.sendRequest( - transportService.getLocalNode(), - ClearVotingConfigExclusionsAction.NAME, - clearVotingConfigExclusionsRequest, - new TransportResponseHandler() { - @Override - public void handleResponse(ClearVotingConfigExclusionsResponse response) { - listener.onResponse(null); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOException { - return new ClearVotingConfigExclusionsResponse(in); - } - } - ); - } - /** * This method triggers batch of tasks for nodes to be decommissioned using executor {@link NodeRemovalClusterStateTaskExecutor} * Once the tasks are submitted, it waits for an expected cluster state to guarantee @@ -259,9 +177,15 @@ public ClusterState execute(ClusterState currentState) { decommissionAttributeMetadata.decommissionAttribute(), decommissionStatus ); - return ClusterState.builder(currentState) + ClusterState newState = ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) .build(); + + // For terminal status we will go ahead and clear any exclusion that was added as part of decommission action + if (decommissionStatus.equals(DecommissionStatus.SUCCESSFUL) || decommissionStatus.equals(DecommissionStatus.FAILED)) { + newState = clearExclusionsAndGetState(newState); + } + return newState; } @Override diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java new file mode 100644 index 0000000000000..8305bda545998 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java @@ -0,0 +1,124 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.decommission; + +import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Strings; +import org.opensearch.common.unit.TimeValue; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.addExclusionAndGetState; + +/** + * Static helper utilities to execute decommission + * + * @opensearch.internal + */ +public class DecommissionHelper { + + static ClusterState registerDecommissionAttributeInClusterState( + ClusterState currentState, + DecommissionAttribute decommissionAttribute + ) { + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); + return ClusterState.builder(currentState) + .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) + .build(); + } + + static ClusterState deleteDecommissionAttributeInClusterState(ClusterState currentState) { + Metadata metadata = currentState.metadata(); + Metadata.Builder mdBuilder = Metadata.builder(metadata); + mdBuilder.removeCustom(DecommissionAttributeMetadata.TYPE); + return ClusterState.builder(currentState).metadata(mdBuilder).build(); + } + + static ClusterState addVotingConfigExclusionsForNodesToBeDecommissioned( + ClusterState currentState, + Set nodeIdsToBeExcluded, + TimeValue decommissionActionTimeout, + final int maxVotingConfigExclusions + ) { + AddVotingConfigExclusionsRequest request = new AddVotingConfigExclusionsRequest( + Strings.EMPTY_ARRAY, + nodeIdsToBeExcluded.toArray(String[]::new), + Strings.EMPTY_ARRAY, + decommissionActionTimeout + ); + Set resolvedExclusion = resolveVotingConfigExclusionsAndCheckMaximum( + request, + currentState, + maxVotingConfigExclusions + ); + return addExclusionAndGetState(currentState, resolvedExclusion, maxVotingConfigExclusions); + } + + static Set filterNodesWithDecommissionAttribute( + ClusterState clusterState, + DecommissionAttribute decommissionAttribute, + boolean onlyClusterManagerNodes + ) { + Set nodesWithDecommissionAttribute = new HashSet<>(); + Iterator nodesIter = onlyClusterManagerNodes + ? clusterState.nodes().getClusterManagerNodes().valuesIt() + : clusterState.nodes().getNodes().valuesIt(); + + while (nodesIter.hasNext()) { + final DiscoveryNode node = nodesIter.next(); + if (nodeHasDecommissionedAttribute(node, decommissionAttribute)) { + nodesWithDecommissionAttribute.add(node); + } + } + return nodesWithDecommissionAttribute; + } + + /** + * Utility method to check if the node has decommissioned attribute + * + * @param discoveryNode node to check on + * @param decommissionAttribute attribute to be checked with + * @return true or false based on whether node has decommissioned attribute + */ + public static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNode, DecommissionAttribute decommissionAttribute) { + String nodeAttributeValue = discoveryNode.getAttributes().get(decommissionAttribute.attributeName()); + return nodeAttributeValue != null && nodeAttributeValue.equals(decommissionAttribute.attributeValue()); + } + + /** + * Utility method to check if the node is commissioned or not + * + * @param discoveryNode node to check on + * @param metadata metadata present current which will be used to check the commissioning status of the node + * @return if the node is commissioned or not + */ + public static boolean nodeCommissioned(DiscoveryNode discoveryNode, Metadata metadata) { + DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata(); + if (decommissionAttributeMetadata != null) { + DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + DecommissionStatus status = decommissionAttributeMetadata.status(); + if (decommissionAttribute != null && status != null) { + if (nodeHasDecommissionedAttribute(discoveryNode, decommissionAttribute) + && (status.equals(DecommissionStatus.IN_PROGRESS) + || status.equals(DecommissionStatus.SUCCESSFUL) + || status.equals(DecommissionStatus.DRAINING))) { + return false; + } + } + } + return true; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 85030a1e902db..f36d7b3e06da9 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -18,9 +18,10 @@ import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; +import org.opensearch.cluster.ClusterStateObserver.Listener; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.NotClusterManagerException; -import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.WeightedRouting; @@ -35,14 +36,19 @@ import org.opensearch.transport.TransportService; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState; +import static org.opensearch.cluster.decommission.DecommissionHelper.addVotingConfigExclusionsForNodesToBeDecommissioned; +import static org.opensearch.cluster.decommission.DecommissionHelper.deleteDecommissionAttributeInClusterState; +import static org.opensearch.cluster.decommission.DecommissionHelper.filterNodesWithDecommissionAttribute; +import static org.opensearch.cluster.decommission.DecommissionHelper.nodeHasDecommissionedAttribute; +import static org.opensearch.cluster.decommission.DecommissionHelper.registerDecommissionAttributeInClusterState; import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING; import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING; @@ -54,8 +60,7 @@ *
    *
  • Initiates nodes decommissioning by adding custom metadata with the attribute and state as {@link DecommissionStatus#INIT}
  • *
  • Remove to-be-decommissioned cluster-manager eligible nodes from voting config and wait for its abdication if it is active leader
  • - *
  • Triggers weigh away for nodes having given awareness attribute to drain.
  • - *
  • Once weighed away, the service triggers nodes decommission. This marks the decommission status as {@link DecommissionStatus#IN_PROGRESS}
  • + *
  • After the draining timeout, the service triggers nodes decommission. This marks the decommission status as {@link DecommissionStatus#IN_PROGRESS}
  • *
  • Once the decommission is successful, the service clears the voting config and marks the status as {@link DecommissionStatus#SUCCESSFUL}
  • *
  • If service fails at any step, it makes best attempt to mark the status as {@link DecommissionStatus#FAILED} and to clear voting config exclusion
  • *
@@ -72,6 +77,7 @@ public class DecommissionService { private final DecommissionController decommissionController; private volatile List awarenessAttributes; private volatile Map> forcedAwarenessAttributes; + private volatile int maxVotingConfigExclusions; @Inject public DecommissionService( @@ -94,6 +100,8 @@ public DecommissionService( CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, this::setForcedAwarenessAttributes ); + maxVotingConfigExclusions = MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING, this::setMaxVotingConfigExclusions); } private void setAwarenessAttributes(List awarenessAttributes) { @@ -112,6 +120,10 @@ private void setForcedAwarenessAttributes(Settings forceSettings) { this.forcedAwarenessAttributes = forcedAwarenessAttributes; } + private void setMaxVotingConfigExclusions(int maxVotingConfigExclusions) { + this.maxVotingConfigExclusions = maxVotingConfigExclusions; + } + /** * Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT} * Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration @@ -126,20 +138,37 @@ public void startDecommissionAction( final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); // register the metadata with status as INIT as first step clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) { + private Set nodeIdsToBeExcluded; + @Override public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes); DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); - // check that request is eligible to proceed + // check that request is eligible to proceed and attribute is weighed away ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); - // ensure attribute is weighed away ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); - decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); - logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString()); - return ClusterState.builder(currentState) - .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) - .build(); + + ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); + // add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion + nodeIdsToBeExcluded = filterNodesWithDecommissionAttribute(currentState, decommissionAttribute, true).stream() + .map(DiscoveryNode::getId) + .collect(Collectors.toSet()); + logger.info( + "resolved cluster manager eligible nodes [{}] that should be added to voting config exclusion", + nodeIdsToBeExcluded.toString() + ); + newState = addVotingConfigExclusionsForNodesToBeDecommissioned( + newState, + nodeIdsToBeExcluded, + TimeValue.timeValueSeconds(120), // TODO - update it with request timeout + maxVotingConfigExclusions + ); + logger.debug( + "registering decommission metadata [{}] to execute action", + newState.metadata().decommissionAttributeMetadata().toString() + ); + return newState; } @Override @@ -158,160 +187,111 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); assert decommissionAttribute.equals(decommissionAttributeMetadata.decommissionAttribute()); - logger.info( + assert decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT); + assert newState.getVotingConfigExclusions() + .stream() + .map(CoordinationMetadata.VotingConfigExclusion::getNodeId) + .collect(Collectors.toSet()) + .containsAll(nodeIdsToBeExcluded); + logger.debug( "registered decommission metadata for attribute [{}] with status [{}]", decommissionAttributeMetadata.decommissionAttribute(), decommissionAttributeMetadata.status() ); - decommissionClusterManagerNodes(decommissionRequest, listener); - } - }); - } - - private synchronized void decommissionClusterManagerNodes( - final DecommissionRequest decommissionRequest, - ActionListener listener - ) { - final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); - ClusterState state = clusterService.getClusterApplierService().state(); - // since here metadata is already registered with INIT, we can guarantee that no new node with decommission attribute can further - // join the cluster - // and hence in further request lifecycle we are sure that no new to-be-decommission leader will join the cluster - Set clusterManagerNodesToBeDecommissioned = filterNodesWithDecommissionAttribute(state, decommissionAttribute, true); - logger.info( - "resolved cluster manager eligible nodes [{}] that should be removed from Voting Configuration", - clusterManagerNodesToBeDecommissioned.toString() - ); - - // remove all 'to-be-decommissioned' cluster manager eligible nodes from voting config - Set nodeIdsToBeExcluded = clusterManagerNodesToBeDecommissioned.stream() - .map(DiscoveryNode::getId) - .collect(Collectors.toSet()); - - final Predicate allNodesRemovedAndAbdicated = clusterState -> { - final Set votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); - return nodeIdsToBeExcluded.stream().noneMatch(votingConfigNodeIds::contains) - && nodeIdsToBeExcluded.contains(clusterState.nodes().getClusterManagerNodeId()) == false - && clusterState.nodes().getClusterManagerNodeId() != null; - }; - - ActionListener exclusionListener = new ActionListener() { - @Override - public void onResponse(Void unused) { - if (clusterService.getClusterApplierService().state().nodes().isLocalNodeElectedClusterManager()) { - if (nodeHasDecommissionedAttribute(clusterService.localNode(), decommissionAttribute)) { - // this is an unexpected state, as after exclusion of nodes having decommission attribute, - // this local node shouldn't have had the decommission attribute. Will send the failure response to the user - String errorMsg = - "unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request"; - logger.error(errorMsg); - // will go ahead and clear the voting config and mark the status as false - clearVotingConfigExclusionAndUpdateStatus(false, false); - // we can send the failure response to the user here - listener.onFailure(new IllegalStateException(errorMsg)); - } else { - logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request"); - // we are good here to send the response now as the request is processed by an eligible active leader - // and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more to-be-decommission - // nodes can be part of Voting Config - listener.onResponse(new DecommissionResponse(true)); - drainNodesWithDecommissionedAttribute(decommissionRequest); - } - } else { - // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager - // this will ensures that request is retried until cluster manager times out - logger.info( - "local node is not eligible to process the request, " - + "throwing NotClusterManagerException to attempt a retry on an eligible node" - ); - listener.onFailure( - new NotClusterManagerException( - "node [" - + transportService.getLocalNode().toString() - + "] not eligible to execute decommission request. Will retry until timeout." - ) - ); - } - } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - // attempting to mark the status as FAILED - clearVotingConfigExclusionAndUpdateStatus(false, false); - } - }; - - if (allNodesRemovedAndAbdicated.test(state)) { - exclusionListener.onResponse(null); - } else { - logger.debug("sending transport request to remove nodes [{}] from voting config", nodeIdsToBeExcluded.toString()); - // send a transport request to exclude to-be-decommissioned cluster manager eligible nodes from voting config - decommissionController.excludeDecommissionedNodesFromVotingConfig(nodeIdsToBeExcluded, new ActionListener() { - @Override - public void onResponse(Void unused) { - logger.info( - "successfully removed decommissioned cluster manager eligible nodes [{}] from voting config ", - clusterManagerNodesToBeDecommissioned.toString() - ); - final ClusterStateObserver abdicationObserver = new ClusterStateObserver( - clusterService, - TimeValue.timeValueSeconds(60L), - logger, - threadPool.getThreadContext() - ); - final ClusterStateObserver.Listener abdicationListener = new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - logger.debug("to-be-decommissioned node is no more the active leader"); - exclusionListener.onResponse(null); - } - - @Override - public void onClusterServiceClose() { - String errorMsg = "cluster service closed while waiting for abdication of to-be-decommissioned leader"; - logger.warn(errorMsg); - listener.onFailure(new DecommissioningFailedException(decommissionAttribute, errorMsg)); - } + final ClusterStateObserver observer = new ClusterStateObserver( + clusterService, + TimeValue.timeValueSeconds(120), // TODO - update it with request timeout + logger, + threadPool.getThreadContext() + ); - @Override - public void onTimeout(TimeValue timeout) { - logger.info("timed out while waiting for abdication of to-be-decommissioned leader"); - clearVotingConfigExclusionAndUpdateStatus(false, false); + final Predicate allNodesRemovedAndAbdicated = clusterState -> { + final Set votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); + return nodeIdsToBeExcluded.stream().noneMatch(votingConfigNodeIds::contains) + && clusterState.nodes().getClusterManagerNodeId() != null + && nodeIdsToBeExcluded.contains(clusterState.nodes().getClusterManagerNodeId()) == false; + }; + + final Listener clusterStateListener = new Listener() { + @Override + public void onNewClusterState(ClusterState state) { + logger.info( + "successfully removed decommissioned cluster manager eligible nodes [{}] from voting config ", + nodeIdsToBeExcluded.toString() + ); + if (state.nodes().isLocalNodeElectedClusterManager()) { + if (nodeHasDecommissionedAttribute(clusterService.localNode(), decommissionAttribute)) { + // this is an unexpected state, as after exclusion of nodes having decommission attribute, + // this local node shouldn't have had the decommission attribute. Will send the failure response to the user + String errorMsg = + "unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request"; + logger.error(errorMsg); + // will go ahead and clear the voting config and mark the status as failed + decommissionController.updateMetadataWithDecommissionStatus( + DecommissionStatus.FAILED, + statusUpdateListener() + ); + listener.onFailure(new IllegalStateException(errorMsg)); + } else { + logger.info("will proceed to drain decommissioned nodes as local node is eligible to process the request"); + // we are good here to send the response now as the request is processed by an eligible active leader + // and to-be-decommissioned cluster manager is no more part of Voting Configuration + listener.onResponse(new DecommissionResponse(true)); + drainNodesWithDecommissionedAttribute(decommissionRequest); + } + } else { + // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not leader + // this will ensures that request is retried until cluster manager times out + logger.info( + "local node is not eligible to process the request, " + + "throwing NotClusterManagerException to attempt a retry on an eligible node" + ); listener.onFailure( - new OpenSearchTimeoutException( - "timed out [{}] while waiting for abdication of to-be-decommissioned leader", - timeout.toString() + new NotClusterManagerException( + "node [" + + transportService.getLocalNode().toString() + + "] not eligible to execute decommission request. Will retry until timeout." ) ); } - }; - // In case the cluster state is already processed even before this code is executed - // therefore testing first before attaching the listener - ClusterState currentState = clusterService.getClusterApplierService().state(); - if (allNodesRemovedAndAbdicated.test(currentState)) { - abdicationListener.onNewClusterState(currentState); - } else { - logger.debug("waiting to abdicate to-be-decommissioned leader"); - abdicationObserver.waitForNextChange(abdicationListener, allNodesRemovedAndAbdicated); } - } - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "failure in removing to-be-decommissioned cluster manager eligible nodes [{}] from voting config", - nodeIdsToBeExcluded.toString() - ), - e - ); - exclusionListener.onFailure(e); + @Override + public void onClusterServiceClose() { + String errorMsg = "cluster service closed while waiting for abdication of to-be-decommissioned leader"; + logger.error(errorMsg); + listener.onFailure(new DecommissioningFailedException(decommissionAttribute, errorMsg)); + } + + @Override + public void onTimeout(TimeValue timeout) { + String errorMsg = "timed out [" + + timeout.toString() + + "] while removing to-be-decommissioned cluster manager eligible nodes [" + + nodeIdsToBeExcluded.toString() + + "] from voting config"; + logger.error(errorMsg); + listener.onFailure(new OpenSearchTimeoutException(errorMsg)); + // will go ahead and clear the voting config and mark the status as failed + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); + } + }; + + // In case the cluster state is already processed even before this code is executed + // therefore testing first before attaching the listener + if (allNodesRemovedAndAbdicated.test(newState)) { + clusterStateListener.onNewClusterState(newState); + } else { + logger.debug("waiting to abdicate to-be-decommissioned leader"); + observer.waitForNextChange(clusterStateListener, allNodesRemovedAndAbdicated); // TODO add request timeout here } - }); - } + } + }); } + // TODO - after registering the new status check if any node which is not excluded still present in decommissioned zone. If yes, start + // the action again (retry) void drainNodesWithDecommissionedAttribute(DecommissionRequest decommissionRequest) { ClusterState state = clusterService.getClusterApplierService().state(); Set decommissionedNodes = filterNodesWithDecommissionAttribute( @@ -342,8 +322,10 @@ public void onFailure(Exception e) { ), e ); - // since we are not able to update the status, we will clear the voting config exclusion we have set earlier - clearVotingConfigExclusionAndUpdateStatus(false, false); + // This decommission state update call will most likely fail as the state update call to 'DRAINING' + // failed. But attempting it anyways as FAILED update might still pass as it doesn't have dependency on + // the current state + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); } }); } @@ -385,12 +367,17 @@ public void onResponse(DecommissionStatus status) { new ActionListener() { @Override public void onResponse(Void unused) { - clearVotingConfigExclusionAndUpdateStatus(true, true); + // will clear the voting config exclusion and mark the status as successful + decommissionController.updateMetadataWithDecommissionStatus( + DecommissionStatus.SUCCESSFUL, + statusUpdateListener() + ); } @Override public void onFailure(Exception e) { - clearVotingConfigExclusionAndUpdateStatus(false, false); + // will go ahead and clear the voting config and mark the status as failed + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); } } ); @@ -406,51 +393,12 @@ public void onFailure(Exception e) { ), e ); - // since we are not able to update the status, we will clear the voting config exclusion we have set earlier - clearVotingConfigExclusionAndUpdateStatus(false, false); - } - }); - } - - private void clearVotingConfigExclusionAndUpdateStatus(boolean decommissionSuccessful, boolean waitForRemoval) { - decommissionController.clearVotingConfigExclusion(new ActionListener() { - @Override - public void onResponse(Void unused) { - logger.info( - "successfully cleared voting config exclusion after completing decommission action, proceeding to update metadata" - ); - DecommissionStatus updateStatusWith = decommissionSuccessful ? DecommissionStatus.SUCCESSFUL : DecommissionStatus.FAILED; - decommissionController.updateMetadataWithDecommissionStatus(updateStatusWith, statusUpdateListener()); - } - - @Override - public void onFailure(Exception e) { - logger.debug( - new ParameterizedMessage("failure in clearing voting config exclusion after processing decommission request"), - e - ); + // This decommission state update call will most likely fail as the state update call to 'DRAINING' + // failed. But attempting it anyways as FAILED update might still pass as it doesn't have dependency on + // the current state decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); } - }, waitForRemoval); - } - - private Set filterNodesWithDecommissionAttribute( - ClusterState clusterState, - DecommissionAttribute decommissionAttribute, - boolean onlyClusterManagerNodes - ) { - Set nodesWithDecommissionAttribute = new HashSet<>(); - Iterator nodesIter = onlyClusterManagerNodes - ? clusterState.nodes().getClusterManagerNodes().valuesIt() - : clusterState.nodes().getNodes().valuesIt(); - - while (nodesIter.hasNext()) { - final DiscoveryNode node = nodesIter.next(); - if (nodeHasDecommissionedAttribute(node, decommissionAttribute)) { - nodesWithDecommissionAttribute.add(node); - } - } - return nodesWithDecommissionAttribute; + }); } private static void validateAwarenessAttribute( @@ -577,80 +525,28 @@ public void startRecommissionAction(final ActionListener() { - @Override - public void onResponse(Void unused) { - logger.info("successfully cleared voting config exclusion for deleting the decommission."); - deleteDecommissionState(listener); - } - - @Override - public void onFailure(Exception e) { - logger.error("Failure in clearing voting config during delete_decommission request.", e); - listener.onFailure(e); - } - }, false); - } - - void deleteDecommissionState(ActionListener listener) { - clusterService.submitStateUpdateTask("delete_decommission_state", new ClusterStateUpdateTask(Priority.URGENT) { + clusterService.submitStateUpdateTask("delete-decommission-state", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { + ClusterState newState = clearExclusionsAndGetState(currentState); logger.info("Deleting the decommission attribute from the cluster state"); - Metadata metadata = currentState.metadata(); - Metadata.Builder mdBuilder = Metadata.builder(metadata); - mdBuilder.removeCustom(DecommissionAttributeMetadata.TYPE); - return ClusterState.builder(currentState).metadata(mdBuilder).build(); + newState = deleteDecommissionAttributeInClusterState(newState); + return newState; } @Override public void onFailure(String source, Exception e) { - logger.error(() -> new ParameterizedMessage("Failed to clear decommission attribute. [{}]", source), e); + logger.error(() -> new ParameterizedMessage("failure during recommission action [{}]", source), e); listener.onFailure(e); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - // Cluster state processed for deleting the decommission attribute. + logger.info("successfully cleared voting config exclusion and decommissioned attribute"); assert newState.metadata().decommissionAttributeMetadata() == null; + assert newState.coordinationMetadata().getVotingConfigExclusions().isEmpty(); listener.onResponse(new DeleteDecommissionStateResponse(true)); } }); } - - /** - * Utility method to check if the node has decommissioned attribute - * - * @param discoveryNode node to check on - * @param decommissionAttribute attribute to be checked with - * @return true or false based on whether node has decommissioned attribute - */ - public static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNode, DecommissionAttribute decommissionAttribute) { - String nodeAttributeValue = discoveryNode.getAttributes().get(decommissionAttribute.attributeName()); - return nodeAttributeValue != null && nodeAttributeValue.equals(decommissionAttribute.attributeValue()); - } - - /** - * Utility method to check if the node is commissioned or not - * - * @param discoveryNode node to check on - * @param metadata metadata present current which will be used to check the commissioning status of the node - * @return if the node is commissioned or not - */ - public static boolean nodeCommissioned(DiscoveryNode discoveryNode, Metadata metadata) { - DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata(); - if (decommissionAttributeMetadata != null) { - DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); - DecommissionStatus status = decommissionAttributeMetadata.status(); - if (decommissionAttribute != null && status != null) { - if (nodeHasDecommissionedAttribute(discoveryNode, decommissionAttribute) - && (status.equals(DecommissionStatus.IN_PROGRESS) - || status.equals(DecommissionStatus.SUCCESSFUL) - || status.equals(DecommissionStatus.DRAINING))) { - return false; - } - } - } - return true; - } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelperTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelperTests.java new file mode 100644 index 0000000000000..f33781064345d --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelperTests.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.configuration; + +import org.junit.BeforeClass; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.Strings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Set; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.addExclusionAndGetState; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; + +public class VotingConfigExclusionsHelperTests extends OpenSearchTestCase { + + private static DiscoveryNode localNode, otherNode1, otherNode2, otherDataNode; + private static CoordinationMetadata.VotingConfigExclusion localNodeExclusion, otherNode1Exclusion, otherNode2Exclusion; + private static ClusterState initialClusterState; + + public void testAddExclusionAndGetState() { + ClusterState updatedState = addExclusionAndGetState(initialClusterState, Set.of(localNodeExclusion), 2); + assertTrue(updatedState.coordinationMetadata().getVotingConfigExclusions().contains(localNodeExclusion)); + assertEquals(1, updatedState.coordinationMetadata().getVotingConfigExclusions().size()); + } + + public void testResolveVotingConfigExclusions() { + AddVotingConfigExclusionsRequest request = new AddVotingConfigExclusionsRequest( + Strings.EMPTY_ARRAY, + new String[] { "other1" }, + Strings.EMPTY_ARRAY, + TimeValue.timeValueSeconds(30) + ); + Set votingConfigExclusions = resolveVotingConfigExclusionsAndCheckMaximum( + request, + initialClusterState, + 10 + ); + assertEquals(1, votingConfigExclusions.size()); + assertTrue(votingConfigExclusions.contains(otherNode1Exclusion)); + } + + public void testResolveVotingConfigExclusionFailsWhenLimitExceeded() { + AddVotingConfigExclusionsRequest request = new AddVotingConfigExclusionsRequest( + Strings.EMPTY_ARRAY, + new String[] { "other1", "other2" }, + Strings.EMPTY_ARRAY, + TimeValue.timeValueSeconds(30) + ); + expectThrows(IllegalArgumentException.class, () -> resolveVotingConfigExclusionsAndCheckMaximum(request, initialClusterState, 1)); + } + + public void testClearExclusionAndGetState() { + ClusterState updatedState = addExclusionAndGetState(initialClusterState, Set.of(localNodeExclusion), 2); + assertTrue(updatedState.coordinationMetadata().getVotingConfigExclusions().contains(localNodeExclusion)); + updatedState = clearExclusionsAndGetState(updatedState); + assertTrue(updatedState.coordinationMetadata().getVotingConfigExclusions().isEmpty()); + } + + @BeforeClass + public static void createBaseClusterState() { + localNode = makeDiscoveryNode("local"); + localNodeExclusion = new CoordinationMetadata.VotingConfigExclusion(localNode); + otherNode1 = makeDiscoveryNode("other1"); + otherNode1Exclusion = new CoordinationMetadata.VotingConfigExclusion(otherNode1); + otherNode2 = makeDiscoveryNode("other2"); + otherNode2Exclusion = new CoordinationMetadata.VotingConfigExclusion(otherNode2); + otherDataNode = new DiscoveryNode("data", "data", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + final CoordinationMetadata.VotingConfiguration allNodesConfig = CoordinationMetadata.VotingConfiguration.of( + localNode, + otherNode1, + otherNode2 + ); + initialClusterState = ClusterState.builder(new ClusterName("cluster")) + .nodes( + new DiscoveryNodes.Builder().add(localNode) + .add(otherNode1) + .add(otherNode2) + .add(otherDataNode) + .localNodeId(localNode.getId()) + .clusterManagerNodeId(localNode.getId()) + ) + .metadata( + Metadata.builder() + .coordinationMetadata( + CoordinationMetadata.builder() + .lastAcceptedConfiguration(allNodesConfig) + .lastCommittedConfiguration(allNodesConfig) + .build() + ) + ) + .build(); + } + + private static DiscoveryNode makeDiscoveryNode(String name) { + return new DiscoveryNode( + name, + name, + buildNewFakeTransportAddress(), + emptyMap(), + singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.CURRENT + ); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index 5a76e0d5137fb..cf92130095e12 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -53,7 +53,6 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; @@ -130,47 +129,6 @@ public void shutdownThreadPoolAndClusterService() { threadPool.shutdown(); } - public void testAddNodesToVotingConfigExclusion() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(2); - - ClusterStateObserver clusterStateObserver = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); - clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch)); - Set nodesToRemoveFromVotingConfig = Collections.singleton(randomFrom("node1", "node6", "node11")); - decommissionController.excludeDecommissionedNodesFromVotingConfig(nodesToRemoveFromVotingConfig, new ActionListener() { - @Override - public void onResponse(Void unused) { - countDownLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail("unexpected failure occurred while removing node from voting config " + e); - } - }); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - clusterService.getClusterApplierService().state().getVotingConfigExclusions().forEach(vce -> { - assertTrue(nodesToRemoveFromVotingConfig.contains(vce.getNodeId())); - assertEquals(nodesToRemoveFromVotingConfig.size(), 1); - }); - } - - public void testClearVotingConfigExclusions() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(1); - decommissionController.clearVotingConfigExclusion(new ActionListener() { - @Override - public void onResponse(Void unused) { - countDownLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail("unexpected failure occurred while clearing voting config exclusion" + e); - } - }, false); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), empty()); - } - public void testNodesRemovedForDecommissionRequestSuccessfulResponse() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); Set nodesToBeRemoved = new HashSet<>(); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java new file mode 100644 index 0000000000000..ab2d8218ec97d --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java @@ -0,0 +1,142 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.decommission; + +import org.junit.BeforeClass; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Set; + +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonMap; +import static org.opensearch.cluster.decommission.DecommissionHelper.addVotingConfigExclusionsForNodesToBeDecommissioned; +import static org.opensearch.cluster.decommission.DecommissionHelper.deleteDecommissionAttributeInClusterState; +import static org.opensearch.cluster.decommission.DecommissionHelper.filterNodesWithDecommissionAttribute; +import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; +import static org.opensearch.cluster.decommission.DecommissionHelper.registerDecommissionAttributeInClusterState; + +public class DecommissionHelperTests extends OpenSearchTestCase { + + private static DiscoveryNode node1, node2, node3, dataNode; + private static ClusterState initialClusterState; + + public void testRegisterAndDeleteDecommissionAttributeInClusterState() { + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone2"); + ClusterState updatedState = registerDecommissionAttributeInClusterState(initialClusterState, decommissionAttribute); + assertEquals(decommissionAttribute, updatedState.metadata().decommissionAttributeMetadata().decommissionAttribute()); + updatedState = deleteDecommissionAttributeInClusterState(updatedState); + assertNull(updatedState.metadata().decommissionAttributeMetadata()); + } + + public void testAddVotingConfigExclusionsForNodesToBeDecommissioned() { + Set nodeIdToBeExcluded = Set.of("node2"); + ClusterState updatedState = addVotingConfigExclusionsForNodesToBeDecommissioned( + initialClusterState, + nodeIdToBeExcluded, + TimeValue.timeValueMinutes(1), + 10 + ); + CoordinationMetadata.VotingConfigExclusion v1 = new CoordinationMetadata.VotingConfigExclusion(node2); + assertTrue( + updatedState.coordinationMetadata().getVotingConfigExclusions().contains(new CoordinationMetadata.VotingConfigExclusion(node2)) + ); + assertEquals(1, updatedState.coordinationMetadata().getVotingConfigExclusions().size()); + } + + public void testFilterNodes() { + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone1"); + Set filteredNodes = filterNodesWithDecommissionAttribute(initialClusterState, decommissionAttribute, true); + assertTrue(filteredNodes.contains(node1)); + assertEquals(1, filteredNodes.size()); + filteredNodes = filterNodesWithDecommissionAttribute(initialClusterState, decommissionAttribute, false); + assertTrue(filteredNodes.contains(node1)); + assertTrue(filteredNodes.contains(dataNode)); + assertEquals(2, filteredNodes.size()); + } + + public void testNodeCommissioned() { + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone1"); + DecommissionStatus decommissionStatus = randomFrom( + DecommissionStatus.IN_PROGRESS, + DecommissionStatus.DRAINING, + DecommissionStatus.SUCCESSFUL + ); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttribute, + decommissionStatus + ); + Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); + assertTrue(nodeCommissioned(node2, metadata)); + assertFalse(nodeCommissioned(node1, metadata)); + DecommissionStatus commissionStatus = randomFrom(DecommissionStatus.FAILED, DecommissionStatus.INIT); + decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute, commissionStatus); + metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); + assertTrue(nodeCommissioned(node2, metadata)); + assertTrue(nodeCommissioned(node1, metadata)); + metadata = Metadata.builder().removeCustom(DecommissionAttributeMetadata.TYPE).build(); + assertTrue(nodeCommissioned(node2, metadata)); + assertTrue(nodeCommissioned(node1, metadata)); + } + + @BeforeClass + public static void createBaseClusterState() { + node1 = makeDiscoveryNode("node1", "zone1"); + node2 = makeDiscoveryNode("node2", "zone2"); + node3 = makeDiscoveryNode("node3", "zone3"); + dataNode = new DiscoveryNode( + "data", + "data", + buildNewFakeTransportAddress(), + singletonMap("zone", "zone1"), + emptySet(), + Version.CURRENT + ); + final CoordinationMetadata.VotingConfiguration allNodesConfig = CoordinationMetadata.VotingConfiguration.of(node1, node2, node3); + initialClusterState = ClusterState.builder(new ClusterName("cluster")) + .nodes( + new DiscoveryNodes.Builder().add(node1) + .add(node2) + .add(node3) + .add(dataNode) + .localNodeId(node1.getId()) + .clusterManagerNodeId(node1.getId()) + ) + .metadata( + Metadata.builder() + .coordinationMetadata( + CoordinationMetadata.builder() + .lastAcceptedConfiguration(allNodesConfig) + .lastCommittedConfiguration(allNodesConfig) + .build() + ) + ) + .build(); + } + + private static DiscoveryNode makeDiscoveryNode(String name, String zone) { + return new DiscoveryNode( + name, + name, + buildNewFakeTransportAddress(), + singletonMap("zone", zone), + singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.CURRENT + ); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 7fe58d75932a1..95980991d22b0 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -11,14 +11,12 @@ import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; -import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.CoordinationMetadata; @@ -39,7 +37,6 @@ import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; import java.util.Collections; @@ -48,6 +45,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptySet; import static java.util.Collections.singletonMap; @@ -313,22 +311,35 @@ public void testDrainNodesWithDecommissionedAttributeWithNoDelay() { } - public void testClearClusterDecommissionState() throws InterruptedException { + public void testRecommissionAction() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, DecommissionStatus.SUCCESSFUL ); - ClusterState state = ClusterState.builder(new ClusterName("test")) - .metadata(Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build()) - .build(); + final ClusterState.Builder builder = builder(clusterService.state()); + setState( + clusterService, + builder.metadata( + Metadata.builder(clusterService.state().metadata()) + .decommissionAttributeMetadata(decommissionAttributeMetadata) + .coordinationMetadata( + CoordinationMetadata.builder() + .addVotingConfigExclusion( + new CoordinationMetadata.VotingConfigExclusion(clusterService.state().nodes().get("node6")) + ) + .build() + ) + .build() + ) + ); + AtomicReference clusterStateAtomicReference = new AtomicReference<>(); ActionListener listener = new ActionListener<>() { @Override public void onResponse(DeleteDecommissionStateResponse decommissionResponse) { - DecommissionAttributeMetadata metadata = clusterService.state().metadata().custom(DecommissionAttributeMetadata.TYPE); - assertNull(metadata); + clusterStateAtomicReference.set(clusterService.state()); countDownLatch.countDown(); } @@ -338,59 +349,11 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - - this.decommissionService.deleteDecommissionState(listener); - + this.decommissionService.startRecommissionAction(listener); // Decommission Attribute should be removed. assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - } - - public void testDeleteDecommissionAttributeClearVotingExclusion() { - TransportService mockTransportService = Mockito.mock(TransportService.class); - Mockito.when(mockTransportService.getLocalNode()).thenReturn(Mockito.mock(DiscoveryNode.class)); - DecommissionService decommissionService = new DecommissionService( - Settings.EMPTY, - clusterSettings, - clusterService, - mockTransportService, - threadPool, - allocationService - ); - decommissionService.startRecommissionAction(Mockito.mock(ActionListener.class)); - - ArgumentCaptor clearVotingConfigExclusionsRequestArgumentCaptor = ArgumentCaptor.forClass( - ClearVotingConfigExclusionsRequest.class - ); - Mockito.verify(mockTransportService) - .sendRequest( - Mockito.any(DiscoveryNode.class), - Mockito.anyString(), - clearVotingConfigExclusionsRequestArgumentCaptor.capture(), - Mockito.any(TransportResponseHandler.class) - ); - - ClearVotingConfigExclusionsRequest request = clearVotingConfigExclusionsRequestArgumentCaptor.getValue(); - assertFalse(request.getWaitForRemoval()); - } - - public void testClusterUpdateTaskForDeletingDecommission() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(1); - ActionListener listener = new ActionListener<>() { - @Override - public void onResponse(DeleteDecommissionStateResponse response) { - assertTrue(response.isAcknowledged()); - assertNull(clusterService.state().metadata().decommissionAttributeMetadata()); - countDownLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail("On Failure shouldn't have been called"); - countDownLatch.countDown(); - } - }; - decommissionService.deleteDecommissionState(listener); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertNull(clusterStateAtomicReference.get().metadata().decommissionAttributeMetadata()); + assertEquals(0, clusterStateAtomicReference.get().coordinationMetadata().getVotingConfigExclusions().size()); } private void setWeightedRoutingWeights(Map weights) {