Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster health call to throw decommissioned exception for local decommissioned node #6008

Merged
merged 13 commits into from
Jan 29, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Changed http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https:/opensearch-project/OpenSearch/pull/4773))
- Change http code for DecommissioningFailedException from 500 to 400 ([#5283](https:/opensearch-project/OpenSearch/pull/5283))
- Require MediaType in Strings.toString API ([#6009](https:/opensearch-project/OpenSearch/pull/6009))
- Cluster health call to throw decommissioned exception for local decommissioned node([#6008](https:/opensearch-project/OpenSearch/pull/6008))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put this under section unreleased 2.x

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack


### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@
"awareness_attribute":{
"type":"string",
"description":"The awareness attribute for which the health is required"
},
"ensure_local_node_commissioned":{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering we could use ensure_node_commissioned and only support with _local to start with. Later we could extend this to any node_id

Copy link
Member Author

@imRishN imRishN Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could only be two kinds of transport request. One which retrieves information from local cluster state of the node or another which gets it from leader's state. There's no mechanism which says run this transport request on a specific node id. Hence, I feel this would ALWAYS run with local param only

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating it to ensure_node_commissioned

"type":"boolean",
"description": "Checks whether local node is commissioned or not. If set to true on a local call it will throw exception if node is decommissioned (default: false)"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,36 @@ public boolean innerMatch(LogEvent event) {
Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, decommissionedNode);
assertFalse(coordinator.localNodeCommissioned());

// Check cluster health API for decommissioned and active node
ClusterHealthResponse activeNodeLocalHealth = client(activeNode).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureLocalNodeCommissioned(true)
.execute()
.actionGet();
assertFalse(activeNodeLocalHealth.isTimedOut());

ClusterHealthResponse decommissionedNodeLocalHealth = client(decommissionedNode).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.execute()
.actionGet();
assertFalse(decommissionedNodeLocalHealth.isTimedOut());

NodeDecommissionedException ex = expectThrows(
NodeDecommissionedException.class,
() -> client(decommissionedNode).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureLocalNodeCommissioned(true)
.execute()
.actionGet()
);
assertTrue(ex.getMessage().contains("local node is decommissioned"));

// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(activeNode).execute(
DeleteDecommissionStateAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ public ActionRequestValidationException validate() {
} else if (!level.equals(Level.AWARENESS_ATTRIBUTES) && awarenessAttribute != null) {
return addValidationError("level=awareness_attributes is required with awareness_attribute parameter", null);
}
if (ensureLocalNodeCommissioned && local == false) {
return addValidationError("not a local request to ensure local node commissioned", null);
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.cluster.LocalClusterUpdateTask;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -57,6 +59,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CollectionUtils;
import org.opensearch.discovery.Discovery;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
Expand All @@ -77,6 +80,7 @@ public class TransportClusterHealthAction extends TransportClusterManagerNodeRea
private static final Logger logger = LogManager.getLogger(TransportClusterHealthAction.class);

private final AllocationService allocationService;
private final Discovery discovery;

@Inject
public TransportClusterHealthAction(
Expand All @@ -85,7 +89,8 @@ public TransportClusterHealthAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AllocationService allocationService
AllocationService allocationService,
Discovery discovery
) {
super(
ClusterHealthAction.NAME,
Expand All @@ -98,6 +103,7 @@ public TransportClusterHealthAction(
indexNameExpressionResolver
);
this.allocationService = allocationService;
this.discovery = discovery;
}

@Override
Expand Down Expand Up @@ -134,7 +140,11 @@ protected void clusterManagerOperation(
final ClusterState unusedState,
final ActionListener<ClusterHealthResponse> listener
) {

if (request.ensureLocalNodeCommissioned()
&& discovery instanceof Coordinator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should discovery instanceof Coordinator be an assertion instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asserts wouldn't run on prod. And only coordinator has this node's commission status info. If a developer uses a different Discovery mechanism it might break this. Hence putting this check directly

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly so tests should fail for a developer, in prod this is expected to be Coordinator

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be fair to assume that Coordinator can only be the discovery for all use cases? Can there a plugin which writes there own Discovery model? I see something like this implemented in Gateway service. Here, they assume discovery might not be instance of Coordinator. But I get your point, for this change we can also add asserts and put in if as well to be a little cautious

if (discovery instanceof Coordinator) {
            recoveryRunnable = () -> clusterService.submitStateUpdateTask("local-gateway-elected-state", new RecoverStateUpdateTask());
        } else {
            final Gateway gateway = new Gateway(settings, clusterService, listGatewayMetaState);
            recoveryRunnable = () -> gateway.performStateRecovery(new GatewayRecoveryListener());
        }

&& ((Coordinator) discovery).localNodeCommissioned() == false) {
listener.onFailure(new NodeDecommissionedException("local node is decommissioned"));
}
final int waitCount = getWaitCount(request);

if (request.waitForEvents() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,13 @@ public final RequestBuilder setLocal(boolean local) {
request.local(local);
return (RequestBuilder) this;
}

/**
* Specifies if the local request should ensure that the local node is commissioned
*/
@SuppressWarnings("unchecked")
public final RequestBuilder setEnsureLocalNodeCommissioned(boolean ensureLocalNodeCommissioned) {
request.ensureLocalNodeCommissioned(ensureLocalNodeCommissioned);
return (RequestBuilder) this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.support.clustermanager;

import org.opensearch.Version;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

Expand All @@ -46,18 +47,25 @@ public abstract class ClusterManagerNodeReadRequest<Request extends ClusterManag
ClusterManagerNodeRequest<Request> {

protected boolean local = false;
protected boolean ensureLocalNodeCommissioned = false;

protected ClusterManagerNodeReadRequest() {}

protected ClusterManagerNodeReadRequest(StreamInput in) throws IOException {
super(in);
local = in.readBoolean();
if (in.getVersion().onOrAfter(Version.CURRENT)) {
ensureLocalNodeCommissioned = in.readBoolean();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(local);
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeBoolean(ensureLocalNodeCommissioned);
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build fails in mixed cluster test if version check is PUT for 2.6 and not current. Please suggest correct way of doing this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will put 2.6 and then backport

}

@SuppressWarnings("unchecked")
Expand All @@ -66,6 +74,12 @@ public final Request local(boolean local) {
return (Request) this;
}

@SuppressWarnings("unchecked")
public final Request ensureLocalNodeCommissioned(boolean ensureLocalNodeCommissioned) {
this.ensureLocalNodeCommissioned = ensureLocalNodeCommissioned;
return (Request) this;
}

/**
* Return local information, do not retrieve the state from cluster-manager node (default: false).
* @return <code>true</code> if local information is to be returned;
Expand All @@ -74,4 +88,13 @@ public final Request local(boolean local) {
public final boolean local() {
return local;
}

/**
* For a given local request, checks if the local node is commissioned or not (default: false).
* @return <code>true</code> if local information is to be returned only when local node is also commissioned
* <code>false</code> to not check local node if commissioned or not for a local request
*/
public final boolean ensureLocalNodeCommissioned() {
return ensureLocalNodeCommissioned;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1444,8 +1444,7 @@ synchronized void onNodeCommissionStatusChange(boolean localNodeCommissioned) {
peerFinder.onNodeCommissionStatusChange(localNodeCommissioned);
}

// package-visible for testing
boolean localNodeCommissioned() {
public boolean localNodeCommissioned() {
return localNodeCommissioned;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

import java.io.IOException;

Expand All @@ -28,4 +29,9 @@ public NodeDecommissionedException(String msg, Object... args) {
public NodeDecommissionedException(StreamInput in) throws IOException {
super(in);
}

@Override
public RestStatus status() {
return RestStatus.UNPROCESSABLE_ENTITY;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

424 HTTP Error code seems more appropriate

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, updated

Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public static ClusterHealthRequest fromRequest(final RestRequest request) {
final ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(Strings.splitStringByCommaToArray(request.param("index")));
clusterHealthRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterHealthRequest.indicesOptions()));
clusterHealthRequest.local(request.paramAsBoolean("local", clusterHealthRequest.local()));
clusterHealthRequest.ensureLocalNodeCommissioned(
request.paramAsBoolean("ensure_local_node_commissioned", clusterHealthRequest.ensureLocalNodeCommissioned())
);
clusterHealthRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", clusterHealthRequest.clusterManagerNodeTimeout())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.health;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -70,6 +71,23 @@ public void testRequestReturnsHiddenIndicesByDefault() {
assertTrue(defaultRequest.indicesOptions().expandWildcardsHidden());
}

public void testValidation() {
ClusterHealthRequest clusterHealthRequest = randomRequest();
{
clusterHealthRequest.local(false);
clusterHealthRequest.ensureLocalNodeCommissioned(true);
ActionRequestValidationException e = clusterHealthRequest.validate();
assertNotNull(e);
assertTrue(e.getMessage().contains("not a local request to ensure local node commissioned"));
}
{
clusterHealthRequest.local(true);
clusterHealthRequest.ensureLocalNodeCommissioned(false);
ActionRequestValidationException e = clusterHealthRequest.validate();
assertNull(e);
}
}

private ClusterHealthRequest randomRequest() {
ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForStatus(randomFrom(ClusterHealthStatus.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte
threadPool,
new ActionFilters(new HashSet<>()),
indexNameExpressionResolver,
new AllocationService(null, new TestGatewayAllocator(), null, null, null)
new AllocationService(null, new TestGatewayAllocator(), null, null, null),
null
);
PlainActionFuture<ClusterHealthResponse> listener = new PlainActionFuture<>();
action.execute(new ClusterHealthRequest().waitForGreenStatus(), listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public void testFromRequest() {
Map<String, String> params = new HashMap<>();
String index = "index";
boolean local = randomBoolean();
boolean ensureLocalNodeCommissioned = false;
if (local) {
ensureLocalNodeCommissioned = randomBoolean();
}
String clusterManagerTimeout = randomTimeValue();
String timeout = randomTimeValue();
ClusterHealthStatus waitForStatus = randomFrom(ClusterHealthStatus.values());
Expand All @@ -63,6 +67,7 @@ public void testFromRequest() {

params.put("index", index);
params.put("local", String.valueOf(local));
params.put("ensure_local_node_commissioned", String.valueOf(ensureLocalNodeCommissioned));
params.put("cluster_manager_timeout", clusterManagerTimeout);
params.put("timeout", timeout);
params.put("wait_for_status", waitForStatus.name());
Expand All @@ -81,6 +86,7 @@ public void testFromRequest() {
assertThat(clusterHealthRequest.indices().length, equalTo(1));
assertThat(clusterHealthRequest.indices()[0], equalTo(index));
assertThat(clusterHealthRequest.local(), equalTo(local));
assertThat(clusterHealthRequest.ensureLocalNodeCommissioned(), equalTo(ensureLocalNodeCommissioned));
assertThat(clusterHealthRequest.clusterManagerNodeTimeout(), equalTo(TimeValue.parseTimeValue(clusterManagerTimeout, "test")));
assertThat(clusterHealthRequest.timeout(), equalTo(TimeValue.parseTimeValue(timeout, "test")));
assertThat(clusterHealthRequest.waitForStatus(), equalTo(waitForStatus));
Expand Down