Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trim translog for closed indices #43156

Merged
merged 6 commits into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,11 @@ final class AsyncTrimTranslogTask extends BaseAsyncTask {
.getSettings().getAsTime(INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING, TimeValue.timeValueMinutes(10)));
}

@Override
protected boolean mustReschedule() {
return indexService.closed.get() == false;
}

@Override
protected void runInternal() {
indexService.maybeTrimTranslog();
Expand Down Expand Up @@ -1031,8 +1036,8 @@ AsyncTranslogFSync getFsyncTask() { // for tests
return fsyncTask;
}

AsyncGlobalCheckpointTask getGlobalCheckpointTask() {
return globalCheckpointTask;
AsyncTrimTranslogTask getTrimTranslogTask() { // for tests
return trimTranslogTask;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,24 @@
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

/**
* NoOpEngine is an engine implementation that does nothing but the bare minimum
* required in order to have an engine. All attempts to do something (search,
* index, get), throw {@link UnsupportedOperationException}.
* index, get), throw {@link UnsupportedOperationException}. However, NoOpEngine
* allows to trim any existing translog files through the usage of the
* {{@link #trimUnreferencedTranslogFiles()}} method.
*/
public final class NoOpEngine extends ReadOnlyEngine {

Expand Down Expand Up @@ -116,4 +124,52 @@ public SegmentsStats segmentsStats(boolean includeSegmentFileSizes, boolean incl
return super.segmentsStats(includeSegmentFileSizes, includeUnloadedSegments);
}
}

/**
* This implementation will trim existing translog files using a {@link TranslogDeletionPolicy}
* that retains nothing but the last translog generation from safe commit.
*/
@Override
public void trimUnreferencedTranslogFiles() {
final Store store = this.engineConfig.getStore();
store.incRef();
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
if (commits.size() == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we guaranteed to have this on a successful close? How do we know that previous commits have been cleaned up on verify-before-close?

final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid);

if (minTranslogGeneration < lastCommitGeneration) {
// a translog deletion policy that retains nothing but the last translog generation from safe commit
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1);
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration);

try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
}
}
}
} catch (final Exception e) {
try {
failEngine("translog trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog", e);
} finally {
store.decRef();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,15 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.index.shard.IndexShardTestCase.getEngine;
import static org.elasticsearch.test.InternalSettingsPlugin.TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.core.IsEqual.equalTo;
Expand Down Expand Up @@ -370,7 +373,7 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception {
.build();
IndexService indexService = createIndex("test", settings);
ensureGreen("test");
assertTrue(indexService.getRefreshTask().mustReschedule());
assertTrue(indexService.getTrimTranslogTask().mustReschedule());
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
client().admin().indices().prepareFlush("test").get();
client().admin().indices().prepareUpdateSettings("test")
Expand All @@ -382,6 +385,48 @@ public void testAsyncTranslogTrimActuallyWorks() throws Exception {
assertBusy(() -> assertThat(IndexShardTestCase.getTranslog(shard).totalOperations(), equalTo(0)));
}

public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception {
final String indexName = "test";
IndexService indexService = createIndex(indexName, Settings.builder()
.put(TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.build());

Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
final Path translogPath = translog.getConfig().getTranslogPath();
final String translogUuid = translog.getTranslogUUID();

final int numDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex().setIndex(indexName).setId(String.valueOf(i)).setSource("{\"foo\": \"bar\"}", XContentType.JSON).get();
if (randomBoolean()) {
client().admin().indices().prepareFlush(indexName).get();
}
}
assertThat(translog.totalOperations(), equalTo(numDocs));
assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(numDocs));
assertAcked(client().admin().indices().prepareClose("test"));

indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
assertTrue(indexService.getTrimTranslogTask().mustReschedule());

final long lastCommitedTranslogGeneration;
try (Engine.IndexCommitRef indexCommitRef = getEngine(indexService.getShard(0)).acquireLastIndexCommit(false)) {
Map<String, String> lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData();
lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertBusy(() -> {
long minTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUuid);
assertThat(minTranslogGen, equalTo(lastCommitedTranslogGeneration));
});
tlrx marked this conversation as resolved.
Show resolved Hide resolved

assertAcked(client().admin().indices().prepareOpen("test"));

indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index());
translog = IndexShardTestCase.getTranslog(indexService.getShard(0));
assertThat(translog.totalOperations(), equalTo(0));
assertThat(translog.stats().estimatedNumberOfOperations(), equalTo(0));
}

public void testIllegalFsyncInterval() {
Settings settings = Settings.builder()
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "0ms") // disable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.test.IndexSettingsModule;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -83,7 +85,7 @@ public void testNoopAfterRegularEngine() throws IOException {
tracker.updateLocalCheckpoint(allocationId.getId(), i);
}

flushAndTrimTranslog(engine);
engine.flush(true, true);

long localCheckpoint = engine.getPersistedLocalCheckpoint();
long maxSeqNo = engine.getSeqNoStats(100L).getMaxSeqNo();
Expand Down Expand Up @@ -159,6 +161,45 @@ public void testNoOpEngineStats() throws Exception {
}
}

public void testTrimUnreferencedTranslogFiles() throws Exception {
final ReplicationTracker tracker = (ReplicationTracker) engine.config().getGlobalCheckpointSupplier();
ShardRouting routing = TestShardRouting.newShardRouting("test", shardId.id(), "node",
null, true, ShardRoutingState.STARTED, allocationId);
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build();
tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table);
tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);

final int numDocs = scaledRandomIntBetween(10, 3000);
for (int i = 0; i < numDocs; i++) {
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
if (rarely()) {
engine.flush();
}
tracker.updateLocalCheckpoint(allocationId.getId(), i);
}
engine.flush(true, true);

final String translogUuid = engine.getTranslog().getTranslogUUID();
final long minFileGeneration = engine.getTranslog().getMinFileGeneration();
final long currentFileGeneration = engine.getTranslog().currentFileGeneration();
engine.close();

final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker));
final Path translogPath = noOpEngine.config().getTranslogConfig().getTranslogPath();

final long lastCommitedTranslogGeneration;
try (Engine.IndexCommitRef indexCommitRef = noOpEngine.acquireLastIndexCommit(false)) {
Map<String, String> lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData();
lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY));
assertThat(lastCommitedTranslogGeneration, equalTo(currentFileGeneration));
}

assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(minFileGeneration));
noOpEngine.trimUnreferencedTranslogFiles();
assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(lastCommitedTranslogGeneration));
noOpEngine.close();
}

private void flushAndTrimTranslog(final InternalEngine engine) {
engine.flush(true, true);
final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy();
Expand Down