From 236940ff1f16775384e418bb9e8639d6c1aaf26c Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 11:35:52 +0530 Subject: [PATCH 01/26] Abstract helpers from TransportVotingConfigExclusion Signed-off-by: Rishab Nahata --- .../AddVotingConfigExclusionsHelper.java | 47 +++++++++++++++++++ ...nsportAddVotingConfigExclusionsAction.java | 23 ++------- 2 files changed, 51 insertions(+), 19 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/cluster/configuration/AddVotingConfigExclusionsHelper.java diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/AddVotingConfigExclusionsHelper.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/AddVotingConfigExclusionsHelper.java new file mode 100644 index 0000000000000..616c030e622d6 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/AddVotingConfigExclusionsHelper.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.configuration; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; +import org.opensearch.cluster.metadata.Metadata; + +import java.util.Set; + +import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING; + +public class AddVotingConfigExclusionsHelper { + + public static ClusterState updateExclusionAndGetState( + ClusterState currentState, + Set resolvedExclusions, + int finalMaxVotingConfigExclusions + ) { + final CoordinationMetadata.Builder builder = CoordinationMetadata.builder(currentState.coordinationMetadata()); + resolvedExclusions.forEach(builder::addVotingConfigExclusion); + final Metadata newMetadata = Metadata.builder(currentState.metadata()).coordinationMetadata(builder.build()).build(); + final ClusterState newState = ClusterState.builder(currentState).metadata(newMetadata).build(); + assert newState.getVotingConfigExclusions().size() <= finalMaxVotingConfigExclusions; + return newState; + } + + // throws IAE if no nodes matched or maximum exceeded + public static Set resolveVotingConfigExclusionsAndCheckMaximum( + AddVotingConfigExclusionsRequest request, + ClusterState state, + int maxVotingConfigExclusions + ) { + return request.resolveVotingConfigExclusionsAndCheckMaximum( + state, + maxVotingConfigExclusions, + MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING.getKey() + ); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java index d0f5e8f198809..983ca2104f4ab 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java @@ -66,6 +66,9 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; +import static org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsHelper.updateExclusionAndGetState; + /** * Transport endpoint action for adding exclusions to voting config * @@ -144,13 +147,7 @@ public ClusterState execute(ClusterState currentState) { assert resolvedExclusions == null : resolvedExclusions; final int finalMaxVotingConfigExclusions = TransportAddVotingConfigExclusionsAction.this.maxVotingConfigExclusions; resolvedExclusions = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, finalMaxVotingConfigExclusions); - - final CoordinationMetadata.Builder builder = CoordinationMetadata.builder(currentState.coordinationMetadata()); - resolvedExclusions.forEach(builder::addVotingConfigExclusion); - final Metadata newMetadata = Metadata.builder(currentState.metadata()).coordinationMetadata(builder.build()).build(); - final ClusterState newState = ClusterState.builder(currentState).metadata(newMetadata).build(); - assert newState.getVotingConfigExclusions().size() <= finalMaxVotingConfigExclusions; - return newState; + return updateExclusionAndGetState(currentState, resolvedExclusions, finalMaxVotingConfigExclusions); } @Override @@ -213,18 +210,6 @@ public void onTimeout(TimeValue timeout) { }); } - private static Set resolveVotingConfigExclusionsAndCheckMaximum( - AddVotingConfigExclusionsRequest request, - ClusterState state, - int maxVotingConfigExclusions - ) { - return request.resolveVotingConfigExclusionsAndCheckMaximum( - state, - maxVotingConfigExclusions, - MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING.getKey() - ); - } - @Override protected ClusterBlockException checkBlock(AddVotingConfigExclusionsRequest request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); From b63413887a6963a2416e524d83c14f3aaaf6ad37 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 18:24:10 +0530 Subject: [PATCH 02/26] Update INIT and exclusion in one call Signed-off-by: Rishab Nahata --- .../cluster/coordination/Coordinator.java | 2 +- .../coordination/JoinTaskExecutor.java | 2 +- .../decommission/DecommissionHelper.java | 108 +++++++ .../decommission/DecommissionService.java | 301 ++++++------------ 4 files changed, 214 insertions(+), 199 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index fbb345ea3a441..fd52f48c7b5f8 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -105,7 +105,7 @@ import java.util.stream.StreamSupport; import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID; -import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned; +import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 4cb6c7b255449..b9c64a66d73b0 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -60,7 +60,7 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; -import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned; +import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; /** diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java new file mode 100644 index 0000000000000..818c7d5c074f3 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.decommission; + +import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Strings; +import org.opensearch.common.unit.TimeValue; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import static org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; +import static org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsHelper.updateExclusionAndGetState; + +/** + * Static helper utilities to execute decommission + */ +public class DecommissionHelper { + + static ClusterState registerDecommissionAttributeInClusterState(ClusterState currentState, DecommissionAttribute decommissionAttribute) { + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); + return ClusterState.builder(currentState) + .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) + .build(); + } + + static ClusterState addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes( + ClusterState currentState, + Set nodeIdsToBeExcluded, + TimeValue decommissionActionTimeout + ) { + AddVotingConfigExclusionsRequest request = new AddVotingConfigExclusionsRequest( + Strings.EMPTY_ARRAY, + nodeIdsToBeExcluded.toArray(String[]::new), + Strings.EMPTY_ARRAY, + decommissionActionTimeout + ); + // TODO - update max count + Set resolvedExclusion = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, 10); + return updateExclusionAndGetState(currentState, resolvedExclusion, 10); + } + + static Set filterNodesWithDecommissionAttribute( + ClusterState clusterState, + DecommissionAttribute decommissionAttribute, + boolean onlyClusterManagerNodes + ) { + Set nodesWithDecommissionAttribute = new HashSet<>(); + Iterator nodesIter = onlyClusterManagerNodes + ? clusterState.nodes().getClusterManagerNodes().valuesIt() + : clusterState.nodes().getNodes().valuesIt(); + + while (nodesIter.hasNext()) { + final DiscoveryNode node = nodesIter.next(); + if (nodeHasDecommissionedAttribute(node, decommissionAttribute)) { + nodesWithDecommissionAttribute.add(node); + } + } + return nodesWithDecommissionAttribute; + } + + /** + * Utility method to check if the node has decommissioned attribute + * + * @param discoveryNode node to check on + * @param decommissionAttribute attribute to be checked with + * @return true or false based on whether node has decommissioned attribute + */ + public static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNode, DecommissionAttribute decommissionAttribute) { + String nodeAttributeValue = discoveryNode.getAttributes().get(decommissionAttribute.attributeName()); + return nodeAttributeValue != null && nodeAttributeValue.equals(decommissionAttribute.attributeValue()); + } + + /** + * Utility method to check if the node is commissioned or not + * + * @param discoveryNode node to check on + * @param metadata metadata present current which will be used to check the commissioning status of the node + * @return if the node is commissioned or not + */ + public static boolean nodeCommissioned(DiscoveryNode discoveryNode, Metadata metadata) { + DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata(); + if (decommissionAttributeMetadata != null) { + DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + DecommissionStatus status = decommissionAttributeMetadata.status(); + if (decommissionAttribute != null && status != null) { + if (nodeHasDecommissionedAttribute(discoveryNode, decommissionAttribute) + && (status.equals(DecommissionStatus.IN_PROGRESS) + || status.equals(DecommissionStatus.SUCCESSFUL) + || status.equals(DecommissionStatus.DRAINING))) { + return false; + } + } + } + return true; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 85030a1e902db..ed57bb19105aa 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -18,6 +18,7 @@ import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; +import org.opensearch.cluster.ClusterStateObserver.Listener; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.metadata.Metadata; @@ -35,14 +36,16 @@ import org.opensearch.transport.TransportService; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.opensearch.cluster.decommission.DecommissionHelper.addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes; +import static org.opensearch.cluster.decommission.DecommissionHelper.filterNodesWithDecommissionAttribute; +import static org.opensearch.cluster.decommission.DecommissionHelper.nodeHasDecommissionedAttribute; +import static org.opensearch.cluster.decommission.DecommissionHelper.registerDecommissionAttributeInClusterState; import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING; import static org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING; @@ -126,6 +129,9 @@ public void startDecommissionAction( final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); // register the metadata with status as INIT as first step clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) { + + private Set nodeIdsToBeExcluded; + @Override public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action @@ -135,11 +141,19 @@ public ClusterState execute(ClusterState currentState) { ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); // ensure attribute is weighed away ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); - decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); - logger.info("registering decommission metadata [{}] to execute action", decommissionAttributeMetadata.toString()); - return ClusterState.builder(currentState) - .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) - .build(); + ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); + Set clusterManagerNodesToBeDecommissioned = filterNodesWithDecommissionAttribute( + currentState, decommissionAttribute, true + ); + logger.info( + "resolved cluster manager eligible nodes [{}] that should be added to Voting Config Exclusion", + clusterManagerNodesToBeDecommissioned.toString() + ); + // add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion + nodeIdsToBeExcluded = clusterManagerNodesToBeDecommissioned.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + newState = addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes(newState, nodeIdsToBeExcluded, TimeValue.timeValueSeconds(30)); + logger.info("registering decommission metadata [{}] to execute action", newState.metadata().decommissionAttributeMetadata().toString()); + return newState; } @Override @@ -163,155 +177,103 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS decommissionAttributeMetadata.decommissionAttribute(), decommissionAttributeMetadata.status() ); - decommissionClusterManagerNodes(decommissionRequest, listener); - } - }); - } - - private synchronized void decommissionClusterManagerNodes( - final DecommissionRequest decommissionRequest, - ActionListener listener - ) { - final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); - ClusterState state = clusterService.getClusterApplierService().state(); - // since here metadata is already registered with INIT, we can guarantee that no new node with decommission attribute can further - // join the cluster - // and hence in further request lifecycle we are sure that no new to-be-decommission leader will join the cluster - Set clusterManagerNodesToBeDecommissioned = filterNodesWithDecommissionAttribute(state, decommissionAttribute, true); - logger.info( - "resolved cluster manager eligible nodes [{}] that should be removed from Voting Configuration", - clusterManagerNodesToBeDecommissioned.toString() - ); - - // remove all 'to-be-decommissioned' cluster manager eligible nodes from voting config - Set nodeIdsToBeExcluded = clusterManagerNodesToBeDecommissioned.stream() - .map(DiscoveryNode::getId) - .collect(Collectors.toSet()); - - final Predicate allNodesRemovedAndAbdicated = clusterState -> { - final Set votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); - return nodeIdsToBeExcluded.stream().noneMatch(votingConfigNodeIds::contains) - && nodeIdsToBeExcluded.contains(clusterState.nodes().getClusterManagerNodeId()) == false - && clusterState.nodes().getClusterManagerNodeId() != null; - }; - ActionListener exclusionListener = new ActionListener() { - @Override - public void onResponse(Void unused) { - if (clusterService.getClusterApplierService().state().nodes().isLocalNodeElectedClusterManager()) { - if (nodeHasDecommissionedAttribute(clusterService.localNode(), decommissionAttribute)) { - // this is an unexpected state, as after exclusion of nodes having decommission attribute, - // this local node shouldn't have had the decommission attribute. Will send the failure response to the user - String errorMsg = - "unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request"; - logger.error(errorMsg); - // will go ahead and clear the voting config and mark the status as false - clearVotingConfigExclusionAndUpdateStatus(false, false); - // we can send the failure response to the user here - listener.onFailure(new IllegalStateException(errorMsg)); - } else { - logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request"); - // we are good here to send the response now as the request is processed by an eligible active leader - // and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more to-be-decommission - // nodes can be part of Voting Config - listener.onResponse(new DecommissionResponse(true)); - drainNodesWithDecommissionedAttribute(decommissionRequest); - } - } else { - // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager - // this will ensures that request is retried until cluster manager times out - logger.info( - "local node is not eligible to process the request, " - + "throwing NotClusterManagerException to attempt a retry on an eligible node" - ); - listener.onFailure( - new NotClusterManagerException( - "node [" - + transportService.getLocalNode().toString() - + "] not eligible to execute decommission request. Will retry until timeout." - ) - ); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - // attempting to mark the status as FAILED - clearVotingConfigExclusionAndUpdateStatus(false, false); - } - }; + final ClusterStateObserver observer = new ClusterStateObserver( + clusterService, + TimeValue.timeValueSeconds(30), // TODO update + logger, + threadPool.getThreadContext() + ); - if (allNodesRemovedAndAbdicated.test(state)) { - exclusionListener.onResponse(null); - } else { - logger.debug("sending transport request to remove nodes [{}] from voting config", nodeIdsToBeExcluded.toString()); - // send a transport request to exclude to-be-decommissioned cluster manager eligible nodes from voting config - decommissionController.excludeDecommissionedNodesFromVotingConfig(nodeIdsToBeExcluded, new ActionListener() { - @Override - public void onResponse(Void unused) { - logger.info( - "successfully removed decommissioned cluster manager eligible nodes [{}] from voting config ", - clusterManagerNodesToBeDecommissioned.toString() - ); - final ClusterStateObserver abdicationObserver = new ClusterStateObserver( - clusterService, - TimeValue.timeValueSeconds(60L), - logger, - threadPool.getThreadContext() - ); - final ClusterStateObserver.Listener abdicationListener = new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - logger.debug("to-be-decommissioned node is no more the active leader"); - exclusionListener.onResponse(null); - } + final Predicate allNodesRemovedAndAbdicated = clusterState -> { + final Set votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); + return nodeIdsToBeExcluded.stream().noneMatch(votingConfigNodeIds::contains) // nodes are excluded from voting config + && clusterState.nodes().getClusterManagerNodeId() != null // a master is elected + && nodeIdsToBeExcluded.contains(clusterState.nodes().getClusterManagerNodeId()) == false; // none of excluded node is elected master - @Override - public void onClusterServiceClose() { - String errorMsg = "cluster service closed while waiting for abdication of to-be-decommissioned leader"; - logger.warn(errorMsg); - listener.onFailure(new DecommissioningFailedException(decommissionAttribute, errorMsg)); - } + }; - @Override - public void onTimeout(TimeValue timeout) { - logger.info("timed out while waiting for abdication of to-be-decommissioned leader"); - clearVotingConfigExclusionAndUpdateStatus(false, false); + final Listener clusterStateListener = new Listener() { + @Override + public void onNewClusterState(ClusterState state) { + logger.info( + "successfully removed decommissioned cluster manager eligible nodes [{}] from voting config ", + nodeIdsToBeExcluded.toString() + ); + if (state.nodes().isLocalNodeElectedClusterManager()) { + if (nodeHasDecommissionedAttribute(clusterService.localNode(), decommissionAttribute)) { + // this is an unexpected state, as after exclusion of nodes having decommission attribute, + // this local node shouldn't have had the decommission attribute. Will send the failure response to the user + String errorMsg = + "unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request"; + logger.error(errorMsg); + // will go ahead and clear the voting config and mark the status as false + clearVotingConfigExclusionAndUpdateStatus(false, false); + // we can send the failure response to the user here + listener.onFailure(new IllegalStateException(errorMsg)); + } else { + logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request"); + // we are good here to send the response now as the request is processed by an eligible active leader + // and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more to-be-decommission + // nodes can be part of Voting Config + listener.onResponse(new DecommissionResponse(true)); + drainNodesWithDecommissionedAttribute(decommissionRequest); + } + } else { + // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager + // this will ensures that request is retried until cluster manager times out + logger.info( + "local node is not eligible to process the request, " + + "throwing NotClusterManagerException to attempt a retry on an eligible node" + ); listener.onFailure( - new OpenSearchTimeoutException( - "timed out [{}] while waiting for abdication of to-be-decommissioned leader", - timeout.toString() + new NotClusterManagerException( + "node [" + + transportService.getLocalNode().toString() + + "] not eligible to execute decommission request. Will retry until timeout." ) ); } - }; - // In case the cluster state is already processed even before this code is executed - // therefore testing first before attaching the listener - ClusterState currentState = clusterService.getClusterApplierService().state(); - if (allNodesRemovedAndAbdicated.test(currentState)) { - abdicationListener.onNewClusterState(currentState); - } else { - logger.debug("waiting to abdicate to-be-decommissioned leader"); - abdicationObserver.waitForNextChange(abdicationListener, allNodesRemovedAndAbdicated); } - } - @Override - public void onFailure(Exception e) { - logger.error( - new ParameterizedMessage( - "failure in removing to-be-decommissioned cluster manager eligible nodes [{}] from voting config", + @Override + public void onClusterServiceClose() { + String errorMsg = "cluster service closed while waiting for abdication of to-be-decommissioned leader"; + logger.warn(errorMsg); + listener.onFailure(new DecommissioningFailedException(decommissionAttribute, errorMsg)); + } + + @Override + public void onTimeout(TimeValue timeout) { + logger.error( + "timed out [{}] while removing to-be-decommissioned cluster manager eligible nodes [{}] from voting config", + timeout.toString(), nodeIdsToBeExcluded.toString() - ), - e - ); - exclusionListener.onFailure(e); + ); + listener.onFailure( + new OpenSearchTimeoutException( + "timed out [{}] while removing to-be-decommissioned cluster manager eligible nodes [{}] from voting config", + timeout.toString(), + nodeIdsToBeExcluded.toString() + ) + ); + clearVotingConfigExclusionAndUpdateStatus(false, false); + } + }; + + // In case the cluster state is already processed even before this code is executed + // therefore testing first before attaching the listener + if (allNodesRemovedAndAbdicated.test(newState)) { + clusterStateListener.onNewClusterState(newState); + } else { + logger.debug("waiting to abdicate to-be-decommissioned leader"); + observer.waitForNextChange(clusterStateListener, allNodesRemovedAndAbdicated); } - }); - } + } + }); } + // TODO - after registering the new status check if any node which is not excluded still present in decommissioned zone. If yes, start the action again (retry) void drainNodesWithDecommissionedAttribute(DecommissionRequest decommissionRequest) { ClusterState state = clusterService.getClusterApplierService().state(); Set decommissionedNodes = filterNodesWithDecommissionAttribute( @@ -434,25 +396,6 @@ public void onFailure(Exception e) { }, waitForRemoval); } - private Set filterNodesWithDecommissionAttribute( - ClusterState clusterState, - DecommissionAttribute decommissionAttribute, - boolean onlyClusterManagerNodes - ) { - Set nodesWithDecommissionAttribute = new HashSet<>(); - Iterator nodesIter = onlyClusterManagerNodes - ? clusterState.nodes().getClusterManagerNodes().valuesIt() - : clusterState.nodes().getNodes().valuesIt(); - - while (nodesIter.hasNext()) { - final DiscoveryNode node = nodesIter.next(); - if (nodeHasDecommissionedAttribute(node, decommissionAttribute)) { - nodesWithDecommissionAttribute.add(node); - } - } - return nodesWithDecommissionAttribute; - } - private static void validateAwarenessAttribute( final DecommissionAttribute decommissionAttribute, List awarenessAttributes, @@ -617,40 +560,4 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } }); } - - /** - * Utility method to check if the node has decommissioned attribute - * - * @param discoveryNode node to check on - * @param decommissionAttribute attribute to be checked with - * @return true or false based on whether node has decommissioned attribute - */ - public static boolean nodeHasDecommissionedAttribute(DiscoveryNode discoveryNode, DecommissionAttribute decommissionAttribute) { - String nodeAttributeValue = discoveryNode.getAttributes().get(decommissionAttribute.attributeName()); - return nodeAttributeValue != null && nodeAttributeValue.equals(decommissionAttribute.attributeValue()); - } - - /** - * Utility method to check if the node is commissioned or not - * - * @param discoveryNode node to check on - * @param metadata metadata present current which will be used to check the commissioning status of the node - * @return if the node is commissioned or not - */ - public static boolean nodeCommissioned(DiscoveryNode discoveryNode, Metadata metadata) { - DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata(); - if (decommissionAttributeMetadata != null) { - DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); - DecommissionStatus status = decommissionAttributeMetadata.status(); - if (decommissionAttribute != null && status != null) { - if (nodeHasDecommissionedAttribute(discoveryNode, decommissionAttribute) - && (status.equals(DecommissionStatus.IN_PROGRESS) - || status.equals(DecommissionStatus.SUCCESSFUL) - || status.equals(DecommissionStatus.DRAINING))) { - return false; - } - } - } - return true; - } } From 36d981d1478b14536b1d08a97ee38e72e4993305 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 18:51:13 +0530 Subject: [PATCH 03/26] use max voting config exclusion count Signed-off-by: Rishab Nahata --- .../decommission/DecommissionHelper.java | 8 +++---- .../decommission/DecommissionService.java | 21 +++++++++++++++---- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java index 818c7d5c074f3..a4046ef1001a7 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java @@ -38,7 +38,8 @@ static ClusterState registerDecommissionAttributeInClusterState(ClusterState cur static ClusterState addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes( ClusterState currentState, Set nodeIdsToBeExcluded, - TimeValue decommissionActionTimeout + TimeValue decommissionActionTimeout, + final int maxVotingConfigExclusions ) { AddVotingConfigExclusionsRequest request = new AddVotingConfigExclusionsRequest( Strings.EMPTY_ARRAY, @@ -46,9 +47,8 @@ static ClusterState addVotingConfigExclusionsForToBeDecommissionedClusterManager Strings.EMPTY_ARRAY, decommissionActionTimeout ); - // TODO - update max count - Set resolvedExclusion = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, 10); - return updateExclusionAndGetState(currentState, resolvedExclusion, 10); + Set resolvedExclusion = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, maxVotingConfigExclusions); + return updateExclusionAndGetState(currentState, resolvedExclusion, maxVotingConfigExclusions); } static Set filterNodesWithDecommissionAttribute( diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index ed57bb19105aa..5ff1b539c7634 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -42,6 +42,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING; import static org.opensearch.cluster.decommission.DecommissionHelper.addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes; import static org.opensearch.cluster.decommission.DecommissionHelper.filterNodesWithDecommissionAttribute; import static org.opensearch.cluster.decommission.DecommissionHelper.nodeHasDecommissionedAttribute; @@ -75,6 +76,7 @@ public class DecommissionService { private final DecommissionController decommissionController; private volatile List awarenessAttributes; private volatile Map> forcedAwarenessAttributes; + private volatile int maxVotingConfigExclusions; @Inject public DecommissionService( @@ -97,6 +99,8 @@ public DecommissionService( CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, this::setForcedAwarenessAttributes ); + maxVotingConfigExclusions = MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING, this::setMaxVotingConfigExclusions); } private void setAwarenessAttributes(List awarenessAttributes) { @@ -115,6 +119,10 @@ private void setForcedAwarenessAttributes(Settings forceSettings) { this.forcedAwarenessAttributes = forcedAwarenessAttributes; } + private void setMaxVotingConfigExclusions(int maxVotingConfigExclusions) { + this.maxVotingConfigExclusions = maxVotingConfigExclusions; + } + /** * Starts the new decommission request and registers the metadata with status as {@link DecommissionStatus#INIT} * Once the status is updated, it tries to exclude to-be-decommissioned cluster manager eligible nodes from Voting Configuration @@ -136,23 +144,28 @@ public void startDecommissionAction( public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes); + DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); + // check that request is eligible to proceed ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); + // ensure attribute is weighed away ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); + ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); + Set clusterManagerNodesToBeDecommissioned = filterNodesWithDecommissionAttribute( currentState, decommissionAttribute, true ); logger.info( - "resolved cluster manager eligible nodes [{}] that should be added to Voting Config Exclusion", + "resolved cluster manager eligible nodes [{}] that should be added to voting config exclusion", clusterManagerNodesToBeDecommissioned.toString() ); // add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion nodeIdsToBeExcluded = clusterManagerNodesToBeDecommissioned.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); - newState = addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes(newState, nodeIdsToBeExcluded, TimeValue.timeValueSeconds(30)); - logger.info("registering decommission metadata [{}] to execute action", newState.metadata().decommissionAttributeMetadata().toString()); + newState = addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes(newState, nodeIdsToBeExcluded, TimeValue.timeValueSeconds(30), maxVotingConfigExclusions); + logger.debug("registering decommission metadata [{}] to execute action", newState.metadata().decommissionAttributeMetadata().toString()); return newState; } @@ -172,7 +185,7 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); assert decommissionAttribute.equals(decommissionAttributeMetadata.decommissionAttribute()); - logger.info( + logger.debug( "registered decommission metadata for attribute [{}] with status [{}]", decommissionAttributeMetadata.decommissionAttribute(), decommissionAttributeMetadata.status() From 29a1d046fda22b0dd0df064b4a4113eceaafa629 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 18:58:40 +0530 Subject: [PATCH 04/26] Fix spotless check Signed-off-by: Rishab Nahata --- .../AddVotingConfigExclusionsHelper.java | 6 ++-- ...nsportAddVotingConfigExclusionsAction.java | 2 -- .../decommission/DecommissionHelper.java | 15 +++++++--- .../decommission/DecommissionService.java | 30 ++++++++++++++----- 4 files changed, 36 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/AddVotingConfigExclusionsHelper.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/AddVotingConfigExclusionsHelper.java index 616c030e622d6..b7366066ed65f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/AddVotingConfigExclusionsHelper.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/AddVotingConfigExclusionsHelper.java @@ -19,7 +19,7 @@ public class AddVotingConfigExclusionsHelper { - public static ClusterState updateExclusionAndGetState( + public static ClusterState updateExclusionAndGetState( ClusterState currentState, Set resolvedExclusions, int finalMaxVotingConfigExclusions @@ -32,8 +32,8 @@ public static ClusterState updateExclusionAndGetState( return newState; } - // throws IAE if no nodes matched or maximum exceeded - public static Set resolveVotingConfigExclusionsAndCheckMaximum( + // throws IAE if no nodes matched or maximum exceeded + public static Set resolveVotingConfigExclusionsAndCheckMaximum( AddVotingConfigExclusionsRequest request, ClusterState state, int maxVotingConfigExclusions diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java index 983ca2104f4ab..6ddd95bdc3791 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java @@ -44,10 +44,8 @@ import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; -import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.inject.Inject; diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java index a4046ef1001a7..e8143aeb564fc 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java @@ -28,7 +28,10 @@ */ public class DecommissionHelper { - static ClusterState registerDecommissionAttributeInClusterState(ClusterState currentState, DecommissionAttribute decommissionAttribute) { + static ClusterState registerDecommissionAttributeInClusterState( + ClusterState currentState, + DecommissionAttribute decommissionAttribute + ) { DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute); return ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) @@ -47,7 +50,11 @@ static ClusterState addVotingConfigExclusionsForToBeDecommissionedClusterManager Strings.EMPTY_ARRAY, decommissionActionTimeout ); - Set resolvedExclusion = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, maxVotingConfigExclusions); + Set resolvedExclusion = resolveVotingConfigExclusionsAndCheckMaximum( + request, + currentState, + maxVotingConfigExclusions + ); return updateExclusionAndGetState(currentState, resolvedExclusion, maxVotingConfigExclusions); } @@ -97,8 +104,8 @@ public static boolean nodeCommissioned(DiscoveryNode discoveryNode, Metadata met if (decommissionAttribute != null && status != null) { if (nodeHasDecommissionedAttribute(discoveryNode, decommissionAttribute) && (status.equals(DecommissionStatus.IN_PROGRESS) - || status.equals(DecommissionStatus.SUCCESSFUL) - || status.equals(DecommissionStatus.DRAINING))) { + || status.equals(DecommissionStatus.SUCCESSFUL) + || status.equals(DecommissionStatus.DRAINING))) { return false; } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 5ff1b539c7634..8e1a016ae6ab3 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -153,10 +153,12 @@ public ClusterState execute(ClusterState currentState) { // ensure attribute is weighed away ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); - ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); + ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); Set clusterManagerNodesToBeDecommissioned = filterNodesWithDecommissionAttribute( - currentState, decommissionAttribute, true + currentState, + decommissionAttribute, + true ); logger.info( "resolved cluster manager eligible nodes [{}] that should be added to voting config exclusion", @@ -164,8 +166,16 @@ public ClusterState execute(ClusterState currentState) { ); // add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion nodeIdsToBeExcluded = clusterManagerNodesToBeDecommissioned.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); - newState = addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes(newState, nodeIdsToBeExcluded, TimeValue.timeValueSeconds(30), maxVotingConfigExclusions); - logger.debug("registering decommission metadata [{}] to execute action", newState.metadata().decommissionAttributeMetadata().toString()); + newState = addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes( + newState, + nodeIdsToBeExcluded, + TimeValue.timeValueSeconds(30), + maxVotingConfigExclusions + ); + logger.debug( + "registering decommission metadata [{}] to execute action", + newState.metadata().decommissionAttributeMetadata().toString() + ); return newState; } @@ -202,7 +212,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS final Set votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); return nodeIdsToBeExcluded.stream().noneMatch(votingConfigNodeIds::contains) // nodes are excluded from voting config && clusterState.nodes().getClusterManagerNodeId() != null // a master is elected - && nodeIdsToBeExcluded.contains(clusterState.nodes().getClusterManagerNodeId()) == false; // none of excluded node is elected master + && nodeIdsToBeExcluded.contains(clusterState.nodes().getClusterManagerNodeId()) == false; // none of excluded node + // is elected master }; @@ -227,13 +238,15 @@ public void onNewClusterState(ClusterState state) { } else { logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request"); // we are good here to send the response now as the request is processed by an eligible active leader - // and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more to-be-decommission + // and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more + // to-be-decommission // nodes can be part of Voting Config listener.onResponse(new DecommissionResponse(true)); drainNodesWithDecommissionedAttribute(decommissionRequest); } } else { - // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster manager + // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster + // manager // this will ensures that request is retried until cluster manager times out logger.info( "local node is not eligible to process the request, " @@ -286,7 +299,8 @@ public void onTimeout(TimeValue timeout) { }); } - // TODO - after registering the new status check if any node which is not excluded still present in decommissioned zone. If yes, start the action again (retry) + // TODO - after registering the new status check if any node which is not excluded still present in decommissioned zone. If yes, start + // the action again (retry) void drainNodesWithDecommissionedAttribute(DecommissionRequest decommissionRequest) { ClusterState state = clusterService.getClusterApplierService().state(); Set decommissionedNodes = filterNodesWithDecommissionAttribute( From 39f058fb8da994900a280951db9582e6611a2699 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 19:17:35 +0530 Subject: [PATCH 05/26] fix Signed-off-by: Rishab Nahata --- .../decommission/DecommissionService.java | 38 ++++++------------- 1 file changed, 12 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 8e1a016ae6ab3..11a0ef5d91a24 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -195,6 +195,7 @@ public void onFailure(String source, Exception e) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); assert decommissionAttribute.equals(decommissionAttributeMetadata.decommissionAttribute()); + assert decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT); logger.debug( "registered decommission metadata for attribute [{}] with status [{}]", decommissionAttributeMetadata.decommissionAttribute(), @@ -203,18 +204,16 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS final ClusterStateObserver observer = new ClusterStateObserver( clusterService, - TimeValue.timeValueSeconds(30), // TODO update + TimeValue.timeValueSeconds(30), // TODO update and name timeout to requestTimeout logger, threadPool.getThreadContext() ); final Predicate allNodesRemovedAndAbdicated = clusterState -> { final Set votingConfigNodeIds = clusterState.getLastCommittedConfiguration().getNodeIds(); - return nodeIdsToBeExcluded.stream().noneMatch(votingConfigNodeIds::contains) // nodes are excluded from voting config - && clusterState.nodes().getClusterManagerNodeId() != null // a master is elected - && nodeIdsToBeExcluded.contains(clusterState.nodes().getClusterManagerNodeId()) == false; // none of excluded node - // is elected master - + return nodeIdsToBeExcluded.stream().noneMatch(votingConfigNodeIds::contains) + && clusterState.nodes().getClusterManagerNodeId() != null + && nodeIdsToBeExcluded.contains(clusterState.nodes().getClusterManagerNodeId()) == false; }; final Listener clusterStateListener = new Listener() { @@ -231,22 +230,18 @@ public void onNewClusterState(ClusterState state) { String errorMsg = "unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request"; logger.error(errorMsg); - // will go ahead and clear the voting config and mark the status as false + // will go ahead and clear the voting config and mark the status as failed clearVotingConfigExclusionAndUpdateStatus(false, false); - // we can send the failure response to the user here listener.onFailure(new IllegalStateException(errorMsg)); } else { logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request"); // we are good here to send the response now as the request is processed by an eligible active leader - // and to-be-decommissioned cluster manager is no more part of Voting Configuration and no more - // to-be-decommission - // nodes can be part of Voting Config + // and to-be-decommissioned cluster manager is no more part of Voting Configuration listener.onResponse(new DecommissionResponse(true)); drainNodesWithDecommissionedAttribute(decommissionRequest); } } else { - // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not the cluster - // manager + // explicitly calling listener.onFailure with NotClusterManagerException as the local node is not leader // this will ensures that request is retried until cluster manager times out logger.info( "local node is not eligible to process the request, " @@ -265,24 +260,15 @@ public void onNewClusterState(ClusterState state) { @Override public void onClusterServiceClose() { String errorMsg = "cluster service closed while waiting for abdication of to-be-decommissioned leader"; - logger.warn(errorMsg); + logger.error(errorMsg); listener.onFailure(new DecommissioningFailedException(decommissionAttribute, errorMsg)); } @Override public void onTimeout(TimeValue timeout) { - logger.error( - "timed out [{}] while removing to-be-decommissioned cluster manager eligible nodes [{}] from voting config", - timeout.toString(), - nodeIdsToBeExcluded.toString() - ); - listener.onFailure( - new OpenSearchTimeoutException( - "timed out [{}] while removing to-be-decommissioned cluster manager eligible nodes [{}] from voting config", - timeout.toString(), - nodeIdsToBeExcluded.toString() - ) - ); + String errorMsg = "timed out [" + timeout.toString() + "while removing to-be-decommissioned cluster manager eligible nodes [" + nodeIdsToBeExcluded.toString() + "] from voting config"; + logger.error(errorMsg); + listener.onFailure(new OpenSearchTimeoutException(errorMsg)); clearVotingConfigExclusionAndUpdateStatus(false, false); } }; From e855f705ce4c514712f3d2f7c77a5729adb30987 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 19:20:10 +0530 Subject: [PATCH 06/26] Remove transport call for exclusion Signed-off-by: Rishab Nahata --- .../decommission/DecommissionController.java | 41 ------------------- .../DecommissionControllerTests.java | 24 +---------- 2 files changed, 1 insertion(+), 64 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index ffb20a05b3ef7..788d9fb702254 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -79,47 +79,6 @@ public class DecommissionController { this.threadPool = threadPool; } - /** - * Transport call to add nodes to voting config exclusion - * - * @param nodes set of nodes Ids to be added to voting config exclusion list - * @param listener callback for response or failure - */ - public void excludeDecommissionedNodesFromVotingConfig(Set nodes, ActionListener listener) { - transportService.sendRequest( - transportService.getLocalNode(), - AddVotingConfigExclusionsAction.NAME, - new AddVotingConfigExclusionsRequest( - Strings.EMPTY_ARRAY, - nodes.toArray(String[]::new), - Strings.EMPTY_ARRAY, - TimeValue.timeValueSeconds(120) // giving a larger timeout of 120 sec as cluster might already be in stress when - // decommission is triggered - ), - new TransportResponseHandler() { - @Override - public void handleResponse(AddVotingConfigExclusionsResponse response) { - listener.onResponse(null); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public AddVotingConfigExclusionsResponse read(StreamInput in) throws IOException { - return new AddVotingConfigExclusionsResponse(in); - } - } - ); - } - /** * Transport call to clear voting config exclusion * diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index 5a76e0d5137fb..10a4c477b73f1 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -130,29 +130,7 @@ public void shutdownThreadPoolAndClusterService() { threadPool.shutdown(); } - public void testAddNodesToVotingConfigExclusion() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(2); - - ClusterStateObserver clusterStateObserver = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); - clusterStateObserver.waitForNextChange(new AdjustConfigurationForExclusions(countDownLatch)); - Set nodesToRemoveFromVotingConfig = Collections.singleton(randomFrom("node1", "node6", "node11")); - decommissionController.excludeDecommissionedNodesFromVotingConfig(nodesToRemoveFromVotingConfig, new ActionListener() { - @Override - public void onResponse(Void unused) { - countDownLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail("unexpected failure occurred while removing node from voting config " + e); - } - }); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - clusterService.getClusterApplierService().state().getVotingConfigExclusions().forEach(vce -> { - assertTrue(nodesToRemoveFromVotingConfig.contains(vce.getNodeId())); - assertEquals(nodesToRemoveFromVotingConfig.size(), 1); - }); - } + // TODO - Add test for custom exclusion public void testClearVotingConfigExclusions() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); From 11e3c81cba9a7c726fc9738a07a24a7e63f7288a Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 19:33:16 +0530 Subject: [PATCH 07/26] Abstract clear exclusion Signed-off-by: Rishab Nahata --- .../TransportAddVotingConfigExclusionsAction.java | 4 ++-- .../TransportClearVotingConfigExclusionsAction.java | 10 +++------- ...Helper.java => VotingConfigExclusionsHelper.java} | 12 +++++++++++- .../cluster/decommission/DecommissionHelper.java | 4 ++-- 4 files changed, 18 insertions(+), 12 deletions(-) rename server/src/main/java/org/opensearch/action/admin/cluster/configuration/{AddVotingConfigExclusionsHelper.java => VotingConfigExclusionsHelper.java} (76%) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java index 6ddd95bdc3791..cdf7009777b68 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java @@ -64,8 +64,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; -import static org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; -import static org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsHelper.updateExclusionAndGetState; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.updateExclusionAndGetState; /** * Transport endpoint action for adding exclusions to voting config diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java index 1fc02db4309b1..d9c85b4fc3dce 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java @@ -60,6 +60,8 @@ import java.io.IOException; import java.util.function.Predicate; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState; + /** * Transport endpoint action for clearing exclusions to voting config * @@ -166,13 +168,7 @@ private void submitClearVotingConfigExclusionsTask( clusterService.submitStateUpdateTask("clear-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { - final CoordinationMetadata newCoordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata()) - .clearVotingConfigExclusions() - .build(); - final Metadata newMetadata = Metadata.builder(currentState.metadata()) - .coordinationMetadata(newCoordinationMetadata) - .build(); - return ClusterState.builder(currentState).metadata(newMetadata).build(); + return clearExclusionsAndGetState(currentState); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/AddVotingConfigExclusionsHelper.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java similarity index 76% rename from server/src/main/java/org/opensearch/action/admin/cluster/configuration/AddVotingConfigExclusionsHelper.java rename to server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java index b7366066ed65f..1708dc8c1f132 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/AddVotingConfigExclusionsHelper.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java @@ -17,7 +17,7 @@ import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING; -public class AddVotingConfigExclusionsHelper { +public class VotingConfigExclusionsHelper { public static ClusterState updateExclusionAndGetState( ClusterState currentState, @@ -44,4 +44,14 @@ public static Set resolveVotingConfigExclusionsAndCheckMa MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING.getKey() ); } + + public static ClusterState clearExclusionsAndGetState(ClusterState currentState) { + final CoordinationMetadata newCoordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata()) + .clearVotingConfigExclusions() + .build(); + final Metadata newMetadata = Metadata.builder(currentState.metadata()) + .coordinationMetadata(newCoordinationMetadata) + .build(); + return ClusterState.builder(currentState).metadata(newMetadata).build(); + } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java index e8143aeb564fc..73f32e3ab18ad 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java @@ -20,8 +20,8 @@ import java.util.Iterator; import java.util.Set; -import static org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; -import static org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsHelper.updateExclusionAndGetState; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.updateExclusionAndGetState; /** * Static helper utilities to execute decommission From fe59288ced675768d656eaa0015de369ca83c727 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 19:49:55 +0530 Subject: [PATCH 08/26] Use controller for status update and account for terminal status Signed-off-by: Rishab Nahata --- .../decommission/DecommissionController.java | 47 +++---------- .../decommission/DecommissionService.java | 67 ++++++++----------- .../DecommissionControllerTests.java | 20 +----- 3 files changed, 40 insertions(+), 94 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 788d9fb702254..0f1641463ede2 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -52,6 +52,8 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState; + /** * Helper controller class to remove list of nodes from the cluster and update status * @@ -79,42 +81,6 @@ public class DecommissionController { this.threadPool = threadPool; } - /** - * Transport call to clear voting config exclusion - * - * @param listener callback for response or failure - */ - public void clearVotingConfigExclusion(ActionListener listener, boolean waitForRemoval) { - final ClearVotingConfigExclusionsRequest clearVotingConfigExclusionsRequest = new ClearVotingConfigExclusionsRequest(); - clearVotingConfigExclusionsRequest.setWaitForRemoval(waitForRemoval); - transportService.sendRequest( - transportService.getLocalNode(), - ClearVotingConfigExclusionsAction.NAME, - clearVotingConfigExclusionsRequest, - new TransportResponseHandler() { - @Override - public void handleResponse(ClearVotingConfigExclusionsResponse response) { - listener.onResponse(null); - } - - @Override - public void handleException(TransportException exp) { - listener.onFailure(exp); - } - - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - - @Override - public ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOException { - return new ClearVotingConfigExclusionsResponse(in); - } - } - ); - } - /** * This method triggers batch of tasks for nodes to be decommissioned using executor {@link NodeRemovalClusterStateTaskExecutor} * Once the tasks are submitted, it waits for an expected cluster state to guarantee @@ -201,7 +167,7 @@ public void onTimeout(TimeValue timeout) { * @param decommissionStatus status to update decommission metadata with * @param listener listener for response and failure */ - public void updateMetadataWithDecommissionStatus(DecommissionStatus decommissionStatus, ActionListener listener) { + public void updateMetadataWithDecommissionStatus(DecommissionStatus decommissionStatus, ActionListener listener, boolean isTerminalStatus) { clusterService.submitStateUpdateTask("update-decommission-status", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { @@ -218,9 +184,14 @@ public ClusterState execute(ClusterState currentState) { decommissionAttributeMetadata.decommissionAttribute(), decommissionStatus ); - return ClusterState.builder(currentState) + ClusterState newState = ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) .build(); + + if (isTerminalStatus) { + newState = clearExclusionsAndGetState(newState); + } + return newState; } @Override diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 11a0ef5d91a24..5f82f1f3d98cc 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; @@ -43,6 +44,7 @@ import java.util.stream.Collectors; import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState; import static org.opensearch.cluster.decommission.DecommissionHelper.addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes; import static org.opensearch.cluster.decommission.DecommissionHelper.filterNodesWithDecommissionAttribute; import static org.opensearch.cluster.decommission.DecommissionHelper.nodeHasDecommissionedAttribute; @@ -231,7 +233,7 @@ public void onNewClusterState(ClusterState state) { "unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request"; logger.error(errorMsg); // will go ahead and clear the voting config and mark the status as failed - clearVotingConfigExclusionAndUpdateStatus(false, false); + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); listener.onFailure(new IllegalStateException(errorMsg)); } else { logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request"); @@ -269,7 +271,8 @@ public void onTimeout(TimeValue timeout) { String errorMsg = "timed out [" + timeout.toString() + "while removing to-be-decommissioned cluster manager eligible nodes [" + nodeIdsToBeExcluded.toString() + "] from voting config"; logger.error(errorMsg); listener.onFailure(new OpenSearchTimeoutException(errorMsg)); - clearVotingConfigExclusionAndUpdateStatus(false, false); + // will go ahead and clear the voting config and mark the status as failed + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); } }; @@ -317,10 +320,11 @@ public void onFailure(Exception e) { ), e ); - // since we are not able to update the status, we will clear the voting config exclusion we have set earlier - clearVotingConfigExclusionAndUpdateStatus(false, false); + // TODO - this might not be needed + // will go ahead and clear the voting config and mark the status as failed + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); } - }); + }, false); } } @@ -360,12 +364,14 @@ public void onResponse(DecommissionStatus status) { new ActionListener() { @Override public void onResponse(Void unused) { - clearVotingConfigExclusionAndUpdateStatus(true, true); + // will clear the voting config exclusion and mark the status as successful + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.SUCCESSFUL, statusUpdateListener(), true); } @Override public void onFailure(Exception e) { - clearVotingConfigExclusionAndUpdateStatus(false, false); + // will go ahead and clear the voting config and mark the status as failed + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); } } ); @@ -381,32 +387,11 @@ public void onFailure(Exception e) { ), e ); - // since we are not able to update the status, we will clear the voting config exclusion we have set earlier - clearVotingConfigExclusionAndUpdateStatus(false, false); + // TODO - this might not be needed + // will go ahead and clear the voting config and mark the status as failed + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); } - }); - } - - private void clearVotingConfigExclusionAndUpdateStatus(boolean decommissionSuccessful, boolean waitForRemoval) { - decommissionController.clearVotingConfigExclusion(new ActionListener() { - @Override - public void onResponse(Void unused) { - logger.info( - "successfully cleared voting config exclusion after completing decommission action, proceeding to update metadata" - ); - DecommissionStatus updateStatusWith = decommissionSuccessful ? DecommissionStatus.SUCCESSFUL : DecommissionStatus.FAILED; - decommissionController.updateMetadataWithDecommissionStatus(updateStatusWith, statusUpdateListener()); - } - - @Override - public void onFailure(Exception e) { - logger.debug( - new ParameterizedMessage("failure in clearing voting config exclusion after processing decommission request"), - e - ); - decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); - } - }, waitForRemoval); + }, false); } private static void validateAwarenessAttribute( @@ -533,21 +518,27 @@ public void startRecommissionAction(final ActionListener() { + clusterService.submitStateUpdateTask("clear-voting-config-exclusions-during-recommission", new ClusterStateUpdateTask(Priority.URGENT) { @Override - public void onResponse(Void unused) { - logger.info("successfully cleared voting config exclusion for deleting the decommission."); - deleteDecommissionState(listener); + public ClusterState execute(ClusterState currentState) { + return clearExclusionsAndGetState(currentState); } @Override - public void onFailure(Exception e) { + public void onFailure(String source, Exception e) { logger.error("Failure in clearing voting config during delete_decommission request.", e); listener.onFailure(e); } - }, false); + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("successfully cleared voting config exclusion for deleting the decommission."); + deleteDecommissionState(listener); + } + }); } + // TODO - merge this state update with above call void deleteDecommissionState(ActionListener listener) { clusterService.submitStateUpdateTask("delete_decommission_state", new ClusterStateUpdateTask(Priority.URGENT) { @Override diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index 10a4c477b73f1..ace7a136a3dd4 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -131,23 +131,7 @@ public void shutdownThreadPoolAndClusterService() { } // TODO - Add test for custom exclusion - - public void testClearVotingConfigExclusions() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(1); - decommissionController.clearVotingConfigExclusion(new ActionListener() { - @Override - public void onResponse(Void unused) { - countDownLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail("unexpected failure occurred while clearing voting config exclusion" + e); - } - }, false); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - assertThat(clusterService.getClusterApplierService().state().getVotingConfigExclusions(), empty()); - } + // TODO - Add test for clear exclusion public void testNodesRemovedForDecommissionRequestSuccessfulResponse() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); @@ -278,7 +262,7 @@ public void onFailure(Exception e) { fail("decommission status update failed"); countDownLatch.countDown(); } - }); + }, false); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); ClusterState newState = clusterService.getClusterApplierService().state(); DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); From a8d4fd629ee9522a6ad2ea877b19eb2b9d00843d Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 19:59:01 +0530 Subject: [PATCH 09/26] atomic recommission Signed-off-by: Rishab Nahata --- .../decommission/DecommissionHelper.java | 7 ++++ .../decommission/DecommissionService.java | 37 ++++--------------- 2 files changed, 15 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java index 73f32e3ab18ad..0d4dd03f92a10 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java @@ -38,6 +38,13 @@ static ClusterState registerDecommissionAttributeInClusterState( .build(); } + static ClusterState deleteDecommissionAttributeInClusterState(ClusterState currentState) { + Metadata metadata = currentState.metadata(); + Metadata.Builder mdBuilder = Metadata.builder(metadata); + mdBuilder.removeCustom(DecommissionAttributeMetadata.TYPE); + return ClusterState.builder(currentState).metadata(mdBuilder).build(); + } + static ClusterState addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes( ClusterState currentState, Set nodeIdsToBeExcluded, diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 5f82f1f3d98cc..22b05d14f2596 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -46,6 +46,7 @@ import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING; import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState; import static org.opensearch.cluster.decommission.DecommissionHelper.addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes; +import static org.opensearch.cluster.decommission.DecommissionHelper.deleteDecommissionAttributeInClusterState; import static org.opensearch.cluster.decommission.DecommissionHelper.filterNodesWithDecommissionAttribute; import static org.opensearch.cluster.decommission.DecommissionHelper.nodeHasDecommissionedAttribute; import static org.opensearch.cluster.decommission.DecommissionHelper.registerDecommissionAttributeInClusterState; @@ -518,48 +519,26 @@ public void startRecommissionAction(final ActionListener listener) { - clusterService.submitStateUpdateTask("delete_decommission_state", new ClusterStateUpdateTask(Priority.URGENT) { + clusterService.submitStateUpdateTask("delete-decommission-state", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { + ClusterState newState = clearExclusionsAndGetState(currentState); logger.info("Deleting the decommission attribute from the cluster state"); - Metadata metadata = currentState.metadata(); - Metadata.Builder mdBuilder = Metadata.builder(metadata); - mdBuilder.removeCustom(DecommissionAttributeMetadata.TYPE); - return ClusterState.builder(currentState).metadata(mdBuilder).build(); + newState = deleteDecommissionAttributeInClusterState(newState); + return newState; } @Override public void onFailure(String source, Exception e) { - logger.error(() -> new ParameterizedMessage("Failed to clear decommission attribute. [{}]", source), e); + logger.error(() -> new ParameterizedMessage("failure during recommission action [{}]", source), e); listener.onFailure(e); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - // Cluster state processed for deleting the decommission attribute. + logger.info("successfully cleared voting config exclusion and decommissioned attribute"); assert newState.metadata().decommissionAttributeMetadata() == null; + assert newState.coordinationMetadata().getVotingConfigExclusions().isEmpty(); listener.onResponse(new DeleteDecommissionStateResponse(true)); } }); From 14b5c389c3fb9613c5dba3fdac9fb05b157c9c32 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 20:07:04 +0530 Subject: [PATCH 10/26] Remove white spaces Signed-off-by: Rishab Nahata --- .../cluster/decommission/DecommissionService.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 22b05d14f2596..5b58d368e4ee0 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -140,24 +140,17 @@ public void startDecommissionAction( final DecommissionAttribute decommissionAttribute = decommissionRequest.getDecommissionAttribute(); // register the metadata with status as INIT as first step clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) { - private Set nodeIdsToBeExcluded; - @Override public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes); - DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); - // check that request is eligible to proceed ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); - // ensure attribute is weighed away ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); - ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); - Set clusterManagerNodesToBeDecommissioned = filterNodesWithDecommissionAttribute( currentState, decommissionAttribute, From ca97114aa7409a5dba32f6baa6e1a28e03615f34 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 21:34:23 +0530 Subject: [PATCH 11/26] Fix Signed-off-by: Rishab Nahata --- .../cluster/decommission/DecommissionService.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 5b58d368e4ee0..3626072ac042b 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -13,7 +13,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.ActionListener; -import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; @@ -22,7 +21,6 @@ import org.opensearch.cluster.ClusterStateObserver.Listener; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.NotClusterManagerException; -import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.WeightedRouting; @@ -61,8 +59,7 @@ *
    *
  • Initiates nodes decommissioning by adding custom metadata with the attribute and state as {@link DecommissionStatus#INIT}
  • *
  • Remove to-be-decommissioned cluster-manager eligible nodes from voting config and wait for its abdication if it is active leader
  • - *
  • Triggers weigh away for nodes having given awareness attribute to drain.
  • - *
  • Once weighed away, the service triggers nodes decommission. This marks the decommission status as {@link DecommissionStatus#IN_PROGRESS}
  • + *
  • After the draining timeout, the service triggers nodes decommission. This marks the decommission status as {@link DecommissionStatus#IN_PROGRESS}
  • *
  • Once the decommission is successful, the service clears the voting config and marks the status as {@link DecommissionStatus#SUCCESSFUL}
  • *
  • If service fails at any step, it makes best attempt to mark the status as {@link DecommissionStatus#FAILED} and to clear voting config exclusion
  • *
@@ -146,10 +143,10 @@ public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action validateAwarenessAttribute(decommissionAttribute, awarenessAttributes, forcedAwarenessAttributes); DecommissionAttributeMetadata decommissionAttributeMetadata = currentState.metadata().decommissionAttributeMetadata(); - // check that request is eligible to proceed + // check that request is eligible to proceed and attribute is weighed away ensureEligibleRequest(decommissionAttributeMetadata, decommissionAttribute); - // ensure attribute is weighed away ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); + ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); Set clusterManagerNodesToBeDecommissioned = filterNodesWithDecommissionAttribute( currentState, From adb7a346ecaef4fe28fa765463fbd5b397ce7ad1 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 21:54:22 +0530 Subject: [PATCH 12/26] Fix tests Signed-off-by: Rishab Nahata --- .../DecommissionControllerTests.java | 3 - .../DecommissionServiceTests.java | 77 +++++-------------- 2 files changed, 20 insertions(+), 60 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index ace7a136a3dd4..3d483afc53a29 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -130,9 +130,6 @@ public void shutdownThreadPoolAndClusterService() { threadPool.shutdown(); } - // TODO - Add test for custom exclusion - // TODO - Add test for clear exclusion - public void testNodesRemovedForDecommissionRequestSuccessfulResponse() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); Set nodesToBeRemoved = new HashSet<>(); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 7fe58d75932a1..4a3cc3169a640 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -48,6 +48,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptySet; import static java.util.Collections.singletonMap; @@ -313,22 +314,32 @@ public void testDrainNodesWithDecommissionedAttributeWithNoDelay() { } - public void testClearClusterDecommissionState() throws InterruptedException { + public void testRecommissionAction() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-2"); DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( decommissionAttribute, DecommissionStatus.SUCCESSFUL ); - ClusterState state = ClusterState.builder(new ClusterName("test")) - .metadata(Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build()) - .build(); + final ClusterState.Builder builder = builder(clusterService.state()); + setState( + clusterService, + builder.metadata(Metadata.builder(clusterService.state().metadata()) + .decommissionAttributeMetadata(decommissionAttributeMetadata) + .coordinationMetadata( + CoordinationMetadata.builder() + .addVotingConfigExclusion(new CoordinationMetadata.VotingConfigExclusion(clusterService.state().nodes().get("node6"))) + .build() + ) + .build() + ) + ); + AtomicReference clusterStateAtomicReference = new AtomicReference<>(); ActionListener listener = new ActionListener<>() { @Override public void onResponse(DeleteDecommissionStateResponse decommissionResponse) { - DecommissionAttributeMetadata metadata = clusterService.state().metadata().custom(DecommissionAttributeMetadata.TYPE); - assertNull(metadata); + clusterStateAtomicReference.set(clusterService.state()); countDownLatch.countDown(); } @@ -338,59 +349,11 @@ public void onFailure(Exception e) { countDownLatch.countDown(); } }; - - this.decommissionService.deleteDecommissionState(listener); - + this.decommissionService.startRecommissionAction(listener); // Decommission Attribute should be removed. assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); - } - - public void testDeleteDecommissionAttributeClearVotingExclusion() { - TransportService mockTransportService = Mockito.mock(TransportService.class); - Mockito.when(mockTransportService.getLocalNode()).thenReturn(Mockito.mock(DiscoveryNode.class)); - DecommissionService decommissionService = new DecommissionService( - Settings.EMPTY, - clusterSettings, - clusterService, - mockTransportService, - threadPool, - allocationService - ); - decommissionService.startRecommissionAction(Mockito.mock(ActionListener.class)); - - ArgumentCaptor clearVotingConfigExclusionsRequestArgumentCaptor = ArgumentCaptor.forClass( - ClearVotingConfigExclusionsRequest.class - ); - Mockito.verify(mockTransportService) - .sendRequest( - Mockito.any(DiscoveryNode.class), - Mockito.anyString(), - clearVotingConfigExclusionsRequestArgumentCaptor.capture(), - Mockito.any(TransportResponseHandler.class) - ); - - ClearVotingConfigExclusionsRequest request = clearVotingConfigExclusionsRequestArgumentCaptor.getValue(); - assertFalse(request.getWaitForRemoval()); - } - - public void testClusterUpdateTaskForDeletingDecommission() throws InterruptedException { - final CountDownLatch countDownLatch = new CountDownLatch(1); - ActionListener listener = new ActionListener<>() { - @Override - public void onResponse(DeleteDecommissionStateResponse response) { - assertTrue(response.isAcknowledged()); - assertNull(clusterService.state().metadata().decommissionAttributeMetadata()); - countDownLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - fail("On Failure shouldn't have been called"); - countDownLatch.countDown(); - } - }; - decommissionService.deleteDecommissionState(listener); - assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + assertNull(clusterStateAtomicReference.get().metadata().decommissionAttributeMetadata()); + assertEquals(0, clusterStateAtomicReference.get().coordinationMetadata().getVotingConfigExclusions().size()); } private void setWeightedRoutingWeights(Map weights) { From 1c7cf93a5b1db09899220d6f837c11ec67cacaa6 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 21:56:30 +0530 Subject: [PATCH 13/26] Fix spotless check Signed-off-by: Rishab Nahata --- ...portClearVotingConfigExclusionsAction.java | 2 -- .../VotingConfigExclusionsHelper.java | 4 +-- .../decommission/DecommissionController.java | 15 ++++----- .../decommission/DecommissionService.java | 31 ++++++++++++++++--- .../DecommissionControllerTests.java | 1 - .../DecommissionServiceTests.java | 22 ++++++------- 6 files changed, 44 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java index d9c85b4fc3dce..b65688dcc30f6 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportClearVotingConfigExclusionsAction.java @@ -44,10 +44,8 @@ import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; -import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.inject.Inject; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java index 1708dc8c1f132..048c76687f312 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java @@ -49,9 +49,7 @@ public static ClusterState clearExclusionsAndGetState(ClusterState currentState) final CoordinationMetadata newCoordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata()) .clearVotingConfigExclusions() .build(); - final Metadata newMetadata = Metadata.builder(currentState.metadata()) - .coordinationMetadata(newCoordinationMetadata) - .build(); + final Metadata newMetadata = Metadata.builder(currentState.metadata()).coordinationMetadata(newCoordinationMetadata).build(); return ClusterState.builder(currentState).metadata(newMetadata).build(); } } diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index 0f1641463ede2..a4ae0c2db62b4 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -12,12 +12,6 @@ import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.ActionListener; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest; -import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsResponse; -import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction; -import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; -import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse; import org.opensearch.action.admin.cluster.node.stats.NodeStats; import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction; import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; @@ -33,7 +27,6 @@ import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; -import org.opensearch.common.Strings; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.unit.TimeValue; import org.opensearch.http.HttpStats; @@ -167,7 +160,11 @@ public void onTimeout(TimeValue timeout) { * @param decommissionStatus status to update decommission metadata with * @param listener listener for response and failure */ - public void updateMetadataWithDecommissionStatus(DecommissionStatus decommissionStatus, ActionListener listener, boolean isTerminalStatus) { + public void updateMetadataWithDecommissionStatus( + DecommissionStatus decommissionStatus, + ActionListener listener, + boolean isTerminalStatus + ) { clusterService.submitStateUpdateTask("update-decommission-status", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { @@ -184,7 +181,7 @@ public ClusterState execute(ClusterState currentState) { decommissionAttributeMetadata.decommissionAttribute(), decommissionStatus ); - ClusterState newState = ClusterState.builder(currentState) + ClusterState newState = ClusterState.builder(currentState) .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) .build(); diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 3626072ac042b..2c187994754c3 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -138,6 +138,7 @@ public void startDecommissionAction( // register the metadata with status as INIT as first step clusterService.submitStateUpdateTask("decommission [" + decommissionAttribute + "]", new ClusterStateUpdateTask(Priority.URGENT) { private Set nodeIdsToBeExcluded; + @Override public ClusterState execute(ClusterState currentState) { // validates if correct awareness attributes and forced awareness attribute set to the cluster before starting action @@ -224,7 +225,11 @@ public void onNewClusterState(ClusterState state) { "unexpected state encountered [local node is to-be-decommissioned leader] while executing decommission request"; logger.error(errorMsg); // will go ahead and clear the voting config and mark the status as failed - decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); + decommissionController.updateMetadataWithDecommissionStatus( + DecommissionStatus.FAILED, + statusUpdateListener(), + true + ); listener.onFailure(new IllegalStateException(errorMsg)); } else { logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request"); @@ -259,11 +264,19 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - String errorMsg = "timed out [" + timeout.toString() + "while removing to-be-decommissioned cluster manager eligible nodes [" + nodeIdsToBeExcluded.toString() + "] from voting config"; + String errorMsg = "timed out [" + + timeout.toString() + + "while removing to-be-decommissioned cluster manager eligible nodes [" + + nodeIdsToBeExcluded.toString() + + "] from voting config"; logger.error(errorMsg); listener.onFailure(new OpenSearchTimeoutException(errorMsg)); // will go ahead and clear the voting config and mark the status as failed - decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); + decommissionController.updateMetadataWithDecommissionStatus( + DecommissionStatus.FAILED, + statusUpdateListener(), + true + ); } }; @@ -356,13 +369,21 @@ public void onResponse(DecommissionStatus status) { @Override public void onResponse(Void unused) { // will clear the voting config exclusion and mark the status as successful - decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.SUCCESSFUL, statusUpdateListener(), true); + decommissionController.updateMetadataWithDecommissionStatus( + DecommissionStatus.SUCCESSFUL, + statusUpdateListener(), + true + ); } @Override public void onFailure(Exception e) { // will go ahead and clear the voting config and mark the status as failed - decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); + decommissionController.updateMetadataWithDecommissionStatus( + DecommissionStatus.FAILED, + statusUpdateListener(), + true + ); } } ); diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index 3d483afc53a29..f5f5e12e27d85 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -53,7 +53,6 @@ import static java.util.Collections.emptySet; import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java index 4a3cc3169a640..95980991d22b0 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -11,14 +11,12 @@ import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; -import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.cluster.decommission.awareness.delete.DeleteDecommissionStateResponse; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionRequest; import org.opensearch.action.admin.cluster.decommission.awareness.put.DecommissionResponse; -import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.coordination.CoordinationMetadata; @@ -39,7 +37,6 @@ import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; import java.util.Collections; @@ -324,14 +321,17 @@ public void testRecommissionAction() throws InterruptedException { final ClusterState.Builder builder = builder(clusterService.state()); setState( clusterService, - builder.metadata(Metadata.builder(clusterService.state().metadata()) - .decommissionAttributeMetadata(decommissionAttributeMetadata) - .coordinationMetadata( - CoordinationMetadata.builder() - .addVotingConfigExclusion(new CoordinationMetadata.VotingConfigExclusion(clusterService.state().nodes().get("node6"))) - .build() - ) - .build() + builder.metadata( + Metadata.builder(clusterService.state().metadata()) + .decommissionAttributeMetadata(decommissionAttributeMetadata) + .coordinationMetadata( + CoordinationMetadata.builder() + .addVotingConfigExclusion( + new CoordinationMetadata.VotingConfigExclusion(clusterService.state().nodes().get("node6")) + ) + .build() + ) + .build() ) ); AtomicReference clusterStateAtomicReference = new AtomicReference<>(); From 0bc3ea7a780433088fafff1b289cf3dda445d407 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 22:48:07 +0530 Subject: [PATCH 14/26] Fix integ tests Signed-off-by: Rishab Nahata --- .../AwarenessAttributeDecommissionIT.java | 22 ++++--------------- .../decommission/DecommissionService.java | 2 +- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index 067b127a667b4..9a4ac79f27317 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -760,24 +760,11 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception { // and hence due to which the leader won't get abdicated and decommission request should eventually fail. // And in this case, to ensure decommission request doesn't leave mutating change in the cluster, we ensure // that no exclusion is set to the cluster and state for decommission is marked as FAILED - Logger clusterLogger = LogManager.getLogger(DecommissionService.class); - MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(clusterLogger); - mockLogAppender.addExpectation( - new MockLogAppender.SeenEventExpectation( - "test", - DecommissionService.class.getCanonicalName(), - Level.ERROR, - "failure in removing to-be-decommissioned cluster manager eligible nodes" - ) + OpenSearchTimeoutException ex = expectThrows( + OpenSearchTimeoutException.class, + () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() ); - - assertBusy(() -> { - OpenSearchTimeoutException ex = expectThrows( - OpenSearchTimeoutException.class, - () -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet() - ); - assertTrue(ex.getMessage().contains("timed out waiting for voting config exclusions")); - }); + assertTrue(ex.getMessage().contains("while removing to-be-decommissioned cluster manager eligible nodes")); ClusterService leaderClusterService = internalCluster().getInstance( ClusterService.class, @@ -813,7 +800,6 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception { // if the below condition is passed, then we are sure current decommission status is marked FAILED assertTrue(expectedStateLatch.await(30, TimeUnit.SECONDS)); - mockLogAppender.assertAllExpectationsMatched(); // ensure all nodes are part of cluster ensureStableCluster(6, TimeValue.timeValueMinutes(2)); diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 2c187994754c3..50de6f1a36764 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -266,7 +266,7 @@ public void onClusterServiceClose() { public void onTimeout(TimeValue timeout) { String errorMsg = "timed out [" + timeout.toString() - + "while removing to-be-decommissioned cluster manager eligible nodes [" + + "] while removing to-be-decommissioned cluster manager eligible nodes [" + nodeIdsToBeExcluded.toString() + "] from voting config"; logger.error(errorMsg); From 577c77ce51f986f91e768fa92a1d3f011385c2ae Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sat, 5 Nov 2022 22:49:36 +0530 Subject: [PATCH 15/26] Fix spotless check Signed-off-by: Rishab Nahata --- .../cluster/coordination/AwarenessAttributeDecommissionIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java index 9a4ac79f27317..464569b1c60bf 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/coordination/AwarenessAttributeDecommissionIT.java @@ -29,7 +29,6 @@ import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.decommission.DecommissionAttribute; import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; -import org.opensearch.cluster.decommission.DecommissionService; import org.opensearch.cluster.decommission.DecommissionStatus; import org.opensearch.cluster.decommission.DecommissioningFailedException; import org.opensearch.cluster.decommission.NodeDecommissionedException; From 738a99b878c464e5ada32b18a5f9d00438e4f3db Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 8 Nov 2022 19:20:07 +0530 Subject: [PATCH 16/26] Resolve terminal status during update Signed-off-by: Rishab Nahata --- .../decommission/DecommissionController.java | 6 +++--- .../decommission/DecommissionService.java | 20 ++++++++----------- .../DecommissionControllerTests.java | 2 +- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index a4ae0c2db62b4..abe5382973711 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -162,8 +162,7 @@ public void onTimeout(TimeValue timeout) { */ public void updateMetadataWithDecommissionStatus( DecommissionStatus decommissionStatus, - ActionListener listener, - boolean isTerminalStatus + ActionListener listener ) { clusterService.submitStateUpdateTask("update-decommission-status", new ClusterStateUpdateTask(Priority.URGENT) { @Override @@ -185,7 +184,8 @@ public ClusterState execute(ClusterState currentState) { .metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata)) .build(); - if (isTerminalStatus) { + // For terminal status we will go ahead and clear any exclusion that was added as part of decommission action + if (decommissionStatus.equals(DecommissionStatus.SUCCESSFUL) || decommissionStatus.equals(DecommissionStatus.FAILED)) { newState = clearExclusionsAndGetState(newState); } return newState; diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 50de6f1a36764..dba2098c7dd7c 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -227,8 +227,7 @@ public void onNewClusterState(ClusterState state) { // will go ahead and clear the voting config and mark the status as failed decommissionController.updateMetadataWithDecommissionStatus( DecommissionStatus.FAILED, - statusUpdateListener(), - true + statusUpdateListener() ); listener.onFailure(new IllegalStateException(errorMsg)); } else { @@ -274,8 +273,7 @@ public void onTimeout(TimeValue timeout) { // will go ahead and clear the voting config and mark the status as failed decommissionController.updateMetadataWithDecommissionStatus( DecommissionStatus.FAILED, - statusUpdateListener(), - true + statusUpdateListener() ); } }; @@ -326,9 +324,9 @@ public void onFailure(Exception e) { ); // TODO - this might not be needed // will go ahead and clear the voting config and mark the status as failed - decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); } - }, false); + }); } } @@ -371,8 +369,7 @@ public void onResponse(Void unused) { // will clear the voting config exclusion and mark the status as successful decommissionController.updateMetadataWithDecommissionStatus( DecommissionStatus.SUCCESSFUL, - statusUpdateListener(), - true + statusUpdateListener() ); } @@ -381,8 +378,7 @@ public void onFailure(Exception e) { // will go ahead and clear the voting config and mark the status as failed decommissionController.updateMetadataWithDecommissionStatus( DecommissionStatus.FAILED, - statusUpdateListener(), - true + statusUpdateListener() ); } } @@ -401,9 +397,9 @@ public void onFailure(Exception e) { ); // TODO - this might not be needed // will go ahead and clear the voting config and mark the status as failed - decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener(), true); + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); } - }, false); + }); } private static void validateAwarenessAttribute( diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java index f5f5e12e27d85..cf92130095e12 100644 --- a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionControllerTests.java @@ -258,7 +258,7 @@ public void onFailure(Exception e) { fail("decommission status update failed"); countDownLatch.countDown(); } - }, false); + }); assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); ClusterState newState = clusterService.getClusterApplierService().state(); DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); From 5dae89621082047b2add7da41b40ca7ba56e6ffa Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 8 Nov 2022 19:23:17 +0530 Subject: [PATCH 17/26] Add java doc Signed-off-by: Rishab Nahata --- .../configuration/VotingConfigExclusionsHelper.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java index 048c76687f312..06cf15274ec39 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java @@ -17,8 +17,21 @@ import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING; +/** + * Static helper utilities for voting config exclusions cluster state updates + * + * @opensearch.internal + */ public class VotingConfigExclusionsHelper { + /** + * Static helper to update current state with given resolved exclusions + * + * @param currentState current cluster state + * @param resolvedExclusions resolved exclusions from the request + * @param finalMaxVotingConfigExclusions max exclusions that be added + * @return newly formed cluster state + */ public static ClusterState updateExclusionAndGetState( ClusterState currentState, Set resolvedExclusions, From 97a506a5326846c6cf1ec7f7cd3fec563b2d181c Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 8 Nov 2022 19:27:05 +0530 Subject: [PATCH 18/26] Fix spotless check Signed-off-by: Rishab Nahata --- .../cluster/decommission/DecommissionController.java | 5 +---- .../cluster/decommission/DecommissionService.java | 10 ++-------- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java index abe5382973711..1ff2fb52175c7 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionController.java @@ -160,10 +160,7 @@ public void onTimeout(TimeValue timeout) { * @param decommissionStatus status to update decommission metadata with * @param listener listener for response and failure */ - public void updateMetadataWithDecommissionStatus( - DecommissionStatus decommissionStatus, - ActionListener listener - ) { + public void updateMetadataWithDecommissionStatus(DecommissionStatus decommissionStatus, ActionListener listener) { clusterService.submitStateUpdateTask("update-decommission-status", new ClusterStateUpdateTask(Priority.URGENT) { @Override public ClusterState execute(ClusterState currentState) { diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index dba2098c7dd7c..7721d5202fe52 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -271,10 +271,7 @@ public void onTimeout(TimeValue timeout) { logger.error(errorMsg); listener.onFailure(new OpenSearchTimeoutException(errorMsg)); // will go ahead and clear the voting config and mark the status as failed - decommissionController.updateMetadataWithDecommissionStatus( - DecommissionStatus.FAILED, - statusUpdateListener() - ); + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); } }; @@ -376,10 +373,7 @@ public void onResponse(Void unused) { @Override public void onFailure(Exception e) { // will go ahead and clear the voting config and mark the status as failed - decommissionController.updateMetadataWithDecommissionStatus( - DecommissionStatus.FAILED, - statusUpdateListener() - ); + decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); } } ); From e227c0b62639cc6dbc93615f4aa5ed5812e4392e Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Fri, 11 Nov 2022 13:15:48 +0530 Subject: [PATCH 19/26] Resolve PR comments Signed-off-by: Rishab Nahata --- .../TransportAddVotingConfigExclusionsAction.java | 4 ++-- .../cluster/configuration/VotingConfigExclusionsHelper.java | 2 +- .../opensearch/cluster/decommission/DecommissionHelper.java | 6 +++--- .../cluster/decommission/DecommissionService.java | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java index cdf7009777b68..ffdb2735ae69f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/TransportAddVotingConfigExclusionsAction.java @@ -65,7 +65,7 @@ import java.util.stream.Collectors; import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; -import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.updateExclusionAndGetState; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.addExclusionAndGetState; /** * Transport endpoint action for adding exclusions to voting config @@ -145,7 +145,7 @@ public ClusterState execute(ClusterState currentState) { assert resolvedExclusions == null : resolvedExclusions; final int finalMaxVotingConfigExclusions = TransportAddVotingConfigExclusionsAction.this.maxVotingConfigExclusions; resolvedExclusions = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, finalMaxVotingConfigExclusions); - return updateExclusionAndGetState(currentState, resolvedExclusions, finalMaxVotingConfigExclusions); + return addExclusionAndGetState(currentState, resolvedExclusions, finalMaxVotingConfigExclusions); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java index 06cf15274ec39..3bfb774ea1d85 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java @@ -32,7 +32,7 @@ public class VotingConfigExclusionsHelper { * @param finalMaxVotingConfigExclusions max exclusions that be added * @return newly formed cluster state */ - public static ClusterState updateExclusionAndGetState( + public static ClusterState addExclusionAndGetState( ClusterState currentState, Set resolvedExclusions, int finalMaxVotingConfigExclusions diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java index 0d4dd03f92a10..ee18b2aa9b5f3 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java @@ -21,7 +21,7 @@ import java.util.Set; import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; -import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.updateExclusionAndGetState; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.addExclusionAndGetState; /** * Static helper utilities to execute decommission @@ -45,7 +45,7 @@ static ClusterState deleteDecommissionAttributeInClusterState(ClusterState curre return ClusterState.builder(currentState).metadata(mdBuilder).build(); } - static ClusterState addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes( + static ClusterState addVotingConfigExclusionsForNodesToBeDecommissioned( ClusterState currentState, Set nodeIdsToBeExcluded, TimeValue decommissionActionTimeout, @@ -62,7 +62,7 @@ static ClusterState addVotingConfigExclusionsForToBeDecommissionedClusterManager currentState, maxVotingConfigExclusions ); - return updateExclusionAndGetState(currentState, resolvedExclusion, maxVotingConfigExclusions); + return addExclusionAndGetState(currentState, resolvedExclusion, maxVotingConfigExclusions); } static Set filterNodesWithDecommissionAttribute( diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 7721d5202fe52..2d850fa3f560a 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -43,7 +43,7 @@ import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING; import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState; -import static org.opensearch.cluster.decommission.DecommissionHelper.addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes; +import static org.opensearch.cluster.decommission.DecommissionHelper.addVotingConfigExclusionsForNodesToBeDecommissioned; import static org.opensearch.cluster.decommission.DecommissionHelper.deleteDecommissionAttributeInClusterState; import static org.opensearch.cluster.decommission.DecommissionHelper.filterNodesWithDecommissionAttribute; import static org.opensearch.cluster.decommission.DecommissionHelper.nodeHasDecommissionedAttribute; @@ -160,7 +160,7 @@ public ClusterState execute(ClusterState currentState) { ); // add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion nodeIdsToBeExcluded = clusterManagerNodesToBeDecommissioned.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); - newState = addVotingConfigExclusionsForToBeDecommissionedClusterManagerNodes( + newState = addVotingConfigExclusionsForNodesToBeDecommissioned( newState, nodeIdsToBeExcluded, TimeValue.timeValueSeconds(30), From ecfdffd81012ee96c9910302151de849032c4bdd Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 15 Nov 2022 15:26:00 +0530 Subject: [PATCH 20/26] fixes Signed-off-by: Rishab Nahata --- .../VotingConfigExclusionsHelper.java | 15 ++++++++- .../decommission/DecommissionHelper.java | 2 ++ .../decommission/DecommissionService.java | 33 ++++++++++--------- 3 files changed, 33 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java index 3bfb774ea1d85..5cc4bd2f831d7 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelper.java @@ -45,7 +45,14 @@ public static ClusterState addExclusionAndGetState( return newState; } - // throws IAE if no nodes matched or maximum exceeded + /** + * Resolves the exclusion from the request and throws IAE if no nodes matched or maximum exceeded + * + * @param request AddVotingConfigExclusionsRequest request + * @param state current cluster state + * @param maxVotingConfigExclusions max number of exclusion acceptable + * @return set of VotingConfigExclusion + */ public static Set resolveVotingConfigExclusionsAndCheckMaximum( AddVotingConfigExclusionsRequest request, ClusterState state, @@ -58,6 +65,12 @@ public static Set resolveVotingConfigExclusionsAndCheckMa ); } + /** + * Clears Voting config exclusion from the given cluster state + * + * @param currentState current cluster state + * @return newly formed cluster state after clearing voting config exclusions + */ public static ClusterState clearExclusionsAndGetState(ClusterState currentState) { final CoordinationMetadata newCoordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata()) .clearVotingConfigExclusions() diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java index ee18b2aa9b5f3..8305bda545998 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionHelper.java @@ -25,6 +25,8 @@ /** * Static helper utilities to execute decommission + * + * @opensearch.internal */ public class DecommissionHelper { diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 2d850fa3f560a..89bdbf3040b0b 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -21,6 +21,7 @@ import org.opensearch.cluster.ClusterStateObserver.Listener; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.NotClusterManagerException; +import org.opensearch.cluster.coordination.CoordinationMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.WeightedRouting; @@ -149,21 +150,16 @@ public ClusterState execute(ClusterState currentState) { ensureToBeDecommissionedAttributeWeighedAway(currentState, decommissionAttribute); ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); - Set clusterManagerNodesToBeDecommissioned = filterNodesWithDecommissionAttribute( - currentState, - decommissionAttribute, - true - ); + // add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion + nodeIdsToBeExcluded = filterNodesWithDecommissionAttribute(currentState, decommissionAttribute, true).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); logger.info( "resolved cluster manager eligible nodes [{}] that should be added to voting config exclusion", - clusterManagerNodesToBeDecommissioned.toString() + nodeIdsToBeExcluded.toString() ); - // add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion - nodeIdsToBeExcluded = clusterManagerNodesToBeDecommissioned.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); newState = addVotingConfigExclusionsForNodesToBeDecommissioned( newState, nodeIdsToBeExcluded, - TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(60), // TODO - update it with request timeout maxVotingConfigExclusions ); logger.debug( @@ -190,6 +186,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); assert decommissionAttribute.equals(decommissionAttributeMetadata.decommissionAttribute()); assert decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT); + assert newState.getVotingConfigExclusions().stream().map(CoordinationMetadata.VotingConfigExclusion::getNodeId) + .collect(Collectors.toSet()) + .containsAll(nodeIdsToBeExcluded); logger.debug( "registered decommission metadata for attribute [{}] with status [{}]", decommissionAttributeMetadata.decommissionAttribute(), @@ -198,7 +197,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS final ClusterStateObserver observer = new ClusterStateObserver( clusterService, - TimeValue.timeValueSeconds(30), // TODO update and name timeout to requestTimeout + TimeValue.timeValueSeconds(60), // TODO - update it with request timeout logger, threadPool.getThreadContext() ); @@ -231,7 +230,7 @@ public void onNewClusterState(ClusterState state) { ); listener.onFailure(new IllegalStateException(errorMsg)); } else { - logger.info("will attempt to fail decommissioned nodes as local node is eligible to process the request"); + logger.info("will proceed to drain decommissioned nodes as local node is eligible to process the request"); // we are good here to send the response now as the request is processed by an eligible active leader // and to-be-decommissioned cluster manager is no more part of Voting Configuration listener.onResponse(new DecommissionResponse(true)); @@ -281,7 +280,7 @@ public void onTimeout(TimeValue timeout) { clusterStateListener.onNewClusterState(newState); } else { logger.debug("waiting to abdicate to-be-decommissioned leader"); - observer.waitForNextChange(clusterStateListener, allNodesRemovedAndAbdicated); + observer.waitForNextChange(clusterStateListener, allNodesRemovedAndAbdicated); // TODO add request timeout here } } }); @@ -319,8 +318,9 @@ public void onFailure(Exception e) { ), e ); - // TODO - this might not be needed - // will go ahead and clear the voting config and mark the status as failed + // This decommission state update call will most likely fail as the state update call to 'DRAINING' + // failed. But attempting it anyways as FAILED update might still pass as it doesn't have dependency on + // the current state decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); } }); @@ -389,8 +389,9 @@ public void onFailure(Exception e) { ), e ); - // TODO - this might not be needed - // will go ahead and clear the voting config and mark the status as failed + // This decommission state update call will most likely fail as the state update call to 'DRAINING' + // failed. But attempting it anyways as FAILED update might still pass as it doesn't have dependency on + // the current state decommissionController.updateMetadataWithDecommissionStatus(DecommissionStatus.FAILED, statusUpdateListener()); } }); From 023a048c0af1f560a98ac98befa1a22d859d8f23 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 15 Nov 2022 15:32:59 +0530 Subject: [PATCH 21/26] Fix spotless check Signed-off-by: Rishab Nahata --- .../cluster/decommission/DecommissionService.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 89bdbf3040b0b..73fb584c71504 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -151,7 +151,9 @@ public ClusterState execute(ClusterState currentState) { ClusterState newState = registerDecommissionAttributeInClusterState(currentState, decommissionAttribute); // add all 'to-be-decommissioned' cluster manager eligible nodes to voting config exclusion - nodeIdsToBeExcluded = filterNodesWithDecommissionAttribute(currentState, decommissionAttribute, true).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + nodeIdsToBeExcluded = filterNodesWithDecommissionAttribute(currentState, decommissionAttribute, true).stream() + .map(DiscoveryNode::getId) + .collect(Collectors.toSet()); logger.info( "resolved cluster manager eligible nodes [{}] that should be added to voting config exclusion", nodeIdsToBeExcluded.toString() @@ -186,7 +188,9 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS DecommissionAttributeMetadata decommissionAttributeMetadata = newState.metadata().decommissionAttributeMetadata(); assert decommissionAttribute.equals(decommissionAttributeMetadata.decommissionAttribute()); assert decommissionAttributeMetadata.status().equals(DecommissionStatus.INIT); - assert newState.getVotingConfigExclusions().stream().map(CoordinationMetadata.VotingConfigExclusion::getNodeId) + assert newState.getVotingConfigExclusions() + .stream() + .map(CoordinationMetadata.VotingConfigExclusion::getNodeId) .collect(Collectors.toSet()) .containsAll(nodeIdsToBeExcluded); logger.debug( From f3e2c57a51cfda186a72852b01ef74b8b61a879c Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 15 Nov 2022 17:23:23 +0530 Subject: [PATCH 22/26] Empty-Commit Signed-off-by: Rishab Nahata From 0a8e37aef2774a86808b23de707923c3fe856e9f Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 30 Nov 2022 17:44:43 +0530 Subject: [PATCH 23/26] Add UT for helper Signed-off-by: Rishab Nahata --- .../VotingConfigExclusionsHelperTests.java | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelperTests.java diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelperTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelperTests.java new file mode 100644 index 0000000000000..65d11d36e3ced --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelperTests.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.configuration; + +import org.junit.BeforeClass; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.Strings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Set; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.addExclusionAndGetState; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState; +import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum; + +public class VotingConfigExclusionsHelperTests extends OpenSearchTestCase { + + private static DiscoveryNode localNode, otherNode1, otherNode2, otherDataNode; + private static CoordinationMetadata.VotingConfigExclusion localNodeExclusion, otherNode1Exclusion, otherNode2Exclusion; + private static ClusterState initialClusterState; + + public void testAddExclusionAndGetState() { + ClusterState updatedState = addExclusionAndGetState(initialClusterState, Set.of(localNodeExclusion), 2); + assertTrue(updatedState.coordinationMetadata().getVotingConfigExclusions().contains(localNodeExclusion)); + assertEquals(1, updatedState.coordinationMetadata().getVotingConfigExclusions().size()); + } + + public void testResolveVotingConfigExclusions() { + AddVotingConfigExclusionsRequest request = new AddVotingConfigExclusionsRequest( + Strings.EMPTY_ARRAY, + new String[]{"other1"}, + Strings.EMPTY_ARRAY, + TimeValue.timeValueSeconds(30) + ); + Set votingConfigExclusions = resolveVotingConfigExclusionsAndCheckMaximum(request, initialClusterState, 10); + assertEquals(1, votingConfigExclusions.size()); + assertTrue(votingConfigExclusions.contains(otherNode1Exclusion)); + } + + public void testResolveVotingConfigExclusionFailsWhenLimitExceeded() { + AddVotingConfigExclusionsRequest request = new AddVotingConfigExclusionsRequest( + Strings.EMPTY_ARRAY, + new String[]{"other1", "other2"}, + Strings.EMPTY_ARRAY, + TimeValue.timeValueSeconds(30) + ); + expectThrows( + IllegalArgumentException.class, + () -> resolveVotingConfigExclusionsAndCheckMaximum(request, initialClusterState, 1) + ); + } + + public void testClearExclusionAndGetState() { + ClusterState updatedState = addExclusionAndGetState(initialClusterState, Set.of(localNodeExclusion), 2); + assertTrue(updatedState.coordinationMetadata().getVotingConfigExclusions().contains(localNodeExclusion)); + updatedState = clearExclusionsAndGetState(updatedState); + assertTrue(updatedState.coordinationMetadata().getVotingConfigExclusions().isEmpty()); + } + + @BeforeClass + public static void createBaseClusterState() { + localNode = makeDiscoveryNode("local"); + localNodeExclusion = new CoordinationMetadata.VotingConfigExclusion(localNode); + otherNode1 = makeDiscoveryNode("other1"); + otherNode1Exclusion = new CoordinationMetadata.VotingConfigExclusion(otherNode1); + otherNode2 = makeDiscoveryNode("other2"); + otherNode2Exclusion = new CoordinationMetadata.VotingConfigExclusion(otherNode2); + otherDataNode = new DiscoveryNode("data", "data", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + final CoordinationMetadata.VotingConfiguration allNodesConfig = CoordinationMetadata.VotingConfiguration.of(localNode, otherNode1, otherNode2); + initialClusterState = ClusterState.builder(new ClusterName("cluster")) + .nodes(new DiscoveryNodes.Builder().add(localNode).add(otherNode1).add(otherNode2).add(otherDataNode).localNodeId(localNode.getId()).clusterManagerNodeId(localNode.getId())) + .metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().lastAcceptedConfiguration(allNodesConfig).lastCommittedConfiguration(allNodesConfig).build())) + .build(); + } + + private static DiscoveryNode makeDiscoveryNode(String name) { + return new DiscoveryNode(name, name, buildNewFakeTransportAddress(), emptyMap(), singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), Version.CURRENT); + } +} From 87bf89ccb24ccc06cfffa40f1958601b3dc7dc1d Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 30 Nov 2022 17:52:14 +0530 Subject: [PATCH 24/26] Fix spotless check Signed-off-by: Rishab Nahata --- .../VotingConfigExclusionsHelperTests.java | 49 ++++++++++++++----- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelperTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelperTests.java index 65d11d36e3ced..f33781064345d 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelperTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/configuration/VotingConfigExclusionsHelperTests.java @@ -45,11 +45,15 @@ public void testAddExclusionAndGetState() { public void testResolveVotingConfigExclusions() { AddVotingConfigExclusionsRequest request = new AddVotingConfigExclusionsRequest( Strings.EMPTY_ARRAY, - new String[]{"other1"}, + new String[] { "other1" }, Strings.EMPTY_ARRAY, TimeValue.timeValueSeconds(30) ); - Set votingConfigExclusions = resolveVotingConfigExclusionsAndCheckMaximum(request, initialClusterState, 10); + Set votingConfigExclusions = resolveVotingConfigExclusionsAndCheckMaximum( + request, + initialClusterState, + 10 + ); assertEquals(1, votingConfigExclusions.size()); assertTrue(votingConfigExclusions.contains(otherNode1Exclusion)); } @@ -57,14 +61,11 @@ public void testResolveVotingConfigExclusions() { public void testResolveVotingConfigExclusionFailsWhenLimitExceeded() { AddVotingConfigExclusionsRequest request = new AddVotingConfigExclusionsRequest( Strings.EMPTY_ARRAY, - new String[]{"other1", "other2"}, + new String[] { "other1", "other2" }, Strings.EMPTY_ARRAY, TimeValue.timeValueSeconds(30) ); - expectThrows( - IllegalArgumentException.class, - () -> resolveVotingConfigExclusionsAndCheckMaximum(request, initialClusterState, 1) - ); + expectThrows(IllegalArgumentException.class, () -> resolveVotingConfigExclusionsAndCheckMaximum(request, initialClusterState, 1)); } public void testClearExclusionAndGetState() { @@ -83,14 +84,40 @@ public static void createBaseClusterState() { otherNode2 = makeDiscoveryNode("other2"); otherNode2Exclusion = new CoordinationMetadata.VotingConfigExclusion(otherNode2); otherDataNode = new DiscoveryNode("data", "data", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - final CoordinationMetadata.VotingConfiguration allNodesConfig = CoordinationMetadata.VotingConfiguration.of(localNode, otherNode1, otherNode2); + final CoordinationMetadata.VotingConfiguration allNodesConfig = CoordinationMetadata.VotingConfiguration.of( + localNode, + otherNode1, + otherNode2 + ); initialClusterState = ClusterState.builder(new ClusterName("cluster")) - .nodes(new DiscoveryNodes.Builder().add(localNode).add(otherNode1).add(otherNode2).add(otherDataNode).localNodeId(localNode.getId()).clusterManagerNodeId(localNode.getId())) - .metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().lastAcceptedConfiguration(allNodesConfig).lastCommittedConfiguration(allNodesConfig).build())) + .nodes( + new DiscoveryNodes.Builder().add(localNode) + .add(otherNode1) + .add(otherNode2) + .add(otherDataNode) + .localNodeId(localNode.getId()) + .clusterManagerNodeId(localNode.getId()) + ) + .metadata( + Metadata.builder() + .coordinationMetadata( + CoordinationMetadata.builder() + .lastAcceptedConfiguration(allNodesConfig) + .lastCommittedConfiguration(allNodesConfig) + .build() + ) + ) .build(); } private static DiscoveryNode makeDiscoveryNode(String name) { - return new DiscoveryNode(name, name, buildNewFakeTransportAddress(), emptyMap(), singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), Version.CURRENT); + return new DiscoveryNode( + name, + name, + buildNewFakeTransportAddress(), + emptyMap(), + singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.CURRENT + ); } } From 4d46a2924d51c8d72fbf4e15187190fa7cd37e14 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Mon, 12 Dec 2022 13:12:58 +0530 Subject: [PATCH 25/26] Add UT for decommission helper Signed-off-by: Rishab Nahata --- .../decommission/DecommissionHelperTests.java | 142 ++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java new file mode 100644 index 0000000000000..ab2d8218ec97d --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionHelperTests.java @@ -0,0 +1,142 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.decommission; + +import org.junit.BeforeClass; +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Set; + +import static java.util.Collections.emptySet; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonMap; +import static org.opensearch.cluster.decommission.DecommissionHelper.addVotingConfigExclusionsForNodesToBeDecommissioned; +import static org.opensearch.cluster.decommission.DecommissionHelper.deleteDecommissionAttributeInClusterState; +import static org.opensearch.cluster.decommission.DecommissionHelper.filterNodesWithDecommissionAttribute; +import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned; +import static org.opensearch.cluster.decommission.DecommissionHelper.registerDecommissionAttributeInClusterState; + +public class DecommissionHelperTests extends OpenSearchTestCase { + + private static DiscoveryNode node1, node2, node3, dataNode; + private static ClusterState initialClusterState; + + public void testRegisterAndDeleteDecommissionAttributeInClusterState() { + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone2"); + ClusterState updatedState = registerDecommissionAttributeInClusterState(initialClusterState, decommissionAttribute); + assertEquals(decommissionAttribute, updatedState.metadata().decommissionAttributeMetadata().decommissionAttribute()); + updatedState = deleteDecommissionAttributeInClusterState(updatedState); + assertNull(updatedState.metadata().decommissionAttributeMetadata()); + } + + public void testAddVotingConfigExclusionsForNodesToBeDecommissioned() { + Set nodeIdToBeExcluded = Set.of("node2"); + ClusterState updatedState = addVotingConfigExclusionsForNodesToBeDecommissioned( + initialClusterState, + nodeIdToBeExcluded, + TimeValue.timeValueMinutes(1), + 10 + ); + CoordinationMetadata.VotingConfigExclusion v1 = new CoordinationMetadata.VotingConfigExclusion(node2); + assertTrue( + updatedState.coordinationMetadata().getVotingConfigExclusions().contains(new CoordinationMetadata.VotingConfigExclusion(node2)) + ); + assertEquals(1, updatedState.coordinationMetadata().getVotingConfigExclusions().size()); + } + + public void testFilterNodes() { + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone1"); + Set filteredNodes = filterNodesWithDecommissionAttribute(initialClusterState, decommissionAttribute, true); + assertTrue(filteredNodes.contains(node1)); + assertEquals(1, filteredNodes.size()); + filteredNodes = filterNodesWithDecommissionAttribute(initialClusterState, decommissionAttribute, false); + assertTrue(filteredNodes.contains(node1)); + assertTrue(filteredNodes.contains(dataNode)); + assertEquals(2, filteredNodes.size()); + } + + public void testNodeCommissioned() { + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone1"); + DecommissionStatus decommissionStatus = randomFrom( + DecommissionStatus.IN_PROGRESS, + DecommissionStatus.DRAINING, + DecommissionStatus.SUCCESSFUL + ); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttribute, + decommissionStatus + ); + Metadata metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); + assertTrue(nodeCommissioned(node2, metadata)); + assertFalse(nodeCommissioned(node1, metadata)); + DecommissionStatus commissionStatus = randomFrom(DecommissionStatus.FAILED, DecommissionStatus.INIT); + decommissionAttributeMetadata = new DecommissionAttributeMetadata(decommissionAttribute, commissionStatus); + metadata = Metadata.builder().putCustom(DecommissionAttributeMetadata.TYPE, decommissionAttributeMetadata).build(); + assertTrue(nodeCommissioned(node2, metadata)); + assertTrue(nodeCommissioned(node1, metadata)); + metadata = Metadata.builder().removeCustom(DecommissionAttributeMetadata.TYPE).build(); + assertTrue(nodeCommissioned(node2, metadata)); + assertTrue(nodeCommissioned(node1, metadata)); + } + + @BeforeClass + public static void createBaseClusterState() { + node1 = makeDiscoveryNode("node1", "zone1"); + node2 = makeDiscoveryNode("node2", "zone2"); + node3 = makeDiscoveryNode("node3", "zone3"); + dataNode = new DiscoveryNode( + "data", + "data", + buildNewFakeTransportAddress(), + singletonMap("zone", "zone1"), + emptySet(), + Version.CURRENT + ); + final CoordinationMetadata.VotingConfiguration allNodesConfig = CoordinationMetadata.VotingConfiguration.of(node1, node2, node3); + initialClusterState = ClusterState.builder(new ClusterName("cluster")) + .nodes( + new DiscoveryNodes.Builder().add(node1) + .add(node2) + .add(node3) + .add(dataNode) + .localNodeId(node1.getId()) + .clusterManagerNodeId(node1.getId()) + ) + .metadata( + Metadata.builder() + .coordinationMetadata( + CoordinationMetadata.builder() + .lastAcceptedConfiguration(allNodesConfig) + .lastCommittedConfiguration(allNodesConfig) + .build() + ) + ) + .build(); + } + + private static DiscoveryNode makeDiscoveryNode(String name, String zone) { + return new DiscoveryNode( + name, + name, + buildNewFakeTransportAddress(), + singletonMap("zone", zone), + singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.CURRENT + ); + } +} From 6ce66281d84ad8cc76219fa56ceb6187f1e1bb2d Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Tue, 13 Dec 2022 17:45:54 +0530 Subject: [PATCH 26/26] Update timeout Signed-off-by: Rishab Nahata --- .../opensearch/cluster/decommission/DecommissionService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java index 73fb584c71504..f36d7b3e06da9 100644 --- a/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java +++ b/server/src/main/java/org/opensearch/cluster/decommission/DecommissionService.java @@ -161,7 +161,7 @@ public ClusterState execute(ClusterState currentState) { newState = addVotingConfigExclusionsForNodesToBeDecommissioned( newState, nodeIdsToBeExcluded, - TimeValue.timeValueSeconds(60), // TODO - update it with request timeout + TimeValue.timeValueSeconds(120), // TODO - update it with request timeout maxVotingConfigExclusions ); logger.debug( @@ -201,7 +201,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS final ClusterStateObserver observer = new ClusterStateObserver( clusterService, - TimeValue.timeValueSeconds(60), // TODO - update it with request timeout + TimeValue.timeValueSeconds(120), // TODO - update it with request timeout logger, threadPool.getThreadContext() );