Skip to content

Commit

Permalink
adding test for remote-cluster-service
Browse files Browse the repository at this point in the history
  • Loading branch information
rajiv-kv committed Aug 30, 2024
1 parent d5ec82d commit 71a447e
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.discovery.Discovery;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -84,7 +83,8 @@ public TransportClusterStateAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
@Nullable RemoteClusterStateService remoteClusterStateService) {
@Nullable RemoteClusterStateService remoteClusterStateService
) {
super(
ClusterStateAction.NAME,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -65,7 +64,6 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.discovery.Discovery;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteManifestManager;
Expand Down Expand Up @@ -439,16 +437,18 @@ public ClusterState getStateFromLocalNode(ClusterStateTermVersion termVersion) {
termVersion.getTerm(),
termVersion.getVersion()
);
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService
.getClusterMetadataManifestByFileName(appliedState.stateUUID(), manifestFile);
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.getClusterMetadataManifestByFileName(
appliedState.stateUUID(),
manifestFile
);
ClusterState clusterStateFromRemote = remoteClusterStateService.getClusterStateForManifest(
appliedState.getClusterName().value(),
clusterMetadataManifest,
appliedState.nodes().getLocalNode().getId(),
true
);

if(clusterStateFromRemote!=null) {
if (clusterStateFromRemote != null) {
logger.trace("Using the remote cluster-state fetched from local node, ClusterStateTermVersion {}", termVersion);
return clusterStateFromRemote;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.Coordinator;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.discovery.Discovery;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand All @@ -42,7 +40,8 @@ public TransportGetTermVersionAction(
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
GetTermVersionAction.NAME,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,16 +527,15 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
reposToSkip.add(joiningNodeRepoName);
}
}
//if non-or1
//mix of remote-state (enabled and disabled)
//all of them settings
// if non-or1
// mix of remote-state (enabled and disabled)
// all of them settings

// publishes to all nodes
// publishes to all nodes

//commits

//30s still commit []
// commits

// 30s still commit []

if (STRICT.equals(remoteStoreCompatibilityMode)) {
DiscoveryNode existingNode = remoteRoutingTableNode.orElseGet(() -> existingNodes.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@

package org.opensearch.action.support.clustermanager;

import org.mockito.Mock;
import org.mockito.Mockito;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.admin.cluster.state.TransportClusterStateAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.ThreadedActionListener;
Expand Down Expand Up @@ -92,10 +89,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.emptySet;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
import org.mockito.Mockito;

import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithRemoteStoreSettings;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand All @@ -105,6 +100,9 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

public class TransportClusterManagerNodeActionTests extends OpenSearchTestCase {
private static ThreadPool threadPool;
Expand Down Expand Up @@ -220,7 +218,8 @@ public void writeTo(StreamOutput out) throws IOException {
}

class Action extends TransportClusterManagerNodeAction<Request, Response> {
private boolean localExecuteSupported = false;
private boolean localExecuteSupported = false;

Action(String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
super(
actionName,
Expand All @@ -233,11 +232,17 @@ class Action extends TransportClusterManagerNodeAction<Request, Response> {
);
}


protected boolean localExecuteSupportedByAction() {
return localExecuteSupported;
}
Action(String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, RemoteClusterStateService clusterStateService) {

Action(
String actionName,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
RemoteClusterStateService clusterStateService
) {
this(actionName, transportService, clusterService, threadPool);
this.remoteClusterStateService = clusterStateService;
this.localExecuteSupported = true;
Expand Down Expand Up @@ -743,34 +748,32 @@ public void testFetchFromRemoteStore() throws InterruptedException, BrokenBarrie
ClusterState state = clusterService.state();
RemoteClusterStateService clusterStateService = Mockito.mock(RemoteClusterStateService.class);
Request request = new Request();
ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(state.term() + 1).stateVersion(state.version() + 1).build();
ClusterMetadataManifest manifest = ClusterMetadataManifest.builder()
.clusterTerm(state.term() + 1)
.stateVersion(state.version() + 1)
.build();
when(clusterStateService.getClusterMetadataManifestByFileName(eq(state.stateUUID()), any())).thenReturn(manifest);

when(clusterStateService.getClusterStateForManifest(state.getClusterName().value(), manifest, localNode.getId(), true)).thenReturn(
buildClusterState(state, state.term() + 1, state.version() + 1));
buildClusterState(state, state.term() + 1, state.version() + 1)
);

PlainActionFuture<Response> listener = new PlainActionFuture<>();
Action action = new Action("internal:testAction", transportService, clusterService, threadPool, clusterStateService);
action.execute(request, listener);


CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0];
// mismatch term and version
GetTermVersionResponse termResp = new GetTermVersionResponse(
new ClusterStateTermVersion(
state.getClusterName(),
state.metadata().clusterUUID(),
state.term() + 1,
state.version() + 1
)
);
transport.handleResponse(capturedRequest.requestId, termResp);
//no more transport calls
assertThat(transport.capturedRequests().length, equalTo(1));
CapturingTransport.CapturedRequest capturedRequest = transport.capturedRequests()[0];
// mismatch term and version
GetTermVersionResponse termResp = new GetTermVersionResponse(
new ClusterStateTermVersion(state.getClusterName(), state.metadata().clusterUUID(), state.term() + 1, state.version() + 1)
);
transport.handleResponse(capturedRequest.requestId, termResp);
// no more transport calls
assertThat(transport.capturedRequests().length, equalTo(1));
assertTrue(listener.isDone());
}

private ClusterState buildClusterState(ClusterState state , long term, long version) {
private ClusterState buildClusterState(ClusterState state, long term, long version) {
CoordinationMetadata.Builder coordMetadataBuilder = CoordinationMetadata.builder().term(term);
Metadata newMetadata = Metadata.builder().coordinationMetadata(coordMetadataBuilder.build()).build();
return ClusterState.builder(state).version(version).metadata(newMetadata).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,10 @@

package org.opensearch.action.support.clustermanager.term;

import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.opensearch.action.admin.cluster.state.ClusterStateAction;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterName;
Expand All @@ -33,6 +28,10 @@
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -124,7 +123,6 @@ public void testDatanodeWithSlowClusterApplierFallbackToPublish() throws Excepti

String master = internalCluster().getClusterManagerName();


AtomicBoolean processState = new AtomicBoolean();
ClusterService cmClsService = internalCluster().getInstance(ClusterService.class, datas.get(0));
cmClsService.addStateApplier(new ClusterStateApplier() {
Expand All @@ -145,38 +143,47 @@ public void applyClusterState(ClusterChangedEvent event) {

ensureGreen();

GetTermVersionResponse respBeforeUpdate = internalCluster().getInstance(Client.class, master).execute(GetTermVersionAction.INSTANCE, new GetTermVersionRequest()).get();
GetTermVersionResponse respBeforeUpdate = internalCluster().getInstance(Client.class, master)
.execute(GetTermVersionAction.INSTANCE, new GetTermVersionRequest())
.get();

processState.set(true);
String index = "index_1";
ActionFuture<CreateIndexResponse> startCreateIndex1 = prepareCreate(index)
.setSettings(Settings.builder()
ActionFuture<CreateIndexResponse> startCreateIndex1 = prepareCreate(index).setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE)
.build())
.execute();

.build()
).execute();

ActionFuture<CreateIndexResponse> startCreateIndex2 = prepareCreate("index_2").setSettings(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE)
.build()).execute();
ActionFuture<CreateIndexResponse> startCreateIndex2 = prepareCreate("index_2").setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE)
.build()
).execute();

//wait for cluster-manager to publish new state
// wait for cluster-manager to publish new state
waitUntil(() -> {
try {
//node is yet to ack commit to cluster-manager , only the state-update corresponding to index_1 should have been published
GetTermVersionResponse respAfterUpdate = internalCluster().getInstance(Client.class, master).execute(GetTermVersionAction.INSTANCE, new GetTermVersionRequest()).get();
logger.info("data has latest , {} , {}", respAfterUpdate.getClusterStateTermVersion().getTerm(), respAfterUpdate.getClusterStateTermVersion().getVersion());
return respBeforeUpdate.getClusterStateTermVersion().getVersion() + 1 == respAfterUpdate.getClusterStateTermVersion().getVersion();
// node is yet to ack commit to cluster-manager , only the state-update corresponding to index_1 should have been published
GetTermVersionResponse respAfterUpdate = internalCluster().getInstance(Client.class, master)
.execute(GetTermVersionAction.INSTANCE, new GetTermVersionRequest())
.get();
logger.info(
"data has latest , {} , {}",
respAfterUpdate.getClusterStateTermVersion().getTerm(),
respAfterUpdate.getClusterStateTermVersion().getVersion()
);
return respBeforeUpdate.getClusterStateTermVersion().getVersion() + 1 == respAfterUpdate.getClusterStateTermVersion()
.getVersion();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 100, TimeUnit.SECONDS);


addCallCountInterceptor(master, callCounters);
ClusterStateResponse stateResponseD = internalCluster().getInstance(Client.class, datas.get(0))
.admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.ParseField;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
Expand Down Expand Up @@ -2335,8 +2334,9 @@ public void testReadLatestMetadataManifestSuccess() throws IOException {
}

public void testReadGlobalMetadata() throws IOException {
// when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(new NamedXContentRegistry(
// List.of(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(IndexGraveyard.TYPE), IndexGraveyard::fromXContent))));
// when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(new NamedXContentRegistry(
// List.of(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(IndexGraveyard.TYPE),
// IndexGraveyard::fromXContent))));
final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build();
remoteClusterStateService.start();

Expand All @@ -2351,15 +2351,21 @@ public void testReadGlobalMetadata() throws IOException {
.coordinationMetadata(new ClusterMetadataManifest.UploadedMetadataAttribute(COORDINATION_METADATA, "mock-coordination-file"))
.settingMetadata(new ClusterMetadataManifest.UploadedMetadataAttribute(SETTING_METADATA, "mock-setting-file"))
.templatesMetadata(new ClusterMetadataManifest.UploadedMetadataAttribute(TEMPLATES_METADATA, "mock-templates-file"))
.put(IndexGraveyard.TYPE, new ClusterMetadataManifest.UploadedMetadataAttribute(IndexGraveyard.TYPE, "mock-custom-" +IndexGraveyard.TYPE+ "-file"))
.put(
IndexGraveyard.TYPE,
new ClusterMetadataManifest.UploadedMetadataAttribute(IndexGraveyard.TYPE, "mock-custom-" + IndexGraveyard.TYPE + "-file")
)
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID("prev-cluster-uuid")
.routingTableVersion(1)
.indicesRouting(List.of())
.build();

Metadata expectedMetadata = Metadata.builder().clusterUUID("cluster-uuid").persistentSettings(Settings.builder().put("readonly", true).build()).build();
Metadata expectedMetadata = Metadata.builder()
.clusterUUID("cluster-uuid")
.persistentSettings(Settings.builder().put("readonly", true).build())
.build();
mockBlobContainerForGlobalMetadata(mockBlobStoreObjects(), expectedManifest, expectedMetadata);

ClusterState newClusterState = remoteClusterStateService.getLatestClusterState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2413,7 +2413,8 @@ public void onFailure(final Exception e) {
threadPool,
actionFilters,
indexNameExpressionResolver,
null)
null
)
);
actions.put(
IndicesShardStoresAction.INSTANCE,
Expand Down Expand Up @@ -2454,7 +2455,8 @@ public void onFailure(final Exception e) {
clusterService,
threadPool,
actionFilters,
indexNameExpressionResolver)
indexNameExpressionResolver
)
);

DynamicActionRegistry dynamicActionRegistry = new DynamicActionRegistry();
Expand Down

0 comments on commit 71a447e

Please sign in to comment.