Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remove] TrimUnsafeCommit logic for legacy 6.x indexes #2225

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,6 @@ public void upgradeAllNodesAndPluginsToNextVersion(List<Provider<RegularFile>> p
writeUnicastHostsFiles();
}

public void fullRestart() {
stop(false);
start();
}

public void nextNodeToNextVersion() {
OpenSearchNode node = upgradeNodeToNextVersion();
node.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1335,45 +1335,6 @@ public void testTurnOffTranslogRetentionAfterUpgraded() throws Exception {
}
}

public void testRecoveryWithTranslogRetentionDisabled() throws Exception {
kartg marked this conversation as resolved.
Show resolved Hide resolved
if (isRunningAgainstOldCluster()) {
final Settings.Builder settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1);
if (minimumNodeVersion().before(Version.V_2_0_0)) {
settings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), randomBoolean());
}
if (randomBoolean()) {
settings.put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1");
}
if (randomBoolean()) {
settings.put(IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING.getKey(), "1kb");
}
createIndex(index, settings.build());
ensureGreen(index);
int numDocs = randomIntBetween(0, 100);
for (int i = 0; i < numDocs; i++) {
indexDocument(Integer.toString(i));
if (rarely()) {
flush(index, randomBoolean());
}
}
client().performRequest(new Request("POST", "/" + index + "/_refresh"));
if (randomBoolean()) {
ensurePeerRecoveryRetentionLeasesRenewedAndSynced(index);
}
if (randomBoolean()) {
flush(index, randomBoolean());
} else if (randomBoolean()) {
syncedFlush(index, randomBoolean());
}
saveInfoDocument("doc_count", Integer.toString(numDocs));
}
ensureGreen(index);
final int numDocs = Integer.parseInt(loadInfoDocument("doc_count"));
assertTotalHits(numDocs, entityAsMap(client().performRequest(new Request("GET", "/" + index + "/_search"))));
}

