From 5274f07b56a8418ed5be531a3a3652a6a7a4ffc0 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Sun, 25 Sep 2022 12:37:39 +0530 Subject: [PATCH] Add DecommissionService and helper to execute awareness attribute decommissioning (#4084) * Add Executor to decommission node attribute * Decommission service implementation with cluster metadata * Master abdication changes to decommission local awareness leader * Update join validator changes to validate decommissioned node join request Signed-off-by: Rishab Nahata Signed-off-by: pranikum <109206473+pranikum@users.noreply.github.com> --- CHANGELOG.md | 2 + .../org/opensearch/OpenSearchException.java | 12 + .../org/opensearch/cluster/ClusterModule.java | 14 + .../coordination/JoinTaskExecutor.java | 37 ++- .../NodeDecommissionedException.java | 31 +++ .../cluster/decommission/package-info.java | 12 + .../ExceptionSerializationTests.java | 4 + .../coordination/JoinTaskExecutorTests.java | 70 +++++ .../DecommissionServiceTests.java | 262 ++++++++++++++++++ ...onAttributeMetadataSerializationTests.java | 83 ++++++ .../DecommissionAttributeMetadataTests.java | 52 ++++ ...missionAttributeMetadataXContentTests.java | 38 +++ 12 files changed, 614 insertions(+), 3 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/decommission/NodeDecommissionedException.java create mode 100644 server/src/main/java/org/opensearch/cluster/decommission/package-info.java create mode 100644 server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index cf57552ff5c81..acb8ded704a7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948)) - [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318)) - Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580)) +- Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084)) + ### Deprecated diff --git a/server/src/main/java/org/opensearch/OpenSearchException.java b/server/src/main/java/org/opensearch/OpenSearchException.java index 87efc03734d26..34d7509c7afb2 100644 --- a/server/src/main/java/org/opensearch/OpenSearchException.java +++ b/server/src/main/java/org/opensearch/OpenSearchException.java @@ -1608,6 +1608,18 @@ private enum OpenSearchExceptionHandle { org.opensearch.index.shard.PrimaryShardClosedException::new, 162, V_3_0_0 + ), + DECOMMISSIONING_FAILED_EXCEPTION( + org.opensearch.cluster.decommission.DecommissioningFailedException.class, + org.opensearch.cluster.decommission.DecommissioningFailedException::new, + 163, + V_3_0_0 + ), + NODE_DECOMMISSIONED_EXCEPTION( + org.opensearch.cluster.decommission.NodeDecommissionedException.class, + org.opensearch.cluster.decommission.NodeDecommissionedException::new, + 164, + V_3_0_0 ); final Class exceptionClass; diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 46552bb5d6a03..3ac3a08d4848a 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -35,6 +35,7 @@ import org.opensearch.cluster.action.index.MappingUpdatedAction; import org.opensearch.cluster.action.index.NodeMappingRefreshAction; import org.opensearch.cluster.action.shard.ShardStateAction; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; import org.opensearch.cluster.metadata.ComponentTemplateMetadata; import org.opensearch.cluster.metadata.ComposableIndexTemplateMetadata; import org.opensearch.cluster.metadata.DataStreamMetadata; @@ -193,6 +194,12 @@ public static List getNamedWriteables() { ); registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom); registerMetadataCustom(entries, WeightedRoutingMetadata.TYPE, WeightedRoutingMetadata::new, WeightedRoutingMetadata::readDiffFrom); + registerMetadataCustom( + entries, + DecommissionAttributeMetadata.TYPE, + DecommissionAttributeMetadata::new, + DecommissionAttributeMetadata::readDiffFrom + ); // Task Status (not Diffable) entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new)); return entries; @@ -283,6 +290,13 @@ public static List getNamedXWriteables() { WeightedRoutingMetadata::fromXContent ) ); + entries.add( + new NamedXContentRegistry.Entry( + Metadata.Custom.class, + new ParseField(DecommissionAttributeMetadata.TYPE), + DecommissionAttributeMetadata::fromXContent + ) + ); return entries; } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 5afdb5b12db23..78b09dc020dfc 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -39,6 +39,10 @@ import org.opensearch.cluster.ClusterStateTaskExecutor; import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.decommission.DecommissionStatus; +import org.opensearch.cluster.decommission.NodeDecommissionedException; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -358,6 +362,7 @@ public boolean runOnlyOnClusterManager() { /** * a task indicates that the current node should become master + * * @deprecated As of 2.0, because supporting inclusive language, replaced by {@link #newBecomeClusterManagerTask()} */ @Deprecated @@ -384,8 +389,9 @@ public static Task newFinishElectionTask() { * Ensures that all indices are compatible with the given node version. This will ensure that all indices in the given metadata * will not be created with a newer version of opensearch as well as that all indices are newer or equal to the minimum index * compatibility version. - * @see Version#minimumIndexCompatibilityVersion() + * * @throws IllegalStateException if any index is incompatible with the given version + * @see Version#minimumIndexCompatibilityVersion() */ public static void ensureIndexCompatibility(final Version nodeVersion, Metadata metadata) { Version supportedIndexVersion = nodeVersion.minimumIndexCompatibilityVersion(); @@ -415,14 +421,18 @@ public static void ensureIndexCompatibility(final Version nodeVersion, Metadata } } - /** ensures that the joining node has a version that's compatible with all current nodes*/ + /** + * ensures that the joining node has a version that's compatible with all current nodes + */ public static void ensureNodesCompatibility(final Version joiningNodeVersion, DiscoveryNodes currentNodes) { final Version minNodeVersion = currentNodes.getMinNodeVersion(); final Version maxNodeVersion = currentNodes.getMaxNodeVersion(); ensureNodesCompatibility(joiningNodeVersion, minNodeVersion, maxNodeVersion); } - /** ensures that the joining node has a version that's compatible with a given version range */ + /** + * ensures that the joining node has a version that's compatible with a given version range + */ public static void ensureNodesCompatibility(Version joiningNodeVersion, Version minClusterNodeVersion, Version maxClusterNodeVersion) { assert minClusterNodeVersion.onOrBefore(maxClusterNodeVersion) : minClusterNodeVersion + " > " + maxClusterNodeVersion; if (joiningNodeVersion.isCompatible(maxClusterNodeVersion) == false) { @@ -466,6 +476,26 @@ public static void ensureMajorVersionBarrier(Version joiningNodeVersion, Version } } + public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata) { + DecommissionAttributeMetadata decommissionAttributeMetadata = metadata.decommissionAttributeMetadata(); + if (decommissionAttributeMetadata != null) { + DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + DecommissionStatus status = decommissionAttributeMetadata.status(); + if (decommissionAttribute != null && status != null) { + // We will let the node join the cluster if the current status is in FAILED state + if (node.getAttributes().get(decommissionAttribute.attributeName()).equals(decommissionAttribute.attributeValue()) + && status.equals(DecommissionStatus.FAILED) == false) { + throw new NodeDecommissionedException( + "node [{}] has decommissioned attribute [{}] with current status of decommissioning [{}]", + node.toString(), + decommissionAttribute.toString(), + status.status() + ); + } + } + } + } + public static Collection> addBuiltInJoinValidators( Collection> onJoinValidators ) { @@ -473,6 +503,7 @@ public static Collection> addBuiltInJoin validators.add((node, state) -> { ensureNodesCompatibility(node.getVersion(), state.getNodes()); ensureIndexCompatibility(node.getVersion(), state.getMetadata()); + ensureNodeCommissioned(node, state.getMetadata()); }); validators.addAll(onJoinValidators); return Collections.unmodifiableCollection(validators); diff --git a/server/src/main/java/org/opensearch/cluster/decommission/NodeDecommissionedException.java b/server/src/main/java/org/opensearch/cluster/decommission/NodeDecommissionedException.java new file mode 100644 index 0000000000000..847d5a527b017 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/decommission/NodeDecommissionedException.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.decommission; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.io.stream.StreamInput; + +import java.io.IOException; + +/** + * This exception is thrown if the node is decommissioned by @{@link DecommissionService} + * and this nodes needs to be removed from the cluster + * + * @opensearch.internal + */ +public class NodeDecommissionedException extends OpenSearchException { + + public NodeDecommissionedException(String msg, Object... args) { + super(msg, args); + } + + public NodeDecommissionedException(StreamInput in) throws IOException { + super(in); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/decommission/package-info.java b/server/src/main/java/org/opensearch/cluster/decommission/package-info.java new file mode 100644 index 0000000000000..256c2f22253cc --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/decommission/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Decommission lifecycle classes + */ +package org.opensearch.cluster.decommission; diff --git a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java index 26b0ce7e9e20c..ff2bb77531486 100644 --- a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java @@ -49,6 +49,8 @@ import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.coordination.CoordinationStateRejectedException; import org.opensearch.cluster.coordination.NoClusterManagerBlockService; +import org.opensearch.cluster.decommission.DecommissioningFailedException; +import org.opensearch.cluster.decommission.NodeDecommissionedException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.IllegalShardRoutingStateException; import org.opensearch.cluster.routing.ShardRouting; @@ -860,6 +862,8 @@ public void testIds() { ids.put(160, NoSeedNodeLeftException.class); ids.put(161, ReplicationFailedException.class); ids.put(162, PrimaryShardClosedException.class); + ids.put(163, DecommissioningFailedException.class); + ids.put(164, NodeDecommissionedException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index 02e502e762561..28f1a92cc1bdc 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -36,9 +36,14 @@ import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateTaskExecutor; +import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.decommission.DecommissionStatus; +import org.opensearch.cluster.decommission.NodeDecommissionedException; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.allocation.AllocationService; @@ -48,7 +53,9 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; +import java.util.Collections; import java.util.HashSet; +import java.util.Map; import static org.hamcrest.Matchers.is; import static org.opensearch.test.VersionUtils.allVersions; @@ -216,4 +223,67 @@ public void testIsBecomeClusterManagerTask() { JoinTaskExecutor.Task joinTaskOfClusterManager = JoinTaskExecutor.newBecomeClusterManagerTask(); assertThat(joinTaskOfClusterManager.isBecomeClusterManagerTask(), is(true)); } + + public void testJoinClusterWithNoDecommission() { + Settings.builder().build(); + Metadata.Builder metaBuilder = Metadata.builder(); + Metadata metadata = metaBuilder.build(); + DiscoveryNode discoveryNode = newDiscoveryNode(Collections.singletonMap("zone", "zone-2")); + JoinTaskExecutor.ensureNodeCommissioned(discoveryNode, metadata); + } + + public void testPreventJoinClusterWithDecommission() { + Settings.builder().build(); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1"); + DecommissionStatus decommissionStatus = randomFrom( + DecommissionStatus.INIT, + DecommissionStatus.IN_PROGRESS, + DecommissionStatus.SUCCESSFUL + ); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttribute, + decommissionStatus + ); + Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build(); + DiscoveryNode discoveryNode = newDiscoveryNode(Collections.singletonMap("zone", "zone-1")); + expectThrows(NodeDecommissionedException.class, () -> JoinTaskExecutor.ensureNodeCommissioned(discoveryNode, metadata)); + } + + public void testJoinClusterWithDifferentDecommission() { + Settings.builder().build(); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1"); + DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttribute, + decommissionStatus + ); + Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build(); + + DiscoveryNode discoveryNode = newDiscoveryNode(Collections.singletonMap("zone", "zone-2")); + JoinTaskExecutor.ensureNodeCommissioned(discoveryNode, metadata); + } + + public void testJoinClusterWithDecommissionFailed() { + Settings.builder().build(); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "zone-1"); + DecommissionAttributeMetadata decommissionAttributeMetadata = new DecommissionAttributeMetadata( + decommissionAttribute, + DecommissionStatus.FAILED + ); + Metadata metadata = Metadata.builder().decommissionAttributeMetadata(decommissionAttributeMetadata).build(); + + DiscoveryNode discoveryNode = newDiscoveryNode(Collections.singletonMap("zone", "zone-1")); + JoinTaskExecutor.ensureNodeCommissioned(discoveryNode, metadata); + } + + private DiscoveryNode newDiscoveryNode(Map attributes) { + return new DiscoveryNode( + randomAlphaOfLength(10), + randomAlphaOfLength(10), + buildNewFakeTransportAddress(), + attributes, + Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), + Version.CURRENT + ); + } } diff --git a/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java new file mode 100644 index 0000000000000..71ee61ffec275 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/decommission/DecommissionServiceTests.java @@ -0,0 +1,262 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.decommission; + +import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Before; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ack.ClusterStateUpdateResponse; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.MockTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonMap; +import static org.opensearch.cluster.ClusterState.builder; +import static org.opensearch.cluster.OpenSearchAllocationTestCase.createAllocationService; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; +import static org.opensearch.test.ClusterServiceUtils.setState; + +public class DecommissionServiceTests extends OpenSearchTestCase { + + private ThreadPool threadPool; + private ClusterService clusterService; + private TransportService transportService; + private AllocationService allocationService; + private DecommissionService decommissionService; + private ClusterSettings clusterSettings; + + @Before + public void setUpService() { + threadPool = new TestThreadPool("test", Settings.EMPTY); + clusterService = createClusterService(threadPool); + allocationService = createAllocationService(); + ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); + logger.info("--> adding cluster manager node on zone_1"); + clusterState = addClusterManagerNodes(clusterState, "zone_1", "node1"); + logger.info("--> adding cluster manager node on zone_2"); + clusterState = addClusterManagerNodes(clusterState, "zone_2", "node6"); + logger.info("--> adding cluster manager node on zone_3"); + clusterState = addClusterManagerNodes(clusterState, "zone_3", "node11"); + logger.info("--> adding four data nodes on zone_1"); + clusterState = addDataNodes(clusterState, "zone_1", "node2", "node3", "node4", "node5"); + logger.info("--> adding four data nodes on zone_2"); + clusterState = addDataNodes(clusterState, "zone_2", "node7", "node8", "node9", "node10"); + logger.info("--> adding four data nodes on zone_3"); + clusterState = addDataNodes(clusterState, "zone_3", "node12", "node13", "node14", "node15"); + clusterState = setLocalNodeAsClusterManagerNode(clusterState, "node1"); + clusterState = setNodesInVotingConfig( + clusterState, + clusterState.nodes().get("node1"), + clusterState.nodes().get("node6"), + clusterState.nodes().get("node11") + ); + final ClusterState.Builder builder = builder(clusterState); + setState(clusterService, builder); + final MockTransport transport = new MockTransport(); + transportService = transport.createTransportService( + Settings.EMPTY, + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundTransportAddress -> clusterService.state().nodes().get("node1"), + null, + emptySet() + ); + + final Settings.Builder nodeSettingsBuilder = Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "zone_1,zone_2,zone_3"); + + clusterSettings = new ClusterSettings(nodeSettingsBuilder.build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + transportService.start(); + transportService.acceptIncomingRequests(); + + this.decommissionService = new DecommissionService( + nodeSettingsBuilder.build(), + clusterSettings, + clusterService, + transportService, + threadPool, + allocationService + ); + } + + @After + public void shutdownThreadPoolAndClusterService() { + clusterService.stop(); + threadPool.shutdown(); + } + + @SuppressWarnings("unchecked") + public void testDecommissioningNotStartedForInvalidAttributeName() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("rack", "rack-a"); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + fail("on response shouldn't have been called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof DecommissioningFailedException); + assertThat(e.getMessage(), Matchers.endsWith("invalid awareness attribute requested for decommissioning")); + countDownLatch.countDown(); + } + }; + decommissionService.startDecommissionAction(decommissionAttribute, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + + @SuppressWarnings("unchecked") + public void testDecommissioningNotStartedForInvalidAttributeValue() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute("zone", "rack-a"); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + fail("on response shouldn't have been called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof DecommissioningFailedException); + assertThat( + e.getMessage(), + Matchers.endsWith( + "invalid awareness attribute value requested for decommissioning. " + + "Set forced awareness values before to decommission" + ) + ); + countDownLatch.countDown(); + } + }; + decommissionService.startDecommissionAction(decommissionAttribute, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + + @SuppressWarnings("unchecked") + public void testDecommissioningFailedWhenAnotherAttributeDecommissioningSuccessful() throws InterruptedException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + DecommissionStatus oldStatus = randomFrom(DecommissionStatus.SUCCESSFUL, DecommissionStatus.IN_PROGRESS, DecommissionStatus.INIT); + DecommissionAttributeMetadata oldMetadata = new DecommissionAttributeMetadata( + new DecommissionAttribute("zone", "zone_1"), + oldStatus + ); + final ClusterState.Builder builder = builder(clusterService.state()); + setState( + clusterService, + builder.metadata(Metadata.builder(clusterService.state().metadata()).decommissionAttributeMetadata(oldMetadata).build()) + ); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse clusterStateUpdateResponse) { + fail("on response shouldn't have been called"); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof DecommissioningFailedException); + if (oldStatus.equals(DecommissionStatus.SUCCESSFUL)) { + assertThat( + e.getMessage(), + Matchers.endsWith("already successfully decommissioned, recommission before triggering another decommission") + ); + } else { + assertThat(e.getMessage(), Matchers.endsWith("is in progress, cannot process this request")); + } + countDownLatch.countDown(); + } + }; + decommissionService.startDecommissionAction(new DecommissionAttribute("zone", "zone_2"), listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + + private ClusterState addDataNodes(ClusterState clusterState, String zone, String... nodeIds) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + org.opensearch.common.collect.List.of(nodeIds).forEach(nodeId -> nodeBuilder.add(newDataNode(nodeId, singletonMap("zone", zone)))); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return clusterState; + } + + private ClusterState addClusterManagerNodes(ClusterState clusterState, String zone, String... nodeIds) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + org.opensearch.common.collect.List.of(nodeIds) + .forEach(nodeId -> nodeBuilder.add(newClusterManagerNode(nodeId, singletonMap("zone", zone)))); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return clusterState; + } + + private ClusterState setLocalNodeAsClusterManagerNode(ClusterState clusterState, String nodeId) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.nodes()); + nodeBuilder.localNodeId(nodeId); + nodeBuilder.clusterManagerNodeId(nodeId); + clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build(); + return clusterState; + } + + private ClusterState setNodesInVotingConfig(ClusterState clusterState, DiscoveryNode... nodes) { + final CoordinationMetadata.VotingConfiguration votingConfiguration = CoordinationMetadata.VotingConfiguration.of(nodes); + + Metadata.Builder builder = Metadata.builder() + .coordinationMetadata( + CoordinationMetadata.builder() + .lastAcceptedConfiguration(votingConfiguration) + .lastCommittedConfiguration(votingConfiguration) + .build() + ); + clusterState = ClusterState.builder(clusterState).metadata(builder).build(); + return clusterState; + } + + private static DiscoveryNode newDataNode(String nodeId, Map attributes) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), attributes, DATA_ROLE, Version.CURRENT); + } + + private static DiscoveryNode newClusterManagerNode(String nodeId, Map attributes) { + return new DiscoveryNode(nodeId, buildNewFakeTransportAddress(), attributes, CLUSTER_MANAGER_ROLE, Version.CURRENT); + } + + final private static Set CLUSTER_MANAGER_ROLE = Collections.unmodifiableSet( + new HashSet<>(Collections.singletonList(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE)) + ); + + final private static Set DATA_ROLE = Collections.unmodifiableSet( + new HashSet<>(Collections.singletonList(DiscoveryNodeRole.DATA_ROLE)) + ); + + private ClusterState removeNodes(ClusterState clusterState, String... nodeIds) { + DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder(clusterState.getNodes()); + org.opensearch.common.collect.List.of(nodeIds).forEach(nodeBuilder::remove); + return allocationService.disassociateDeadNodes(ClusterState.builder(clusterState).nodes(nodeBuilder).build(), false, "test"); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java new file mode 100644 index 0000000000000..60b3a03848830 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataSerializationTests.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.Diff; +import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.decommission.DecommissionStatus; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.test.AbstractDiffableSerializationTestCase; + +import java.io.IOException; + +public class DecommissionAttributeMetadataSerializationTests extends AbstractDiffableSerializationTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return DecommissionAttributeMetadata::new; + } + + @Override + protected Metadata.Custom createTestInstance() { + String attributeName = randomAlphaOfLength(6); + String attributeValue = randomAlphaOfLength(6); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); + DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); + return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus); + } + + @Override + protected Metadata.Custom mutateInstance(Metadata.Custom instance) { + return randomValueOtherThan(instance, this::createTestInstance); + } + + @Override + protected Metadata.Custom makeTestChanges(Metadata.Custom testInstance) { + DecommissionAttributeMetadata decommissionAttributeMetadata = (DecommissionAttributeMetadata) testInstance; + DecommissionAttribute decommissionAttribute = decommissionAttributeMetadata.decommissionAttribute(); + String attributeName = decommissionAttribute.attributeName(); + String attributeValue = decommissionAttribute.attributeValue(); + DecommissionStatus decommissionStatus = decommissionAttributeMetadata.status(); + if (randomBoolean()) { + decommissionStatus = randomFrom(DecommissionStatus.values()); + } + if (randomBoolean()) { + attributeName = randomAlphaOfLength(6); + } + if (randomBoolean()) { + attributeValue = randomAlphaOfLength(6); + } + return new DecommissionAttributeMetadata(new DecommissionAttribute(attributeName, attributeValue), decommissionStatus); + } + + @Override + protected Writeable.Reader> diffReader() { + return DecommissionAttributeMetadata::readDiffFrom; + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); + } + + @Override + protected Metadata.Custom doParseInstance(XContentParser parser) throws IOException { + assertEquals(XContentParser.Token.START_OBJECT, parser.nextToken()); + DecommissionAttributeMetadata decommissionAttributeMetadata = DecommissionAttributeMetadata.fromXContent(parser); + assertEquals(XContentParser.Token.END_OBJECT, parser.currentToken()); + return new DecommissionAttributeMetadata( + decommissionAttributeMetadata.decommissionAttribute(), + decommissionAttributeMetadata.status() + ); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java new file mode 100644 index 0000000000000..746d4565b0db3 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataTests.java @@ -0,0 +1,52 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.decommission.DecommissionStatus; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.test.AbstractNamedWriteableTestCase; + +import java.io.IOException; +import java.util.Collections; + +public class DecommissionAttributeMetadataTests extends AbstractNamedWriteableTestCase { + @Override + protected DecommissionAttributeMetadata createTestInstance() { + String attributeName = randomAlphaOfLength(6); + String attributeValue = randomAlphaOfLength(6); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); + DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); + return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus); + } + + @Override + protected DecommissionAttributeMetadata mutateInstance(DecommissionAttributeMetadata instance) throws IOException { + return randomValueOtherThan(instance, this::createTestInstance); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry( + Collections.singletonList( + new NamedWriteableRegistry.Entry( + DecommissionAttributeMetadata.class, + DecommissionAttributeMetadata.TYPE, + DecommissionAttributeMetadata::new + ) + ) + ); + } + + @Override + protected Class categoryClass() { + return DecommissionAttributeMetadata.class; + } +} diff --git a/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java new file mode 100644 index 0000000000000..030946f4510a1 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/metadata/DecommissionAttributeMetadataXContentTests.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.cluster.decommission.DecommissionAttribute; +import org.opensearch.cluster.decommission.DecommissionAttributeMetadata; +import org.opensearch.cluster.decommission.DecommissionStatus; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +public class DecommissionAttributeMetadataXContentTests extends AbstractXContentTestCase { + @Override + protected DecommissionAttributeMetadata createTestInstance() { + String attributeName = randomAlphaOfLength(6); + String attributeValue = randomAlphaOfLength(6); + DecommissionAttribute decommissionAttribute = new DecommissionAttribute(attributeName, attributeValue); + DecommissionStatus decommissionStatus = randomFrom(DecommissionStatus.values()); + return new DecommissionAttributeMetadata(decommissionAttribute, decommissionStatus); + } + + @Override + protected DecommissionAttributeMetadata doParseInstance(XContentParser parser) throws IOException { + return DecommissionAttributeMetadata.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } +}