Skip to content

Commit

Permalink
ILM: add force-merge step to searchable snapshots action (#60819)
Browse files Browse the repository at this point in the history
This adds a force-merge step to the searchable snapshot action, enabled by default,
but parameterizable using the `force_merge-index" optional boolean.

eg.
```
PUT _ilm/policy/my_policy
{
  "policy": {
    "phases": {
      "cold": {
        "actions": {
          "searchable_snapshot" : {
            "snapshot_repository" : "backing_repo",
            "force_merge_index": true
          }
        }
      }
    }
  }
}
```
  • Loading branch information
andreidan authored Aug 10, 2020
1 parent a62f5fa commit d0a17b2
Show file tree
Hide file tree
Showing 11 changed files with 412 additions and 202 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ tasks.register("verifyVersions") {
* after the backport of the backcompat code is complete.
*/

boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https:/elastic/elasticsearch/pull/60819" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/ilm/actions/ilm-forcemerge.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ This action makes the index <<dynamic-index-settings,read-only>>.
To use the `forcemerge` action in the `hot` phase, the `rollover` action *must* be present.
If no rollover action is configured, {ilm-init} will reject the policy.

[NOTE]
The `forcemerge` action is best effort. It might happen that some of the
shards are relocating, in which case they will not be merged.

[[ilm-forcemerge-options]]
==== Options

Expand Down
14 changes: 14 additions & 0 deletions docs/reference/ilm/actions/ilm-searchable-snapshot.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ To keep the snapshot, set `delete_searchable_snapshot` to `false` in the delete
Specifies where to store the snapshot.
See <<snapshots-register-repository>> for more information.

`force_merge_index`::
(Optional, boolean)
Force merges the managed index to one segment.
Defaults to `true`.
If the managed index was already force merged using the
<<ilm-forcemerge, force merge action>> in a previous action
the `searchable snapshot` action force merge step will be a no-op.

[NOTE]
The `forcemerge` action is best effort. It might happen that some of
the shards are relocating, in which case they will not be merged.
The `searchable-snapshot` action will continue executing even if not all shards
are force merged.

[[ilm-searchable-snapshot-ex]]
==== Examples
[source,console]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ protected static void assertOK(Response response) {
* in an non green state
* @param index index to test for
**/
protected static void ensureGreen(String index) throws IOException {
public static void ensureGreen(String index) throws IOException {
ensureHealth(index, (request) -> {
request.addParameter("wait_for_status", "green");
request.addParameter("wait_for_no_relocating_shards", "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
Expand All @@ -18,7 +19,7 @@
import org.elasticsearch.xpack.core.ilm.Step.StepKey;

import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand All @@ -31,38 +32,52 @@ public class SearchableSnapshotAction implements LifecycleAction {
public static final String NAME = "searchable_snapshot";

public static final ParseField SNAPSHOT_REPOSITORY = new ParseField("snapshot_repository");
public static final ParseField FORCE_MERGE_INDEX = new ParseField("force_merge_index");
public static final String CONDITIONAL_DATASTREAM_CHECK_KEY = BranchingStep.NAME + "-on-datastream-check";

public static final String RESTORED_INDEX_PREFIX = "restored-";

private static final ConstructingObjectParser<SearchableSnapshotAction, Void> PARSER = new ConstructingObjectParser<>(NAME,
a -> new SearchableSnapshotAction((String) a[0]));
a -> new SearchableSnapshotAction((String) a[0], a[1] == null || (boolean) a[1]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), SNAPSHOT_REPOSITORY);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FORCE_MERGE_INDEX);
}

public static SearchableSnapshotAction parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

private final String snapshotRepository;
private final boolean forceMergeIndex;

public SearchableSnapshotAction(String snapshotRepository) {
public SearchableSnapshotAction(String snapshotRepository, boolean forceMergeIndex) {
if (Strings.hasText(snapshotRepository) == false) {
throw new IllegalArgumentException("the snapshot repository must be specified");
}
this.snapshotRepository = snapshotRepository;
this.forceMergeIndex = forceMergeIndex;
}

public SearchableSnapshotAction(String snapshotRepository) {
this(snapshotRepository, true);
}

public SearchableSnapshotAction(StreamInput in) throws IOException {
this(in.readString());
this(in.readString(), in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readBoolean() : true);
}

boolean isForceMergeIndex() {
return forceMergeIndex;
}

@Override
public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
StepKey checkNoWriteIndex = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME);
StepKey waitForNoFollowerStepKey = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME);
StepKey forceMergeStepKey = new StepKey(phase, NAME, ForceMergeStep.NAME);
StepKey waitForSegmentCountKey = new StepKey(phase, NAME, SegmentCountStep.NAME);
StepKey generateSnapshotNameKey = new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME);
StepKey cleanSnapshotKey = new StepKey(phase, NAME, CleanupSnapshotStep.NAME);
StepKey createSnapshotKey = new StepKey(phase, NAME, CreateSnapshotStep.NAME);
Expand All @@ -77,8 +92,14 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {

CheckNotDataStreamWriteIndexStep checkNoWriteIndexStep = new CheckNotDataStreamWriteIndexStep(checkNoWriteIndex,
waitForNoFollowerStepKey);
WaitForNoFollowersStep waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, generateSnapshotNameKey,
client);
final WaitForNoFollowersStep waitForNoFollowersStep;
if (forceMergeIndex) {
waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, forceMergeStepKey, client);
} else {
waitForNoFollowersStep = new WaitForNoFollowersStep(waitForNoFollowerStepKey, generateSnapshotNameKey, client);
}
ForceMergeStep forceMergeStep = new ForceMergeStep(forceMergeStepKey, waitForSegmentCountKey, client, 1);
SegmentCountStep segmentCountStep = new SegmentCountStep(waitForSegmentCountKey, generateSnapshotNameKey, client, 1);
GenerateSnapshotNameStep generateSnapshotNameStep = new GenerateSnapshotNameStep(generateSnapshotNameKey, cleanSnapshotKey,
snapshotRepository);
CleanupSnapshotStep cleanupSnapshotStep = new CleanupSnapshotStep(cleanSnapshotKey, createSnapshotKey, client);
Expand Down Expand Up @@ -108,9 +129,25 @@ public List<Step> toSteps(Client client, String phase, StepKey nextStepKey) {
SwapAliasesAndDeleteSourceIndexStep swapAliasesAndDeleteSourceIndexStep = new SwapAliasesAndDeleteSourceIndexStep(swapAliasesKey,
null, client, RESTORED_INDEX_PREFIX);

return Arrays.asList(checkNoWriteIndexStep, waitForNoFollowersStep, generateSnapshotNameStep, cleanupSnapshotStep,
createSnapshotBranchingStep, mountSnapshotStep, waitForGreenIndexHealthStep, copyMetadataStep, copySettingsStep,
isDataStreamBranchingStep, replaceDataStreamBackingIndex, deleteSourceIndexStep, swapAliasesAndDeleteSourceIndexStep);
List<Step> steps = new ArrayList<>();
steps.add(checkNoWriteIndexStep);
steps.add(waitForNoFollowersStep);
if (forceMergeIndex) {
steps.add(forceMergeStep);
steps.add(segmentCountStep);
}
steps.add(generateSnapshotNameStep);
steps.add(cleanupSnapshotStep);
steps.add(createSnapshotBranchingStep);
steps.add(mountSnapshotStep);
steps.add(waitForGreenIndexHealthStep);
steps.add(copyMetadataStep);
steps.add(copySettingsStep);
steps.add(isDataStreamBranchingStep);
steps.add(replaceDataStreamBackingIndex);
steps.add(deleteSourceIndexStep);
steps.add(swapAliasesAndDeleteSourceIndexStep);
return steps;
}

@Override
Expand All @@ -126,12 +163,16 @@ public String getWriteableName() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(snapshotRepository);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(forceMergeIndex);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SNAPSHOT_REPOSITORY.getPreferredName(), snapshotRepository);
builder.field(FORCE_MERGE_INDEX.getPreferredName(), forceMergeIndex);
builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public void evaluateCondition(Metadata metadata, Index index, Listener listener,
if (idxSegments == null || (response.getShardFailures() != null && response.getShardFailures().length > 0)) {
final DefaultShardOperationFailedException[] failures = response.getShardFailures();
logger.info("[{}] retrieval of segment counts after force merge did not succeed, " +
"there were {} shard failures. " +
"failures: {}",
"there were {} shard failures. failures: {}",
index.getName(),
response.getFailedShards(),
failures == null ? "n/a" : Strings.collectionToDelimitedString(Arrays.stream(failures)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,73 @@ public class SearchableSnapshotActionTests extends AbstractActionTestCase<Search
@Override
public void testToSteps() {
String phase = randomAlphaOfLengthBetween(1, 10);
StepKey expectedFirstStep = new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME);
StepKey expectedSecondStep = new StepKey(phase, NAME, WaitForNoFollowersStep.NAME);
StepKey expectedThirdStep = new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME);
StepKey expectedFourthStep = new StepKey(phase, NAME, CleanupSnapshotStep.NAME);
StepKey expectedFifthStep = new StepKey(phase, NAME, CreateSnapshotStep.NAME);
StepKey expectedSixthStep = new StepKey(phase, NAME, MountSnapshotStep.NAME);
StepKey expectedSeventhStep = new StepKey(phase, NAME, WaitForIndexColorStep.NAME);
StepKey expectedEighthStep = new StepKey(phase, NAME, CopyExecutionStateStep.NAME);
StepKey expectedNinthStep = new StepKey(phase, NAME, CopySettingsStep.NAME);
StepKey expectedTenthStep = new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY);
StepKey expectedElevenStep = new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME);
StepKey expectedTwelveStep = new StepKey(phase, NAME, DeleteStep.NAME);
StepKey expectedThirteenStep = new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME);

SearchableSnapshotAction action = createTestInstance();
StepKey nextStepKey = new StepKey(phase, randomAlphaOfLengthBetween(1, 5), randomAlphaOfLengthBetween(1, 5));

List<Step> steps = action.toSteps(null, phase, nextStepKey);
assertThat(steps.size(), is(13));

assertThat(steps.get(0).getKey(), is(expectedFirstStep));
assertThat(steps.get(1).getKey(), is(expectedSecondStep));
assertThat(steps.get(2).getKey(), is(expectedThirdStep));
assertThat(steps.get(3).getKey(), is(expectedFourthStep));
assertThat(steps.get(4).getKey(), is(expectedFifthStep));
assertThat(steps.get(5).getKey(), is(expectedSixthStep));
assertThat(steps.get(6).getKey(), is(expectedSeventhStep));
assertThat(steps.get(7).getKey(), is(expectedEighthStep));
assertThat(steps.get(8).getKey(), is(expectedNinthStep));
assertThat(steps.get(9).getKey(), is(expectedTenthStep));
assertThat(steps.get(10).getKey(), is(expectedElevenStep));
assertThat(steps.get(11).getKey(), is(expectedTwelveStep));
assertThat(steps.get(12).getKey(), is(expectedThirteenStep));

AsyncActionBranchingStep branchStep = (AsyncActionBranchingStep) steps.get(4);
assertThat(branchStep.getNextKeyOnIncompleteResponse(), is(expectedFourthStep));
assertThat(steps.size(), is(action.isForceMergeIndex() ? 15 : 13));

List<StepKey> expectedSteps = action.isForceMergeIndex() ? expectedStepKeysWithForceMerge(phase) :
expectedStepKeysNoForceMerge(phase);

assertThat(steps.get(0).getKey(), is(expectedSteps.get(0)));
assertThat(steps.get(1).getKey(), is(expectedSteps.get(1)));
assertThat(steps.get(2).getKey(), is(expectedSteps.get(2)));
assertThat(steps.get(3).getKey(), is(expectedSteps.get(3)));
assertThat(steps.get(4).getKey(), is(expectedSteps.get(4)));
assertThat(steps.get(5).getKey(), is(expectedSteps.get(5)));
assertThat(steps.get(6).getKey(), is(expectedSteps.get(6)));
assertThat(steps.get(7).getKey(), is(expectedSteps.get(7)));
assertThat(steps.get(8).getKey(), is(expectedSteps.get(8)));
assertThat(steps.get(9).getKey(), is(expectedSteps.get(9)));
assertThat(steps.get(10).getKey(), is(expectedSteps.get(10)));
assertThat(steps.get(11).getKey(), is(expectedSteps.get(11)));
assertThat(steps.get(12).getKey(), is(expectedSteps.get(12)));

if (action.isForceMergeIndex()) {
assertThat(steps.get(13).getKey(), is(expectedSteps.get(13)));
AsyncActionBranchingStep branchStep = (AsyncActionBranchingStep) steps.get(6);
assertThat(branchStep.getNextKeyOnIncompleteResponse(), is(expectedSteps.get(5)));
} else {
AsyncActionBranchingStep branchStep = (AsyncActionBranchingStep) steps.get(4);
assertThat(branchStep.getNextKeyOnIncompleteResponse(), is(expectedSteps.get(3)));
}
}

private List<StepKey> expectedStepKeysWithForceMerge(String phase) {
return List.of(
new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME),
new StepKey(phase, NAME, WaitForNoFollowersStep.NAME),
new StepKey(phase, NAME, ForceMergeStep.NAME),
new StepKey(phase, NAME, SegmentCountStep.NAME),
new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME),
new StepKey(phase, NAME, CleanupSnapshotStep.NAME),
new StepKey(phase, NAME, CreateSnapshotStep.NAME),
new StepKey(phase, NAME, MountSnapshotStep.NAME),
new StepKey(phase, NAME, WaitForIndexColorStep.NAME),
new StepKey(phase, NAME, CopyExecutionStateStep.NAME),
new StepKey(phase, NAME, CopySettingsStep.NAME),
new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY),
new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME),
new StepKey(phase, NAME, DeleteStep.NAME),
new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME));
}

private List<StepKey> expectedStepKeysNoForceMerge(String phase) {
return List.of(
new StepKey(phase, NAME, CheckNotDataStreamWriteIndexStep.NAME),
new StepKey(phase, NAME, WaitForNoFollowersStep.NAME),
new StepKey(phase, NAME, GenerateSnapshotNameStep.NAME),
new StepKey(phase, NAME, CleanupSnapshotStep.NAME),
new StepKey(phase, NAME, CreateSnapshotStep.NAME),
new StepKey(phase, NAME, MountSnapshotStep.NAME),
new StepKey(phase, NAME, WaitForIndexColorStep.NAME),
new StepKey(phase, NAME, CopyExecutionStateStep.NAME),
new StepKey(phase, NAME, CopySettingsStep.NAME),
new StepKey(phase, NAME, SearchableSnapshotAction.CONDITIONAL_DATASTREAM_CHECK_KEY),
new StepKey(phase, NAME, ReplaceDataStreamBackingIndexStep.NAME),
new StepKey(phase, NAME, DeleteStep.NAME),
new StepKey(phase, NAME, SwapAliasesAndDeleteSourceIndexStep.NAME));
}

@Override
Expand All @@ -79,6 +110,6 @@ protected SearchableSnapshotAction mutateInstance(SearchableSnapshotAction insta
}

static SearchableSnapshotAction randomInstance() {
return new SearchableSnapshotAction(randomAlphaOfLengthBetween(5, 10));
return new SearchableSnapshotAction(randomAlphaOfLengthBetween(5, 10), randomBoolean());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -36,12 +37,15 @@
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLengthBetween;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.rest.ESRestTestCase.ensureGreen;

/**
* This class provides the operational REST functions needed to control an ILM time series lifecycle.
Expand Down Expand Up @@ -204,4 +208,36 @@ public static Map<String, Object> getOnlyIndexSettings(RestClient client, String
}
}

public static void createIndexWithSettings(RestClient client, String index, String alias, Settings.Builder settings)
throws IOException {
createIndexWithSettings(client, index, alias, settings, randomBoolean());
}

public static void createIndexWithSettings(RestClient client, String index, String alias, Settings.Builder settings,
boolean useWriteIndex) throws IOException {
Request request = new Request("PUT", "/" + index);

String writeIndexSnippet = "";
if (useWriteIndex) {
writeIndexSnippet = "\"is_write_index\": true";
}
request.setJsonEntity("{\n \"settings\": " + Strings.toString(settings.build())
+ ", \"aliases\" : { \"" + alias + "\": { " + writeIndexSnippet + " } } }");
client.performRequest(request);
// wait for the shards to initialize
ensureGreen(index);
}

@SuppressWarnings("unchecked")
public static Integer getNumberOfSegments(RestClient client, String index) throws IOException {
Response response = client.performRequest(new Request("GET", index + "/_segments"));
XContentType entityContentType = XContentType.fromMediaTypeOrFormat(response.getEntity().getContentType().getValue());
Map<String, Object> responseEntity = XContentHelper.convertToMap(entityContentType.xContent(),
response.getEntity().getContent(), false);
responseEntity = (Map<String, Object>) responseEntity.get("indices");
responseEntity = (Map<String, Object>) responseEntity.get(index);
responseEntity = (Map<String, Object>) responseEntity.get("shards");
List<Map<String, Object>> shards = (List<Map<String, Object>>) responseEntity.get("0");
return (Integer) shards.get(0).get("num_search_segments");
}
}
Loading

0 comments on commit d0a17b2

Please sign in to comment.