Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atomically update cluster state with decommission status and corresponding action #5093

Merged
merged 29 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
236940f
Abstract helpers from TransportVotingConfigExclusion
imRishN Nov 5, 2022
b634138
Update INIT and exclusion in one call
imRishN Nov 5, 2022
36d981d
use max voting config exclusion count
imRishN Nov 5, 2022
29a1d04
Fix spotless check
imRishN Nov 5, 2022
39f058f
fix
imRishN Nov 5, 2022
e855f70
Remove transport call for exclusion
imRishN Nov 5, 2022
11e3c81
Abstract clear exclusion
imRishN Nov 5, 2022
fe59288
Use controller for status update and account for terminal status
imRishN Nov 5, 2022
a8d4fd6
atomic recommission
imRishN Nov 5, 2022
14b5c38
Remove white spaces
imRishN Nov 5, 2022
ca97114
Fix
imRishN Nov 5, 2022
adb7a34
Fix tests
imRishN Nov 5, 2022
1c7cf93
Fix spotless check
imRishN Nov 5, 2022
0bc3ea7
Fix integ tests
imRishN Nov 5, 2022
577c77c
Fix spotless check
imRishN Nov 5, 2022
738a99b
Resolve terminal status during update
imRishN Nov 8, 2022
5dae896
Add java doc
imRishN Nov 8, 2022
97a506a
Fix spotless check
imRishN Nov 8, 2022
e227c0b
Resolve PR comments
imRishN Nov 11, 2022
ecfdffd
fixes
imRishN Nov 15, 2022
023a048
Fix spotless check
imRishN Nov 15, 2022
703821c
Merge remote-tracking branch 'upstream/main' into decommission/init-m…
imRishN Nov 15, 2022
f3e2c57
Empty-Commit
imRishN Nov 15, 2022
948a28d
Merge remote-tracking branch 'upstream/main' into decommission/init-m…
imRishN Nov 30, 2022
0a8e37a
Add UT for helper
imRishN Nov 30, 2022
87bf89c
Fix spotless check
imRishN Nov 30, 2022
4d46a29
Add UT for decommission helper
imRishN Dec 12, 2022
a159da6
Merge remote-tracking branch 'upstream/main' into decommission/init-m…
imRishN Dec 12, 2022
6ce6628
Update timeout
imRishN Dec 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.decommission.DecommissionAttribute;
import org.opensearch.cluster.decommission.DecommissionAttributeMetadata;
import org.opensearch.cluster.decommission.DecommissionService;
import org.opensearch.cluster.decommission.DecommissionStatus;
import org.opensearch.cluster.decommission.DecommissioningFailedException;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
Expand Down Expand Up @@ -824,24 +823,11 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception {
// and hence due to which the leader won't get abdicated and decommission request should eventually fail.
// And in this case, to ensure decommission request doesn't leave mutating change in the cluster, we ensure
// that no exclusion is set to the cluster and state for decommission is marked as FAILED
Logger clusterLogger = LogManager.getLogger(DecommissionService.class);
MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(clusterLogger);
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test",
DecommissionService.class.getCanonicalName(),
Level.ERROR,
"failure in removing to-be-decommissioned cluster manager eligible nodes"
)
OpenSearchTimeoutException ex = expectThrows(
OpenSearchTimeoutException.class,
() -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet()
);

assertBusy(() -> {
OpenSearchTimeoutException ex = expectThrows(
OpenSearchTimeoutException.class,
() -> client().execute(DecommissionAction.INSTANCE, decommissionRequest).actionGet()
);
assertTrue(ex.getMessage().contains("timed out waiting for voting config exclusions"));
});
assertTrue(ex.getMessage().contains("while removing to-be-decommissioned cluster manager eligible nodes"));

