diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index ddfb23f60e121..7be1725138e50 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -940,6 +940,11 @@ final class AsyncTrimTranslogTask extends BaseAsyncTask { .getSettings().getAsTime(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, TimeValue.timeValueMinutes(10))); } + @Override + protected boolean mustReschedule() { + return indexService.closed.get() == false; + } + @Override protected void runInternal() { indexService.maybeTrimTranslog(); @@ -1031,8 +1036,8 @@ AsyncTranslogFSync getFsyncTask() { // for tests return fsyncTask; } - AsyncGlobalCheckpointTask getGlobalCheckpointTask() { - return globalCheckpointTask; + AsyncTrimTranslogTask getTrimTranslogTask() { // for tests + return trimTranslogTask; } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index 7f474d1be24c7..007a13351dfd9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -27,16 +27,24 @@ import org.apache.lucene.index.SegmentReader; import org.apache.lucene.store.Directory; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.index.translog.TranslogConfig; +import org.elasticsearch.index.translog.TranslogDeletionPolicy; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; +import java.util.Map; import java.util.function.Function; /** * NoOpEngine is an engine implementation that does nothing but the bare minimum * required in order to have an engine. All attempts to do something (search, - * index, get), throw {@link UnsupportedOperationException}. + * index, get), throw {@link UnsupportedOperationException}. However, NoOpEngine + * allows to trim any existing translog files through the usage of the + * {{@link #trimUnreferencedTranslogFiles()}} method. */ public final class NoOpEngine extends ReadOnlyEngine { @@ -116,4 +124,52 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl return super.segmentsStats(includeSegmentFileSizes, includeUnloadedSegments); } } + + /** + * This implementation will trim existing translog files using a {@link TranslogDeletionPolicy} + * that retains nothing but the last translog generation from safe commit. + */ + @Override + public void trimUnreferencedTranslogFiles() { + final Store store = this.engineConfig.getStore(); + store.incRef(); + try (ReleasableLock lock = readLock.acquire()) { + ensureOpen(); + final List commits = DirectoryReader.listCommits(store.directory()); + if (commits.size() == 1) { + final Map commitUserData = getLastCommittedSegmentInfos().getUserData(); + final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY); + if (translogUuid == null) { + throw new IllegalStateException("commit doesn't contain translog unique id"); + } + if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { + throw new IllegalStateException("commit doesn't contain translog generation id"); + } + final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY)); + final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); + final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid); + + if (minTranslogGeneration < lastCommitGeneration) { + // a translog deletion policy that retains nothing but the last translog generation from safe commit + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1); + translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration); + translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration); + + try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, + engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) { + translog.trimUnreferencedReaders(); + } + } + } + } catch (final Exception e) { + try { + failEngine("translog trimming failed", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw new EngineException(shardId, "failed to trim translog", e); + } finally { + store.decRef(); + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 2d4030a51ce3d..af4d621448ad7 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -42,12 +42,15 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.nio.file.Path; import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.index.shard.IndexShardTestCase.getEngine; import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.core.IsEqual.equalTo; @@ -370,7 +373,7 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception { .build(); IndexService indexService = createIndex("test", settings); ensureGreen("test"); - assertTrue(indexService.getRefreshTask().mustReschedule()); + assertTrue(indexService.getTrimTranslogTask().mustReschedule()); client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); client().admin().indices().prepareFlush("test").get(); client().admin().indices().prepareUpdateSettings("test") @@ -382,6 +385,48 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception { assertBusy(() -> assertThat(IndexShardTestCase.getTranslog(shard).totalOperations(), equalTo(0))); } + public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { + final String indexName = "test"; + IndexService indexService = createIndex(indexName, Settings.builder() + .put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms") + .build()); + + Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0)); + final Path translogPath = translog.getConfig().getTranslogPath(); + final String translogUuid = translog.getTranslogUUID(); + + final int numDocs = scaledRandomIntBetween(10, 100); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex().setIndex(indexName).setId(String.valueOf(i)).setSource("{\"foo\": \"bar\"}", XContentType.JSON).get(); + if (randomBoolean()) { + client().admin().indices().prepareFlush(indexName).get(); + } + } + assertThat(translog.totalOperations(), equalTo(numDocs)); + assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(numDocs)); + assertAcked(client().admin().indices().prepareClose("test")); + + indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index()); + assertTrue(indexService.getTrimTranslogTask().mustReschedule()); + + final long lastCommitedTranslogGeneration; + try (Engine.IndexCommitRef indexCommitRef = getEngine(indexService.getShard(0)).acquireLastIndexCommit(false)) { + Map lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData(); + lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); + } + assertBusy(() -> { + long minTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUuid); + assertThat(minTranslogGen, equalTo(lastCommitedTranslogGeneration)); + }); + + assertAcked(client().admin().indices().prepareOpen("test")); + + indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index()); + translog = IndexShardTestCase.getTranslog(indexService.getShard(0)); + assertThat(translog.totalOperations(), equalTo(0)); + assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(0)); + } + public void testIllegalFsyncInterval() { Settings settings = Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java index 6f74ac23a8e85..f45eab0e05778 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.test.IndexSettingsModule; @@ -42,6 +43,7 @@ import java.io.UncheckedIOException; import java.nio.file.Path; import java.util.Collections; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.equalTo; @@ -83,7 +85,7 @@ public void testNoopAfterRegularEngine() throws IOException { tracker.updateLocalCheckpoint(allocationId.getId(), i); } - flushAndTrimTranslog(engine); + engine.flush(true, true); long localCheckpoint = engine.getPersistedLocalCheckpoint(); long maxSeqNo = engine.getSeqNoStats(100L).getMaxSeqNo(); @@ -159,6 +161,45 @@ public void testNoOpEngineStats() throws Exception { } } + public void testTrimUnreferencedTranslogFiles() throws Exception { + final ReplicationTracker tracker = (ReplicationTracker) engine.config().getGlobalCheckpointSupplier(); + ShardRouting routing = TestShardRouting.newShardRouting("test", shardId.id(), "node", + null, true, ShardRoutingState.STARTED, allocationId); + IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build(); + tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table); + tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + final int numDocs = scaledRandomIntBetween(10, 3000); + for (int i = 0; i < numDocs; i++) { + engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); + if (rarely()) { + engine.flush(); + } + tracker.updateLocalCheckpoint(allocationId.getId(), i); + } + engine.flush(true, true); + + final String translogUuid = engine.getTranslog().getTranslogUUID(); + final long minFileGeneration = engine.getTranslog().getMinFileGeneration(); + final long currentFileGeneration = engine.getTranslog().currentFileGeneration(); + engine.close(); + + final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); + final Path translogPath = noOpEngine.config().getTranslogConfig().getTranslogPath(); + + final long lastCommitedTranslogGeneration; + try (Engine.IndexCommitRef indexCommitRef = noOpEngine.acquireLastIndexCommit(false)) { + Map lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData(); + lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertThat(lastCommitedTranslogGeneration, equalTo(currentFileGeneration)); + } + + assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(minFileGeneration)); + noOpEngine.trimUnreferencedTranslogFiles(); + assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(lastCommitedTranslogGeneration)); + noOpEngine.close(); + } + private void flushAndTrimTranslog(final InternalEngine engine) { engine.flush(true, true); final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy();