Skip to content

Commit

Permalink
Optimise clone operation for incremental full cluster snapshots (open…
Browse files Browse the repository at this point in the history
…search-project#16296)

* Optimise clone operation for incremental full cluster snapshots

Signed-off-by: Ashish Singh <[email protected]>

* Add UTs

Signed-off-by: Ashish Singh <[email protected]>

* Add CHANGELOG

Signed-off-by: Ashish Singh <[email protected]>

---------

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Oct 14, 2024
1 parent 5947002 commit ee29108
Show file tree
Hide file tree
Showing 3 changed files with 504 additions and 58 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove identity-related feature flagged code from the RestController ([#15430](https:/opensearch-project/OpenSearch/pull/15430))
- Remove Identity FeatureFlag ([#16024](https:/opensearch-project/OpenSearch/pull/16024))
- Ensure RestHandler.Wrapper delegates all implementations to the wrapped handler ([#16154](https:/opensearch-project/OpenSearch/pull/16154))

- Optimise clone operation for incremental full cluster snapshots ([#16296](https:/opensearch-project/OpenSearch/pull/16296))

### Deprecated

Expand Down
121 changes: 64 additions & 57 deletions server/src/main/java/org/opensearch/snapshots/SnapshotsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1510,7 +1510,8 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS

private final Set<RepositoryShardId> currentlyCloning = Collections.synchronizedSet(new HashSet<>());

private void runReadyClone(
// Made to package private to be able to test the method in UTs
void runReadyClone(
Snapshot target,
SnapshotId sourceSnapshot,
ShardSnapshotStatus shardStatusBefore,
Expand All @@ -1534,69 +1535,75 @@ public void onFailure(Exception e) {
@Override
protected void doRun() {
final String localNodeId = clusterService.localNode().getId();
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
try {
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(
repositoryData,
if (remoteStoreIndexShallowCopy == false) {
executeClone(localNodeId, false);
} else {
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
try {
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(
repositoryData,
sourceSnapshot,
repoShardId.index()
);
final boolean cloneRemoteStoreIndexShardSnapshot = indexMetadata.getSettings()
.getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
executeClone(localNodeId, cloneRemoteStoreIndexShardSnapshot);
} catch (IOException e) {
logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName());
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
}, this::onFailure));
}
}

private void executeClone(String localNodeId, boolean cloneRemoteStoreIndexShardSnapshot) {
if (currentlyCloning.add(repoShardId)) {
if (cloneRemoteStoreIndexShardSnapshot) {
repository.cloneRemoteStoreIndexShardSnapshot(
sourceSnapshot,
repoShardId.index()
target.getSnapshotId(),
repoShardId,
shardStatusBefore.generation(),
remoteStoreLockManagerFactory,
getCloneCompletionListener(localNodeId)
);
final boolean cloneRemoteStoreIndexShardSnapshot = remoteStoreIndexShallowCopy
&& indexMetadata.getSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false);
final SnapshotId targetSnapshot = target.getSnapshotId();
final ActionListener<String> listener = ActionListener.wrap(
generation -> innerUpdateSnapshotState(
new ShardSnapshotUpdate(
target,
repoShardId,
new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)
),
ActionListener.runBefore(
ActionListener.wrap(
v -> logger.trace(
"Marked [{}] as successfully cloned from [{}] to [{}]",
repoShardId,
sourceSnapshot,
targetSnapshot
),
e -> {
logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId);
failAllListenersOnMasterFailOver(e);
}
),
() -> currentlyCloning.remove(repoShardId)
)
),
e -> {
logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId);
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
} else {
repository.cloneShardSnapshot(
sourceSnapshot,
target.getSnapshotId(),
repoShardId,
shardStatusBefore.generation(),
getCloneCompletionListener(localNodeId)
);
if (currentlyCloning.add(repoShardId)) {
if (cloneRemoteStoreIndexShardSnapshot) {
repository.cloneRemoteStoreIndexShardSnapshot(
sourceSnapshot,
targetSnapshot,
}
}
}

private ActionListener<String> getCloneCompletionListener(String localNodeId) {
return ActionListener.wrap(
generation -> innerUpdateSnapshotState(
new ShardSnapshotUpdate(target, repoShardId, new ShardSnapshotStatus(localNodeId, ShardState.SUCCESS, generation)),
ActionListener.runBefore(
ActionListener.wrap(
v -> logger.trace(
"Marked [{}] as successfully cloned from [{}] to [{}]",
repoShardId,
shardStatusBefore.generation(),
remoteStoreLockManagerFactory,
listener
);
} else {
repository.cloneShardSnapshot(
sourceSnapshot,
targetSnapshot,
repoShardId,
shardStatusBefore.generation(),
listener
);
}
}
} catch (IOException e) {
logger.warn("Failed to get index-metadata from repository data for index [{}]", repoShardId.index().getName());
target.getSnapshotId()
),
e -> {
logger.warn("Cluster state update after successful shard clone [{}] failed", repoShardId);
failAllListenersOnMasterFailOver(e);
}
),
() -> currentlyCloning.remove(repoShardId)
)
),
e -> {
logger.warn("Exception [{}] while trying to clone shard [{}]", e, repoShardId);
failCloneShardAndUpdateClusterState(target, sourceSnapshot, repoShardId);
}
}, this::onFailure));
);
}
});
}
Expand Down
Loading

0 comments on commit ee29108

Please sign in to comment.