Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-release of read-only-allow-delete block when disk utilization fa… #42559

Merged
merged 15 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/reference/modules/cluster/disk_allocator.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ Elasticsearch enforces a read-only index block
(`index.blocks.read_only_allow_delete`) on every index that has one or more
shards allocated on the node that has at least one disk exceeding the flood
stage. This is a last resort to prevent nodes from running out of disk space.
The index block must be released manually once there is enough disk space
available to allow indexing operations to continue.
The index block is automatically released once the disk utilization falls below
the high watermark.
The automatic release can however be disabled in 7.x through a system property
`es.disk.auto_release_flood_stage_block`

NOTE: You can not mix the usage of percentage values and byte values within
these settings. Either all are set to percentage values, or all are set to byte
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,23 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Listens for a node to go over the high watermark and kicks off an empty
Expand All @@ -64,6 +68,7 @@ public class DiskThresholdMonitor {
private final RerouteService rerouteService;
private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE);
private final AtomicBoolean checkInProgress = new AtomicBoolean();
private final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);

public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
Client client, LongSupplier currentTimeMillisSupplier, RerouteService rerouteService) {
Expand Down Expand Up @@ -135,14 +140,23 @@ public void onNewInfo(ClusterInfo info) {
}
final ClusterState state = clusterStateSupplier.get();
final Set<String> indicesToMarkReadOnly = new HashSet<>();
RoutingNodes routingNodes = state.getRoutingNodes();
Set<String> indicesToMarkIneligibleForAutoRelease = new HashSet<>();
//Ensure we release indices on nodes that have a usage response from node stats
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesToMarkIneligibleForAutoRelease);

for (final ObjectObjectCursor<String, DiskUsage> entry : usages) {
final String node = entry.key;
final DiskUsage usage = entry.value;
warnAboutDiskIfNeeded(usage);
RoutingNode routingNode = routingNodes.node(node);
// Only unblock index if all nodes that contain shards of it are below the high disk watermark
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()
|| usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
markIneligiblityForAutoRelease(routingNode, indicesToMarkIneligibleForAutoRelease);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell this does essentially the same thing as the loop on lines 161-163. indicesToMarkIneligibleForAutoRelease ends up being indicesToMarkReadOnly plus any indices with shards on nodes for whom we don't know the disk usage. I think it'd be simpler to use this fact in the calculation of indicesToAutoRelease below rather than constructing these two almost-identical sets.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, I see the difference now, we're looking at the high thresholds not the flood_stage thresholds. Still, I think we should combine this with the other check on the high thresholds below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the definition of each

1. indicesToMarkIneligibleForAutoRelease = indices_on_nodes_missing_disk_usage + indices_on_nodes_below_high_watermark

2. indicesToMarkReadOnly = indices_on_nodes_below_flood_stage(ignore if already blocked)

3. indicesToAutoRelease = all_indices_having_block - indicesToMarkIneligibleForAutoRelease

I think each of them are clear and distinctly represent a set. Any changes that we do will affect readability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's far too early to declare that this is the most readable solution we can find. A case in point: you see my earlier confusion about which thresholds this condition is checking. Also, we're duplicating a rather complicated condition that appears a few lines lower down too. Also we're not marking any indices as ineligible for auto-release so the name of that set is a bit odd. Let's call that something like indicesNotToAutoRelease.

In master, this method flows through the thresholds in a much more obvious manner: work on nodes above the flood_stage watermark, then nodes between high and flood_stage, then nodes between low and high and finally nodes below low. I think it'd be much better to fit into this flow. Why not add indices to indicesNotToAutoRelease in the existing loop in the first block, and have a similar loop in the second block for nodes between the high and flood_stage watermarks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I meant about the sets in specific which are well-defined and not the logic in general. I have simplified the if-block which I agree is the right thing to do

}
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
final RoutingNode routingNode = state.getRoutingNodes().node(node);
if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
for (ShardRouting routing : routingNode) {
indicesToMarkReadOnly.add(routing.index().getName());
Expand Down Expand Up @@ -181,7 +195,7 @@ public void onNewInfo(ClusterInfo info) {
}
}

final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 2);
final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 3);

