diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java index 927dbf9995778..70e283791fc3e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteRestoreSnapshotIT.java @@ -938,17 +938,8 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio Thread thread = new Thread(() -> { try { String snapshotName = "snapshot-concurrent-" + snapshotIndex; - CreateSnapshotResponse createSnapshotResponse2 = client().admin() - .cluster() - .prepareCreateSnapshot(snapshotRepoName, snapshotName) - .setWaitForCompletion(true) - .get(); - SnapshotInfo snapshotInfo = createSnapshotResponse2.getSnapshotInfo(); - assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); - assertThat(snapshotInfo.successfulShards(), greaterThan(0)); - assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); - assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName)); - assertThat(snapshotInfo.getPinnedTimestamp(), greaterThan(0L)); + client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, snapshotName).setWaitForCompletion(true).get(); + logger.info("Snapshot completed {}", snapshotName); } catch (Exception e) {} }); threads.add(thread); @@ -963,15 +954,19 @@ public void testConcurrentSnapshotV2CreateOperation() throws InterruptedExceptio thread.join(); } - // Validate that only one snapshot has been created + // Sleeping 10 sec for earlier created snapshot to complete runNextQueuedOperation and be ready for next snapshot + // We can't put `waitFor` since we don't have visibility on its completion + Thread.sleep(TimeValue.timeValueSeconds(10).seconds()); + client().admin().cluster().prepareCreateSnapshot(snapshotRepoName, "snapshot-cleanup-timestamp").setWaitForCompletion(true).get(); Repository repository = internalCluster().getInstance(RepositoriesService.class).repository(snapshotRepoName); PlainActionFuture repositoryDataPlainActionFuture = new PlainActionFuture<>(); repository.getRepositoryData(repositoryDataPlainActionFuture); - RepositoryData repositoryData = repositoryDataPlainActionFuture.get(); - assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(1)); - forceSyncPinnedTimestamps(); - assertEquals(RemoteStorePinnedTimestampService.getPinnedEntities().size(), repositoryData.getSnapshotIds().size()); + assertThat(repositoryData.getSnapshotIds().size(), greaterThanOrEqualTo(2)); + waitUntil(() -> { + forceSyncPinnedTimestamps(); + return RemoteStorePinnedTimestampService.getPinnedEntities().size() == repositoryData.getSnapshotIds().size(); + }); } public void testConcurrentSnapshotV2CreateOperation_MasterChange() throws Exception { diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index 2df094ebe540b..6688c7dd0431a 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -79,6 +79,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; +import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; import org.opensearch.common.collect.Tuple; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; @@ -466,34 +467,35 @@ public TimeValue timeout() { * @param listener snapshot creation listener */ public void createSnapshotV2(final CreateSnapshotRequest request, final ActionListener listener) { - long pinnedTimestamp = System.currentTimeMillis(); final String repositoryName = request.repository(); final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); + validate(repositoryName, snapshotName); + + final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot + Snapshot snapshot = new Snapshot(repositoryName, snapshotId); + long pinnedTimestamp = System.currentTimeMillis(); + try { + updateSnapshotPinnedTimestamp(snapshot, pinnedTimestamp); + } catch (Exception e) { + listener.onFailure(e); + return; + } Repository repository = repositoriesService.repository(repositoryName); - validate(repositoryName, snapshotName); repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.URGENT) { private SnapshotsInProgress.Entry newEntry; - - private SnapshotId snapshotId; - - private Snapshot snapshot; - boolean enteredLoop; @Override public ClusterState execute(ClusterState currentState) { // move to in progress - snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot Repository repository = repositoriesService.repository(repositoryName); - if (repository.isReadOnly()) { listener.onFailure( new RepositoryException(repository.getMetadata().name(), "cannot create snapshot-v2 in a readonly repository") ); } - snapshot = new Snapshot(repositoryName, snapshotId); final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); createSnapshotPreValidations(currentState, repositoryData, repositoryName, snapshotName); @@ -593,59 +595,46 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl pinnedTimestamp ); final Version version = minCompatibleVersion(newState.nodes().getMinNodeVersion(), repositoryData, null); - final StepListener pinnedTimestampListener = new StepListener<>(); - pinnedTimestampListener.whenComplete(repoData -> { - repository.finalizeSnapshot( - shardGenerations, - repositoryData.getGenId(), - metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()), - snapshotInfo, - version, - state -> stateWithoutSnapshot(state, snapshot), - Priority.IMMEDIATE, - new ActionListener() { - @Override - public void onResponse(RepositoryData repositoryData) { - if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) { - leaveRepoLoop(repositoryName); - failSnapshotCompletionListeners( - snapshot, - new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager") - ); - listener.onFailure( - new SnapshotException( - repositoryName, - snapshotName, - "Aborting snapshot-v2, no longer cluster manager" - ) - ); - return; - } - listener.onResponse(snapshotInfo); - // For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations - // can get queued . This is triggering them. - runNextQueuedOperation(repositoryData, repositoryName, true); - cleanOrphanTimestamp(repositoryName, repositoryData); - } - - @Override - public void onFailure(Exception e) { - logger.error("Failed to finalize snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName); + repository.finalizeSnapshot( + shardGenerations, + repositoryData.getGenId(), + metadataForSnapshot(newState.metadata(), request.includeGlobalState(), false, dataStreams, newEntry.indices()), + snapshotInfo, + version, + state -> stateWithoutSnapshot(state, snapshot), + Priority.IMMEDIATE, + new ActionListener() { + @Override + public void onResponse(RepositoryData repositoryData) { + if (clusterService.state().nodes().isLocalNodeElectedClusterManager() == false) { leaveRepoLoop(repositoryName); - // cleaning up in progress snapshot here - stateWithoutSnapshotV2(newState); - listener.onFailure(e); + failSnapshotCompletionListeners( + snapshot, + new SnapshotException(snapshot, "Aborting snapshot-v2, no longer cluster manager") + ); + listener.onFailure( + new SnapshotException(repositoryName, snapshotName, "Aborting snapshot-v2, no longer cluster manager") + ); + return; } + listener.onResponse(snapshotInfo); + logger.info("created snapshot-v2 [{}] in repository [{}]", repositoryName, snapshotName); + // For snapshot-v2, we don't allow concurrent snapshots . But meanwhile non-v2 snapshot operations + // can get queued . This is triggering them. + runNextQueuedOperation(repositoryData, repositoryName, true); + cleanOrphanTimestamp(repositoryName, repositoryData); } - ); - }, e -> { - logger.error("Failed to update pinned timestamp for snapshot-v2 {} {} {} ", repositoryName, snapshotName, e); - leaveRepoLoop(repositoryName); - // cleaning up in progress snapshot here - stateWithoutSnapshotV2(newState); - listener.onFailure(e); - }); - updateSnapshotPinnedTimestamp(repositoryData, snapshot, pinnedTimestamp, pinnedTimestampListener); + + @Override + public void onFailure(Exception e) { + logger.error("Failed to finalize snapshot repo {} for snapshot-v2 {} ", repositoryName, snapshotName); + leaveRepoLoop(repositoryName); + // cleaning up in progress snapshot here + stateWithoutSnapshotV2(newState); + listener.onFailure(e); + } + } + ); } @Override @@ -733,30 +722,30 @@ private void createSnapshotPreValidations( ensureNoCleanupInProgress(currentState, repositoryName, snapshotName); } - private void updateSnapshotPinnedTimestamp( - RepositoryData repositoryData, - Snapshot snapshot, - long timestampToPin, - ActionListener listener - ) { + private void updateSnapshotPinnedTimestamp(Snapshot snapshot, long timestampToPin) throws Exception { + CountDownLatch latch = new CountDownLatch(1); + SetOnce ex = new SetOnce<>(); + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.debug("Timestamp pinned successfully for snapshot {}", snapshot.getSnapshotId().getName()); + } + + @Override + public void onFailure(Exception e) { + logger.error("Failed to pin timestamp for snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e); + ex.set(e); + } + }; remoteStorePinnedTimestampService.pinTimestamp( timestampToPin, getPinningEntity(snapshot.getRepository(), snapshot.getSnapshotId().getUUID()), - new ActionListener() { - @Override - public void onResponse(Void unused) { - logger.debug("Timestamp pinned successfully for snapshot {}", snapshot.getSnapshotId().getName()); - listener.onResponse(repositoryData); - } - - @Override - public void onFailure(Exception e) { - logger.error("Failed to pin timestamp for snapshot {} with exception {}", snapshot.getSnapshotId().getName(), e); - listener.onFailure(e); - - } - } + new LatchedActionListener<>(listener, latch) ); + latch.await(); + if (ex.get() != null) { + throw ex.get(); + } } public static String getPinningEntity(String repositoryName, String snapshotUUID) {