Skip to content

Commit

Permalink
Cluster health call to throw decommissioned exception for local decom…
Browse files Browse the repository at this point in the history
…missioned node (opensearch-project#6008)

* Cluster health call to throw decommissioned exception for local decommissioned nodes

Signed-off-by: Rishab Nahata <[email protected]>
  • Loading branch information
imRishN committed Jan 29, 2023
1 parent 348852f commit c504b51
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https:/opensearch-project/OpenSearch/pull/4725))
- [Refactor] Use local opensearch.common.SetOnce instead of lucene's utility class ([#5947](https:/opensearch-project/OpenSearch/pull/5947))
- Cluster health call to throw decommissioned exception for local decommissioned node([#6008](https:/opensearch-project/OpenSearch/pull/6008))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@
"awareness_attribute":{
"type":"string",
"description":"The awareness attribute for which the health is required"
},
"ensure_node_commissioned":{
"type":"boolean",
"description": "Checks whether local node is commissioned or not. If set to true on a local call it will throw exception if node is decommissioned (default: false)"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,36 @@ public boolean innerMatch(LogEvent event) {
Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, decommissionedNode);
assertFalse(coordinator.localNodeCommissioned());

// Check cluster health API for decommissioned and active node
ClusterHealthResponse activeNodeLocalHealth = client(activeNode).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeCommissioned(true)
.execute()
.actionGet();
assertFalse(activeNodeLocalHealth.isTimedOut());

ClusterHealthResponse decommissionedNodeLocalHealth = client(decommissionedNode).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.execute()
.actionGet();
assertFalse(decommissionedNodeLocalHealth.isTimedOut());

NodeDecommissionedException ex = expectThrows(
NodeDecommissionedException.class,
() -> client(decommissionedNode).admin()
.cluster()
.prepareHealth()
.setLocal(true)
.setEnsureNodeCommissioned(true)
.execute()
.actionGet()
);
assertTrue(ex.getMessage().contains("local node is decommissioned"));

// Recommissioning the zone back to gracefully succeed the test once above tests succeeds
DeleteDecommissionStateResponse deleteDecommissionStateResponse = client(activeNode).execute(
DeleteDecommissionStateAction.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class ClusterHealthRequest extends ClusterManagerNodeReadRequest<ClusterH
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
private String waitForNodes = "";
private Priority waitForEvents = null;
private boolean ensureNodeCommissioned = false;
/**
* Only used by the high-level REST Client. Controls the details level of the health information returned.
* The default value is 'cluster'.
Expand Down Expand Up @@ -103,6 +104,9 @@ public ClusterHealthRequest(StreamInput in) throws IOException {
awarenessAttribute = in.readOptionalString();
level = in.readEnum(Level.class);
}
if (in.getVersion().onOrAfter(Version.V_2_6_0)) {
ensureNodeCommissioned = in.readBoolean();
}
}

@Override
Expand Down Expand Up @@ -137,6 +141,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(awarenessAttribute);
out.writeEnum(level);
}
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeBoolean(ensureNodeCommissioned);
}
}

@Override
Expand Down Expand Up @@ -321,13 +328,30 @@ public String getAwarenessAttribute() {
return awarenessAttribute;
}

public final ClusterHealthRequest ensureNodeCommissioned(boolean ensureNodeCommissioned) {
this.ensureNodeCommissioned = ensureNodeCommissioned;
return this;
}

/**
* For a given local request, checks if the local node is commissioned or not (default: false).
* @return <code>true</code> if local information is to be returned only when local node is also commissioned
* <code>false</code> to not check local node if commissioned or not for a local request
*/
public final boolean ensureNodeCommissioned() {
return ensureNodeCommissioned;
}

@Override
public ActionRequestValidationException validate() {
if (level.equals(Level.AWARENESS_ATTRIBUTES) && indices.length > 0) {
return addValidationError("awareness_attribute is not a supported parameter with index health", null);
} else if (!level.equals(Level.AWARENESS_ATTRIBUTES) && awarenessAttribute != null) {
return addValidationError("level=awareness_attributes is required with awareness_attribute parameter", null);
}
if (ensureNodeCommissioned && local == false) {
return addValidationError("not a local request to ensure local node commissioned", null);
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,12 @@ public ClusterHealthRequestBuilder setLevel(String level) {
request.setLevel(level);
return this;
}

/**
* Specifies if the local request should ensure that the local node is commissioned
*/
public final ClusterHealthRequestBuilder setEnsureNodeCommissioned(boolean ensureNodeCommissioned) {
request.ensureNodeCommissioned(ensureNodeCommissioned);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.opensearch.cluster.LocalClusterUpdateTask;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -57,6 +59,7 @@
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.CollectionUtils;
import org.opensearch.discovery.Discovery;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
Expand All @@ -77,6 +80,7 @@ public class TransportClusterHealthAction extends TransportClusterManagerNodeRea
private static final Logger logger = LogManager.getLogger(TransportClusterHealthAction.class);

private final AllocationService allocationService;
private final Discovery discovery;

@Inject
public TransportClusterHealthAction(
Expand All @@ -85,7 +89,8 @@ public TransportClusterHealthAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
AllocationService allocationService
AllocationService allocationService,
Discovery discovery
) {
super(
ClusterHealthAction.NAME,
Expand All @@ -98,6 +103,7 @@ public TransportClusterHealthAction(
indexNameExpressionResolver
);
this.allocationService = allocationService;
this.discovery = discovery;
}

@Override
Expand Down Expand Up @@ -134,7 +140,12 @@ protected void clusterManagerOperation(
final ClusterState unusedState,
final ActionListener<ClusterHealthResponse> listener
) {

if (request.ensureNodeCommissioned()
&& discovery instanceof Coordinator
&& ((Coordinator) discovery).localNodeCommissioned() == false) {
listener.onFailure(new NodeDecommissionedException("local node is decommissioned"));
return;
}
final int waitCount = getWaitCount(request);

if (request.waitForEvents() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1438,8 +1438,7 @@ synchronized void onNodeCommissionStatusChange(boolean localNodeCommissioned) {
peerFinder.onNodeCommissionStatusChange(localNodeCommissioned);
}

// package-visible for testing
boolean localNodeCommissioned() {
public boolean localNodeCommissioned() {
return localNodeCommissioned;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

import java.io.IOException;

Expand All @@ -28,4 +29,9 @@ public NodeDecommissionedException(String msg, Object... args) {
public NodeDecommissionedException(StreamInput in) throws IOException {
super(in);
}

@Override
public RestStatus status() {
return RestStatus.FAILED_DEPENDENCY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public static ClusterHealthRequest fromRequest(final RestRequest request) {
final ClusterHealthRequest clusterHealthRequest = clusterHealthRequest(Strings.splitStringByCommaToArray(request.param("index")));
clusterHealthRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterHealthRequest.indicesOptions()));
clusterHealthRequest.local(request.paramAsBoolean("local", clusterHealthRequest.local()));
clusterHealthRequest.ensureNodeCommissioned(
request.paramAsBoolean("ensure_node_commissioned", clusterHealthRequest.ensureNodeCommissioned())
);
clusterHealthRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", clusterHealthRequest.clusterManagerNodeTimeout())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.action.admin.cluster.health;

import org.opensearch.LegacyESVersion;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -152,6 +153,23 @@ public void testBwcSerialization() throws Exception {
}
}

public void testValidation() {
ClusterHealthRequest clusterHealthRequest = randomRequest();
{
clusterHealthRequest.local(false);
clusterHealthRequest.ensureNodeCommissioned(true);
ActionRequestValidationException e = clusterHealthRequest.validate();
assertNotNull(e);
assertTrue(e.getMessage().contains("not a local request to ensure local node commissioned"));
}
{
clusterHealthRequest.local(true);
clusterHealthRequest.ensureNodeCommissioned(false);
ActionRequestValidationException e = clusterHealthRequest.validate();
assertNull(e);
}
}

private ClusterHealthRequest randomRequest() {
ClusterHealthRequest request = new ClusterHealthRequest();
request.waitForStatus(randomFrom(ClusterHealthStatus.values()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte
threadPool,
new ActionFilters(new HashSet<>()),
indexNameExpressionResolver,
new AllocationService(null, new TestGatewayAllocator(), null, null, null)
new AllocationService(null, new TestGatewayAllocator(), null, null, null),
null
);
PlainActionFuture<ClusterHealthResponse> listener = new PlainActionFuture<>();
action.execute(new ClusterHealthRequest().waitForGreenStatus(), listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ public void testFromRequest() {
Map<String, String> params = new HashMap<>();
String index = "index";
boolean local = randomBoolean();
boolean ensureLocalNodeCommissioned = false;
if (local) {
ensureLocalNodeCommissioned = randomBoolean();
}
String clusterManagerTimeout = randomTimeValue();
String timeout = randomTimeValue();
ClusterHealthStatus waitForStatus = randomFrom(ClusterHealthStatus.values());
Expand All @@ -63,6 +67,7 @@ public void testFromRequest() {

params.put("index", index);
params.put("local", String.valueOf(local));
params.put("ensure_node_commissioned", String.valueOf(ensureLocalNodeCommissioned));
params.put("cluster_manager_timeout", clusterManagerTimeout);
params.put("timeout", timeout);
params.put("wait_for_status", waitForStatus.name());
Expand All @@ -81,6 +86,7 @@ public void testFromRequest() {
assertThat(clusterHealthRequest.indices().length, equalTo(1));
assertThat(clusterHealthRequest.indices()[0], equalTo(index));
assertThat(clusterHealthRequest.local(), equalTo(local));
assertThat(clusterHealthRequest.ensureNodeCommissioned(), equalTo(ensureLocalNodeCommissioned));
assertThat(clusterHealthRequest.clusterManagerNodeTimeout(), equalTo(TimeValue.parseTimeValue(clusterManagerTimeout, "test")));
assertThat(clusterHealthRequest.timeout(), equalTo(TimeValue.parseTimeValue(timeout, "test")));
assertThat(clusterHealthRequest.waitForStatus(), equalTo(waitForStatus));
Expand Down

0 comments on commit c504b51

Please sign in to comment.