Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster health call to throw decommissioned exception for local decommissioned node #6008

Merged
merged 13 commits into from
Jan 29, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https:/opensearch-project/OpenSearch/pull/4773))
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https:/opensearch-project/OpenSearch/pull/5283))
- Require MediaType in Strings.toString API ([#6009](https:/opensearch-project/OpenSearch/pull/6009))
- Cluster health call to throw decommissioned exception for local decommissioned node([#6008](https:/opensearch-project/OpenSearch/pull/6008))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this under section unreleased 2.x

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack


### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@
"awareness_attribute":{
"type":"string",
"description":"The awareness attribute for which the health is required"
},
"ensure_node_commissioned":{
"type":"boolean",
"description": "Checks whether local node is commissioned or not. If set to true on a local call it will throw exception if node is decommissioned (default: false)"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,36 @@ public boolean innerMatch(LogEvent event) {
Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, decommissionedNode);
assertFalse(coordinator.localNodeCommissioned());

// Check cluster health API for decommissioned and active node
ClusterHealthResponse activeNodeLocalHealth = client(activeNode).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeCommissioned(true)
.execute()
.actionGet();
assertFalse(activeNodeLocalHealth.isTimedOut());

ClusterHealthResponse decommissionedNodeLocalHealth = client(decommissionedNode).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.execute()
.actionGet();
assertFalse(decommissionedNodeLocalHealth.isTimedOut());

NodeDecommissionedException ex = expectThrows(
NodeDecommissionedException.class,
() -> client(decommissionedNode).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeCommissioned(true)
.execute()
.actionGet()
);
assertTrue(ex.getMessage().contains("local node is decommissioned"));

// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(activeNode).execute(
DeleteDecommissionStateAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequest<ClusterH
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
private String waitForNodes = "";
private Priority waitForEvents = null;
private boolean ensureNodeCommissioned = false;
/**
* Only used by the high-level REST Client. Controls the details level of the health information returned.
* The default value is 'cluster'.
Expand Down Expand Up @@ -98,6 +99,9 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
awarenessAttribute = in.readOptionalString();
level = in.readEnum(Level.class);
}
if (in.getVersion().onOrAfter(Version.CURRENT)) {
ensureNodeCommissioned = in.readBoolean();
}
imRishN marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down Expand Up @@ -130,6 +134,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(awarenessAttribute);
out.writeEnum(level);
}
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeBoolean(ensureNodeCommissioned);
}
}

@Override
Expand Down Expand Up @@ -314,13 +321,30 @@ public String getAwarenessAttribute() {
return awarenessAttribute;
}

public final ClusterHealthRequest ensureNodeCommissioned(boolean ensureNodeCommissioned) {
this.ensureNodeCommissioned = ensureNodeCommissioned;
return this;
}

/**
* For a given local request, checks if the local node is commissioned or not (default: false).
* @return <code>true</code> if local information is to be returned only when local node is also commissioned
* <code>false</code> to not check local node if commissioned or not for a local request
*/
public final boolean ensureNodeCommissioned() {
return ensureNodeCommissioned;
}

@Override
public ActionRequestValidationException validate() {
if (level.equals(Level.AWARENESS_ATTRIBUTES) && indices.length > 0) {
return addValidationError("awareness_attribute is not a supported parameter with index health", null);
} else if (!level.equals(Level.AWARENESS_ATTRIBUTES) && awarenessAttribute != null) {
return addValidationError("level=awareness_attributes is required with awareness_attribute parameter", null);
}
if (ensureNodeCommissioned && local == false) {
return addValidationError("not a local request to ensure local node commissioned", null);
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,12 @@ public ClusterHealthRequestBuilder setLevel(String level) {
request.setLevel(level);
return this;
}

/**
* Specifies if the local request should ensure that the local node is commissioned
*/
public final ClusterHealthRequestBuilder setEnsureNodeCommissioned(boolean ensureNodeCommissioned) {
request.ensureNodeCommissioned(ensureNodeCommissioned);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.cluster.LocalClusterUpdateTask;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -57,6 +59,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CollectionUtils;
import org.opensearch.discovery.Discovery;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
Expand All @@ -77,6 +80,7 @@ public class TransportClusterHealthAction extends TransportClusterManagerNodeRea
private static final Logger logger = LogManager.getLogger(TransportClusterHealthAction.class);

private final AllocationService allocationService;
private final Discovery discovery;

@Inject
public TransportClusterHealthAction(
Expand All @@ -85,7 +89,8 @@ public TransportClusterHealthAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AllocationService allocationService
AllocationService allocationService,
Discovery discovery
) {
super(
ClusterHealthAction.NAME,
Expand All @@ -98,6 +103,7 @@ public TransportClusterHealthAction(
indexNameExpressionResolver
);
this.allocationService = allocationService;
this.discovery = discovery;
}

@Override
Expand Down Expand Up @@ -134,7 +140,12 @@ protected void clusterManagerOperation(
final ClusterState unusedState,
final ActionListener<ClusterHealthResponse> listener
) {

if (request.ensureNodeCommissioned()
&& discovery instanceof Coordinator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should discovery instanceof Coordinator be an assertion instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asserts wouldn't run on prod. And only coordinator has this node's commission status info. If a developer uses a different Discovery mechanism it might break this. Hence putting this check directly

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly so tests should fail for a developer, in prod this is expected to be Coordinator

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be fair to assume that Coordinator can only be the discovery for all use cases? Can there a plugin which writes there own Discovery model? I see something like this implemented in Gateway service. Here, they assume discovery might not be instance of Coordinator. But I get your point, for this change we can also add asserts and put in if as well to be a little cautious

if (discovery instanceof Coordinator) {
            recoveryRunnable = () -> clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask());
        } else {
            final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);
            recoveryRunnable = () -> gateway.performStateRecovery(new GatewayRecoveryListener());
        }

&& ((Coordinator) discovery).localNodeCommissioned() == false) {
listener.onFailure(new NodeDecommissionedException("local node is decommissioned"));
return;
}
final int waitCount = getWaitCount(request);

if (request.waitForEvents() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1444,8 +1444,7 @@ synchronized void onNodeCommissionStatusChange(boolean localNodeCommissioned) {
peerFinder.onNodeCommissionStatusChange(localNodeCommissioned);
}

// package-visible for testing
boolean localNodeCommissioned() {
public boolean localNodeCommissioned() {
return localNodeCommissioned;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

import java.io.IOException;

Expand All @@ -28,4 +29,9 @@ public NodeDecommissionedException(String msg, Object... args) {
public NodeDecommissionedException(StreamInput in) throws IOException {
super(in);
}

@Override
public RestStatus status() {
return RestStatus.FAILED_DEPENDENCY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public static ClusterHealthRequest fromRequest(final RestRequest request) {
final ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(Strings.splitStringByCommaToArray(request.param("index")));
clusterHealthRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterHealthRequest.indicesOptions()));
clusterHealthRequest.local(request.paramAsBoolean("local", clusterHealthRequest.local()));
clusterHealthRequest.ensureNodeCommissioned(
request.paramAsBoolean("ensure_node_commissioned", clusterHealthRequest.ensureNodeCommissioned())
);
clusterHealthRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", clusterHealthRequest.clusterManagerNodeTimeout())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.health;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -70,6 +71,23 @@ public void testRequestReturnsHiddenIndicesByDefault() {
assertTrue(defaultRequest.indicesOptions().expandWildcardsHidden());
}

public void testValidation() {
ClusterHealthRequest clusterHealthRequest = randomRequest();
{
clusterHealthRequest.local(false);
clusterHealthRequest.ensureNodeCommissioned(true);
ActionRequestValidationException e = clusterHealthRequest.validate();
assertNotNull(e);
assertTrue(e.getMessage().contains("not a local request to ensure local node commissioned"));
}
{
clusterHealthRequest.local(true);
clusterHealthRequest.ensureNodeCommissioned(false);
ActionRequestValidationException e = clusterHealthRequest.validate();
assertNull(e);
}
}

private ClusterHealthRequest randomRequest() {
ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForStatus(randomFrom(ClusterHealthStatus.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte
threadPool,
new ActionFilters(new HashSet<>()),
indexNameExpressionResolver,
new AllocationService(null, new TestGatewayAllocator(), null, null, null)
new AllocationService(null, new TestGatewayAllocator(), null, null, null),
null
);
PlainActionFuture<ClusterHealthResponse> listener = new PlainActionFuture<>();
action.execute(new ClusterHealthRequest().waitForGreenStatus(), listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public void testFromRequest() {
Map<String, String> params = new HashMap<>();
String index = "index";
boolean local = randomBoolean();
boolean ensureLocalNodeCommissioned = false;
if (local) {
ensureLocalNodeCommissioned = randomBoolean();
}
String clusterManagerTimeout = randomTimeValue();
String timeout = randomTimeValue();
ClusterHealthStatus waitForStatus = randomFrom(ClusterHealthStatus.values());
Expand All @@ -63,6 +67,7 @@ public void testFromRequest() {

params.put("index", index);
params.put("local", String.valueOf(local));
params.put("ensure_node_commissioned", String.valueOf(ensureLocalNodeCommissioned));
params.put("cluster_manager_timeout", clusterManagerTimeout);
params.put("timeout", timeout);
params.put("wait_for_status", waitForStatus.name());
Expand All @@ -81,6 +86,7 @@ public void testFromRequest() {
assertThat(clusterHealthRequest.indices().length, equalTo(1));
assertThat(clusterHealthRequest.indices()[0], equalTo(index));
assertThat(clusterHealthRequest.local(), equalTo(local));
assertThat(clusterHealthRequest.ensureNodeCommissioned(), equalTo(ensureLocalNodeCommissioned));
assertThat(clusterHealthRequest.clusterManagerNodeTimeout(), equalTo(TimeValue.parseTimeValue(clusterManagerTimeout, "test")));
assertThat(clusterHealthRequest.timeout(), equalTo(TimeValue.parseTimeValue(timeout, "test")));
assertThat(clusterHealthRequest.waitForStatus(), equalTo(waitForStatus));
Expand Down