ClusterService leaderClusterService = internalCluster().getInstance(
ClusterService.class,
Expand Down Expand Up @@ -877,7 +863,6 @@ public void testDecommissionFailedWithOnlyOneAttributeValue() throws Exception {

// if the below condition is passed, then we are sure current decommission status is marked FAILED
assertTrue(expectedStateLatch.await(30, TimeUnit.SECONDS));
mockLogAppender.assertAllExpectationsMatched();

// ensure all nodes are part of cluster
ensureStableCluster(6, TimeValue.timeValueMinutes(2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
Expand All @@ -66,6 +64,9 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.resolveVotingConfigExclusionsAndCheckMaximum;
import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.addExclusionAndGetState;

/**
* Transport endpoint action for adding exclusions to voting config
*
Expand Down Expand Up @@ -144,13 +145,7 @@ public ClusterState execute(ClusterState currentState) {
assert resolvedExclusions == null : resolvedExclusions;
final int finalMaxVotingConfigExclusions = TransportAddVotingConfigExclusionsAction.this.maxVotingConfigExclusions;
resolvedExclusions = resolveVotingConfigExclusionsAndCheckMaximum(request, currentState, finalMaxVotingConfigExclusions);

final CoordinationMetadata.Builder builder = CoordinationMetadata.builder(currentState.coordinationMetadata());
resolvedExclusions.forEach(builder::addVotingConfigExclusion);
final Metadata newMetadata = Metadata.builder(currentState.metadata()).coordinationMetadata(builder.build()).build();
final ClusterState newState = ClusterState.builder(currentState).metadata(newMetadata).build();
assert newState.getVotingConfigExclusions().size() <= finalMaxVotingConfigExclusions;
return newState;
return addExclusionAndGetState(currentState, resolvedExclusions, finalMaxVotingConfigExclusions);
}

@Override
Expand Down Expand Up @@ -213,18 +208,6 @@ public void onTimeout(TimeValue timeout) {
});
}

private static Set<VotingConfigExclusion> resolveVotingConfigExclusionsAndCheckMaximum(
AddVotingConfigExclusionsRequest request,
ClusterState state,
int maxVotingConfigExclusions
) {
return request.resolveVotingConfigExclusionsAndCheckMaximum(
state,
maxVotingConfigExclusions,
MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING.getKey()
);
}

@Override
protected ClusterBlockException checkBlock(AddVotingConfigExclusionsRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
Expand All @@ -60,6 +58,8 @@
import java.io.IOException;
import java.util.function.Predicate;

import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState;

/**
* Transport endpoint action for clearing exclusions to voting config
*
Expand Down Expand Up @@ -166,13 +166,7 @@ private void submitClearVotingConfigExclusionsTask(
clusterService.submitStateUpdateTask("clear-voting-config-exclusions", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(ClusterState currentState) {
final CoordinationMetadata newCoordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata())
.clearVotingConfigExclusions()
.build();
final Metadata newMetadata = Metadata.builder(currentState.metadata())
.coordinationMetadata(newCoordinationMetadata)
.build();
return ClusterState.builder(currentState).metadata(newMetadata).build();
return clearExclusionsAndGetState(currentState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.configuration;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
import org.opensearch.cluster.metadata.Metadata;

import java.util.Set;

import static org.opensearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction.MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING;

/**
* Static helper utilities for voting config exclusions cluster state updates
*
* @opensearch.internal
*/
public class VotingConfigExclusionsHelper {

/**
* Static helper to update current state with given resolved exclusions
*
* @param currentState current cluster state
* @param resolvedExclusions resolved exclusions from the request
* @param finalMaxVotingConfigExclusions max exclusions that be added
* @return newly formed cluster state
*/
public static ClusterState addExclusionAndGetState(
ClusterState currentState,
Set<VotingConfigExclusion> resolvedExclusions,
int finalMaxVotingConfigExclusions
) {
final CoordinationMetadata.Builder builder = CoordinationMetadata.builder(currentState.coordinationMetadata());
resolvedExclusions.forEach(builder::addVotingConfigExclusion);
final Metadata newMetadata = Metadata.builder(currentState.metadata()).coordinationMetadata(builder.build()).build();
final ClusterState newState = ClusterState.builder(currentState).metadata(newMetadata).build();
assert newState.getVotingConfigExclusions().size() <= finalMaxVotingConfigExclusions;
return newState;
}

/**
* Resolves the exclusion from the request and throws IAE if no nodes matched or maximum exceeded
*
* @param request AddVotingConfigExclusionsRequest request
* @param state current cluster state
* @param maxVotingConfigExclusions max number of exclusion acceptable
* @return set of VotingConfigExclusion
*/
public static Set<VotingConfigExclusion> resolveVotingConfigExclusionsAndCheckMaximum(
AddVotingConfigExclusionsRequest request,
ClusterState state,
int maxVotingConfigExclusions
) {
return request.resolveVotingConfigExclusionsAndCheckMaximum(
state,
maxVotingConfigExclusions,
MAXIMUM_VOTING_CONFIG_EXCLUSIONS_SETTING.getKey()
);
}

/**
* Clears Voting config exclusion from the given cluster state
*
* @param currentState current cluster state
* @return newly formed cluster state after clearing voting config exclusions
*/
public static ClusterState clearExclusionsAndGetState(ClusterState currentState) {
final CoordinationMetadata newCoordinationMetadata = CoordinationMetadata.builder(currentState.coordinationMetadata())
.clearVotingConfigExclusions()
.build();
final Metadata newMetadata = Metadata.builder(currentState.metadata()).coordinationMetadata(newCoordinationMetadata).build();
return ClusterState.builder(currentState).metadata(newMetadata).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@
import java.util.stream.StreamSupport;

import static org.opensearch.cluster.coordination.NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID;
import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned;
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.opensearch.cluster.decommission.DecommissionService.nodeCommissioned;
import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.AddVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsAction;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsRequest;
import org.opensearch.action.admin.cluster.configuration.ClearVotingConfigExclusionsResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsAction;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
Expand All @@ -33,7 +27,6 @@
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.http.HttpStats;
Expand All @@ -52,6 +45,8 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static org.opensearch.action.admin.cluster.configuration.VotingConfigExclusionsHelper.clearExclusionsAndGetState;

/**
* Helper controller class to remove list of nodes from the cluster and update status
*
Expand Down Expand Up @@ -79,83 +74,6 @@ public class DecommissionController {
this.threadPool = threadPool;
}

/**
* Transport call to add nodes to voting config exclusion
*
* @param nodes set of nodes Ids to be added to voting config exclusion list
* @param listener callback for response or failure
*/
public void excludeDecommissionedNodesFromVotingConfig(Set<String> nodes, ActionListener<Void> listener) {
transportService.sendRequest(
transportService.getLocalNode(),
AddVotingConfigExclusionsAction.NAME,
new AddVotingConfigExclusionsRequest(
Strings.EMPTY_ARRAY,
nodes.toArray(String[]::new),
Strings.EMPTY_ARRAY,
TimeValue.timeValueSeconds(120) // giving a larger timeout of 120 sec as cluster might already be in stress when
// decommission is triggered
),
new TransportResponseHandler<AddVotingConfigExclusionsResponse>() {
@Override
public void handleResponse(AddVotingConfigExclusionsResponse response) {
listener.onResponse(null);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public AddVotingConfigExclusionsResponse read(StreamInput in) throws IOException {
return new AddVotingConfigExclusionsResponse(in);
}
}
);
}

/**
* Transport call to clear voting config exclusion
*
* @param listener callback for response or failure
*/
public void clearVotingConfigExclusion(ActionListener<Void> listener, boolean waitForRemoval) {
final ClearVotingConfigExclusionsRequest clearVotingConfigExclusionsRequest = new ClearVotingConfigExclusionsRequest();
clearVotingConfigExclusionsRequest.setWaitForRemoval(waitForRemoval);
transportService.sendRequest(
transportService.getLocalNode(),
ClearVotingConfigExclusionsAction.NAME,
clearVotingConfigExclusionsRequest,
new TransportResponseHandler<ClearVotingConfigExclusionsResponse>() {
@Override
public void handleResponse(ClearVotingConfigExclusionsResponse response) {
listener.onResponse(null);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public ClearVotingConfigExclusionsResponse read(StreamInput in) throws IOException {
return new ClearVotingConfigExclusionsResponse(in);
}
}
);
}

/**
* This method triggers batch of tasks for nodes to be decommissioned using executor {@link NodeRemovalClusterStateTaskExecutor}
* Once the tasks are submitted, it waits for an expected cluster state to guarantee
Expand Down Expand Up @@ -259,9 +177,15 @@ public ClusterState execute(ClusterState currentState) {
decommissionAttributeMetadata.decommissionAttribute(),
decommissionStatus
);
return ClusterState.builder(currentState)
ClusterState newState = ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.metadata()).decommissionAttributeMetadata(decommissionAttributeMetadata))
.build();

// For terminal status we will go ahead and clear any exclusion that was added as part of decommission action
if (decommissionStatus.equals(DecommissionStatus.SUCCESSFUL) || decommissionStatus.equals(DecommissionStatus.FAILED)) {
newState = clearExclusionsAndGetState(newState);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets ensure we call this out in the documentation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

}
return newState;
}

@Override
Expand Down
Loading