diff --git a/CHANGELOG.md b/CHANGELOG.md index 5e4d984cadbbf..f0ded8bd0fe36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https://github.com/opensearch-project/OpenSearch/pull/6791)) - Enable sort optimization for all NumericTypes ([#6464](https://github.com/opensearch-project/OpenSearch/pull/6464) - Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331)) +- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index 1482e73efdace..197a364ddeaa0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -49,6 +49,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; @@ -513,6 +514,292 @@ public void testShardRoutingWithNetworkDisruption_FailOpenEnabled() throws Excep assertNoSearchInAZ("a"); } + public void testStrictWeightedRoutingWithCustomString_FailOpenEnabled() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[50]; + String customPreference = randomAlphaOfLength(10); + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setPreference(customPreference) + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + logger.info("--> shards should fail due to network disruption"); + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(searchResponse.getFailedShards(), 0); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + try { + assertSearchInAZ("b"); + } catch (AssertionError ae) { + assertSearchInAZ("c"); + } + assertNoSearchInAZ("a"); + } + + public void testStrictWeightedRoutingWithCustomString_FailOpenDisabled() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", false) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("b").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Set hitNodes = new HashSet<>(); + Future[] responses = new Future[50]; + String customPreference = randomAlphaOfLength(10); + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setPreference(customPreference) + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + logger.info("--> shards should fail due to network disruption"); + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertNotEquals(searchResponse.getFailedShards(), 0); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } catch (Exception t) { + fail("search should not fail"); + } + } + + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + Set expectedHotNodes = new HashSet<>(); + for (DiscoveryNode node : dataNodes) { + if (node.getAttributes().getOrDefault("zone", "").equals("b")) { + expectedHotNodes.add(node.getId()); + } + } + + assertEquals(expectedHotNodes, hitNodes); + + assertSearchInAZ("b"); + assertNoSearchInAZ("c"); + assertNoSearchInAZ("a"); + } + + /** + * Should failopen shards even if failopen enabled with custom search preference. + * @throws Exception + */ + public void testStrictWeightedRoutingWithShardPrefNetworkDisruption_FailOpenEnabled() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> creating network partition disruption"); + final String clusterManagerNode1 = internalCluster().getClusterManagerName(); + Set nodesInOneSide = Stream.of(clusterManagerNode1, nodeMap.get("c").get(0)).collect(Collectors.toCollection(HashSet::new)); + Set nodesInOtherSide = Stream.of(nodeMap.get("a").get(0)).collect(Collectors.toCollection(HashSet::new)); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide), + NetworkDisruption.UNRESPONSIVE + ); + internalCluster().setDisruptionScheme(networkDisruption); + + logger.info("--> network disruption is started"); + networkDisruption.startDisrupting(); + + Future[] responses = new Future[50]; + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + ShardId shardId = internalCluster().clusterService() + .state() + .getRoutingTable() + .index("test") + .randomAllActiveShardsIt() + .getShardRoutings() + .stream() + .filter(shard -> { + return dataNodes.get(shard.currentNodeId()).getAttributes().getOrDefault("zone", "").equals("c"); + }) + .findFirst() + .get() + .shardId(); + + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("c").get(0)) + .prepareSearch("test") + .setPreference(String.format(Locale.ROOT, "_shards:%s", shardId.getId())) + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + logger.info("--> network disruption is stopped"); + networkDisruption.stopDisrupting(); + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(searchResponse.getFailedShards(), 0); + } catch (Exception t) { + fail("search should not fail"); + } + } + + assertNoSearchInAZ("a"); + try { + assertSearchInAZ("c"); + } catch (AssertionError ae) { + assertSearchInAZ("b"); + } + } + + public void testStrictWeightedRoutingWithShardPref() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + ShardId shardId = internalCluster().clusterService() + .state() + .getRoutingTable() + .index("test") + .randomAllActiveShardsIt() + .getShardRoutings() + .stream() + .filter(shard -> { + return dataNodes.get(shard.currentNodeId()).getAttributes().getOrDefault("zone", "").equals("c"); + }) + .findFirst() + .get() + .shardId(); + + Future[] responses = new Future[50]; + logger.info("--> making search requests"); + for (int i = 0; i < 50; i++) { + responses[i] = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch("test") + .setPreference(String.format(Locale.ROOT, "_shards:%s", shardId.getId())) + .setSize(100) + .setQuery(QueryBuilders.matchAllQuery()) + .execute(); + } + + for (int i = 0; i < 50; i++) { + try { + SearchResponse searchResponse = responses[i].get(); + assertEquals(searchResponse.getFailedShards(), 0); + assertNotEquals(searchResponse.getHits().getTotalHits().value, 0); + } catch (Exception t) { + fail("search should not fail"); + } + } + assertNoSearchInAZ("c"); + } + private void assertNoSearchInAZ(String az) { ImmutableOpenMap dataNodes = internalCluster().clusterService().state().nodes().getDataNodes(); String dataNodeId = null; @@ -806,7 +1093,6 @@ public void testMultiGetWithNetworkDisruption_FailOpenDisabled() throws Exceptio * Assert that preference search with custom string doesn't hit a node in weighed away az */ public void testStrictWeightedRoutingWithCustomString() { - Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") @@ -844,13 +1130,18 @@ public void testStrictWeightedRoutingWithCustomString() { .get(); // make search requests with custom string - searchResponse = internalCluster().client(nodeMap.get("a").get(0)) + internalCluster().client(nodeMap.get("a").get(0)) .prepareSearch() .setSize(20) .setPreference(customPreference) + .setQuery(QueryBuilders.matchAllQuery()) .get(); // assert search on data nodes on az c (weighed away az) - assertSearchInAZ("c"); + try { + assertSearchInAZ("c"); + } catch (AssertionError ae) { + assertSearchInAZ("a"); + } } @@ -889,24 +1180,61 @@ public void testPreferenceSearchWithWeightedRouting() { SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0)) .prepareSearch() - .setSize(0) - .setPreference("_local") + .setPreference(randomFrom("_local", "_prefer_nodes:" + "zone:a", customPreference)) .get(); assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); - searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + searchResponse = internalCluster().client(nodeMap.get("a").get(0)) .prepareSearch() - .setSize(0) .setPreference( "_only_nodes:" + nodeIDMap.get(nodeInZoneA) + "," + nodeIDMap.get(nodeInZoneB) + "," + nodeIDMap.get(nodeInZoneC) ) .get(); assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + } - searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + public void testPreferenceSearchWithIgnoreWeightedRouting() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", false) + .put("cluster.routing.ignore_weighted_routing", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 2; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + + String customPreference = randomAlphaOfLength(10); + String nodeInZoneA = nodeMap.get("a").get(0); + String nodeInZoneB = nodeMap.get("b").get(0); + String nodeInZoneC = nodeMap.get("c").get(0); + + Map nodeIDMap = new HashMap<>(); + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + for (DiscoveryNode node : dataNodes) { + nodeIDMap.put(node.getName(), node.getId()); + } + + SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0)) .prepareSearch() - .setSize(0) - .setPreference("_prefer_nodes:zone:a") + .setPreference(randomFrom("_local", "_prefer_nodes:" + "zone:a", customPreference)) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("a").get(0)) + .prepareSearch() + .setPreference( + "_only_nodes:" + nodeIDMap.get(nodeInZoneA) + "," + nodeIDMap.get(nodeInZoneB) + "," + nodeIDMap.get(nodeInZoneC) + ) .get(); assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); } @@ -915,7 +1243,6 @@ public void testPreferenceSearchWithWeightedRouting() { * Assert that preference based search with preference type is not allowed with strict weighted shard routing */ public void testStrictWeightedRouting() { - Settings commonSettings = Settings.builder() .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") @@ -935,11 +1262,6 @@ public void testStrictWeightedRouting() { setShardRoutingWeights(weights); String nodeInZoneA = nodeMap.get("a").get(0); - assertThrows( - PreferenceBasedSearchNotAllowedException.class, - () -> internalCluster().client(nodeMap.get("b").get(0)).prepareSearch().setSize(0).setPreference("_local").get() - ); - assertThrows( PreferenceBasedSearchNotAllowedException.class, () -> internalCluster().client(nodeMap.get("b").get(0)) @@ -957,7 +1279,48 @@ public void testStrictWeightedRouting() { .setPreference("_prefer_nodes:" + nodeInZoneA) .get() ); + } + public void testStrictWeightedRoutingAllowedForSomeSearchPrefs() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .put("cluster.routing.weighted.fail_open", true) + .put("cluster.routing.weighted.strict", true) + .build(); + + int nodeCountPerAZ = 1; + Map> nodeMap = setupCluster(nodeCountPerAZ, commonSettings); + + int numShards = 10; + int numReplicas = 1; + setUpIndexing(numShards, numReplicas); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + setShardRoutingWeights(weights); + String nodeInZoneA = nodeMap.get("a").get(0); + String customPreference = randomAlphaOfLength(10); + + SearchResponse searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(0) + .setPreference("_only_local:" + nodeInZoneA) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("b").get(0)) + .prepareSearch() + .setSize(0) + .setPreference("_local:" + nodeInZoneA) + .get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("b").get(0)).prepareSearch().setSize(0).setPreference("_shards:1").get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); + + searchResponse = internalCluster().client(nodeMap.get("b").get(0)).prepareSearch().setSize(0).setPreference(customPreference).get(); + assertEquals(RestStatus.OK.getStatus(), searchResponse.status().getStatus()); } /** diff --git a/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java index dbef876c9a258..72c189f20eaf6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/FailAwareWeightedRouting.java @@ -20,6 +20,8 @@ import java.util.List; +import static org.opensearch.cluster.routing.OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING; + /** * This class contains logic to find next shard to retry search request in case of failure from other shard copy. * This decides if retryable shard search requests can be tried on shard copies present in data @@ -72,9 +74,13 @@ public SearchShardTarget findNext( Runnable onShardSkipped ) { SearchShardTarget next = shardIt.nextOrNull(); + if (ignoreWeightedRouting(clusterState)) { + return next; + } + while (next != null && WeightedRoutingUtils.isWeighedAway(next.getNodeId(), clusterState)) { SearchShardTarget nextShard = next; - if (canFailOpen(nextShard.getShardId(), exception, clusterState)) { + if (canFailOpen(nextShard.getShardId(), shardIt.size(), exception, clusterState)) { logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.getShardId()), exception); getWeightedRoutingStats().updateFailOpenCount(); break; @@ -98,10 +104,13 @@ public SearchShardTarget findNext( */ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception, Runnable onShardSkipped) { ShardRouting next = shardsIt.nextOrNull(); + if (ignoreWeightedRouting(clusterState)) { + return next; + } while (next != null && WeightedRoutingUtils.isWeighedAway(next.currentNodeId(), clusterState)) { ShardRouting nextShard = next; - if (canFailOpen(nextShard.shardId(), exception, clusterState)) { + if (canFailOpen(nextShard.shardId(), shardsIt.size(), exception, clusterState)) { logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.shardId()), exception); getWeightedRoutingStats().updateFailOpenCount(); break; @@ -117,8 +126,8 @@ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState cluster * @return true if can fail open ie request shard copies present in nodes with weighted shard * routing weight set to zero */ - private boolean canFailOpen(ShardId shardId, Exception exception, ClusterState clusterState) { - return isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId); + private boolean canFailOpen(ShardId shardId, int shardItSize, Exception exception, ClusterState clusterState) { + return shardItSize == 1 || isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId); } private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardId) { @@ -131,6 +140,10 @@ private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardI return false; } + private boolean ignoreWeightedRouting(ClusterState clusterState) { + return IGNORE_WEIGHTED_SHARD_ROUTING.get(clusterState.getMetadata().settings()); + } + public WeightedRoutingStats getWeightedRoutingStats() { return WeightedRoutingStats.getInstance(); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 6a877838ece95..711a750ade712 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -324,9 +324,12 @@ public ShardIterator activeInitializingShardsWeightedIt( WeightedRouting weightedRouting, DiscoveryNodes nodes, double defaultWeight, - boolean isFailOpenEnabled + boolean isFailOpenEnabled, + @Nullable Integer seed ) { - final int seed = shufflerForWeightedRouting.nextSeed(); + if (seed == null) { + seed = shufflerForWeightedRouting.nextSeed(); + } List ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed); // append shards for attribute value with weight zero, so that shard search requests can be tried on @@ -350,6 +353,7 @@ public ShardIterator activeInitializingShardsWeightedIt( logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId); } } + return new PlainShardIterator(shardId, ordered); } @@ -371,21 +375,6 @@ private List activeInitializingShardsWithWeights( return orderedListWithDistinctShards; } - /** - * Returns an iterator over active and initializing shards, shards are ordered by weighted - * round-robin scheduling policy. Uses the passed seed to shuffle the shards. - * - */ - public ShardIterator activeInitializingShardsSimpleWeightedIt( - WeightedRouting weightedRouting, - DiscoveryNodes nodes, - double defaultWeight, - int seed - ) { - List ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed); - return new PlainShardIterator(shardId, ordered); - } - /** * Returns a list containing shard routings ordered using weighted round-robin scheduling. */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index b247936245151..566eefe2c4f1a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -93,16 +93,30 @@ public class OperationRouting { public static final Setting STRICT_WEIGHTED_SHARD_ROUTING_ENABLED = Setting.boolSetting( "cluster.routing.weighted.strict", + true, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting IGNORE_WEIGHTED_SHARD_ROUTING = Setting.boolSetting( + "cluster.routing.ignore_weighted_routing", false, Setting.Property.Dynamic, Setting.Property.NodeScope ); + + private static final List WEIGHTED_ROUTING_RESTRICTED_PREFERENCES = Arrays.asList( + Preference.ONLY_NODES, + Preference.PREFER_NODES + ); + private volatile List awarenessAttributes; private volatile boolean useAdaptiveReplicaSelection; private volatile boolean ignoreAwarenessAttr; private volatile double weightedRoutingDefaultWeight; private volatile boolean isFailOpenEnabled; private volatile boolean isStrictWeightedShardRouting; + private volatile boolean ignoreWeightedRouting; public OperationRouting(Settings settings, ClusterSettings clusterSettings) { // whether to ignore awareness attributes when routing requests @@ -116,11 +130,13 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) { this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings); this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings); this.isStrictWeightedShardRouting = STRICT_WEIGHTED_SHARD_ROUTING_ENABLED.get(settings); + this.ignoreWeightedRouting = IGNORE_WEIGHTED_SHARD_ROUTING.get(settings); clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection); clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes); clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight); clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled); clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting); + clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting); } void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) { @@ -143,6 +159,10 @@ void setStrictWeightedShardRouting(boolean strictWeightedShardRouting) { this.isStrictWeightedShardRouting = strictWeightedShardRouting; } + void setIgnoreWeightedRouting(boolean isWeightedRoundRobinEnabled) { + this.ignoreWeightedRouting = isWeightedRoundRobinEnabled; + } + public boolean isIgnoreAwarenessAttr() { return ignoreAwarenessAttr; } @@ -314,11 +334,7 @@ private ShardIterator preferenceActiveShardIterator( } } preferenceType = Preference.parse(preference); - if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) { - throw new PreferenceBasedSearchNotAllowedException( - "Preference type based routing not allowed with strict weighted shard routing enabled" - ); - } + checkPreferenceBasedRoutingAllowed(preferenceType, weightedRoutingMetadata); switch (preferenceType) { case PREFER_NODES: final Set nodesIds = Arrays.stream(preference.substring(Preference.PREFER_NODES.type().length() + 1).split(",")) @@ -344,11 +360,16 @@ private ShardIterator preferenceActiveShardIterator( // for a different element in the list by also incorporating the // shard ID into the hash of the user-supplied preference key. routingHash = 31 * routingHash + indexShard.shardId.hashCode(); - if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) { - return indexShard.activeInitializingShardsSimpleWeightedIt( + if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting( + isStrictWeightedShardRouting, + ignoreWeightedRouting, + weightedRoutingMetadata + )) { + return indexShard.activeInitializingShardsWeightedIt( weightedRoutingMetadata.getWeightedRouting(), nodes, getWeightedRoutingDefaultWeight(), + isFailOpenEnabled, routingHash ); } else if (ignoreAwarenessAttributes()) { @@ -365,12 +386,13 @@ private ShardIterator shardRoutings( @Nullable Map nodeCounts, @Nullable WeightedRoutingMetadata weightedRoutingMetadata ) { - if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet()) { + if (WeightedRoutingUtils.shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata)) { return indexShard.activeInitializingShardsWeightedIt( weightedRoutingMetadata.getWeightedRouting(), nodes, getWeightedRoutingDefaultWeight(), - isFailOpenEnabled + isFailOpenEnabled, + null ); } else if (ignoreAwarenessAttributes()) { if (useAdaptiveReplicaSelection) { @@ -438,4 +460,15 @@ private static int calculateScaledShardId(IndexMetadata indexMetadata, String ef return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor(); } + private void checkPreferenceBasedRoutingAllowed(Preference preference, @Nullable WeightedRoutingMetadata weightedRoutingMetadata) { + if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting( + isStrictWeightedShardRouting, + ignoreWeightedRouting, + weightedRoutingMetadata + ) && WEIGHTED_ROUTING_RESTRICTED_PREFERENCES.contains(preference)) { + throw new PreferenceBasedSearchNotAllowedException( + "Preference type based routing not allowed with strict weighted shard routing enabled" + ); + } + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java index c7d40cbbbea61..72387aad0fa45 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingUtils.java @@ -53,4 +53,16 @@ public static boolean isWeighedAway(String nodeId, ClusterState clusterState) { } return false; } + + public static boolean shouldPerformWeightedRouting(boolean ignoreWeightedRouting, WeightedRoutingMetadata weightedRoutingMetadata) { + return !ignoreWeightedRouting && weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet(); + } + + public static boolean shouldPerformStrictWeightedRouting( + boolean isStrictWeightedShardRouting, + boolean ignoreWeightedRouting, + WeightedRoutingMetadata weightedRoutingMetadata + ) { + return isStrictWeightedShardRouting && shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index edc5389798fac..b7979407f97a0 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -546,6 +546,7 @@ public void apply(Settings value, Settings current, Settings previous) { OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT, OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED, OperationRouting.STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, + OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING, IndexGraveyard.SETTING_MAX_TOMBSTONES, PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java index 5c3a2454c4074..c0164f1afd924 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/FailAwareWeightedRoutingTests.java @@ -42,6 +42,10 @@ public class FailAwareWeightedRoutingTests extends OpenSearchTestCase { private ClusterState setUpCluster() { + return setUpCluster(Settings.EMPTY); + } + + private ClusterState setUpCluster(Settings transientSettings) { ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); // set up nodes @@ -78,7 +82,7 @@ private ClusterState setUpCluster() { Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); WeightedRouting weightedRouting = new WeightedRouting("zone", weights); WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, 0); - Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()).transientSettings(transientSettings); metadataBuilder.putCustom(WeightedRoutingMetadata.TYPE, weightedRoutingMetadata); clusterState = ClusterState.builder(clusterState).metadata(metadataBuilder).build(); @@ -143,6 +147,124 @@ public void testFindNextWithoutFailOpen() throws IOException { assertEquals(1, shardSkipped.get()); } + public void testFindNextWithJustOneShardInStandbyZone() throws IOException { + ClusterState clusterState = setUpCluster(); + + AtomicInteger shardSkipped = new AtomicInteger(); + // set up index + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, "node_zone_a", true, ShardRoutingState.STARTED); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", false, ShardRoutingState.STARTED); + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingC); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + // fail open is not executed since fail open conditions don't met + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNotNull(next); + next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNull(next); + assertEquals(0, shardSkipped.get()); + } + + public void testFindNextWithIgnoreWeightedRoutingTrue() throws IOException { + ClusterState clusterState = setUpCluster(Settings.builder().put("cluster.routing.ignore_weighted_routing", true).build()); + + AtomicInteger shardSkipped = new AtomicInteger(); + // set up index + IndexMetadata indexMetadata = IndexMetadata.builder("test") + .settings( + Settings.builder() + .put(SETTING_VERSION_CREATED, Version.CURRENT) + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 2) + .put(SETTING_CREATION_DATE, System.currentTimeMillis()) + ) + .build(); + + ShardRouting shardRoutingA = TestShardRouting.newShardRouting("test", 0, "node_zone_a", true, ShardRoutingState.STARTED); + ShardRouting shardRoutingB = TestShardRouting.newShardRouting("test", 0, "node_zone_b", false, ShardRoutingState.STARTED); + ShardRouting shardRoutingC = TestShardRouting.newShardRouting("test", 0, "node_zone_c", false, ShardRoutingState.STARTED); + + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.metadata()); + metadataBuilder.put(indexMetadata, false).generateClusterUuidIfNeeded(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + + final ShardId shardId = new ShardId("test", "_na_", 0); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingBuilder.addShard(shardRoutingA); + indexShardRoutingBuilder.addShard(shardRoutingB); + indexShardRoutingBuilder.addShard(shardRoutingC); + + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + clusterState = ClusterState.builder(clusterState).routingTable(routingTableBuilder.build()).build(); + + List shardRoutings = new ArrayList<>(); + shardRoutings.add(shardRoutingA); + shardRoutings.add(shardRoutingB); + shardRoutings.add(shardRoutingC); + + String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); + SearchShardIterator searchShardIterator = new SearchShardIterator( + clusterAlias, + shardId, + shardRoutings, + OriginalIndicesTests.randomOriginalIndices() + ); + + // fail open is not executed since fail open conditions don't met + SearchShardTarget next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNotNull(next); + next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNotNull(next); + next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNotNull(next); + next = FailAwareWeightedRouting.getInstance() + .findNext(searchShardIterator, clusterState, new OpenSearchRejectedExecutionException(), () -> shardSkipped.incrementAndGet()); + assertNull(next); + assertEquals(0, shardSkipped.get()); + } + public void testFindNextWithFailOpenDueTo5xx() throws IOException { ClusterState clusterState = setUpCluster(); diff --git a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java index 866939e6ac3d7..59ea0dfca559a 100644 --- a/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java +++ b/server/src/test/java/org/opensearch/cluster/structure/RoutingIteratorTests.java @@ -552,7 +552,7 @@ public void testWeightedRoutingWithDifferentWeights() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); ShardRouting shardRouting; @@ -565,7 +565,7 @@ public void testWeightedRoutingWithDifferentWeights() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(3, shardIterator.size()); weights = Map.of("zone1", -1.0, "zone2", 0.0, "zone3", 1.0); @@ -573,7 +573,7 @@ public void testWeightedRoutingWithDifferentWeights() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(1, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -584,7 +584,7 @@ public void testWeightedRoutingWithDifferentWeights() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); assertEquals(3, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -646,7 +646,7 @@ public void testWeightedRoutingInMemoryStore() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); ShardRouting shardRouting; shardRouting = shardIterator.nextOrNull(); @@ -660,7 +660,7 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -675,7 +675,7 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -690,7 +690,7 @@ public void testWeightedRoutingInMemoryStore() { shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, false, null); assertEquals(2, shardIterator.size()); shardRouting = shardIterator.nextOrNull(); assertNotNull(shardRouting); @@ -755,7 +755,7 @@ public void testWeightedRoutingShardState() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); assertEquals(3, shardIterator.size()); ShardRouting shardRouting; @@ -834,21 +834,21 @@ public void testWeightedRoutingShardStateWithDifferentWeights() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); ShardRouting shardRouting1 = shardIterator.nextOrNull(); shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); ShardRouting shardRouting2 = shardIterator.nextOrNull(); shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, null); ShardRouting shardRouting3 = shardIterator.nextOrNull(); @@ -864,8 +864,8 @@ public void testWeightedRoutingShardStateWithDifferentWeights() { /** * Test to validate that simple weighted shard routing with seed return same shard routing on each call */ - public void testActiveInitializingShardsSimpleWeightedIt() { - TestThreadPool threadPool = new TestThreadPool("testActiveInitializingShardsSimpleWeightedIt"); + public void testActiveInitializingShardsWeightedItWithCustomSeed() { + TestThreadPool threadPool = new TestThreadPool("testActiveInitializingShardsWeightedItWithCustomSeed"); try { Settings.Builder settings = Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) @@ -912,16 +912,15 @@ public void testActiveInitializingShardsSimpleWeightedIt() { ShardIterator shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsSimpleWeightedIt(weightedRouting, clusterState.nodes(), 1, 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, 1); ShardRouting shardRouting1 = shardIterator.nextOrNull(); for (int i = 0; i < 50; i++) { - shardIterator = clusterState.routingTable() .index("test") .shard(0) - .activeInitializingShardsSimpleWeightedIt(weightedRouting, clusterState.nodes(), 1, 1); + .activeInitializingShardsWeightedIt(weightedRouting, clusterState.nodes(), 1, true, 1); ShardRouting shardRouting2 = shardIterator.nextOrNull();