diff --git a/docs/reference/modules/cluster/disk_allocator.asciidoc b/docs/reference/modules/cluster/disk_allocator.asciidoc index 5daa50d9c25b0..c0cc934734d6b 100644 --- a/docs/reference/modules/cluster/disk_allocator.asciidoc +++ b/docs/reference/modules/cluster/disk_allocator.asciidoc @@ -40,8 +40,10 @@ Elasticsearch enforces a read-only index block (`index.blocks.read_only_allow_delete`) on every index that has one or more shards allocated on the node that has at least one disk exceeding the flood stage. This is a last resort to prevent nodes from running out of disk space. -The index block must be released manually once there is enough disk space -available to allow indexing operations to continue. +The index block is automatically released once the disk utilization falls below +the high watermark. +The automatic release can however be disabled in 7.x through a system property +`es.disk.auto_release_flood_stage_block` NOTE: You can not mix the usage of percentage values and byte values within these settings. Either all are set to percentage values, or all are set to byte diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 0e18ca3c0ea35..9071d23ce526d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -23,6 +23,8 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.client.Client; @@ -33,10 +35,12 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; @@ -47,6 +51,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** * Listens for a node to go over the high watermark and kicks off an empty @@ -65,6 +71,7 @@ public class DiskThresholdMonitor { private final RerouteService rerouteService; private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE); private final AtomicBoolean checkInProgress = new AtomicBoolean(); + private final DeprecationLogger deprecationLogger = new DeprecationLogger(logger); public DiskThresholdMonitor(Settings settings, Supplier clusterStateSupplier, ClusterSettings clusterSettings, Client client, LongSupplier currentTimeMillisSupplier, RerouteService rerouteService) { @@ -73,6 +80,10 @@ public DiskThresholdMonitor(Settings settings, Supplier clusterSta this.rerouteService = rerouteService; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); this.client = client; + if (diskThresholdSettings.isAutoReleaseIndexEnabled() == false) { + deprecationLogger.deprecated("[{}] will be removed in version {}", + DiskThresholdSettings.AUTO_RELEASE_INDEX_ENABLED_KEY, Version.V_7_4_0.major + 1); + } } /** @@ -136,21 +147,33 @@ public void onNewInfo(ClusterInfo info) { } final ClusterState state = clusterStateSupplier.get(); final Set indicesToMarkReadOnly = new HashSet<>(); + RoutingNodes routingNodes = state.getRoutingNodes(); + Set indicesNotToAutoRelease = new HashSet<>(); + markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesNotToAutoRelease); for (final ObjectObjectCursor entry : usages) { final String node = entry.key; final DiskUsage usage = entry.value; warnAboutDiskIfNeeded(usage); + RoutingNode routingNode = routingNodes.node(node); + // Only unblock index if all nodes that contain shards of it are below the high disk watermark if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() || usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { - final RoutingNode routingNode = state.getRoutingNodes().node(node); if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?! for (ShardRouting routing : routingNode) { - indicesToMarkReadOnly.add(routing.index().getName()); + String indexName = routing.index().getName(); + indicesToMarkReadOnly.add(indexName); + indicesNotToAutoRelease.add(indexName); } } } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + if (routingNode != null) { + for (ShardRouting routing : routingNode) { + String indexName = routing.index().getName(); + indicesNotToAutoRelease.add(indexName); + } + } if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { reroute = true; explanation = "high disk watermark exceeded on one or more nodes"; @@ -182,7 +205,7 @@ public void onNewInfo(ClusterInfo info) { } } - final ActionListener listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 2); + final ActionListener listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 3); if (reroute) { logger.info("rerouting shards: [{}]", explanation); @@ -197,30 +220,70 @@ public void onNewInfo(ClusterInfo info) { } else { listener.onResponse(null); } + Set indicesToAutoRelease = StreamSupport.stream(state.routingTable().indicesRouting() + .spliterator(), false) + .map(c -> c.key) + .filter(index -> indicesNotToAutoRelease.contains(index) == false) + .filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK)) + .collect(Collectors.toSet()); + + if (indicesToAutoRelease.isEmpty() == false) { + if (diskThresholdSettings.isAutoReleaseIndexEnabled()) { + logger.info("releasing read-only-allow-delete block on indices: [{}]", indicesToAutoRelease); + updateIndicesReadOnly(indicesToAutoRelease, listener, false); + } else { + deprecationLogger.deprecated("[{}] will be removed in version {}", + DiskThresholdSettings.AUTO_RELEASE_INDEX_ENABLED_KEY, Version.V_7_4_0.major + 1); + logger.debug("[{}] disabled, not releasing read-only-allow-delete block on indices: [{}]", + DiskThresholdSettings.AUTO_RELEASE_INDEX_ENABLED_KEY, indicesToAutoRelease); + listener.onResponse(null); + } + } else { + listener.onResponse(null); + } indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index)); if (indicesToMarkReadOnly.isEmpty() == false) { - markIndicesReadOnly(indicesToMarkReadOnly, ActionListener.wrap(r -> { - setLastRunTimeMillis(); - listener.onResponse(r); - }, e -> { - logger.debug("marking indices readonly failed", e); - setLastRunTimeMillis(); - listener.onFailure(e); - })); + updateIndicesReadOnly(indicesToMarkReadOnly, listener, true); } else { listener.onResponse(null); } } + private void markNodesMissingUsageIneligibleForRelease(RoutingNodes routingNodes, ImmutableOpenMap usages, + Set indicesToMarkIneligibleForAutoRelease) { + for (RoutingNode routingNode : routingNodes) { + if (usages.containsKey(routingNode.nodeId()) == false) { + if (routingNode != null) { + for (ShardRouting routing : routingNode) { + String indexName = routing.index().getName(); + indicesToMarkIneligibleForAutoRelease.add(indexName); + } + } + } + } + + } + private void setLastRunTimeMillis() { lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong())); } - protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { + protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { // set read-only block but don't block on the response - client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)) - .setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()) - .execute(ActionListener.map(listener, r -> null)); + ActionListener wrappedListener = ActionListener.wrap(r -> { + setLastRunTimeMillis(); + listener.onResponse(r); + }, e -> { + logger.debug(new ParameterizedMessage("setting indices [{}] read-only failed", readOnly), e); + setLastRunTimeMillis(); + listener.onFailure(e); + }); + Settings readOnlySettings = readOnly ? Settings.builder() + .put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, Boolean.TRUE.toString()).build() : + Settings.builder().putNull(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE).build(); + client.admin().indices().prepareUpdateSettings(indicesToUpdate.toArray(Strings.EMPTY_ARRAY)) + .setSettings(readOnlySettings) + .execute(ActionListener.map(wrappedListener, r -> null)); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java index b8d234e9f1086..2477763a3138c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java @@ -72,6 +72,20 @@ public class DiskThresholdSettings { private volatile TimeValue rerouteInterval; private volatile Double freeDiskThresholdFloodStage; private volatile ByteSizeValue freeBytesThresholdFloodStage; + private static final boolean autoReleaseIndexEnabled; + public static final String AUTO_RELEASE_INDEX_ENABLED_KEY = "es.disk.auto_release_flood_stage_block"; + + static { + final String property = System.getProperty(AUTO_RELEASE_INDEX_ENABLED_KEY); + if (property == null) { + autoReleaseIndexEnabled = true; + } else if (Boolean.FALSE.toString().equals(property)){ + autoReleaseIndexEnabled = false; + } else { + throw new IllegalArgumentException(AUTO_RELEASE_INDEX_ENABLED_KEY + " may only be unset or set to [false] but was [" + + property + "]"); + } + } public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) { final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings); @@ -286,6 +300,10 @@ public ByteSizeValue getFreeBytesThresholdFloodStage() { return freeBytesThresholdFloodStage; } + public boolean isAutoReleaseIndexEnabled() { + return autoReleaseIndexEnabled; + } + public boolean includeRelocations() { return includeRelocations; } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 1b4375364305b..64406888bc239 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.ClusterSettings; @@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; public class DiskThresholdMonitorTests extends ESAllocationTestCase { @@ -51,7 +53,6 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase { public void testMarkFloodStageIndicesReadOnly() { AllocationService allocation = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); - Settings settings = Settings.EMPTY; MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT) .put("index.routing.allocation.require._id", "node2")).numberOfShards(1).numberOfReplicas(0)) @@ -65,28 +66,25 @@ public void testMarkFloodStageIndicesReadOnly() { .addAsNew(metaData.index("test_1")) .addAsNew(metaData.index("test_2")) .build(); - ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metaData(metaData).routingTable(routingTable).build(); - logger.info("adding two nodes and performing rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().add(newNode("node1")) - .add(newNode("node2"))).build(); - clusterState = allocation.reroute(clusterState, "reroute"); - logger.info("start primary shard"); - clusterState = startInitializingShardsAndReroute(allocation, clusterState); - ClusterState finalState = clusterState; + final ClusterState clusterState = applyStartedShardsUntilNoChange( + ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData).routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(), allocation); AtomicBoolean reroute = new AtomicBoolean(false); AtomicReference> indices = new AtomicReference<>(); AtomicLong currentTime = new AtomicLong(); - DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState, + DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, priority, listener) -> { assertTrue(reroute.compareAndSet(false, true)); assertThat(priority, equalTo(Priority.HIGH)); listener.onResponse(null); }) { + @Override - protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { + protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener, boolean readOnly) { assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); + assertTrue(readOnly); listener.onResponse(null); } }; @@ -119,7 +117,7 @@ protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionList .blocks(ClusterBlocks.builder().addBlocks(indexMetaData).build()).build(); assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); - monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState, + monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> anotherFinalClusterState, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, priority, listener) -> { assertTrue(reroute.compareAndSet(false, true)); @@ -127,8 +125,9 @@ protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionList listener.onResponse(null); }) { @Override - protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { + protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener, boolean readOnly) { assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); + assertTrue(readOnly); listener.onResponse(null); } }; @@ -156,15 +155,15 @@ public void testDoesNotSubmitRerouteTaskTooFrequently() { assertTrue(listenerReference.compareAndSet(null, listener)); }) { @Override - protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { + protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener, boolean readOnly) { throw new AssertionError("unexpected"); } }; final ImmutableOpenMap.Builder allDisksOkBuilder; allDisksOkBuilder = ImmutableOpenMap.builder(); - allDisksOkBuilder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 50)); - allDisksOkBuilder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 50)); + allDisksOkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, 50)); + allDisksOkBuilder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 50)); final ImmutableOpenMap allDisksOk = allDisksOkBuilder.build(); final ImmutableOpenMap.Builder oneDiskAboveWatermarkBuilder = ImmutableOpenMap.builder(); @@ -226,4 +225,144 @@ protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionList monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); assertNull(listenerReference.get()); } + + public void testAutoReleaseIndices() { + AtomicReference> indicesToMarkReadOnly = new AtomicReference<>(); + AtomicReference> indicesToRelease = new AtomicReference<>(); + AllocationService allocation = createAllocationService(Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test_1").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1)) + .put(IndexMetaData.builder("test_2").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(1)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test_1")) + .addAsNew(metaData.index("test_2")) + .build(); + final ClusterState clusterState = applyStartedShardsUntilNoChange( + ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData).routingTable(routingTable) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(), allocation); + assertThat(clusterState.getRoutingTable().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8)); + + DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L, + (reason, priority, listener) -> { + assertNotNull(listener); + assertThat(priority, equalTo(Priority.HIGH)); + listener.onResponse(null); + }) { + @Override + protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { + if (readOnly) { + assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate)); + } else { + assertTrue(indicesToRelease.compareAndSet(null, indicesToUpdate)); + } + listener.onResponse(null); + } + }; + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4))); + monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indicesToMarkReadOnly.get()); + assertNull(indicesToRelease.get()); + + // Change cluster state so that "test_2" index is blocked (read only) + IndexMetaData indexMetaData = IndexMetaData.builder(clusterState.metaData().index("test_2")).settings(Settings.builder() + .put(clusterState.metaData() + .index("test_2").getSettings()) + .put(IndexMetaData.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true)).build(); + + ClusterState clusterStateWithBlocks = ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData()) + .put(indexMetaData, true).build()) + .blocks(ClusterBlocks.builder().addBlocks(indexMetaData).build()).build(); + + assertTrue(clusterStateWithBlocks.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); + monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterStateWithBlocks, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L, + (reason, priority, listener) -> { + assertNotNull(listener); + assertThat(priority, equalTo(Priority.HIGH)); + listener.onResponse(null); + }) { + @Override + protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener listener, boolean readOnly) { + if (readOnly) { + assertTrue(indicesToMarkReadOnly.compareAndSet(null, indicesToUpdate)); + } else { + assertTrue(indicesToRelease.compareAndSet(null, indicesToUpdate)); + } + listener.onResponse(null); + } + }; + // When free disk on any of node1 or node2 goes below 5% flood watermark, then apply index block on indices not having the block + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 100))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4))); + monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + assertThat(indicesToMarkReadOnly.get(), contains("test_1")); + assertNull(indicesToRelease.get()); + + // When free disk on node1 and node2 goes above 10% high watermark, then only release index block + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100))); + monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + assertNull(indicesToMarkReadOnly.get()); + assertThat(indicesToRelease.get(), contains("test_2")); + + // When no usage information is present for node2, we don't release the block + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4))); + monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + assertThat(indicesToMarkReadOnly.get(), contains("test_1")); + assertNull(indicesToRelease.get()); + + // When disk usage on one node is between the high and flood-stage watermarks, nothing changes + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 9))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(5, 100))); + if (randomBoolean()) { + builder.put("node3", new DiskUsage("node3", "node3", "/foo/bar", 100, between(0, 100))); + } + monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + assertNull(indicesToMarkReadOnly.get()); + assertNull(indicesToRelease.get()); + + // When disk usage on one node is missing and the other is below the high watermark, nothing changes + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 100))); + if (randomBoolean()) { + builder.put("node3", new DiskUsage("node3", "node3", "/foo/bar", 100, between(0, 100))); + } + monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + assertNull(indicesToMarkReadOnly.get()); + assertNull(indicesToRelease.get()); + + // When disk usage on one node is missing and the other is above the flood-stage watermark, affected indices are blocked + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4))); + if (randomBoolean()) { + builder.put("node3", new DiskUsage("node3", "node3", "/foo/bar", 100, between(0, 100))); + } + monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + assertThat(indicesToMarkReadOnly.get(), contains("test_1")); + assertNull(indicesToRelease.get()); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java index d9e157187d581..0ce9a89127901 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettingsTests.java @@ -45,6 +45,9 @@ public void testDefaults() { assertEquals(60L, diskThresholdSettings.getRerouteInterval().seconds()); assertTrue(diskThresholdSettings.isEnabled()); assertTrue(diskThresholdSettings.includeRelocations()); + assertEquals(zeroBytes, diskThresholdSettings.getFreeBytesThresholdFloodStage()); + assertEquals(5.0D, diskThresholdSettings.getFreeDiskThresholdFloodStage(), 0.0D); + assertTrue(diskThresholdSettings.isAutoReleaseIndexEnabled()); } public void testUpdate() { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index 595c144fa173e..5731b04be4c43 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -19,15 +19,21 @@ package org.elasticsearch.cluster.routing.allocation.decider; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.DiskUsage; import org.elasticsearch.cluster.MockInternalClusterInfoService; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -35,12 +41,15 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class MockDiskUsagesIT extends ESIntegTestCase { @@ -133,4 +142,91 @@ public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { assertThat("node3 has at least 3 shards", nodesToShardCount.get(realNodeNames.get(2)), greaterThanOrEqualTo(3)); }); } + + public void testAutomaticReleaseOfIndexBlock() throws Exception { + List nodes = internalCluster().startNodes(3); + + // Wait for all 3 nodes to be up + assertBusy(() -> { + NodesStatsResponse resp = client().admin().cluster().prepareNodesStats().get(); + assertThat(resp.getNodes().size(), equalTo(3)); + }); + + // Start with all nodes at 50% usage + final MockInternalClusterInfoService cis = (MockInternalClusterInfoService) + internalCluster().getInstance(ClusterInfoService.class, internalCluster().getMasterName()); + cis.setUpdateFrequency(TimeValue.timeValueMillis(100)); + cis.onMaster(); + cis.setN1Usage(nodes.get(0), new DiskUsage(nodes.get(0), "n1", "/dev/null", 100, 50)); + cis.setN2Usage(nodes.get(1), new DiskUsage(nodes.get(1), "n2", "/dev/null", 100, 50)); + cis.setN3Usage(nodes.get(2), new DiskUsage(nodes.get(2), "n3", "/dev/null", 100, 50)); + + final boolean watermarkBytes = randomBoolean(); // we have to consistently use bytes or percentage for the disk watermark settings + client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder() + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "15b" : "85%") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), watermarkBytes ? "10b" : "90%") + .put( + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), + watermarkBytes ? "5b" : "95%") + .put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "150ms")).get(); + // Create an index with 6 shards so we can check allocation for it + prepareCreate("test").setSettings(Settings.builder() + .put("number_of_shards", 6) + .put("number_of_replicas", 0)).get(); + ensureGreen("test"); + + // Block until the "fake" cluster info is retrieved at least once + assertBusy(() -> { + ClusterInfo info = cis.getClusterInfo(); + logger.info("--> got: {} nodes", info.getNodeLeastAvailableDiskUsages().size()); + assertThat(info.getNodeLeastAvailableDiskUsages().size(), greaterThan(0)); + }); + + final List realNodeNames = new ArrayList<>(); + ClusterStateResponse resp = client().admin().cluster().prepareState().get(); + Iterator iter = resp.getState().getRoutingNodes().iterator(); + while (iter.hasNext()) { + RoutingNode node = iter.next(); + realNodeNames.add(node.nodeId()); + logger.info("--> node {} has {} shards", + node.nodeId(), resp.getState().getRoutingNodes().node(node.nodeId()).numberOfOwningShards()); + } + + client().prepareIndex("test", "doc", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + assertSearchHits(client().prepareSearch().get(), "1"); + + // Block all nodes so that re-balancing does not occur (BalancedShardsAllocator) + cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 3)); + cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 3)); + cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 3)); + + // Wait until index "test" is blocked + assertBusy(() -> { + assertBlocked(client().prepareIndex().setIndex("test").setType("doc").setId("1").setSource("foo", "bar"), + IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK); + }); + + // Cannot add further documents + assertBlocked(client().prepareIndex().setIndex("test").setType("doc").setId("2").setSource("foo", "bar"), + IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK); + assertSearchHits(client().prepareSearch().get(), "1"); + + // Update the disk usages so all nodes are back under the high and flood watermarks + cis.setN1Usage(realNodeNames.get(0), new DiskUsage(nodes.get(0), "n1", "_na_", 100, 11)); + cis.setN2Usage(realNodeNames.get(1), new DiskUsage(nodes.get(1), "n2", "_na_", 100, 11)); + cis.setN3Usage(realNodeNames.get(2), new DiskUsage(nodes.get(2), "n3", "_na_", 100, 11)); + + // Attempt to create a new document until DiskUsageMonitor unblocks the index + assertBusy(() -> { + try { + client().prepareIndex("test", "doc", "3").setSource("{\"foo\": \"bar\"}", XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + } catch (ClusterBlockException e) { + throw new AssertionError("retrying", e); + } + }); + assertSearchHits(client().prepareSearch().get(), "1", "3"); + } + }