Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance search preference based routing for WRR #6834

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https:/opensearch-project/OpenSearch/pull/6791))
- Enable sort optimization for all NumericTypes ([#6464](https:/opensearch-project/OpenSearch/pull/6464)
- Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https:/opensearch-project/OpenSearch/pull/6331))
- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https:/opensearch-project/OpenSearch/pull/6834))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https:/opensearch-project/OpenSearch/pull/6490))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.List;

import static org.opensearch.cluster.routing.OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING;

/**
* This class contains logic to find next shard to retry search request in case of failure from other shard copy.
* This decides if retryable shard search requests can be tried on shard copies present in data
Expand Down Expand Up @@ -72,9 +74,13 @@ public SearchShardTarget findNext(
Runnable onShardSkipped
) {
SearchShardTarget next = shardIt.nextOrNull();
if (ignoreWeightedRouting(clusterState)) {
return next;
}

while (next != null && WeightedRoutingUtils.isWeighedAway(next.getNodeId(), clusterState)) {
SearchShardTarget nextShard = next;
if (canFailOpen(nextShard.getShardId(), exception, clusterState)) {
if (canFailOpen(nextShard.getShardId(), shardIt.size(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.getShardId()), exception);
getWeightedRoutingStats().updateFailOpenCount();
break;
Expand All @@ -98,10 +104,13 @@ public SearchShardTarget findNext(
*/
public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState clusterState, Exception exception, Runnable onShardSkipped) {
ShardRouting next = shardsIt.nextOrNull();
if (ignoreWeightedRouting(clusterState)) {
return next;
}

while (next != null && WeightedRoutingUtils.isWeighedAway(next.currentNodeId(), clusterState)) {
ShardRouting nextShard = next;
if (canFailOpen(nextShard.shardId(), exception, clusterState)) {
if (canFailOpen(nextShard.shardId(), shardsIt.size(), exception, clusterState)) {
logger.info(() -> new ParameterizedMessage("{}: Fail open executed due to exception", nextShard.shardId()), exception);
getWeightedRoutingStats().updateFailOpenCount();
break;
Expand All @@ -117,8 +126,8 @@ public ShardRouting findNext(final ShardsIterator shardsIt, ClusterState cluster
* @return true if can fail open ie request shard copies present in nodes with weighted shard
* routing weight set to zero
*/
private boolean canFailOpen(ShardId shardId, Exception exception, ClusterState clusterState) {
return isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId);
private boolean canFailOpen(ShardId shardId, int shardItSize, Exception exception, ClusterState clusterState) {
return shardItSize == 1 || isInternalFailure(exception) || hasInActiveShardCopies(clusterState, shardId);
}

private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardId) {
Expand All @@ -131,6 +140,10 @@ private boolean hasInActiveShardCopies(ClusterState clusterState, ShardId shardI
return false;
}

private boolean ignoreWeightedRouting(ClusterState clusterState) {
return IGNORE_WEIGHTED_SHARD_ROUTING.get(clusterState.getMetadata().settings());
}

public WeightedRoutingStats getWeightedRoutingStats() {
return WeightedRoutingStats.getInstance();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,12 @@ public ShardIterator activeInitializingShardsWeightedIt(
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight,
boolean isFailOpenEnabled
boolean isFailOpenEnabled,
@Nullable Integer seed
) {
final int seed = shufflerForWeightedRouting.nextSeed();
if (seed == null) {
seed = shufflerForWeightedRouting.nextSeed();
}
List<ShardRouting> ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed);

// append shards for attribute value with weight zero, so that shard search requests can be tried on
Expand All @@ -350,6 +353,7 @@ public ShardIterator activeInitializingShardsWeightedIt(
logger.debug("no shard copies found for shard id [{}] for node attribute with weight zero", shardId);
}
}

return new PlainShardIterator(shardId, ordered);
}

Expand All @@ -371,21 +375,6 @@ private List<ShardRouting> activeInitializingShardsWithWeights(
return orderedListWithDistinctShards;
}

/**
* Returns an iterator over active and initializing shards, shards are ordered by weighted
* round-robin scheduling policy. Uses the passed seed to shuffle the shards.
*
*/
public ShardIterator activeInitializingShardsSimpleWeightedIt(
WeightedRouting weightedRouting,
DiscoveryNodes nodes,
double defaultWeight,
int seed
) {
List<ShardRouting> ordered = activeInitializingShardsWithWeights(weightedRouting, nodes, defaultWeight, seed);
return new PlainShardIterator(shardId, ordered);
}

/**
* Returns a list containing shard routings ordered using weighted round-robin scheduling.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,30 @@ public class OperationRouting {

public static final Setting<Boolean> STRICT_WEIGHTED_SHARD_ROUTING_ENABLED = Setting.boolSetting(
"cluster.routing.weighted.strict",
true,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Boolean> IGNORE_WEIGHTED_SHARD_ROUTING = Setting.boolSetting(
"cluster.routing.ignore_weighted_routing",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private static final List<Preference> WEIGHTED_ROUTING_RESTRICTED_PREFERENCES = Arrays.asList(
Preference.ONLY_NODES,
Preference.PREFER_NODES
);

private volatile List<String> awarenessAttributes;
private volatile boolean useAdaptiveReplicaSelection;
private volatile boolean ignoreAwarenessAttr;
private volatile double weightedRoutingDefaultWeight;
private volatile boolean isFailOpenEnabled;
private volatile boolean isStrictWeightedShardRouting;
private volatile boolean ignoreWeightedRouting;

public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
// whether to ignore awareness attributes when routing requests
Expand All @@ -116,11 +130,13 @@ public OperationRouting(Settings settings, ClusterSettings clusterSettings) {
this.weightedRoutingDefaultWeight = WEIGHTED_ROUTING_DEFAULT_WEIGHT.get(settings);
this.isFailOpenEnabled = WEIGHTED_ROUTING_FAILOPEN_ENABLED.get(settings);
this.isStrictWeightedShardRouting = STRICT_WEIGHTED_SHARD_ROUTING_ENABLED.get(settings);
this.ignoreWeightedRouting = IGNORE_WEIGHTED_SHARD_ROUTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(USE_ADAPTIVE_REPLICA_SELECTION_SETTING, this::setUseAdaptiveReplicaSelection);
clusterSettings.addSettingsUpdateConsumer(IGNORE_AWARENESS_ATTRIBUTES_SETTING, this::setIgnoreAwarenessAttributes);
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_DEFAULT_WEIGHT, this::setWeightedRoutingDefaultWeight);
clusterSettings.addSettingsUpdateConsumer(WEIGHTED_ROUTING_FAILOPEN_ENABLED, this::setFailOpenEnabled);
clusterSettings.addSettingsUpdateConsumer(STRICT_WEIGHTED_SHARD_ROUTING_ENABLED, this::setStrictWeightedShardRouting);
clusterSettings.addSettingsUpdateConsumer(IGNORE_WEIGHTED_SHARD_ROUTING, this::setIgnoreWeightedRouting);
}

void setUseAdaptiveReplicaSelection(boolean useAdaptiveReplicaSelection) {
Expand All @@ -143,6 +159,10 @@ void setStrictWeightedShardRouting(boolean strictWeightedShardRouting) {
this.isStrictWeightedShardRouting = strictWeightedShardRouting;
}

void setIgnoreWeightedRouting(boolean isWeightedRoundRobinEnabled) {
this.ignoreWeightedRouting = isWeightedRoundRobinEnabled;
}

public boolean isIgnoreAwarenessAttr() {
return ignoreAwarenessAttr;
}
Expand Down Expand Up @@ -314,11 +334,7 @@ private ShardIterator preferenceActiveShardIterator(
}
}
preferenceType = Preference.parse(preference);
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) {
throw new PreferenceBasedSearchNotAllowedException(
"Preference type based routing not allowed with strict weighted shard routing enabled"
);
}
checkPreferenceBasedRoutingAllowed(preferenceType, weightedRoutingMetadata);
switch (preferenceType) {
case PREFER_NODES:
final Set<String> nodesIds = Arrays.stream(preference.substring(Preference.PREFER_NODES.type().length() + 1).split(","))
Expand All @@ -344,11 +360,16 @@ private ShardIterator preferenceActiveShardIterator(
// for a different element in the list by also incorporating the
// shard ID into the hash of the user-supplied preference key.
routingHash = 31 * routingHash + indexShard.shardId.hashCode();
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet() && isStrictWeightedShardRouting) {
return indexShard.activeInitializingShardsSimpleWeightedIt(
if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting(
isStrictWeightedShardRouting,
ignoreWeightedRouting,
weightedRoutingMetadata
)) {
return indexShard.activeInitializingShardsWeightedIt(
weightedRoutingMetadata.getWeightedRouting(),
nodes,
getWeightedRoutingDefaultWeight(),
isFailOpenEnabled,
routingHash
);
} else if (ignoreAwarenessAttributes()) {
Expand All @@ -365,12 +386,13 @@ private ShardIterator shardRoutings(
@Nullable Map<String, Long> nodeCounts,
@Nullable WeightedRoutingMetadata weightedRoutingMetadata
) {
if (weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet()) {
if (WeightedRoutingUtils.shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata)) {
return indexShard.activeInitializingShardsWeightedIt(
weightedRoutingMetadata.getWeightedRouting(),
nodes,
getWeightedRoutingDefaultWeight(),
isFailOpenEnabled
isFailOpenEnabled,
null
);
} else if (ignoreAwarenessAttributes()) {
if (useAdaptiveReplicaSelection) {
Expand Down Expand Up @@ -438,4 +460,15 @@ private static int calculateScaledShardId(IndexMetadata indexMetadata, String ef
return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();
}

private void checkPreferenceBasedRoutingAllowed(Preference preference, @Nullable WeightedRoutingMetadata weightedRoutingMetadata) {
if (WeightedRoutingUtils.shouldPerformStrictWeightedRouting(
isStrictWeightedShardRouting,
ignoreWeightedRouting,
weightedRoutingMetadata
) && WEIGHTED_ROUTING_RESTRICTED_PREFERENCES.contains(preference)) {
throw new PreferenceBasedSearchNotAllowedException(
"Preference type based routing not allowed with strict weighted shard routing enabled"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,16 @@ public static boolean isWeighedAway(String nodeId, ClusterState clusterState) {
}
return false;
}

public static boolean shouldPerformWeightedRouting(boolean ignoreWeightedRouting, WeightedRoutingMetadata weightedRoutingMetadata) {
return !ignoreWeightedRouting && weightedRoutingMetadata != null && weightedRoutingMetadata.getWeightedRouting().isSet();
}

public static boolean shouldPerformStrictWeightedRouting(
boolean isStrictWeightedShardRouting,
boolean ignoreWeightedRouting,
WeightedRoutingMetadata weightedRoutingMetadata
) {
return isStrictWeightedShardRouting && shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ public void apply(Settings value, Settings current, Settings previous) {
OperationRouting.WEIGHTED_ROUTING_DEFAULT_WEIGHT,
OperationRouting.WEIGHTED_ROUTING_FAILOPEN_ENABLED,
OperationRouting.STRICT_WEIGHTED_SHARD_ROUTING_ENABLED,
OperationRouting.IGNORE_WEIGHTED_SHARD_ROUTING,
IndexGraveyard.SETTING_MAX_TOMBSTONES,
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
Expand Down
Loading