public void testResize() throws Exception {
int numDocs;
if (isRunningAgainstOldCluster()) {
Expand Down
117 changes: 0 additions & 117 deletions qa/translog-policy/build.gradle

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogCorruptedException;
Expand All @@ -115,7 +114,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -252,7 +250,7 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
trimUnsafeCommits(engineConfig);
store.trimUnsafeCommits(engineConfig.getTranslogConfig().getTranslogPath());
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier(), seqNo -> {
final LocalCheckpointTracker tracker = getLocalCheckpointTracker();
assert tracker != null || getTranslog().isOpen() == false;
Expand Down Expand Up @@ -2955,15 +2953,6 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a
return true;
}

private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOException {
final Store store = engineConfig.getStore();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final Path translogPath = engineConfig.getTranslogConfig().getTranslogPath();
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
final long minRetainedTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUUID);
store.trimUnsafeCommits(globalCheckpoint, minRetainedTranslogGen, engineConfig.getIndexSettings().getIndexVersionCreated());
}

/**
* Restores the live version map and local checkpoint of this engine using documents (including soft-deleted)
* after the local checkpoint in the safe commit. This step ensures the live version map and checkpoint tracker
Expand Down
25 changes: 7 additions & 18 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1597,27 +1597,16 @@ public void ensureIndexHasHistoryUUID() throws IOException {
* commit on the replica will cause exception as the new last commit c3 will have recovery_translog_gen=1. The recovery
* translog generation of a commit is calculated based on the current local checkpoint. The local checkpoint of c3 is 1
* while the local checkpoint of c2 is 2.
* <p>
* 3. Commit without translog can be used in recovery. An old index, which was created before multiple-commits is introduced
* (v6.2), may not have a safe commit. If that index has a snapshotted commit without translog and an unsafe commit,
* the policy can consider the snapshotted commit as a safe commit for recovery even the commit does not have translog.
*/
public void trimUnsafeCommits(
final long lastSyncedGlobalCheckpoint,
final long minRetainedTranslogGen,
final org.opensearch.Version indexVersionCreated
) throws IOException {
public void trimUnsafeCommits(final Path translogPath) throws IOException {
metadataLock.writeLock().lock();
try {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(directory);
if (existingCommits.isEmpty()) {
throw new IllegalArgumentException("No index found to trim");
}
final IndexCommit lastIndexCommitCommit = existingCommits.get(existingCommits.size() - 1);
final String translogUUID = lastIndexCommitCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY);
final IndexCommit startingIndexCommit;
// TODO: Asserts the starting commit is a safe commit once peer-recovery sets global checkpoint.
startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
assert existingCommits.isEmpty() == false : "No index found to trim";
final IndexCommit lastIndexCommit = existingCommits.get(existingCommits.size() - 1);
final String translogUUID = lastIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be fetching the translogUUID as:

readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY)

instead?

Otherwise, based my code reading, startingIndexCommit will always be the same as lastIndexCommit, making the two if-conditions in this method dead code.

Copy link
Collaborator Author

@nknize nknize Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll need to dig a little deeper but I think we want to trim in the corner case that the two are not equal (e.g., the commits are unsafe)? Note: this is just a rote refactor of the existing code that was in the InternalEngine.trimUnsafeCommits. We can dig a little deeper in the test case and address in a followup?

Copy link
Member

@kartg kartg Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that this change doesn't look like an equivalent refactor.

Previously, InternalEngine computed the lastSyncedGlobalCheckpoint using the translog UUID from the latest SegmentInfos. Meanwhile, lastIndexCommit is the last/oldest commit. With this change, lastSyncedGlobalCheckpoint is now derived through lastIndexCommit rather than the latest one, and the if-conditions have not changed.

I recognize that I don't fully understand the unsafe commit check. So is this still the right check to make?

Note - both DirectoryReader.listCommits above and store.readLastCommittedSegmentsInfo() in InternalEngine start with the same method call - SegmentInfos.readLatestCommit() on the directory. The difference is that the former not only returns the latest commit, but also fetches commits from other segments_N files the directory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I take that back. I just realized that I was mis-interpreting the logic in DirectoryReader.listCommits - the last commit isn't the oldest but rather the most recent. Feel free to ignore my rambling above :)

final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
final IndexCommit startingIndexCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);

if (translogUUID.equals(startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY)) == false) {
throw new IllegalStateException(
Expand All @@ -1628,7 +1617,7 @@ public void trimUnsafeCommits(
+ "]"
);
}
if (startingIndexCommit.equals(lastIndexCommitCommit) == false) {
if (startingIndexCommit.equals(lastIndexCommit) == false) {
try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) {
// this achieves two things:
// - by committing a new commit based on the starting commit, it make sure the starting commit will be opened
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6094,7 +6094,7 @@ public void testTrimUnsafeCommits() throws Exception {
minTranslogGen = engine.getTranslog().getMinFileGeneration();
}

store.trimUnsafeCommits(globalCheckpoint.get(), minTranslogGen, config.getIndexSettings().getIndexVersionCreated());
store.trimUnsafeCommits(config.getTranslogConfig().getTranslogPath());
long safeMaxSeqNo = commitMaxSeqNo.stream()
.filter(s -> s <= globalCheckpoint.get())
.reduce((s1, s2) -> s2) // get the last one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,10 +968,7 @@ protected static void createIndex(String name, Settings settings, String mapping
entity += "}";
if (settings.getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) == false) {
expectSoftDeletesWarning(request, name);
} else if (settings.hasValue(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey())
|| settings.hasValue(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey())) {
expectTranslogRetentionWarning(request);
}
}
request.setJsonEntity(entity);
client().performRequest(request);
}
Expand Down Expand Up @@ -1025,21 +1022,6 @@ protected static void expectSoftDeletesWarning(Request request, String indexName
}
}

protected static void expectTranslogRetentionWarning(Request request) {
final List<String> expectedWarnings = Collections.singletonList(
"Translog retention settings [index.translog.retention.age] "
+ "and [index.translog.retention.size] are deprecated and effectively ignored. They will be removed in a future version."
);
final Builder requestOptions = RequestOptions.DEFAULT.toBuilder();
if (nodeVersions.stream().allMatch(version -> version.onOrAfter(LegacyESVersion.V_7_7_0))) {
requestOptions.setWarningsHandler(warnings -> warnings.equals(expectedWarnings) == false);
request.setOptions(requestOptions);
} else if (nodeVersions.stream().anyMatch(version -> version.onOrAfter(LegacyESVersion.V_7_7_0))) {
requestOptions.setWarningsHandler(warnings -> warnings.isEmpty() == false && warnings.equals(expectedWarnings) == false);
request.setOptions(requestOptions);
}
}

protected static Map<String, Object> getIndexSettings(String index) throws IOException {
Request request = new Request("GET", "/" + index + "/_settings");
request.addParameter("flat_settings", "true");
Expand Down