Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise clone operation for incremental full cluster snapshots #16296

Merged
merged 3 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove Identity FeatureFlag ([#16024](https:/opensearch-project/OpenSearch/pull/16024))
- Ensure RestHandler.Wrapper delegates all implementations to the wrapped handler ([#16154](https:/opensearch-project/OpenSearch/pull/16154))
- Code cleanup: Remove ApproximateIndexOrDocValuesQuery ([#16273](https:/opensearch-project/OpenSearch/pull/16273))

- Optimise clone operation for incremental full cluster snapshots ([#16296](https:/opensearch-project/OpenSearch/pull/16296))

### Deprecated

Expand Down
121 changes: 64 additions & 57 deletions server/src/main/java/org/opensearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,8 @@

private final Set<RepositoryShardId> currentlyCloning = Collections.synchronizedSet(new HashSet<>());

private void runReadyClone(
// Made to package private to be able to test the method in UTs
void runReadyClone(
Snapshot target,
SnapshotId sourceSnapshot,
ShardSnapshotStatus shardStatusBefore,
Expand All @@ -1351,69 +1352,75 @@
@Override
protected void doRun() {
final String localNodeId = clusterService.localNode().getId();
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
try {
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(
repositoryData,
if (remoteStoreIndexShallowCopy == false) {
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
executeClone(localNodeId, false);
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
} else {
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
try {
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(
repositoryData,
sourceSnapshot,
repoShardId.index()
);
final boolean cloneRemoteStoreIndexShardSnapshot = indexMetadata.getSettings()
.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
executeClone(localNodeId, cloneRemoteStoreIndexShardSnapshot);
} catch (IOException e) {
logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName());
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
}, this::onFailure));
}
}

private void executeClone(String localNodeId, boolean cloneRemoteStoreIndexShardSnapshot) {
if (currentlyCloning.add(repoShardId)) {
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
if (cloneRemoteStoreIndexShardSnapshot) {
repository.cloneRemoteStoreIndexShardSnapshot(
sourceSnapshot,
repoShardId.index()
target.getSnapshotId(),
repoShardId,
shardStatusBefore.generation(),
remoteStoreLockManagerFactory,
getCloneCompletionListener(localNodeId)
);
final boolean cloneRemoteStoreIndexShardSnapshot = remoteStoreIndexShallowCopy
&& indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
final SnapshotId targetSnapshot = target.getSnapshotId();
final ActionListener<String> listener = ActionListener.wrap(
generation -> innerUpdateSnapshotState(
new ShardSnapshotUpdate(
target,
repoShardId,
new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)
),
ActionListener.runBefore(
ActionListener.wrap(
v -> logger.trace(
"Marked [{}] as successfully cloned from [{}] to [{}]",
repoShardId,
sourceSnapshot,
targetSnapshot
),
e -> {
logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId);
failAllListenersOnMasterFailOver(e);
}
),
() -> currentlyCloning.remove(repoShardId)
)
),
e -> {
logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId);
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
} else {
repository.cloneShardSnapshot(
sourceSnapshot,
target.getSnapshotId(),
repoShardId,
shardStatusBefore.generation(),
getCloneCompletionListener(localNodeId)
);
if (currentlyCloning.add(repoShardId)) {
if (cloneRemoteStoreIndexShardSnapshot) {
repository.cloneRemoteStoreIndexShardSnapshot(
sourceSnapshot,
targetSnapshot,
}
}
}

private ActionListener<String> getCloneCompletionListener(String localNodeId) {
return ActionListener.wrap(
generation -> innerUpdateSnapshotState(
new ShardSnapshotUpdate(target, repoShardId, new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)),
ActionListener.runBefore(
ActionListener.wrap(
v -> logger.trace(

Check warning on line 1405 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1405

Added line #L1405 was not covered by tests
"Marked [{}] as successfully cloned from [{}] to [{}]",
repoShardId,
shardStatusBefore.generation(),
remoteStoreLockManagerFactory,
listener
);
} else {
repository.cloneShardSnapshot(
sourceSnapshot,
targetSnapshot,
repoShardId,
shardStatusBefore.generation(),
listener
);
}
}
} catch (IOException e) {
logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName());
target.getSnapshotId()

Check warning on line 1409 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1409

Added line #L1409 was not covered by tests
),
e -> {
logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId);
failAllListenersOnMasterFailOver(e);
}

Check warning on line 1414 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1412-L1414

Added lines #L1412 - L1414 were not covered by tests
),
() -> currentlyCloning.remove(repoShardId)

Check warning on line 1416 in server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java#L1416

Added line #L1416 was not covered by tests
)
),
e -> {
logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId);
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
}, this::onFailure));
);
}
});
}
Expand Down
Loading
Loading