Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto-release of read-only-allow-delete block when disk utilization fa… #42559

Merged
merged 15 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/reference/modules/cluster/disk_allocator.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ Elasticsearch enforces a read-only index block
(`index.blocks.read_only_allow_delete`) on every index that has one or more
shards allocated on the node that has at least one disk exceeding the flood
stage. This is a last resort to prevent nodes from running out of disk space.
The index block must be released manually once there is enough disk space
available to allow indexing operations to continue.
The index block is automatically released once the disk utilization falls below
the high watermark.
The automatic release can however be disabled in 7.x through a system property
`es.disk.auto_release_flood_stage_block`

NOTE: You can not mix the usage of percentage values and byte values within
these settings. Either all are set to percentage values, or all are set to byte
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.client.Client;
Expand All @@ -33,10 +35,12 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
Expand All @@ -47,6 +51,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* Listens for a node to go over the high watermark and kicks off an empty
Expand All @@ -65,6 +71,7 @@ public class DiskThresholdMonitor {
private final RerouteService rerouteService;
private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE);
private final AtomicBoolean checkInProgress = new AtomicBoolean();
private final DeprecationLogger deprecationLogger = new DeprecationLogger(logger);

public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
Client client, LongSupplier currentTimeMillisSupplier, RerouteService rerouteService) {
Expand All @@ -73,6 +80,10 @@ public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterSta
this.rerouteService = rerouteService;
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
this.client = client;
if (diskThresholdSettings.isAutoReleaseIndexEnabled() == false) {
deprecationLogger.deprecated("[{}] will be removed in version {}",
DiskThresholdSettings.AUTO_RELEASE_INDEX_ENABLED_KEY, Version.V_7_4_0.major + 1);
}
}

