From 7d7dd50b4b113b476c6d906446f0e618924296a7 Mon Sep 17 00:00:00 2001 From: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Date: Fri, 31 May 2024 16:56:15 +0530 Subject: [PATCH] Fixing UTs Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> --- .../allocation/FailedShardsRoutingTests.java | 40 +++++++++++++++- ...eStoreMigrationAllocationDeciderTests.java | 46 +++++++++++++++++++ .../cluster/OpenSearchAllocationTestCase.java | 3 ++ 3 files changed, 88 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 5e3b74ee138ab..6796759fa8dd4 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -42,19 +42,30 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.command.AllocationCommands; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.snapshots.InternalSnapshotsInfoService; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.VersionUtils; +import org.opensearch.test.gateway.TestGatewayAllocator; +import org.opensearch.threadpool.TestThreadPool; import java.util.ArrayList; import java.util.HashSet; @@ -72,11 +83,15 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.snapshots.InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class FailedShardsRoutingTests extends OpenSearchAllocationTestCase { private final Logger logger = LogManager.getLogger(FailedShardsRoutingTests.class); @@ -826,7 +841,6 @@ private void testReplicaIsPromoted(boolean isSegmentReplicationEnabled) { public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() { FeatureFlags.initializeFeatureFlags(Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build()); - AllocationService allocation = createAllocationService(Settings.builder().build()); // segment replication enabled Settings.Builder settingsBuilder = settings(Version.CURRENT).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT); @@ -849,6 +863,29 @@ public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() { .routingTable(initialRoutingTable) .build(); + BlobPath basePath = BlobPath.cleanPath().add("test"); + RepositoriesService repositoriesService = mock(RepositoriesService.class); + BlobStoreRepository repository = mock(BlobStoreRepository.class); + BlobStore blobStore = mock(BlobStore.class); + when(repository.blobStore()).thenReturn(blobStore); + when(repositoriesService.repository(anyString())).thenReturn(repository); + when(repository.basePath()).thenReturn(basePath); + when(repository.getCompressor()).thenReturn(new DeflateCompressor()); + when(blobStore.isBlobMetadataEnabled()).thenReturn(true); + + TestThreadPool testThreadPool = new TestThreadPool(getTestName()); + final ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(clusterService.state()); + final InternalSnapshotsInfoService internalSnapshotsInfoService = new InternalSnapshotsInfoService( + Settings.builder().put(INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING.getKey(), randomIntBetween(1, 10)).build(), + clusterService, + () -> repositoriesService, + () -> rerouteService + ); + + TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator(); + AllocationService allocation = createAllocationService(Settings.EMPTY, gatewayAllocator, internalSnapshotsInfoService); + ShardId shardId = new ShardId(metadata.index("test").getIndex(), 0); // add a remote node and start primary shard @@ -954,5 +991,6 @@ public void testPreferReplicaOnRemoteNodeForPrimaryPromotion() { || primaryShardRouting3.currentNodeId().equals(nonRemoteNode2.getId()) ); assertEquals(expectedCandidateForSegRep.allocationId(), primaryShardRouting3.allocationId()); + testThreadPool.shutdownNow(); } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreMigrationAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreMigrationAllocationDeciderTests.java index ee4dbe9738e04..e3e2c0faf4d07 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreMigrationAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreMigrationAllocationDeciderTests.java @@ -43,6 +43,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RerouteService; import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; @@ -51,18 +52,29 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.RemoteStoreMigrationAllocationDecider; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.snapshots.InternalSnapshotsInfoService; +import org.opensearch.test.ClusterServiceUtils; +import org.opensearch.threadpool.TestThreadPool; +import org.junit.After; import java.util.Collections; import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Objects; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; @@ -74,7 +86,11 @@ import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.REMOTE_STORE; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.snapshots.InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING; import static org.hamcrest.core.Is.is; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RemoteStoreMigrationAllocationDeciderTests extends OpenSearchAllocationTestCase { @@ -109,6 +125,7 @@ public class RemoteStoreMigrationAllocationDeciderTests extends OpenSearchAlloca private RoutingTable routingTable = null; private ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0); + private TestThreadPool testThreadPool; private void beforeAllocation(String direction) { FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings); @@ -138,17 +155,46 @@ private void beforeAllocation(String direction) { getClusterSettings(customSettings) ); + BlobPath basePath = BlobPath.cleanPath().add("test"); + RepositoriesService repositoriesService = mock(RepositoriesService.class); + BlobStoreRepository repository = mock(BlobStoreRepository.class); + BlobStore blobStore = mock(BlobStore.class); + when(repository.blobStore()).thenReturn(blobStore); + when(repositoriesService.repository(anyString())).thenReturn(repository); + when(repository.basePath()).thenReturn(basePath); + when(repository.getCompressor()).thenReturn(new DeflateCompressor()); + when(blobStore.isBlobMetadataEnabled()).thenReturn(true); + + if (Objects.isNull(testThreadPool)) { + testThreadPool = new TestThreadPool(getTestName()); + } + final ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(clusterService.state()); + final InternalSnapshotsInfoService internalSnapshotsInfoService = new InternalSnapshotsInfoService( + Settings.builder().put(INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING.getKey(), randomIntBetween(1, 10)).build(), + clusterService, + () -> repositoriesService, + () -> rerouteService + ); + routingAllocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(remoteStoreMigrationAllocationDecider)), clusterState.getRoutingNodes(), clusterState, null, + internalSnapshotsInfoService, null, 0L ); routingAllocation.debugDecision(true); } + @After + public void tearDown() throws Exception { + testThreadPool.shutdownNow(); + super.tearDown(); + } + private void prepareRoutingTable(boolean isReplicaAllocation, String primaryShardNodeId) { routingTable = RoutingTable.builder() .add( diff --git a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java index f6113860e3907..ee96b0546fa7c 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/OpenSearchAllocationTestCase.java @@ -51,6 +51,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.gateway.GatewayAllocator; +import org.opensearch.snapshots.InternalSnapshotsInfoService; import org.opensearch.snapshots.SnapshotShardSizeInfo; import org.opensearch.snapshots.SnapshotsInfoService; import org.opensearch.test.OpenSearchTestCase; @@ -70,6 +71,7 @@ import static java.util.Collections.emptyMap; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.opensearch.snapshots.InternalSnapshotsInfoService.INTERNAL_SNAPSHOT_INFO_MAX_CONCURRENT_FETCHES_SETTING; public abstract class OpenSearchAllocationTestCase extends OpenSearchTestCase { private static final ClusterSettings EMPTY_CLUSTER_SETTINGS = new ClusterSettings( @@ -103,6 +105,7 @@ public static MockAllocationService createAllocationService(Settings settings, C randomAllocationDeciders(settings, clusterSettings, random), new TestGatewayAllocator(), new BalancedShardsAllocator(settings), + EmptyClusterInfoService.INSTANCE, SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES );