From b5ebb42b4fb935c84320e6f8875712d2a6b07fe8 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 12 Jun 2019 13:57:54 +0200 Subject: [PATCH 1/5] Trim translog for replicated closed indices --- .../org/elasticsearch/index/IndexService.java | 9 +++- .../index/engine/NoOpEngine.java | 42 ++++++++++++++++ .../index/IndexServiceTests.java | 34 ++++++++++++- .../index/engine/NoOpEngineTests.java | 49 +++++++++++++++---- 4 files changed, 122 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 7c3dc0fe497be..776bf3c521413 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -938,6 +938,11 @@ final class AsyncTrimTranslogTask extends BaseAsyncTask { .getSettings().getAsTime(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, TimeValue.timeValueMinutes(10))); } + @Override + protected boolean mustReschedule() { + return indexService.closed.get() == false; + } + @Override protected void runInternal() { indexService.maybeTrimTranslog(); @@ -1029,8 +1034,8 @@ AsyncTranslogFSync getFsyncTask() { // for tests return fsyncTask; } - AsyncGlobalCheckpointTask getGlobalCheckpointTask() { - return globalCheckpointTask; + AsyncTrimTranslogTask getTrimTranslogTask() { // for tests + return trimTranslogTask; } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index 7f474d1be24c7..60c8e1740a6d5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -27,10 +27,15 @@ import org.apache.lucene.index.SegmentReader; import org.apache.lucene.store.Directory; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.Map; import java.util.function.Function; /** @@ -116,4 +121,41 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl return super.segmentsStats(includeSegmentFileSizes, includeUnloadedSegments); } } + + @Override + public void trimUnreferencedTranslogFiles() { + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen(); + final Map commitUserData = getLastCommittedSegmentInfos().getUserData(); + final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY); + if (translogUuid == null) { + throw new IllegalStateException("commit doesn't contain translog unique id"); + } + if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { + throw new IllegalStateException("commit doesn't contain translog generation id"); + } + final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY)); + final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); + final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid); + + if (minTranslogGeneration < lastCommitGeneration) { + // a translog deletion policy that retains nothing but the last translog generation from safe commit + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1); + translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration); + translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration); + + try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, + engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier())) { + translog.trimUnreferencedReaders(); + } + } + } catch (final Exception e) { + try { + failEngine("translog trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new EngineException(shardId, "failed to trim translog", e); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 2d4030a51ce3d..48ae9ecabebc0 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -44,10 +44,12 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.index.shard.IndexShardTestCase.getEngine; import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.core.IsEqual.equalTo; @@ -370,7 +372,7 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception { .build(); IndexService indexService = createIndex("test", settings); ensureGreen("test"); - assertTrue(indexService.getRefreshTask().mustReschedule()); + assertTrue(indexService.getTrimTranslogTask().mustReschedule()); client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); client().admin().indices().prepareFlush("test").get(); client().admin().indices().prepareUpdateSettings("test") @@ -382,6 +384,36 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception { assertBusy(() -> assertThat(IndexShardTestCase.getTranslog(shard).totalOperations(), equalTo(0))); } + public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { + final String indexName = "test"; + IndexService indexService = createIndex(indexName, Settings.builder() + .put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms") + .build()); + + final Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0)); + final int numDocs = scaledRandomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex().setIndex(indexName).setId(String.valueOf(i)).setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); + if (randomBoolean()) { + client().admin().indices().prepareFlush(indexName).get(); + } + } + assertAcked(client().admin().indices().prepareClose("test")); + + indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index()); + assertTrue(indexService.getTrimTranslogTask().mustReschedule()); + + final long lastCommitedTranslogGeneration; + try (Engine.IndexCommitRef indexCommitRef = getEngine(indexService.getShard(0)).acquireLastIndexCommit(false)) { + Map lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData(); + lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); + } + assertBusy(() -> { + long minTranslogGen = Translog.readMinTranslogGeneration(translog.getConfig().getTranslogPath(), translog.getTranslogUUID()); + assertThat(minTranslogGen, equalTo(lastCommitedTranslogGeneration)); + }); + } + public void testIllegalFsyncInterval() { Settings settings = Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java index de32e3e43077d..48ffb0e4cddcc 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java @@ -35,13 +35,14 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.TranslogDeletionPolicy; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.IndexSettingsModule; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; import java.util.Collections; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.equalTo; @@ -83,7 +84,7 @@ public void testNoopAfterRegularEngine() throws IOException { tracker.updateLocalCheckpoint(allocationId.getId(), i); } - flushAndTrimTranslog(engine); + engine.flush(true, true); long localCheckpoint = engine.getLocalCheckpoint(); long maxSeqNo = engine.getSeqNoStats(100L).getMaxSeqNo(); @@ -127,7 +128,7 @@ public void testNoOpEngineStats() throws Exception { } } engine.getLocalCheckpointTracker().waitForOpsToComplete(numDocs + deletions - 1); - flushAndTrimTranslog(engine); + engine.flush(true, true); } final DocsStats expectedDocStats; @@ -157,12 +158,42 @@ public void testNoOpEngineStats() throws Exception { } } - private void flushAndTrimTranslog(final InternalEngine engine) { - engine.flush(true, true); - final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy(); - deletionPolicy.setRetentionSizeInBytes(-1); - deletionPolicy.setRetentionAgeInMillis(-1); - deletionPolicy.setMinTranslogGenerationForRecovery(engine.getTranslog().getGeneration().translogFileGeneration); + public void testTrimUnreferencedTranslogFiles() throws Exception { + final ReplicationTracker tracker = (ReplicationTracker) engine.config().getGlobalCheckpointSupplier(); + ShardRouting routing = TestShardRouting.newShardRouting("test", shardId.id(), "node", + null, true, ShardRoutingState.STARTED, allocationId); + IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build(); + tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table); + tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + final int numDocs = scaledRandomIntBetween(10, 3000); + for (int i = 0; i < numDocs; i++) { + engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); + if (rarely()) { + engine.flush(); + } + tracker.updateLocalCheckpoint(allocationId.getId(), i); + } engine.flush(true, true); + + final String translogUuid = engine.getTranslog().getTranslogUUID(); + final long minFileGeneration = engine.getTranslog().getMinFileGeneration(); + final long currentFileGeneration = engine.getTranslog().currentFileGeneration(); + engine.close(); + + final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); + final Path translogPath = noOpEngine.config().getTranslogConfig().getTranslogPath(); + + final long lastCommitedTranslogGeneration; + try (Engine.IndexCommitRef indexCommitRef = noOpEngine.acquireLastIndexCommit(false)) { + Map lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData(); + lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertThat(lastCommitedTranslogGeneration, equalTo(currentFileGeneration)); + } + + assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(minFileGeneration)); + noOpEngine.trimUnreferencedTranslogFiles(); + assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(lastCommitedTranslogGeneration)); + noOpEngine.close(); } } From 1d48c4e99bb59d340fd362615108224191d7e402 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 19 Jun 2019 17:48:53 +0200 Subject: [PATCH 2/5] Update NoOpEngine javadoc --- .../main/java/org/elasticsearch/index/engine/NoOpEngine.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index 60c8e1740a6d5..ff436738d7f62 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -41,7 +41,9 @@ /** * NoOpEngine is an engine implementation that does nothing but the bare minimum * required in order to have an engine. All attempts to do something (search, - * index, get), throw {@link UnsupportedOperationException}. + * index, get), throw {@link UnsupportedOperationException}. However, NoOpEngine + * allows to trim any existing translog files through the usage of the + * {{@link #trimUnreferencedTranslogFiles()}} method. */ public final class NoOpEngine extends ReadOnlyEngine { From efd339fb1358ec08056072daa33956eff8eb64a6 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 19 Jun 2019 17:56:09 +0200 Subject: [PATCH 3/5] Update trimUnreferencedTranslogFiles() doc --- .../main/java/org/elasticsearch/index/engine/NoOpEngine.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index ff436738d7f62..d1103871d4229 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -124,6 +124,10 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl } } + /** + * This implementation will trim existing translog files using a {@link TranslogDeletionPolicy} + * that retains nothing but the last translog generation from safe commit. + */ @Override public void trimUnreferencedTranslogFiles() { try (ReleasableLock lock = readLock.acquire()) { From fe62689dd6a3c11c6a647cbaea9ed5bce221b706 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 24 Jun 2019 17:57:19 +0200 Subject: [PATCH 4/5] Check translog operations and stats in test --- .../elasticsearch/index/IndexServiceTests.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 48ae9ecabebc0..af4d621448ad7 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.Map; @@ -390,7 +391,10 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { .put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms") .build()); - final Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0)); + Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0)); + final Path translogPath = translog.getConfig().getTranslogPath(); + final String translogUuid = translog.getTranslogUUID(); + final int numDocs = scaledRandomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { client().prepareIndex().setIndex(indexName).setId(String.valueOf(i)).setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); @@ -398,6 +402,8 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { client().admin().indices().prepareFlush(indexName).get(); } } + assertThat(translog.totalOperations(), equalTo(numDocs)); + assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(numDocs)); assertAcked(client().admin().indices().prepareClose("test")); indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index()); @@ -409,9 +415,16 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); } assertBusy(() -> { - long minTranslogGen = Translog.readMinTranslogGeneration(translog.getConfig().getTranslogPath(), translog.getTranslogUUID()); + long minTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUuid); assertThat(minTranslogGen, equalTo(lastCommitedTranslogGeneration)); }); + + assertAcked(client().admin().indices().prepareOpen("test")); + + indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index()); + translog = IndexShardTestCase.getTranslog(indexService.getShard(0)); + assertThat(translog.totalOperations(), equalTo(0)); + assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(0)); } public void testIllegalFsyncInterval() { From 6f08262b318bf368f6c66684ca8921aaa1eafd75 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 25 Jun 2019 16:28:54 +0200 Subject: [PATCH 5/5] check number of commits --- .../index/engine/NoOpEngine.java | 44 +++++++++++-------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index d1103871d4229..e6292cc6bcfa2 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -28,6 +28,7 @@ import org.apache.lucene.store.Directory; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogDeletionPolicy; @@ -130,29 +131,34 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl */ @Override public void trimUnreferencedTranslogFiles() { + final Store store = this.engineConfig.getStore(); + store.incRef(); try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - final Map commitUserData = getLastCommittedSegmentInfos().getUserData(); - final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY); - if (translogUuid == null) { - throw new IllegalStateException("commit doesn't contain translog unique id"); - } - if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { - throw new IllegalStateException("commit doesn't contain translog generation id"); - } - final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY)); - final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid); + final List commits = DirectoryReader.listCommits(store.directory()); + if (commits.size() == 1) { + final Map commitUserData = getLastCommittedSegmentInfos().getUserData(); + final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY); + if (translogUuid == null) { + throw new IllegalStateException("commit doesn't contain translog unique id"); + } + if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { + throw new IllegalStateException("commit doesn't contain translog generation id"); + } + final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY)); + final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); + final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid); - if (minTranslogGeneration < lastCommitGeneration) { - // a translog deletion policy that retains nothing but the last translog generation from safe commit - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1); - translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration); - translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration); + if (minTranslogGeneration < lastCommitGeneration) { + // a translog deletion policy that retains nothing but the last translog generation from safe commit + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1); + translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration); + translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration); - try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, + try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier())) { - translog.trimUnreferencedReaders(); + translog.trimUnreferencedReaders(); + } } } } catch (final Exception e) { @@ -162,6 +168,8 @@ public void trimUnreferencedTranslogFiles() { e.addSuppressed(inner); } throw new EngineException(shardId, "failed to trim translog", e); + } finally { + store.decRef(); } } }