if (reroute) {
logger.info("rerouting shards: [{}]", explanation);
Expand All @@ -196,30 +210,70 @@ public void onNewInfo(ClusterInfo info) {
} else {
listener.onResponse(null);
}
// Get set of indices that are eligible to be automatically unblocked
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
// Only collect indices that are currently blocked
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
final String[] indices = state.routingTable().indicesRouting().keys().toArray(String.class);
Set<String> indicesToAutoRelease = Arrays.stream(indices)
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
.filter(index -> indicesToMarkIneligibleForAutoRelease.contains(index) == false)
.filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.collect(Collectors.toCollection(HashSet::new));
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved

if (indicesToAutoRelease.isEmpty() == false) {
if (diskThresholdSettings.isAutoReleaseIndexEnabled()) {
logger.info("Releasing read-only allow delete block on indices: [{}]", indicesToAutoRelease);
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
updateIndicesReadOnly(indicesToAutoRelease, listener, false);
} else {
deprecationLogger.deprecated("[{}] will be removed in 8.0.0", DiskThresholdSettings.AUTO_RELEASE_INDEX_ENABLED_KEY);
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
listener.onResponse(null);
}
} else {
listener.onResponse(null);
}

indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
if (indicesToMarkReadOnly.isEmpty() == false) {
markIndicesReadOnly(indicesToMarkReadOnly, ActionListener.wrap(r -> {
setLastRunTimeMillis();
listener.onResponse(r);
}, e -> {
logger.debug("marking indices readonly failed", e);
setLastRunTimeMillis();
listener.onFailure(e);
}));
updateIndicesReadOnly(indicesToMarkReadOnly, listener, true);
} else {
listener.onResponse(null);
}
}

private void markNodesMissingUsageIneligibleForRelease(RoutingNodes routingNodes, ImmutableOpenMap<String, DiskUsage> usages,
Set<String> indicesToMarkIneligibleForAutoRelease) {
for (RoutingNode routingNode : routingNodes) {
if (usages.containsKey(routingNode.nodeId()) == false) {
markIneligiblityForAutoRelease(routingNode, indicesToMarkIneligibleForAutoRelease);
}
}

}

private void markIneligiblityForAutoRelease(RoutingNode routingNode, Set<String> indicesToMarkIneligibleForAutoRelease) {
if (routingNode != null) {
for (ShardRouting routing : routingNode) {
String indexName = routing.index().getName();
indicesToMarkIneligibleForAutoRelease.add(indexName);
}
}
}

private void setLastRunTimeMillis() {
lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong()));
}

protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
// set read-only block but don't block on the response
client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY))
.setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build())
.execute(ActionListener.map(listener, r -> null));
String value = readOnly ? Boolean.TRUE.toString() : null;
ActionListener<Void> wrappedListener = ActionListener.wrap(r -> {
setLastRunTimeMillis();
listener.onResponse(r);
}, e -> {
logger.debug("marking indices read-only [{}] failed", readOnly, e);
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
setLastRunTimeMillis();
listener.onFailure(e);
});
client.admin().indices().prepareUpdateSettings(indicesToUpdate.toArray(Strings.EMPTY_ARRAY)).
setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, value).build()).
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
execute(ActionListener.map(wrappedListener, r -> null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ public class DiskThresholdSettings {
private volatile TimeValue rerouteInterval;
private volatile Double freeDiskThresholdFloodStage;
private volatile ByteSizeValue freeBytesThresholdFloodStage;
private static boolean autoReleaseIndexEnabled;
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
public static final String AUTO_RELEASE_INDEX_ENABLED_KEY = "es.disk.auto_release_flood_stage_block";

static {
final String property = System.getProperty(AUTO_RELEASE_INDEX_ENABLED_KEY);
if (property == null) {
autoReleaseIndexEnabled = true;
} else if (Boolean.FALSE.toString().equals(property)){
autoReleaseIndexEnabled = false;
} else {
throw new IllegalArgumentException(AUTO_RELEASE_INDEX_ENABLED_KEY + " may only be unset or set to [false] but was [" +
property + "]");
}
}

public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
Expand Down Expand Up @@ -286,6 +300,10 @@ public ByteSizeValue getFreeBytesThresholdFloodStage() {
return freeBytesThresholdFloodStage;
}

public boolean isAutoReleaseIndexEnabled() {
return autoReleaseIndexEnabled;
}

public boolean includeRelocations() {
return includeRelocations;
}
Expand Down
Loading