diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java index 2c8f8f5c286c8..da91d273f3bce 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/state/TransportClusterStateAction.java @@ -51,7 +51,6 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.discovery.Discovery; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.node.NodeClosedException; import org.opensearch.threadpool.ThreadPool; @@ -84,7 +83,8 @@ public TransportClusterStateAction( ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - @Nullable RemoteClusterStateService remoteClusterStateService) { + @Nullable RemoteClusterStateService remoteClusterStateService + ) { super( ClusterStateAction.NAME, false, diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index a9d04e5b1cd3f..8a3a7d6a93ea7 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -50,7 +50,6 @@ import org.opensearch.cluster.NotClusterManagerException; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.coordination.ClusterStateTermVersion; -import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.coordination.FailedToCommitClusterStateException; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; @@ -65,7 +64,6 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.discovery.ClusterManagerNotDiscoveredException; -import org.opensearch.discovery.Discovery; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.gateway.remote.RemoteManifestManager; @@ -439,8 +437,10 @@ public ClusterState getStateFromLocalNode(ClusterStateTermVersion termVersion) { termVersion.getTerm(), termVersion.getVersion() ); - ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService - .getClusterMetadataManifestByFileName(appliedState.stateUUID(), manifestFile); + ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.getClusterMetadataManifestByFileName( + appliedState.stateUUID(), + manifestFile + ); ClusterState clusterStateFromRemote = remoteClusterStateService.getClusterStateForManifest( appliedState.getClusterName().value(), clusterMetadataManifest, @@ -448,7 +448,7 @@ public ClusterState getStateFromLocalNode(ClusterStateTermVersion termVersion) { true ); - if(clusterStateFromRemote!=null) { + if (clusterStateFromRemote != null) { logger.trace("Using the remote cluster-state fetched from local node, ClusterStateTermVersion {}", termVersion); return clusterStateFromRemote; } diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/term/TransportGetTermVersionAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/term/TransportGetTermVersionAction.java index 542fcf0f03a79..e01bf1d116473 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/term/TransportGetTermVersionAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/term/TransportGetTermVersionAction.java @@ -15,13 +15,11 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.coordination.ClusterStateTermVersion; -import org.opensearch.cluster.coordination.Coordinator; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.discovery.Discovery; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -42,7 +40,8 @@ public TransportGetTermVersionAction( ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver) { + IndexNameExpressionResolver indexNameExpressionResolver + ) { super( GetTermVersionAction.NAME, false, diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index c32c1fb25e6b3..c650530caa57e 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -527,16 +527,15 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod reposToSkip.add(joiningNodeRepoName); } } -//if non-or1 - //mix of remote-state (enabled and disabled) - //all of them settings + // if non-or1 + // mix of remote-state (enabled and disabled) + // all of them settings - // publishes to all nodes + // publishes to all nodes - //commits - - //30s still commit [] + // commits + // 30s still commit [] if (STRICT.equals(remoteStoreCompatibilityMode)) { DiscoveryNode existingNode = remoteRoutingTableNode.orElseGet(() -> existingNodes.get(0)); diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index 79be249008d6a..c504e28895a85 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -13,14 +13,11 @@ package org.opensearch.action.support.clustermanager; -import org.mockito.Mock; -import org.mockito.Mockito; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction; -import org.opensearch.action.admin.cluster.state.TransportClusterStateAction; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.ThreadedActionListener; @@ -92,10 +89,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static java.util.Collections.emptySet; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.when; +import org.mockito.Mockito; + import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithRemoteStoreSettings; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -105,6 +100,9 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; public class TransportClusterManagerNodeActionTests extends OpenSearchTestCase { private static ThreadPool threadPool; @@ -220,7 +218,8 @@ public void writeTo(StreamOutput out) throws IOException { } class Action extends TransportClusterManagerNodeAction { - private boolean localExecuteSupported = false; + private boolean localExecuteSupported = false; + Action(String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) { super( actionName, @@ -233,11 +232,17 @@ class Action extends TransportClusterManagerNodeAction { ); } - protected boolean localExecuteSupportedByAction() { return localExecuteSupported; } - Action(String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, RemoteClusterStateService clusterStateService) { + + Action( + String actionName, + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + RemoteClusterStateService clusterStateService + ) { this(actionName, transportService, clusterService, threadPool); this.remoteClusterStateService = clusterStateService; this.localExecuteSupported = true; @@ -743,34 +748,32 @@ public void testFetchFromRemoteStore() throws InterruptedException, BrokenBarrie ClusterState state = clusterService.state(); RemoteClusterStateService clusterStateService = Mockito.mock(RemoteClusterStateService.class); Request request = new Request(); - ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(state.term() + 1).stateVersion(state.version() + 1).build(); + ClusterMetadataManifest manifest = ClusterMetadataManifest.builder() + .clusterTerm(state.term() + 1) + .stateVersion(state.version() + 1) + .build(); when(clusterStateService.getClusterMetadataManifestByFileName(eq(state.stateUUID()), any())).thenReturn(manifest); when(clusterStateService.getClusterStateForManifest(state.getClusterName().value(), manifest, localNode.getId(), true)).thenReturn( - buildClusterState(state, state.term() + 1, state.version() + 1)); + buildClusterState(state, state.term() + 1, state.version() + 1) + ); PlainActionFuture listener = new PlainActionFuture<>(); Action action = new Action("internal:testAction", transportService, clusterService, threadPool, clusterStateService); action.execute(request, listener); - - CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0]; - // mismatch term and version - GetTermVersionResponse termResp = new GetTermVersionResponse( - new ClusterStateTermVersion( - state.getClusterName(), - state.metadata().clusterUUID(), - state.term() + 1, - state.version() + 1 - ) - ); - transport.handleResponse(capturedRequest.requestId, termResp); - //no more transport calls - assertThat(transport.capturedRequests().length, equalTo(1)); + CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0]; + // mismatch term and version + GetTermVersionResponse termResp = new GetTermVersionResponse( + new ClusterStateTermVersion(state.getClusterName(), state.metadata().clusterUUID(), state.term() + 1, state.version() + 1) + ); + transport.handleResponse(capturedRequest.requestId, termResp); + // no more transport calls + assertThat(transport.capturedRequests().length, equalTo(1)); assertTrue(listener.isDone()); } - private ClusterState buildClusterState(ClusterState state , long term, long version) { + private ClusterState buildClusterState(ClusterState state, long term, long version) { CoordinationMetadata.Builder coordMetadataBuilder = CoordinationMetadata.builder().term(term); Metadata newMetadata = Metadata.builder().coordinationMetadata(coordMetadataBuilder.build()).build(); return ClusterState.builder(state).version(version).metadata(newMetadata).build(); diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/term/ClusterTermVersionIT.java b/server/src/test/java/org/opensearch/action/support/clustermanager/term/ClusterTermVersionIT.java index f5ee368e05284..e5221fa778890 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/term/ClusterTermVersionIT.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/term/ClusterTermVersionIT.java @@ -8,15 +8,10 @@ package org.opensearch.action.support.clustermanager.term; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import org.opensearch.action.admin.cluster.state.ClusterStateAction; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.admin.indices.create.CreateIndexResponse; -import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; -import org.opensearch.action.support.ActiveShardCount; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterName; @@ -33,6 +28,10 @@ import org.opensearch.test.transport.MockTransportService; import org.opensearch.transport.TransportService; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -124,7 +123,6 @@ public void testDatanodeWithSlowClusterApplierFallbackToPublish() throws Excepti String master = internalCluster().getClusterManagerName(); - AtomicBoolean processState = new AtomicBoolean(); ClusterService cmClsService = internalCluster().getInstance(ClusterService.class, datas.get(0)); cmClsService.addStateApplier(new ClusterStateApplier() { @@ -145,38 +143,47 @@ public void applyClusterState(ClusterChangedEvent event) { ensureGreen(); - GetTermVersionResponse respBeforeUpdate = internalCluster().getInstance(Client.class, master).execute(GetTermVersionAction.INSTANCE, new GetTermVersionRequest()).get(); + GetTermVersionResponse respBeforeUpdate = internalCluster().getInstance(Client.class, master) + .execute(GetTermVersionAction.INSTANCE, new GetTermVersionRequest()) + .get(); processState.set(true); String index = "index_1"; - ActionFuture startCreateIndex1 = prepareCreate(index) - .setSettings(Settings.builder() + ActionFuture startCreateIndex1 = prepareCreate(index).setSettings( + Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE) - .build()) - .execute(); - + .build() + ).execute(); - ActionFuture startCreateIndex2 = prepareCreate("index_2").setSettings(Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE) - .build()).execute(); + ActionFuture startCreateIndex2 = prepareCreate("index_2").setSettings( + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE) + .build() + ).execute(); - //wait for cluster-manager to publish new state + // wait for cluster-manager to publish new state waitUntil(() -> { try { - //node is yet to ack commit to cluster-manager , only the state-update corresponding to index_1 should have been published - GetTermVersionResponse respAfterUpdate = internalCluster().getInstance(Client.class, master).execute(GetTermVersionAction.INSTANCE, new GetTermVersionRequest()).get(); - logger.info("data has latest , {} , {}", respAfterUpdate.getClusterStateTermVersion().getTerm(), respAfterUpdate.getClusterStateTermVersion().getVersion()); - return respBeforeUpdate.getClusterStateTermVersion().getVersion() + 1 == respAfterUpdate.getClusterStateTermVersion().getVersion(); + // node is yet to ack commit to cluster-manager , only the state-update corresponding to index_1 should have been published + GetTermVersionResponse respAfterUpdate = internalCluster().getInstance(Client.class, master) + .execute(GetTermVersionAction.INSTANCE, new GetTermVersionRequest()) + .get(); + logger.info( + "data has latest , {} , {}", + respAfterUpdate.getClusterStateTermVersion().getTerm(), + respAfterUpdate.getClusterStateTermVersion().getVersion() + ); + return respBeforeUpdate.getClusterStateTermVersion().getVersion() + 1 == respAfterUpdate.getClusterStateTermVersion() + .getVersion(); } catch (Exception e) { throw new RuntimeException(e); } }, 100, TimeUnit.SECONDS); - addCallCountInterceptor(master, callCounters); ClusterStateResponse stateResponseD = internalCluster().getInstance(Client.class, datas.get(0)) .admin() diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 571d79ae84211..d6313ce6711c2 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -46,7 +46,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.core.ParseField; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; @@ -2335,8 +2334,9 @@ public void testReadLatestMetadataManifestSuccess() throws IOException { } public void testReadGlobalMetadata() throws IOException { -// when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(new NamedXContentRegistry( -// List.of(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(IndexGraveyard.TYPE), IndexGraveyard::fromXContent)))); + // when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(new NamedXContentRegistry( + // List.of(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(IndexGraveyard.TYPE), + // IndexGraveyard::fromXContent)))); final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build(); remoteClusterStateService.start(); @@ -2351,7 +2351,10 @@ public void testReadGlobalMetadata() throws IOException { .coordinationMetadata(new ClusterMetadataManifest.UploadedMetadataAttribute(COORDINATION_METADATA, "mock-coordination-file")) .settingMetadata(new ClusterMetadataManifest.UploadedMetadataAttribute(SETTING_METADATA, "mock-setting-file")) .templatesMetadata(new ClusterMetadataManifest.UploadedMetadataAttribute(TEMPLATES_METADATA, "mock-templates-file")) - .put(IndexGraveyard.TYPE, new ClusterMetadataManifest.UploadedMetadataAttribute(IndexGraveyard.TYPE, "mock-custom-" +IndexGraveyard.TYPE+ "-file")) + .put( + IndexGraveyard.TYPE, + new ClusterMetadataManifest.UploadedMetadataAttribute(IndexGraveyard.TYPE, "mock-custom-" + IndexGraveyard.TYPE + "-file") + ) .nodeId("nodeA") .opensearchVersion(VersionUtils.randomOpenSearchVersion(random())) .previousClusterUUID("prev-cluster-uuid") @@ -2359,7 +2362,10 @@ public void testReadGlobalMetadata() throws IOException { .indicesRouting(List.of()) .build(); - Metadata expectedMetadata = Metadata.builder().clusterUUID("cluster-uuid").persistentSettings(Settings.builder().put("readonly", true).build()).build(); + Metadata expectedMetadata = Metadata.builder() + .clusterUUID("cluster-uuid") + .persistentSettings(Settings.builder().put("readonly", true).build()) + .build(); mockBlobContainerForGlobalMetadata(mockBlobStoreObjects(), expectedManifest, expectedMetadata); ClusterState newClusterState = remoteClusterStateService.getLatestClusterState( diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 1a0e6c4ccc737..183df37dc8c90 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2413,7 +2413,8 @@ public void onFailure(final Exception e) { threadPool, actionFilters, indexNameExpressionResolver, - null) + null + ) ); actions.put( IndicesShardStoresAction.INSTANCE, @@ -2454,7 +2455,8 @@ public void onFailure(final Exception e) { clusterService, threadPool, actionFilters, - indexNameExpressionResolver) + indexNameExpressionResolver + ) ); DynamicActionRegistry dynamicActionRegistry = new DynamicActionRegistry();