From 2eb5e97a62b8c76d4e8a2b9e6ca3e5c74120f434 Mon Sep 17 00:00:00 2001 From: INDRAJIT BANERJEE Date: Fri, 30 Jun 2023 17:11:10 +0530 Subject: [PATCH 01/16] Moving get snapshot requests to listener based async calls Signed-off-by: INDRAJIT BANERJEE --- .../get/TransportGetSnapshotsAction.java | 88 ++++++++++--------- .../snapshots/SnapshotResiliencyTests.java | 13 +++ 2 files changed, 59 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index b5445bf544cc6..8dece707d3423 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -37,8 +37,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.CollectionUtil; import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.SnapshotsInProgress; @@ -138,57 +138,61 @@ protected void clusterManagerOperation( currentSnapshots.add(snapshotInfo); } - final RepositoryData repositoryData; - if (isCurrentSnapshotsOnly(request.snapshots()) == false) { - repositoryData = PlainActionFuture.get(fut -> repositoriesService.getRepositoryData(repository, fut)); - for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { - allSnapshotIds.put(snapshotId.getName(), snapshotId); + final StepListener repositoryDataListener = new StepListener<>(); + repositoriesService.getRepositoryData(repository, repositoryDataListener); + repositoryDataListener.whenComplete(repositoryData -> { + if (isCurrentSnapshotsOnly(request.snapshots()) == false) { + for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { + allSnapshotIds.put(snapshotId.getName(), snapshotId); + } + } else { + repositoryData = null; } - } else { - repositoryData = null; - } - final Set toResolve = new HashSet<>(); - if (isAllSnapshots(request.snapshots())) { - toResolve.addAll(allSnapshotIds.values()); - } else { - for (String snapshotOrPattern : request.snapshots()) { - if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { - toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList())); - } else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) { - if (allSnapshotIds.containsKey(snapshotOrPattern)) { - toResolve.add(allSnapshotIds.get(snapshotOrPattern)); - } else if (request.ignoreUnavailable() == false) { - throw new SnapshotMissingException(repository, snapshotOrPattern); - } - } else { - for (Map.Entry entry : allSnapshotIds.entrySet()) { - if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { - toResolve.add(entry.getValue()); + final Set toResolve = new HashSet<>(); + if (isAllSnapshots(request.snapshots())) { + toResolve.addAll(allSnapshotIds.values()); + } else { + for (String snapshotOrPattern : request.snapshots()) { + if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { + toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList())); + } else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) { + if (allSnapshotIds.containsKey(snapshotOrPattern)) { + toResolve.add(allSnapshotIds.get(snapshotOrPattern)); + } else if (request.ignoreUnavailable() == false) { + throw new SnapshotMissingException(repository, snapshotOrPattern); + } + } else { + for (Map.Entry entry : allSnapshotIds.entrySet()) { + if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { + toResolve.add(entry.getValue()); + } } } } - } - if (toResolve.isEmpty() && request.ignoreUnavailable() == false && isCurrentSnapshotsOnly(request.snapshots()) == false) { - throw new SnapshotMissingException(repository, request.snapshots()[0]); + if (toResolve.isEmpty() + && request.ignoreUnavailable() == false + && isCurrentSnapshotsOnly(request.snapshots()) == false) { + throw new SnapshotMissingException(repository, request.snapshots()[0]); + } } - } - final List snapshotInfos; - if (request.verbose()) { - snapshotInfos = snapshots(snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable()); - } else { - if (repositoryData != null) { - // want non-current snapshots as well, which are found in the repository data - snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots); + final List snapshotInfos; + if (request.verbose()) { + snapshotInfos = snapshots(snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable()); } else { - // only want current snapshots - snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList()); - CollectionUtil.timSort(snapshotInfos); + if (repositoryData != null) { + // want non-current snapshots as well, which are found in the repository data + snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots); + } else { + // only want current snapshots + snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList()); + CollectionUtil.timSort(snapshotInfos); + } } - } - listener.onResponse(new GetSnapshotsResponse(snapshotInfos)); + listener.onResponse(new GetSnapshotsResponse(snapshotInfos)); + }, listener::onFailure); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 0bb2b604e8f1a..b0f5adce58ab8 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -56,6 +56,8 @@ import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsAction; +import org.opensearch.action.admin.cluster.snapshots.get.TransportGetSnapshotsAction; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; @@ -2136,6 +2138,17 @@ public void onFailure(final Exception e) { indexNameExpressionResolver ) ); + actions.put( + GetSnapshotsAction.INSTANCE, + new TransportGetSnapshotsAction( + transportService, + clusterService, + threadPool, + repositoriesService, + actionFilters, + indexNameExpressionResolver + ) + ); actions.put( ClusterStateAction.INSTANCE, new TransportClusterStateAction( From 04569cfda0fa48b1ba6c4723f7d1ed29b182081f Mon Sep 17 00:00:00 2001 From: INDRAJIT BANERJEE Date: Sun, 2 Jul 2023 20:50:40 +0530 Subject: [PATCH 02/16] Adding test coverage for TransportGetSnapshotsAction Signed-off-by: INDRAJIT BANERJEE --- .../repositories/RepositoriesService.java | 15 +- .../get/GetSnapshotsActionTests.java | 245 ++++++++++++++++++ .../RepositoriesServiceTests.java | 18 +- 3 files changed, 268 insertions(+), 10 deletions(-) create mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsActionTests.java diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 9c56d172f2ea1..ec780e0765ea0 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -149,6 +149,19 @@ public RepositoriesService( deleteRepositoryTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_REPOSITORY_KEY, true); } + public RepositoriesService repositories(Map repositories) { + this.repositories = repositories; + return this; + } + + public Map repositories() { + return this.repositories; + } + + public Map typesRegistry() { + return this.typesRegistry; + } + /** * Registers new repository in the cluster *

@@ -576,7 +589,7 @@ private void archiveRepositoryStats(Repository repository, long clusterStateVers /** * Creates repository holder. This method starts the repository */ - private Repository createRepository(RepositoryMetadata repositoryMetadata, Map factories) { + public Repository createRepository(RepositoryMetadata repositoryMetadata, Map factories) { logger.debug("creating repository [{}][{}]", repositoryMetadata.type(), repositoryMetadata.name()); Repository.Factory factory = factories.get(repositoryMetadata.type()); if (factory == null) { diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsActionTests.java new file mode 100644 index 0000000000000..b535c85a5f2df --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsActionTests.java @@ -0,0 +1,245 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.snapshots.get; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.IndicesRequest; + +import org.opensearch.action.get.MultiGetAction; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.service.ClusterApplierService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.index.Index; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.RepositoriesServiceTests; +import org.opensearch.repositories.Repository; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskId; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.CapturingTransport; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; +import org.opensearch.action.ActionRequestValidationException; +import org.junit.After; +import org.junit.Before; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; + +public class GetSnapshotsActionTests extends OpenSearchTestCase { + private TransportService transportService; + private ClusterService clusterService; + private ThreadPool threadPool; + private RepositoriesService repositoriesService; + private final String[] snapshotsName1 = { "test_snapshot" }; + private final String[] currSnapshotOnly = { GetSnapshotsRequest.CURRENT_SNAPSHOT }; + private final String[] snapshotsName2 = { "test_snapshot1", "test_snapshot2" }; + private final String repoName = "test_repo"; + private final String indexName = "test_index"; + private static final String repoType = "internal"; + + private GetSnapshotsActionTests.TestTransportGetSnapshotsAction getSnapshotsAction; + + public class TestTransportGetSnapshotsAction extends TransportGetSnapshotsAction { + TestTransportGetSnapshotsAction() { + super( + GetSnapshotsActionTests.this.transportService, + GetSnapshotsActionTests.this.clusterService, + GetSnapshotsActionTests.this.threadPool, + repositoriesService, + new ActionFilters(Collections.emptySet()), + new GetSnapshotsActionTests.Resolver() + ); + } + + @Override + protected void clusterManagerOperation( + GetSnapshotsRequest request, + ClusterState state, + ActionListener listener + ) { + ClusterState stateWithIndex = ClusterStateCreationUtils.state(indexName, 1, 1); + super.clusterManagerOperation(request, stateWithIndex, listener); + } + } + + @Before + public void setUp() throws Exception { + super.setUp(); + + threadPool = new TestThreadPool("GetSnapshotsActionTests"); + clusterService = createClusterService(threadPool); + CapturingTransport capturingTransport = new CapturingTransport(); + transportService = capturingTransport.createTransportService( + clusterService.getSettings(), + threadPool, + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + boundAddress -> clusterService.localNode(), + null, + Collections.emptySet() + ); + + Map typesRegistry = GetSnapshotsActionTests.getTypesRegistry(threadPool); + repositoriesService = new RepositoriesService( + Settings.EMPTY, + clusterService, + transportService, + typesRegistry, + typesRegistry, + threadPool + ); + repositoriesService.start(); + transportService.start(); + transportService.acceptIncomingRequests(); + getSnapshotsAction = new GetSnapshotsActionTests.TestTransportGetSnapshotsAction(); + } + + private static Map getTypesRegistry(ThreadPool threadPool) { + final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class); + when(clusterApplierService.threadPool()).thenReturn(threadPool); + final ClusterService clusterServiceMock = mock(ClusterService.class); + when(clusterServiceMock.getClusterApplierService()).thenReturn(clusterApplierService); + Map typesRegistry = Map.of( + RepositoriesServiceTests.TestRepository.TYPE, + RepositoriesServiceTests.TestRepository::new, + RepositoriesServiceTests.MeteredRepositoryTypeA.TYPE, + metadata -> new RepositoriesServiceTests.MeteredRepositoryTypeA(metadata, clusterServiceMock), + RepositoriesServiceTests.MeteredRepositoryTypeB.TYPE, + metadata -> new RepositoriesServiceTests.MeteredRepositoryTypeB(metadata, clusterServiceMock) + ); + + return typesRegistry; + } + + @After + public void tearDown() throws Exception { + ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); + threadPool = null; + clusterService.close(); + super.tearDown(); + } + + public void testRepoMissing() { + GetSnapshotsRequest repoMissingRequest = new GetSnapshotsRequest(); + getSnapshotsAction.execute(null, repoMissingRequest, ActionListener.wrap(repoMissingResponse -> { + assertThrows(ActionRequestValidationException.class, () -> { repoMissingResponse.getSnapshots(); }); + }, exception -> { assertTrue(exception.getMessage().contains("repository is missing")); })); + + repoMissingRequest = new GetSnapshotsRequest().snapshots(snapshotsName1); + + getSnapshotsAction.execute(null, repoMissingRequest, ActionListener.wrap(repoMissingResponse -> { + assertThrows(ActionRequestValidationException.class, () -> { repoMissingResponse.getSnapshots(); }); + }, exception -> { assertTrue(exception.getMessage().contains("repository is missing")); })); + + repoMissingRequest = new GetSnapshotsRequest().ignoreUnavailable(true); + + getSnapshotsAction.execute(null, repoMissingRequest, ActionListener.wrap(repoMissingResponse -> { + assertThrows(ActionRequestValidationException.class, () -> { repoMissingResponse.getSnapshots(); }); + }, exception -> { assertTrue(exception.getMessage().contains("repository is missing")); })); + + repoMissingRequest = new GetSnapshotsRequest().verbose(true); + + getSnapshotsAction.execute(null, repoMissingRequest, ActionListener.wrap(repoMissingResponse -> { + assertThrows(ActionRequestValidationException.class, () -> { repoMissingResponse.getSnapshots(); }); + }, exception -> { assertTrue(exception.getMessage().contains("repository is missing")); })); + + repoMissingRequest = new GetSnapshotsRequest().verbose(false); + + getSnapshotsAction.execute(null, repoMissingRequest, ActionListener.wrap(repoMissingResponse -> { + assertThrows(ActionRequestValidationException.class, () -> { repoMissingResponse.getSnapshots(); }); + }, exception -> { assertTrue(exception.getMessage().contains("repository is missing")); })); + } + + public void testCurrentRepoSnapshot() { + final Task task = createTask(); + RepositoryMetadata repositoryMetadata = new RepositoryMetadata(repoName, repoType, Settings.EMPTY); + Repository repository = repositoriesService.createRepository(repositoryMetadata, repositoriesService.typesRegistry()); + ; + Map repositoryMap = new HashMap<>(); + repositoryMap.put(repoName, repository); + repositoriesService = repositoriesService.repositories(repositoryMap); + assertEquals(repositoryMap, repositoriesService.repositories()); + + GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) + .snapshots(currSnapshotOnly) + .ignoreUnavailable(true); + getSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("snapshots should be set as we are checking the current snapshot", repoSnapshotResponse.getSnapshots()); + }, exception -> { throw new AssertionError(exception); })); + + repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) + .snapshots(currSnapshotOnly) + .ignoreUnavailable(true) + .verbose(false); + getSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("snapshots should be set as we are checking the current snapshot", repoSnapshotResponse.getSnapshots()); + }, exception -> { throw new AssertionError(exception); })); + + repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) + .snapshots(currSnapshotOnly) + .ignoreUnavailable(false) + .verbose(false); + getSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("snapshots should be set as we are checking the current snapshot", repoSnapshotResponse.getSnapshots()); + }, exception -> { throw new AssertionError(exception); })); + + repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) + .snapshots(currSnapshotOnly) + .ignoreUnavailable(true) + .verbose(false); + getSnapshotsAction.execute(task, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("snapshots should be set as we are checking the current snapshot", repoSnapshotResponse.getSnapshots()); + }, exception -> { throw new AssertionError(exception); })); + } + + private static Task createTask() { + return new Task( + randomLong(), + "transport", + MultiGetAction.NAME, + "description", + new TaskId(randomLong() + ":" + randomLong()), + Collections.emptyMap() + ); + } + + static class Resolver extends IndexNameExpressionResolver { + Resolver() { + super(new ThreadContext(Settings.EMPTY)); + } + + @Override + public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { + return request.indices(); + } + + @Override + public Index[] concreteIndices(ClusterState state, IndicesRequest request) { + Index[] out = new Index[request.indices().length]; + for (int x = 0; x < out.length; x++) { + out[x] = new Index(request.indices()[x], "_na_"); + } + return out; + } + } + +} diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index f5295bead19a4..3d48e3f43a235 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -210,15 +210,15 @@ private void assertThrowsOnRegister(String repoName) { expectThrows(RepositoryException.class, () -> repositoriesService.registerRepository(request, null)); } - private static class TestRepository implements Repository { + public static class TestRepository implements Repository { - private static final String TYPE = "internal"; + public static final String TYPE = "internal"; private boolean isClosed; private boolean isStarted; private final RepositoryMetadata metadata; - private TestRepository(RepositoryMetadata metadata) { + public TestRepository(RepositoryMetadata metadata) { this.metadata = metadata; } @@ -409,11 +409,11 @@ public void close() { } } - private static class MeteredRepositoryTypeA extends MeteredBlobStoreRepository { - private static final String TYPE = "type-a"; + public static class MeteredRepositoryTypeA extends MeteredBlobStoreRepository { + public static final String TYPE = "type-a"; private static final RepositoryStats STATS = new RepositoryStats(Map.of("GET", 10L)); - private MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clusterService) { + public MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clusterService) { super( metadata, false, @@ -440,11 +440,11 @@ public BlobPath basePath() { } } - private static class MeteredRepositoryTypeB extends MeteredBlobStoreRepository { - private static final String TYPE = "type-b"; + public static class MeteredRepositoryTypeB extends MeteredBlobStoreRepository { + public static final String TYPE = "type-b"; private static final RepositoryStats STATS = new RepositoryStats(Map.of("LIST", 20L)); - private MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clusterService) { + public MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clusterService) { super( metadata, false, From 312fa4c6179781474010d0faef561081fe165d80 Mon Sep 17 00:00:00 2001 From: INDRAJIT BANERJEE Date: Sun, 2 Jul 2023 20:59:03 +0530 Subject: [PATCH 03/16] Adding Changelog Signed-off-by: INDRAJIT BANERJEE --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f75cc6e508e7a..767f2bade8a7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792)) - Pass localNode info to all plugins on node start ([#7919](https://github.com/opensearch-project/OpenSearch/pull/7919)) - Improved performance of parsing floating point numbers ([#7909](https://github.com/opensearch-project/OpenSearch/pull/7909)) +- Adding test coverage for TransportGetSnapshotsAction ([#8377](https://github.com/opensearch-project/OpenSearch/pull/8377)) ### Deprecated From 77f4876b1f74f85f836a110ad83f9d031f2dcd68 Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Mon, 10 Jul 2023 22:28:47 +0530 Subject: [PATCH 04/16] Adding TransportGetSnapshotsAction integ tests Signed-off-by: Indrajit Banerjee --- .../repositories/RepositoriesService.java | 15 +- .../get/GetSnapshotsActionTests.java | 245 ------------------ .../RepositoriesServiceTests.java | 18 +- .../snapshots/SnapshotResiliencyTests.java | 111 +++++++- 4 files changed, 116 insertions(+), 273 deletions(-) delete mode 100644 server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsActionTests.java diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index ec780e0765ea0..9c56d172f2ea1 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -149,19 +149,6 @@ public RepositoriesService( deleteRepositoryTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_REPOSITORY_KEY, true); } - public RepositoriesService repositories(Map repositories) { - this.repositories = repositories; - return this; - } - - public Map repositories() { - return this.repositories; - } - - public Map typesRegistry() { - return this.typesRegistry; - } - /** * Registers new repository in the cluster *

@@ -589,7 +576,7 @@ private void archiveRepositoryStats(Repository repository, long clusterStateVers /** * Creates repository holder. This method starts the repository */ - public Repository createRepository(RepositoryMetadata repositoryMetadata, Map factories) { + private Repository createRepository(RepositoryMetadata repositoryMetadata, Map factories) { logger.debug("creating repository [{}][{}]", repositoryMetadata.type(), repositoryMetadata.name()); Repository.Factory factory = factories.get(repositoryMetadata.type()); if (factory == null) { diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsActionTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsActionTests.java deleted file mode 100644 index b535c85a5f2df..0000000000000 --- a/server/src/test/java/org/opensearch/action/admin/cluster/snapshots/get/GetSnapshotsActionTests.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.action.admin.cluster.snapshots.get; - -import org.opensearch.action.ActionListener; -import org.opensearch.action.IndicesRequest; - -import org.opensearch.action.get.MultiGetAction; -import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.replication.ClusterStateCreationUtils; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.metadata.RepositoryMetadata; -import org.opensearch.cluster.service.ClusterApplierService; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.index.Index; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.RepositoriesServiceTests; -import org.opensearch.repositories.Repository; -import org.opensearch.tasks.Task; -import org.opensearch.tasks.TaskId; -import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.test.transport.CapturingTransport; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.TransportService; -import org.opensearch.action.ActionRequestValidationException; -import org.junit.After; -import org.junit.Before; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.opensearch.test.ClusterServiceUtils.createClusterService; - -public class GetSnapshotsActionTests extends OpenSearchTestCase { - private TransportService transportService; - private ClusterService clusterService; - private ThreadPool threadPool; - private RepositoriesService repositoriesService; - private final String[] snapshotsName1 = { "test_snapshot" }; - private final String[] currSnapshotOnly = { GetSnapshotsRequest.CURRENT_SNAPSHOT }; - private final String[] snapshotsName2 = { "test_snapshot1", "test_snapshot2" }; - private final String repoName = "test_repo"; - private final String indexName = "test_index"; - private static final String repoType = "internal"; - - private GetSnapshotsActionTests.TestTransportGetSnapshotsAction getSnapshotsAction; - - public class TestTransportGetSnapshotsAction extends TransportGetSnapshotsAction { - TestTransportGetSnapshotsAction() { - super( - GetSnapshotsActionTests.this.transportService, - GetSnapshotsActionTests.this.clusterService, - GetSnapshotsActionTests.this.threadPool, - repositoriesService, - new ActionFilters(Collections.emptySet()), - new GetSnapshotsActionTests.Resolver() - ); - } - - @Override - protected void clusterManagerOperation( - GetSnapshotsRequest request, - ClusterState state, - ActionListener listener - ) { - ClusterState stateWithIndex = ClusterStateCreationUtils.state(indexName, 1, 1); - super.clusterManagerOperation(request, stateWithIndex, listener); - } - } - - @Before - public void setUp() throws Exception { - super.setUp(); - - threadPool = new TestThreadPool("GetSnapshotsActionTests"); - clusterService = createClusterService(threadPool); - CapturingTransport capturingTransport = new CapturingTransport(); - transportService = capturingTransport.createTransportService( - clusterService.getSettings(), - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> clusterService.localNode(), - null, - Collections.emptySet() - ); - - Map typesRegistry = GetSnapshotsActionTests.getTypesRegistry(threadPool); - repositoriesService = new RepositoriesService( - Settings.EMPTY, - clusterService, - transportService, - typesRegistry, - typesRegistry, - threadPool - ); - repositoriesService.start(); - transportService.start(); - transportService.acceptIncomingRequests(); - getSnapshotsAction = new GetSnapshotsActionTests.TestTransportGetSnapshotsAction(); - } - - private static Map getTypesRegistry(ThreadPool threadPool) { - final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class); - when(clusterApplierService.threadPool()).thenReturn(threadPool); - final ClusterService clusterServiceMock = mock(ClusterService.class); - when(clusterServiceMock.getClusterApplierService()).thenReturn(clusterApplierService); - Map typesRegistry = Map.of( - RepositoriesServiceTests.TestRepository.TYPE, - RepositoriesServiceTests.TestRepository::new, - RepositoriesServiceTests.MeteredRepositoryTypeA.TYPE, - metadata -> new RepositoriesServiceTests.MeteredRepositoryTypeA(metadata, clusterServiceMock), - RepositoriesServiceTests.MeteredRepositoryTypeB.TYPE, - metadata -> new RepositoriesServiceTests.MeteredRepositoryTypeB(metadata, clusterServiceMock) - ); - - return typesRegistry; - } - - @After - public void tearDown() throws Exception { - ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); - threadPool = null; - clusterService.close(); - super.tearDown(); - } - - public void testRepoMissing() { - GetSnapshotsRequest repoMissingRequest = new GetSnapshotsRequest(); - getSnapshotsAction.execute(null, repoMissingRequest, ActionListener.wrap(repoMissingResponse -> { - assertThrows(ActionRequestValidationException.class, () -> { repoMissingResponse.getSnapshots(); }); - }, exception -> { assertTrue(exception.getMessage().contains("repository is missing")); })); - - repoMissingRequest = new GetSnapshotsRequest().snapshots(snapshotsName1); - - getSnapshotsAction.execute(null, repoMissingRequest, ActionListener.wrap(repoMissingResponse -> { - assertThrows(ActionRequestValidationException.class, () -> { repoMissingResponse.getSnapshots(); }); - }, exception -> { assertTrue(exception.getMessage().contains("repository is missing")); })); - - repoMissingRequest = new GetSnapshotsRequest().ignoreUnavailable(true); - - getSnapshotsAction.execute(null, repoMissingRequest, ActionListener.wrap(repoMissingResponse -> { - assertThrows(ActionRequestValidationException.class, () -> { repoMissingResponse.getSnapshots(); }); - }, exception -> { assertTrue(exception.getMessage().contains("repository is missing")); })); - - repoMissingRequest = new GetSnapshotsRequest().verbose(true); - - getSnapshotsAction.execute(null, repoMissingRequest, ActionListener.wrap(repoMissingResponse -> { - assertThrows(ActionRequestValidationException.class, () -> { repoMissingResponse.getSnapshots(); }); - }, exception -> { assertTrue(exception.getMessage().contains("repository is missing")); })); - - repoMissingRequest = new GetSnapshotsRequest().verbose(false); - - getSnapshotsAction.execute(null, repoMissingRequest, ActionListener.wrap(repoMissingResponse -> { - assertThrows(ActionRequestValidationException.class, () -> { repoMissingResponse.getSnapshots(); }); - }, exception -> { assertTrue(exception.getMessage().contains("repository is missing")); })); - } - - public void testCurrentRepoSnapshot() { - final Task task = createTask(); - RepositoryMetadata repositoryMetadata = new RepositoryMetadata(repoName, repoType, Settings.EMPTY); - Repository repository = repositoriesService.createRepository(repositoryMetadata, repositoriesService.typesRegistry()); - ; - Map repositoryMap = new HashMap<>(); - repositoryMap.put(repoName, repository); - repositoriesService = repositoriesService.repositories(repositoryMap); - assertEquals(repositoryMap, repositoriesService.repositories()); - - GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) - .snapshots(currSnapshotOnly) - .ignoreUnavailable(true); - getSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { - assertNotNull("snapshots should be set as we are checking the current snapshot", repoSnapshotResponse.getSnapshots()); - }, exception -> { throw new AssertionError(exception); })); - - repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) - .snapshots(currSnapshotOnly) - .ignoreUnavailable(true) - .verbose(false); - getSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { - assertNotNull("snapshots should be set as we are checking the current snapshot", repoSnapshotResponse.getSnapshots()); - }, exception -> { throw new AssertionError(exception); })); - - repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) - .snapshots(currSnapshotOnly) - .ignoreUnavailable(false) - .verbose(false); - getSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { - assertNotNull("snapshots should be set as we are checking the current snapshot", repoSnapshotResponse.getSnapshots()); - }, exception -> { throw new AssertionError(exception); })); - - repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) - .snapshots(currSnapshotOnly) - .ignoreUnavailable(true) - .verbose(false); - getSnapshotsAction.execute(task, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { - assertNotNull("snapshots should be set as we are checking the current snapshot", repoSnapshotResponse.getSnapshots()); - }, exception -> { throw new AssertionError(exception); })); - } - - private static Task createTask() { - return new Task( - randomLong(), - "transport", - MultiGetAction.NAME, - "description", - new TaskId(randomLong() + ":" + randomLong()), - Collections.emptyMap() - ); - } - - static class Resolver extends IndexNameExpressionResolver { - Resolver() { - super(new ThreadContext(Settings.EMPTY)); - } - - @Override - public String[] concreteIndexNames(ClusterState state, IndicesRequest request) { - return request.indices(); - } - - @Override - public Index[] concreteIndices(ClusterState state, IndicesRequest request) { - Index[] out = new Index[request.indices().length]; - for (int x = 0; x < out.length; x++) { - out[x] = new Index(request.indices()[x], "_na_"); - } - return out; - } - } - -} diff --git a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java index a9522c5314861..edf5b6c84bc54 100644 --- a/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java @@ -211,15 +211,15 @@ private void assertThrowsOnRegister(String repoName) { expectThrows(RepositoryException.class, () -> repositoriesService.registerRepository(request, null)); } - public static class TestRepository implements Repository { + private static class TestRepository implements Repository { - public static final String TYPE = "internal"; + private static final String TYPE = "internal"; private boolean isClosed; private boolean isStarted; private final RepositoryMetadata metadata; - public TestRepository(RepositoryMetadata metadata) { + private TestRepository(RepositoryMetadata metadata) { this.metadata = metadata; } @@ -422,11 +422,11 @@ public void close() { } } - public static class MeteredRepositoryTypeA extends MeteredBlobStoreRepository { - public static final String TYPE = "type-a"; + private static class MeteredRepositoryTypeA extends MeteredBlobStoreRepository { + private static final String TYPE = "type-a"; private static final RepositoryStats STATS = new RepositoryStats(Map.of("GET", 10L)); - public MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clusterService) { + private MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clusterService) { super( metadata, false, @@ -453,11 +453,11 @@ public BlobPath basePath() { } } - public static class MeteredRepositoryTypeB extends MeteredBlobStoreRepository { - public static final String TYPE = "type-b"; + private static class MeteredRepositoryTypeB extends MeteredBlobStoreRepository { + private static final String TYPE = "type-b"; private static final RepositoryStats STATS = new RepositoryStats(Map.of("LIST", 20L)); - public MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clusterService) { + private MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clusterService) { super( metadata, false, diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 0dd65109ee43e..ef6f0c32ae8b0 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -57,6 +57,8 @@ import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsAction; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.opensearch.action.admin.cluster.snapshots.get.TransportGetSnapshotsAction; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; @@ -637,7 +639,6 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { ); final StepListener createOtherSnapshotResponseStepListener = new StepListener<>(); - continueOrDie( createSnapshotResponseStepListener, createSnapshotResponse -> client().admin() @@ -645,7 +646,6 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { .prepareCreateSnapshot(repoName, "snapshot-2") .execute(createOtherSnapshotResponseStepListener) ); - final StepListener deleteSnapshotStepListener = new StepListener<>(); continueOrDie( @@ -678,7 +678,6 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); // We end up with two snapshots no matter if the delete worked out or not assertThat(snapshotIds, hasSize(2)); - for (SnapshotId snapshotId : snapshotIds) { final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); @@ -688,6 +687,107 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { } } + public void testTransportGetSnapshotsAction() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + final String[] snapshotsList = { "snapshot-1", "snapshot-2" }; + final String[] indexList = { "index-1", "index-2" }; + final int shards = randomIntBetween(1, 10); + + TestClusterNodes.TestClusterNode clusterManagerNode = testClusterNodes.currentClusterManager( + testClusterNodes.nodes.values().iterator().next().clusterService.state() + ); + + for (int i = 0; i < snapshotsList.length; i++) { + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + final String snapshot = snapshotsList[i]; + final String index = indexList[i]; + continueOrDie( + createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin() + .cluster() + .prepareCreateSnapshot(repoName, snapshot) + .setWaitForCompletion(true) + .execute(createSnapshotResponseStepListener) + ); + + final StepListener getSnapshotsResponseStepListener = new StepListener<>(); + continueOrDie(getSnapshotsResponseStepListener, getSnapshotsResponse -> { + client().admin().cluster().prepareGetSnapshots(repoName).execute(getSnapshotsResponseStepListener); + }); + } + deterministicTaskQueue.runAllRunnableTasks(); + + final Repository repository = clusterManagerNode.repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + assertThat(snapshotIds, hasSize(2)); + for (SnapshotId snapshotId : snapshotIds) { + final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(0, snapshotInfo.failedShards()); + assertTrue(Arrays.stream(snapshotsList).anyMatch(snapshotInfo.snapshotId().getName()::equals)); + } + + TransportAction getSnapshotsAction = clusterManagerNode.actions.get(GetSnapshotsAction.INSTANCE); + GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) + .snapshots(snapshotsList) + .ignoreUnavailable(true); + if (getSnapshotsAction instanceof TransportGetSnapshotsAction) { + TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; + transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("snapshots should be set as we are checking the current snapshot", repoSnapshotResponse.getSnapshots()); + assertThat(repoSnapshotResponse.getSnapshots(), hasSize(2)); + }, exception -> { throw new AssertionError(exception); })); + } + } + + public void testTransportGetCurrentSnapshotOnly() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + String snapshotName = GetSnapshotsRequest.CURRENT_SNAPSHOT; + final String index = "test"; + final int shards = randomIntBetween(1, 10); + + TestClusterNodes.TestClusterNode clusterManagerNode = testClusterNodes.currentClusterManager( + testClusterNodes.nodes.values().iterator().next().clusterService.state() + ); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie( + createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin() + .cluster() + .prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true) + .execute(createSnapshotResponseStepListener) + ); + final StepListener getSnapshotsResponseStepListener = new StepListener<>(); + continueOrDie(getSnapshotsResponseStepListener, getSnapshotsResponse -> { + client().admin().cluster().prepareGetSnapshots(repoName).execute(getSnapshotsResponseStepListener); + }); + deterministicTaskQueue.runAllRunnableTasks(); + + final Repository repository = clusterManagerNode.repositoriesService.repository(repoName); + Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + assertThat(snapshotIds, hasSize(0)); + + final String[] snapshotsList = { GetSnapshotsRequest.CURRENT_SNAPSHOT }; + TransportAction getSnapshotsAction = clusterManagerNode.actions.get(GetSnapshotsAction.INSTANCE); + GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) + .snapshots(snapshotsList) + .ignoreUnavailable(true); + if (getSnapshotsAction instanceof TransportGetSnapshotsAction) { + TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; + transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("snapshots should be set as we are checking the current snapshot", repoSnapshotResponse.getSnapshots()); + assertThat(repoSnapshotResponse.getSnapshots(), hasSize(0)); + }, exception -> { throw new AssertionError(exception); })); + } + } + public void testBulkSnapshotDeleteWithAbort() { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); @@ -1627,7 +1727,6 @@ public TestClusterNode currentClusterManager(ClusterState state) { } private final class TestClusterNode { - private final Logger logger = LogManager.getLogger(TestClusterNode.class); private final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry( @@ -1671,6 +1770,8 @@ private final class TestClusterNode { private Coordinator coordinator; + private Map actions = new HashMap<>(); + TestClusterNode(DiscoveryNode node) throws IOException { this.node = node; final Environment environment = createEnvironment(node.getName()); @@ -1906,7 +2007,7 @@ public void onFailure(final Exception e) { SegmentReplicationCheckpointPublisher.EMPTY, mock(RemoteRefreshSegmentPressureService.class) ); - Map actions = new HashMap<>(); + final SystemIndices systemIndices = new SystemIndices(emptyMap()); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices); final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService( From dbc4884b1ad6fd6fdf688eb94bd6207d4f8ad3f5 Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Wed, 12 Jul 2023 16:15:43 +0530 Subject: [PATCH 05/16] Adding Minor test fix Signed-off-by: Indrajit Banerjee --- .../snapshots/SnapshotResiliencyTests.java | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index ef6f0c32ae8b0..e03d82e761649 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -705,7 +705,7 @@ public void testTransportGetSnapshotsAction() { final String index = indexList[i]; continueOrDie( createRepoAndIndex(repoName, index, shards), - createIndexResponse -> client().admin() + createSnapshotResponse -> client().admin() .cluster() .prepareCreateSnapshot(repoName, snapshot) .setWaitForCompletion(true) @@ -733,13 +733,12 @@ public void testTransportGetSnapshotsAction() { GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) .snapshots(snapshotsList) .ignoreUnavailable(true); - if (getSnapshotsAction instanceof TransportGetSnapshotsAction) { - TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; - transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { - assertNotNull("snapshots should be set as we are checking the current snapshot", repoSnapshotResponse.getSnapshots()); - assertThat(repoSnapshotResponse.getSnapshots(), hasSize(2)); - }, exception -> { throw new AssertionError(exception); })); - } + + TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; + transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("Snapshot list should be registered", repoSnapshotResponse.getSnapshots()); + assertThat(repoSnapshotResponse.getSnapshots(), hasSize(2)); + }, exception -> { throw new AssertionError(exception); })); } public void testTransportGetCurrentSnapshotOnly() { @@ -779,13 +778,12 @@ public void testTransportGetCurrentSnapshotOnly() { GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) .snapshots(snapshotsList) .ignoreUnavailable(true); - if (getSnapshotsAction instanceof TransportGetSnapshotsAction) { - TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; - transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { - assertNotNull("snapshots should be set as we are checking the current snapshot", repoSnapshotResponse.getSnapshots()); - assertThat(repoSnapshotResponse.getSnapshots(), hasSize(0)); - }, exception -> { throw new AssertionError(exception); })); - } + + TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; + transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("Empty Snapshot info should be registered for current snapshots", repoSnapshotResponse.getSnapshots()); + assertThat(repoSnapshotResponse.getSnapshots(), hasSize(0)); + }, exception -> { throw new AssertionError(exception); })); } public void testBulkSnapshotDeleteWithAbort() { From 1d7bf944a052c58be5dd5b106f2a1ee688bf734c Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Wed, 12 Jul 2023 18:23:45 +0530 Subject: [PATCH 06/16] Adding minor typo fix Signed-off-by: Indrajit Banerjee --- .../java/org/opensearch/snapshots/SnapshotResiliencyTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 158c78f52eee6..51b254aa84ded 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -757,7 +757,7 @@ public void testTransportGetCurrentSnapshotOnly() { continueOrDie( createRepoAndIndex(repoName, index, shards), - createIndexResponse -> client().admin() + createSnapshotResponse -> client().admin() .cluster() .prepareCreateSnapshot(repoName, snapshotName) .setWaitForCompletion(true) From f0b3834d35f138d542a7a0f68e8416b697b351bb Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Mon, 17 Jul 2023 11:34:13 +0530 Subject: [PATCH 07/16] Removing stale tests Signed-off-by: Indrajit Banerjee --- .../snapshots/SnapshotResiliencyTests.java | 51 ------------------- 1 file changed, 51 deletions(-) diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 51b254aa84ded..84d95d7830479 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -58,7 +58,6 @@ import org.opensearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsAction; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; -import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.opensearch.action.admin.cluster.snapshots.get.TransportGetSnapshotsAction; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; @@ -711,11 +710,6 @@ public void testTransportGetSnapshotsAction() { .setWaitForCompletion(true) .execute(createSnapshotResponseStepListener) ); - - final StepListener getSnapshotsResponseStepListener = new StepListener<>(); - continueOrDie(getSnapshotsResponseStepListener, getSnapshotsResponse -> { - client().admin().cluster().prepareGetSnapshots(repoName).execute(getSnapshotsResponseStepListener); - }); } deterministicTaskQueue.runAllRunnableTasks(); @@ -741,51 +735,6 @@ public void testTransportGetSnapshotsAction() { }, exception -> { throw new AssertionError(exception); })); } - public void testTransportGetCurrentSnapshotOnly() { - setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); - - String repoName = "repo"; - String snapshotName = GetSnapshotsRequest.CURRENT_SNAPSHOT; - final String index = "test"; - final int shards = randomIntBetween(1, 10); - - TestClusterNodes.TestClusterNode clusterManagerNode = testClusterNodes.currentClusterManager( - testClusterNodes.nodes.values().iterator().next().clusterService.state() - ); - - final StepListener createSnapshotResponseStepListener = new StepListener<>(); - - continueOrDie( - createRepoAndIndex(repoName, index, shards), - createSnapshotResponse -> client().admin() - .cluster() - .prepareCreateSnapshot(repoName, snapshotName) - .setWaitForCompletion(true) - .execute(createSnapshotResponseStepListener) - ); - final StepListener getSnapshotsResponseStepListener = new StepListener<>(); - continueOrDie(getSnapshotsResponseStepListener, getSnapshotsResponse -> { - client().admin().cluster().prepareGetSnapshots(repoName).execute(getSnapshotsResponseStepListener); - }); - deterministicTaskQueue.runAllRunnableTasks(); - - final Repository repository = clusterManagerNode.repositoriesService.repository(repoName); - Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); - assertThat(snapshotIds, hasSize(0)); - - final String[] snapshotsList = { GetSnapshotsRequest.CURRENT_SNAPSHOT }; - TransportAction getSnapshotsAction = clusterManagerNode.actions.get(GetSnapshotsAction.INSTANCE); - GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) - .snapshots(snapshotsList) - .ignoreUnavailable(true); - - TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; - transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { - assertNotNull("Empty Snapshot info should be registered for current snapshots", repoSnapshotResponse.getSnapshots()); - assertThat(repoSnapshotResponse.getSnapshots(), hasSize(0)); - }, exception -> { throw new AssertionError(exception); })); - } - public void testBulkSnapshotDeleteWithAbort() { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); From 5bb2e398e162378b50d00dd04318d6eab47ef321 Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Mon, 17 Jul 2023 12:28:37 +0530 Subject: [PATCH 08/16] Cleaning transport tests Signed-off-by: Indrajit Banerjee --- .../snapshots/SnapshotResiliencyTests.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index d160eb3b297ca..d0a59d16a1745 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -713,16 +713,6 @@ public void testTransportGetSnapshotsAction() { } deterministicTaskQueue.runAllRunnableTasks(); - final Repository repository = clusterManagerNode.repositoriesService.repository(repoName); - Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); - assertThat(snapshotIds, hasSize(2)); - for (SnapshotId snapshotId : snapshotIds) { - final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); - assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); - assertEquals(0, snapshotInfo.failedShards()); - assertTrue(Arrays.stream(snapshotsList).anyMatch(snapshotInfo.snapshotId().getName()::equals)); - } - TransportAction getSnapshotsAction = clusterManagerNode.actions.get(GetSnapshotsAction.INSTANCE); GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) .snapshots(snapshotsList) @@ -732,6 +722,12 @@ public void testTransportGetSnapshotsAction() { transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { assertNotNull("Snapshot list should be registered", repoSnapshotResponse.getSnapshots()); assertThat(repoSnapshotResponse.getSnapshots(), hasSize(2)); + List snapshotInfos = repoSnapshotResponse.getSnapshots(); + for (SnapshotInfo snapshotInfo : snapshotInfos) { + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(0, snapshotInfo.failedShards()); + assertTrue(Arrays.stream(snapshotsList).anyMatch(snapshotInfo.snapshotId().getName()::equals)); + } }, exception -> { throw new AssertionError(exception); })); } From ef5aab856ef1b82412fbb01995bd5aab3029aca6 Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Mon, 17 Jul 2023 13:33:54 +0530 Subject: [PATCH 09/16] Cleaning up Changelog Signed-off-by: Indrajit Banerjee --- CHANGELOG.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 793d54bc4ca62..202c6bd19d683 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,10 +45,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459)) - Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792)) -- Pass localNode info to all plugins on node start ([#7919](https://github.com/opensearch-project/OpenSearch/pull/7919)) -- Improved performance of parsing floating point numbers ([#7909](https://github.com/opensearch-project/OpenSearch/pull/7909)) -- Move span actions to Scope ([#8411](https://github.com/opensearch-project/OpenSearch/pull/8411)) -- Add wrapper tracer implementation - Removed blocking wait in TransportGetSnapshotsAction which was exhausting generic threadpool ([#8377](https://github.com/opensearch-project/OpenSearch/pull/8377)) ### Deprecated From b9954c04206a1c4f22c8abc58abdac2f99fa82a4 Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Mon, 17 Jul 2023 21:00:45 +0530 Subject: [PATCH 10/16] Adding getRepositoryData call when isCurrentSnapshotsOnly is false Signed-off-by: Indrajit Banerjee --- .../cluster/snapshots/get/TransportGetSnapshotsAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index ddf92268bd879..2ece83651bf5e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -139,9 +139,9 @@ protected void clusterManagerOperation( } final StepListener repositoryDataListener = new StepListener<>(); - repositoriesService.getRepositoryData(repository, repositoryDataListener); repositoryDataListener.whenComplete(repositoryData -> { if (isCurrentSnapshotsOnly(request.snapshots()) == false) { + repositoriesService.getRepositoryData(repository, repositoryDataListener); for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { allSnapshotIds.put(snapshotId.getName(), snapshotId); } From b947da2255998b449f9298e9e07043fc92f089bc Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Fri, 21 Jul 2023 13:27:59 +0530 Subject: [PATCH 11/16] Adding listener callback restricted to nonCurrentSnapshotsOnly Signed-off-by: Indrajit Banerjee --- .../get/TransportGetSnapshotsAction.java | 85 +++++++++---------- .../snapshots/SnapshotResiliencyTests.java | 32 ++++--- 2 files changed, 60 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index 2ece83651bf5e..78386028aa8e8 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -70,6 +70,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static java.util.Collections.unmodifiableList; @@ -139,60 +140,58 @@ protected void clusterManagerOperation( } final StepListener repositoryDataListener = new StepListener<>(); - repositoryDataListener.whenComplete(repositoryData -> { - if (isCurrentSnapshotsOnly(request.snapshots()) == false) { - repositoriesService.getRepositoryData(repository, repositoryDataListener); - for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { + AtomicReference repositoryData = new AtomicReference<>(null); + if (isCurrentSnapshotsOnly(request.snapshots()) == false) { + repositoriesService.getRepositoryData(repository, repositoryDataListener); + repositoryDataListener.whenComplete(repoData -> { + for (SnapshotId snapshotId : repoData.getSnapshotIds()) { allSnapshotIds.put(snapshotId.getName(), snapshotId); } - } else { - repositoryData = null; - } + repositoryData.set(repoData); + }, listener::onFailure); + } - final Set toResolve = new HashSet<>(); - if (isAllSnapshots(request.snapshots())) { - toResolve.addAll(allSnapshotIds.values()); - } else { - for (String snapshotOrPattern : request.snapshots()) { - if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { - toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList())); - } else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) { - if (allSnapshotIds.containsKey(snapshotOrPattern)) { - toResolve.add(allSnapshotIds.get(snapshotOrPattern)); - } else if (request.ignoreUnavailable() == false) { - throw new SnapshotMissingException(repository, snapshotOrPattern); - } - } else { - for (Map.Entry entry : allSnapshotIds.entrySet()) { - if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { - toResolve.add(entry.getValue()); - } + final Set toResolve = new HashSet<>(); + if (isAllSnapshots(request.snapshots())) { + toResolve.addAll(allSnapshotIds.values()); + } else { + for (String snapshotOrPattern : request.snapshots()) { + if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { + toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList())); + } else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) { + if (allSnapshotIds.containsKey(snapshotOrPattern)) { + toResolve.add(allSnapshotIds.get(snapshotOrPattern)); + } else if (request.ignoreUnavailable() == false) { + throw new SnapshotMissingException(repository, snapshotOrPattern); + } + } else { + for (Map.Entry entry : allSnapshotIds.entrySet()) { + if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { + toResolve.add(entry.getValue()); } } } + } - if (toResolve.isEmpty() - && request.ignoreUnavailable() == false - && isCurrentSnapshotsOnly(request.snapshots()) == false) { - throw new SnapshotMissingException(repository, request.snapshots()[0]); - } + if (toResolve.isEmpty() && request.ignoreUnavailable() == false && isCurrentSnapshotsOnly(request.snapshots()) == false) { + throw new SnapshotMissingException(repository, request.snapshots()[0]); } + } - final List snapshotInfos; - if (request.verbose()) { - snapshotInfos = snapshots(snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable()); + final List snapshotInfos; + if (request.verbose()) { + snapshotInfos = snapshots(snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable()); + } else { + if (repositoryData.get() != null) { + // want non-current snapshots as well, which are found in the repository data + snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData.get(), currentSnapshots); } else { - if (repositoryData != null) { - // want non-current snapshots as well, which are found in the repository data - snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots); - } else { - // only want current snapshots - snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList()); - CollectionUtil.timSort(snapshotInfos); - } + // only want current snapshots + snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList()); + CollectionUtil.timSort(snapshotInfos); } - listener.onResponse(new GetSnapshotsResponse(snapshotInfos)); - }, listener::onFailure); + } + listener.onResponse(new GetSnapshotsResponse(snapshotInfos)); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index d0a59d16a1745..4d8f8947442b7 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -692,6 +692,7 @@ public void testTransportGetSnapshotsAction() { String repoName = "repo"; final String[] snapshotsList = { "snapshot-1", "snapshot-2" }; final String[] indexList = { "index-1", "index-2" }; + final boolean[] snapshotRequestMode = { true, false }; final int shards = randomIntBetween(1, 10); TestClusterNodes.TestClusterNode clusterManagerNode = testClusterNodes.currentClusterManager( @@ -714,21 +715,24 @@ public void testTransportGetSnapshotsAction() { deterministicTaskQueue.runAllRunnableTasks(); TransportAction getSnapshotsAction = clusterManagerNode.actions.get(GetSnapshotsAction.INSTANCE); - GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) - .snapshots(snapshotsList) - .ignoreUnavailable(true); - TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; - transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { - assertNotNull("Snapshot list should be registered", repoSnapshotResponse.getSnapshots()); - assertThat(repoSnapshotResponse.getSnapshots(), hasSize(2)); - List snapshotInfos = repoSnapshotResponse.getSnapshots(); - for (SnapshotInfo snapshotInfo : snapshotInfos) { - assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); - assertEquals(0, snapshotInfo.failedShards()); - assertTrue(Arrays.stream(snapshotsList).anyMatch(snapshotInfo.snapshotId().getName()::equals)); - } - }, exception -> { throw new AssertionError(exception); })); + for (boolean mode : snapshotRequestMode) { + GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) + .snapshots(snapshotsList) + .ignoreUnavailable(mode) + .verbose(mode); + + transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("Snapshot list should be registered", repoSnapshotResponse.getSnapshots()); + assertThat(repoSnapshotResponse.getSnapshots(), hasSize(2)); + List snapshotInfos = repoSnapshotResponse.getSnapshots(); + for (SnapshotInfo snapshotInfo : snapshotInfos) { + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(0, snapshotInfo.failedShards()); + assertTrue(Arrays.stream(snapshotsList).anyMatch(snapshotInfo.snapshotId().getName()::equals)); + } + }, exception -> { throw new AssertionError(exception); })); + } } public void testBulkSnapshotDeleteWithAbort() { From 891648d4dd3da557a509d367cb9f87cdadea63bc Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Sun, 23 Jul 2023 21:15:09 +0530 Subject: [PATCH 12/16] Calling getRepositoryData when CurrentSnapshotOnly is false Signed-off-by: Indrajit Banerjee --- .../get/TransportGetSnapshotsAction.java | 83 ++++++++++--------- 1 file changed, 43 insertions(+), 40 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index 78386028aa8e8..395c83ed18a3e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -70,7 +70,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static java.util.Collections.unmodifiableList; @@ -140,58 +139,62 @@ protected void clusterManagerOperation( } final StepListener repositoryDataListener = new StepListener<>(); - AtomicReference repositoryData = new AtomicReference<>(null); if (isCurrentSnapshotsOnly(request.snapshots()) == false) { repositoriesService.getRepositoryData(repository, repositoryDataListener); - repositoryDataListener.whenComplete(repoData -> { - for (SnapshotId snapshotId : repoData.getSnapshotIds()) { + } + repositoryDataListener.whenComplete(repositoryData -> { + if (isCurrentSnapshotsOnly(request.snapshots()) == false) { + for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { allSnapshotIds.put(snapshotId.getName(), snapshotId); } - repositoryData.set(repoData); - }, listener::onFailure); - } + } else { + repositoryData = null; + } - final Set toResolve = new HashSet<>(); - if (isAllSnapshots(request.snapshots())) { - toResolve.addAll(allSnapshotIds.values()); - } else { - for (String snapshotOrPattern : request.snapshots()) { - if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { - toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList())); - } else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) { - if (allSnapshotIds.containsKey(snapshotOrPattern)) { - toResolve.add(allSnapshotIds.get(snapshotOrPattern)); - } else if (request.ignoreUnavailable() == false) { - throw new SnapshotMissingException(repository, snapshotOrPattern); - } - } else { - for (Map.Entry entry : allSnapshotIds.entrySet()) { - if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { - toResolve.add(entry.getValue()); + final Set toResolve = new HashSet<>(); + if (isAllSnapshots(request.snapshots())) { + toResolve.addAll(allSnapshotIds.values()); + } else { + for (String snapshotOrPattern : request.snapshots()) { + if (GetSnapshotsRequest.CURRENT_SNAPSHOT.equalsIgnoreCase(snapshotOrPattern)) { + toResolve.addAll(currentSnapshots.stream().map(SnapshotInfo::snapshotId).collect(Collectors.toList())); + } else if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) { + if (allSnapshotIds.containsKey(snapshotOrPattern)) { + toResolve.add(allSnapshotIds.get(snapshotOrPattern)); + } else if (request.ignoreUnavailable() == false) { + throw new SnapshotMissingException(repository, snapshotOrPattern); + } + } else { + for (Map.Entry entry : allSnapshotIds.entrySet()) { + if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) { + toResolve.add(entry.getValue()); + } } } } - } - if (toResolve.isEmpty() && request.ignoreUnavailable() == false && isCurrentSnapshotsOnly(request.snapshots()) == false) { - throw new SnapshotMissingException(repository, request.snapshots()[0]); + if (toResolve.isEmpty() + && request.ignoreUnavailable() == false + && isCurrentSnapshotsOnly(request.snapshots()) == false) { + throw new SnapshotMissingException(repository, request.snapshots()[0]); + } } - } - final List snapshotInfos; - if (request.verbose()) { - snapshotInfos = snapshots(snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable()); - } else { - if (repositoryData.get() != null) { - // want non-current snapshots as well, which are found in the repository data - snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData.get(), currentSnapshots); + final List snapshotInfos; + if (request.verbose()) { + snapshotInfos = snapshots(snapshotsInProgress, repository, new ArrayList<>(toResolve), request.ignoreUnavailable()); } else { - // only want current snapshots - snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList()); - CollectionUtil.timSort(snapshotInfos); + if (repositoryData != null) { + // want non-current snapshots as well, which are found in the repository data + snapshotInfos = buildSimpleSnapshotInfos(toResolve, repositoryData, currentSnapshots); + } else { + // only want current snapshots + snapshotInfos = currentSnapshots.stream().map(SnapshotInfo::basic).collect(Collectors.toList()); + CollectionUtil.timSort(snapshotInfos); + } } - } - listener.onResponse(new GetSnapshotsResponse(snapshotInfos)); + listener.onResponse(new GetSnapshotsResponse(snapshotInfos)); + }, listener::onFailure); } catch (Exception e) { listener.onFailure(e); } From c4a668ea2ce948480887b584b880470976cb380b Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Mon, 24 Jul 2023 12:05:04 +0530 Subject: [PATCH 13/16] refactoring getRepositoryData Signed-off-by: Indrajit Banerjee --- .../cluster/snapshots/get/TransportGetSnapshotsAction.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index 395c83ed18a3e..dbf47f2b121e3 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -141,14 +141,15 @@ protected void clusterManagerOperation( final StepListener repositoryDataListener = new StepListener<>(); if (isCurrentSnapshotsOnly(request.snapshots()) == false) { repositoriesService.getRepositoryData(repository, repositoryDataListener); + } else { + // Setting repositoryDataListener response to be null if the request has only current snapshot + repositoryDataListener.onResponse(null); } repositoryDataListener.whenComplete(repositoryData -> { - if (isCurrentSnapshotsOnly(request.snapshots()) == false) { + if (repositoryData != null) { for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { allSnapshotIds.put(snapshotId.getName(), snapshotId); } - } else { - repositoryData = null; } final Set toResolve = new HashSet<>(); From 7cee224e3c5ce92fcbb785f7e8367d8200e1cd8f Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Fri, 4 Aug 2023 18:39:03 +0530 Subject: [PATCH 14/16] Adding current in-progress snapshots tests Signed-off-by: Indrajit Banerjee --- .../snapshots/SnapshotResiliencyTests.java | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 8f2fc1d3a28ec..67e13469470d1 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -735,6 +735,59 @@ public void testTransportGetSnapshotsAction() { } } + public void testTransportGetCurrentSnapshotsAction() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + final String index = "index-1"; + final boolean[] snapshotRequestMode = { true, false }; + final String[] snapshotsList = { GetSnapshotsRequest.CURRENT_SNAPSHOT }; + final int shards = randomIntBetween(1, 10); + + TestClusterNodes.TestClusterNode clusterManagerNode = testClusterNodes.currentClusterManager( + testClusterNodes.nodes.values().iterator().next().clusterService.state() + ); + + final StepListener createSnapshotResponseListener = new StepListener<>(); + clusterManagerNode.clusterService.addListener(new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.state().custom(SnapshotsInProgress.TYPE) != null) { + TransportAction getSnapshotsAction = clusterManagerNode.actions.get(GetSnapshotsAction.INSTANCE); + TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; + for (boolean mode : snapshotRequestMode) { + GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) + .snapshots(snapshotsList) + .ignoreUnavailable(mode) + .verbose(mode); + + transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("Snapshot list should be registered", repoSnapshotResponse.getSnapshots()); + List snapshotInfos = repoSnapshotResponse.getSnapshots(); + assertThat(repoSnapshotResponse.getSnapshots(), hasSize(snapshotsList.length)); + for (SnapshotInfo snapshotInfo : snapshotInfos) { + assertEquals(SnapshotState.IN_PROGRESS, snapshotInfo.state()); + assertEquals(0, snapshotInfo.failedShards()); + assertTrue(snapshotInfo.snapshotId().getName().contains("last-snapshot")); + } + }, exception -> { throw new AssertionError(exception); })); + } + SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); + assertTrue(snapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); + clusterManagerNode.clusterService.removeListener(this); + } + } + }); + continueOrDie( + createRepoAndIndex(repoName, index, shards), + createIndexResponse -> client().admin() + .cluster() + .prepareCreateSnapshot(repoName, GetSnapshotsRequest.CURRENT_SNAPSHOT) + .execute(createSnapshotResponseListener) + ); + deterministicTaskQueue.runAllRunnableTasks(); + } + public void testBulkSnapshotDeleteWithAbort() { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); From 3440908f985d3f8c17c5547f6bc0fa1c4c765b7f Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Sat, 5 Aug 2023 16:20:34 +0530 Subject: [PATCH 15/16] refactoring tests Signed-off-by: Indrajit Banerjee --- .../snapshots/SnapshotResiliencyTests.java | 65 ++++++++----------- 1 file changed, 26 insertions(+), 39 deletions(-) diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 94215240da1c3..0e7c6f553de3f 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -783,7 +783,6 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = clusterManagerNode.repositoriesService.repository(repoName); Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); - // We end up with two snapshots no matter if the delete worked out or not assertThat(snapshotIds, hasSize(2)); for (SnapshotId snapshotId : snapshotIds) { final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId); @@ -800,7 +799,6 @@ public void testTransportGetSnapshotsAction() { String repoName = "repo"; final String[] snapshotsList = { "snapshot-1", "snapshot-2" }; final String[] indexList = { "index-1", "index-2" }; - final boolean[] snapshotRequestMode = { true, false }; final int shards = randomIntBetween(1, 10); TestClusterNodes.TestClusterNode clusterManagerNode = testClusterNodes.currentClusterManager( @@ -824,23 +822,18 @@ public void testTransportGetSnapshotsAction() { TransportAction getSnapshotsAction = clusterManagerNode.actions.get(GetSnapshotsAction.INSTANCE); TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; - for (boolean mode : snapshotRequestMode) { - GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) - .snapshots(snapshotsList) - .ignoreUnavailable(mode) - .verbose(mode); - - transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { - assertNotNull("Snapshot list should be registered", repoSnapshotResponse.getSnapshots()); - assertThat(repoSnapshotResponse.getSnapshots(), hasSize(2)); - List snapshotInfos = repoSnapshotResponse.getSnapshots(); - for (SnapshotInfo snapshotInfo : snapshotInfos) { - assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); - assertEquals(0, snapshotInfo.failedShards()); - assertTrue(Arrays.stream(snapshotsList).anyMatch(snapshotInfo.snapshotId().getName()::equals)); - } - }, exception -> { throw new AssertionError(exception); })); - } + GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName).snapshots(snapshotsList); + + transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("Snapshot list should not be null", repoSnapshotResponse.getSnapshots()); + assertThat(repoSnapshotResponse.getSnapshots(), hasSize(2)); + List snapshotInfos = repoSnapshotResponse.getSnapshots(); + for (SnapshotInfo snapshotInfo : snapshotInfos) { + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(0, snapshotInfo.failedShards()); + assertTrue(Arrays.stream(snapshotsList).anyMatch(snapshotInfo.snapshotId().getName()::equals)); + } + }, exception -> { throw new AssertionError(exception); })); } public void testTransportGetCurrentSnapshotsAction() { @@ -848,7 +841,6 @@ public void testTransportGetCurrentSnapshotsAction() { String repoName = "repo"; final String index = "index-1"; - final boolean[] snapshotRequestMode = { true, false }; final String[] snapshotsList = { GetSnapshotsRequest.CURRENT_SNAPSHOT }; final int shards = randomIntBetween(1, 10); @@ -863,25 +855,20 @@ public void clusterChanged(ClusterChangedEvent event) { if (event.state().custom(SnapshotsInProgress.TYPE) != null) { TransportAction getSnapshotsAction = clusterManagerNode.actions.get(GetSnapshotsAction.INSTANCE); TransportGetSnapshotsAction transportGetSnapshotsAction = (TransportGetSnapshotsAction) getSnapshotsAction; - for (boolean mode : snapshotRequestMode) { - GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) - .snapshots(snapshotsList) - .ignoreUnavailable(mode) - .verbose(mode); - - transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { - assertNotNull("Snapshot list should be registered", repoSnapshotResponse.getSnapshots()); - List snapshotInfos = repoSnapshotResponse.getSnapshots(); - assertThat(repoSnapshotResponse.getSnapshots(), hasSize(snapshotsList.length)); - for (SnapshotInfo snapshotInfo : snapshotInfos) { - assertEquals(SnapshotState.IN_PROGRESS, snapshotInfo.state()); - assertEquals(0, snapshotInfo.failedShards()); - assertTrue(snapshotInfo.snapshotId().getName().contains("last-snapshot")); - } - }, exception -> { throw new AssertionError(exception); })); - } - SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - assertTrue(snapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); + GetSnapshotsRequest repoSnapshotRequest = new GetSnapshotsRequest().repository(repoName) + .snapshots(snapshotsList) + .ignoreUnavailable(false) + .verbose(false); + transportGetSnapshotsAction.execute(null, repoSnapshotRequest, ActionListener.wrap(repoSnapshotResponse -> { + assertNotNull("Snapshot list should not be null", repoSnapshotResponse.getSnapshots()); + List snapshotInfos = repoSnapshotResponse.getSnapshots(); + assertThat(repoSnapshotResponse.getSnapshots(), hasSize(snapshotsList.length)); + for (SnapshotInfo snapshotInfo : snapshotInfos) { + assertEquals(SnapshotState.IN_PROGRESS, snapshotInfo.state()); + assertEquals(0, snapshotInfo.failedShards()); + assertTrue(snapshotInfo.snapshotId().getName().contains("last-snapshot")); + } + }, exception -> { throw new AssertionError(exception); })); clusterManagerNode.clusterService.removeListener(this); } } From 0d7e7fb51b92e59a444140559623d4ece610451f Mon Sep 17 00:00:00 2001 From: Indrajit Banerjee Date: Mon, 7 Aug 2023 18:17:41 +0530 Subject: [PATCH 16/16] Fixing typo error of adding back comment Signed-off-by: Indrajit Banerjee --- .../java/org/opensearch/snapshots/SnapshotResiliencyTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 0e7c6f553de3f..c04e2821d7931 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -783,6 +783,7 @@ public void testConcurrentSnapshotCreateAndDeleteOther() { assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = clusterManagerNode.repositoriesService.repository(repoName); Collection snapshotIds = getRepositoryData(repository).getSnapshotIds(); + // We end up with two snapshots no matter if the delete worked out or not assertThat(snapshotIds, hasSize(2)); for (SnapshotId snapshotId : snapshotIds) { final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);