/**
Expand Down Expand Up @@ -136,21 +147,33 @@ public void onNewInfo(ClusterInfo info) {
}
final ClusterState state = clusterStateSupplier.get();
final Set<String> indicesToMarkReadOnly = new HashSet<>();
RoutingNodes routingNodes = state.getRoutingNodes();
Set<String> indicesNotToAutoRelease = new HashSet<>();
markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesNotToAutoRelease);

for (final ObjectObjectCursor<String, DiskUsage> entry : usages) {
final String node = entry.key;
final DiskUsage usage = entry.value;
warnAboutDiskIfNeeded(usage);
RoutingNode routingNode = routingNodes.node(node);
// Only unblock index if all nodes that contain shards of it are below the high disk watermark
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
final RoutingNode routingNode = state.getRoutingNodes().node(node);
if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
for (ShardRouting routing : routingNode) {
indicesToMarkReadOnly.add(routing.index().getName());
String indexName = routing.index().getName();
indicesToMarkReadOnly.add(indexName);
indicesNotToAutoRelease.add(indexName);
}
}
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
if (routingNode != null) {
for (ShardRouting routing : routingNode) {
String indexName = routing.index().getName();
indicesNotToAutoRelease.add(indexName);
}
}
if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
reroute = true;
explanation = "high disk watermark exceeded on one or more nodes";
Expand Down Expand Up @@ -182,7 +205,7 @@ public void onNewInfo(ClusterInfo info) {
}
}

final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 2);
final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 3);

if (reroute) {
logger.info("rerouting shards: [{}]", explanation);
Expand All @@ -197,30 +220,70 @@ public void onNewInfo(ClusterInfo info) {
} else {
listener.onResponse(null);
}
Set<String> indicesToAutoRelease = StreamSupport.stream(state.routingTable().indicesRouting()
.spliterator(), false)
.map(c -> c.key)
.filter(index -> indicesNotToAutoRelease.contains(index) == false)
.filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.collect(Collectors.toSet());

if (indicesToAutoRelease.isEmpty() == false) {
if (diskThresholdSettings.isAutoReleaseIndexEnabled()) {
logger.info("releasing read-only-allow-delete block on indices: [{}]", indicesToAutoRelease);
updateIndicesReadOnly(indicesToAutoRelease, listener, false);
} else {
deprecationLogger.deprecated("[{}] will be removed in version {}",
DiskThresholdSettings.AUTO_RELEASE_INDEX_ENABLED_KEY, Version.V_7_4_0.major + 1);
logger.debug("[{}] disabled, not releasing read-only-allow-delete block on indices: [{}]",
DiskThresholdSettings.AUTO_RELEASE_INDEX_ENABLED_KEY, indicesToAutoRelease);
listener.onResponse(null);
}
} else {
listener.onResponse(null);
}

indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
if (indicesToMarkReadOnly.isEmpty() == false) {
markIndicesReadOnly(indicesToMarkReadOnly, ActionListener.wrap(r -> {
setLastRunTimeMillis();
listener.onResponse(r);
}, e -> {
logger.debug("marking indices readonly failed", e);
setLastRunTimeMillis();
listener.onFailure(e);
}));
updateIndicesReadOnly(indicesToMarkReadOnly, listener, true);
} else {
listener.onResponse(null);
}
}

private void markNodesMissingUsageIneligibleForRelease(RoutingNodes routingNodes, ImmutableOpenMap<String, DiskUsage> usages,
Set<String> indicesToMarkIneligibleForAutoRelease) {
for (RoutingNode routingNode : routingNodes) {
if (usages.containsKey(routingNode.nodeId()) == false) {
if (routingNode != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

routingNode cannot be null here.

for (ShardRouting routing : routingNode) {
String indexName = routing.index().getName();
indicesToMarkIneligibleForAutoRelease.add(indexName);
}
}
}
}

}

private void setLastRunTimeMillis() {
lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong()));
}

protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
protected void updateIndicesReadOnly(Set<String> indicesToUpdate, ActionListener<Void> listener, boolean readOnly) {
// set read-only block but don't block on the response
client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY))
.setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build())
.execute(ActionListener.map(listener, r -> null));
ActionListener<Void> wrappedListener = ActionListener.wrap(r -> {
setLastRunTimeMillis();
listener.onResponse(r);
}, e -> {
logger.debug(new ParameterizedMessage("setting indices [{}] read-only failed", readOnly), e);
setLastRunTimeMillis();
listener.onFailure(e);
});
Settings readOnlySettings = readOnly ? Settings.builder()
.put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString()).build() :
Settings.builder().putNull(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE).build();
client.admin().indices().prepareUpdateSettings(indicesToUpdate.toArray(Strings.EMPTY_ARRAY))
.setSettings(readOnlySettings)
.execute(ActionListener.map(wrappedListener, r -> null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ public class DiskThresholdSettings {
private volatile TimeValue rerouteInterval;
private volatile Double freeDiskThresholdFloodStage;
private volatile ByteSizeValue freeBytesThresholdFloodStage;
private static final boolean autoReleaseIndexEnabled;
public static final String AUTO_RELEASE_INDEX_ENABLED_KEY = "es.disk.auto_release_flood_stage_block";

static {
final String property = System.getProperty(AUTO_RELEASE_INDEX_ENABLED_KEY);
if (property == null) {
autoReleaseIndexEnabled = true;
} else if (Boolean.FALSE.toString().equals(property)){
autoReleaseIndexEnabled = false;
} else {
throw new IllegalArgumentException(AUTO_RELEASE_INDEX_ENABLED_KEY + " may only be unset or set to [false] but was [" +
property + "]");
}
}

public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
Expand Down Expand Up @@ -286,6 +300,10 @@ public ByteSizeValue getFreeBytesThresholdFloodStage() {
return freeBytesThresholdFloodStage;
}

public boolean isAutoReleaseIndexEnabled() {
return autoReleaseIndexEnabled;
}

public boolean includeRelocations() {
return includeRelocations;
}
Expand Down
Loading