Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fallback on Term-Version check mismatch - Remote cluster-state #15424

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Added
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https:/opensearch-project/OpenSearch/pull/15637))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https:/opensearch-project/OpenSearch/pull/15651))
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https:/opensearch-project/OpenSearch/pull/15424))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https:/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.state.ClusterStateAction;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.PublicationTransportHandler;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.junit.Before;

import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.hamcrest.Matchers.is;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateTermVersionIT extends RemoteStoreBaseIntegTestCase {
private static final String INDEX_NAME = "test-index";
private static final String INDEX_NAME_1 = "test-index-1";
List<BlobPath> indexRoutingPaths;
AtomicInteger indexRoutingFiles = new AtomicInteger();
private final RemoteStoreEnums.PathType pathType = RemoteStoreEnums.PathType.HASHED_PREFIX;

@Before
public void setup() {
asyncUploadMockFsRepo = false;
}

protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MockTransportService.TestPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.put(
RemoteRoutingTableBlobStore.REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING.getKey(),
RemoteStoreEnums.PathType.HASHED_PREFIX.toString()
)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REMOTE_ROUTING_TABLE_REPO)
.put(REMOTE_PUBLICATION_SETTING_KEY, true)
.build();
}

public void testRemoteClusterStateFallback() throws Exception {
BlobStoreRepository repository = prepareClusterAndVerifyRepository();

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);

RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);

String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new);
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, dataNodes[0]);

String cm = internalCluster().getClusterManagerName();
primaryService.addRequestHandlingBehavior(
PublicationTransportHandler.COMMIT_STATE_ACTION_NAME,
(handler, request, channel, task) -> {
// not committing the state
logger.info("ignoring the commit from cluster-manager {}", request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
);

String index = "index_1";
createIndex(
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), Long.MAX_VALUE)
.build()
);
logger.info("created index {}", index);
Map<String, AtomicInteger> callCounters = Map.ofEntries(
Map.entry(ClusterStateAction.NAME, new AtomicInteger()),
Map.entry(GetTermVersionAction.NAME, new AtomicInteger())
);

addCallCountInterceptor(cm, callCounters);

ClusterStateResponse stateResponseM = client(cm).admin().cluster().state(new ClusterStateRequest()).actionGet();

ClusterStateResponse stateResponseD = client(dataNodes[0]).admin().cluster().state(new ClusterStateRequest()).actionGet();
assertEquals(stateResponseM, stateResponseD);
assertThat(callCounters.get(ClusterStateAction.NAME).get(), is(0));
assertThat(callCounters.get(GetTermVersionAction.NAME).get(), is(1));

}

public void testNoRemoteClusterStateFound() throws Exception {
BlobStoreRepository repository = prepareClusterAndVerifyRepository();

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);

RemoteManifestManager remoteManifestManager = remoteClusterStateService.getRemoteManifestManager();
Optional<ClusterMetadataManifest> latestManifest = remoteManifestManager.getLatestClusterMetadataManifest(
getClusterState().getClusterName().value(),
getClusterState().getMetadata().clusterUUID()
);

String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new);
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, dataNodes[0]);
primaryService.addRequestHandlingBehavior(
PublicationTransportHandler.COMMIT_STATE_ACTION_NAME,
(handler, request, channel, task) -> {
// not committing the state
logger.info("ignoring the commit from cluster-manager {}", request);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
);

ClusterState state = internalCluster().clusterService().state();
String cm = internalCluster().getClusterManagerName();
MockTransportService cmservice = (MockTransportService) internalCluster().getInstance(TransportService.class, cm);
cmservice.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> {
channel.sendResponse(
new GetTermVersionResponse(new ClusterStateTermVersion(state.getClusterName(), state.stateUUID(), -1, -1), true)
);
});

Map<String, AtomicInteger> callCounters = Map.ofEntries(
Map.entry(ClusterStateAction.NAME, new AtomicInteger()),
Map.entry(GetTermVersionAction.NAME, new AtomicInteger())
);

addCallCountInterceptor(cm, callCounters);

ClusterStateResponse stateResponseM = client(cm).admin().cluster().state(new ClusterStateRequest()).actionGet();
ClusterStateResponse stateResponseD = client(dataNodes[0]).admin().cluster().state(new ClusterStateRequest()).actionGet();
assertEquals(stateResponseM, stateResponseD);
assertThat(callCounters.get(ClusterStateAction.NAME).get(), is(1));
assertThat(callCounters.get(GetTermVersionAction.NAME).get(), is(1));

}

private void addCallCountInterceptor(String nodeName, Map<String, AtomicInteger> callCounters) {
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, nodeName);
for (var ctrEnty : callCounters.entrySet()) {
primaryService.addRequestHandlingBehavior(ctrEnty.getKey(), (handler, request, channel, task) -> {
ctrEnty.getValue().incrementAndGet();
logger.info("--> {} response redirect", ctrEnty.getKey());
handler.messageReceived(request, channel, task);
});
}
}

private BlobStoreRepository prepareClusterAndVerifyRepository() throws Exception {
clusterSettingsSuppliedByTest = true;
Path segmentRepoPath = randomRepoPath();
Path translogRepoPath = randomRepoPath();
Path remoteRoutingTableRepoPath = randomRepoPath();
Settings settings = buildRemoteStoreNodeAttributes(
REPOSITORY_NAME,
segmentRepoPath,
REPOSITORY_2_NAME,
translogRepoPath,
REMOTE_ROUTING_TABLE_REPO,
remoteRoutingTableRepoPath,
false
);
prepareCluster(1, 3, INDEX_NAME, 1, 5, settings);
ensureGreen(INDEX_NAME);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REMOTE_ROUTING_TABLE_REPO);

return repository;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
import org.opensearch.cluster.metadata.Metadata.Custom;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand Down Expand Up @@ -80,7 +82,8 @@ public TransportClusterStateAction(
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
@Nullable RemoteClusterStateService remoteClusterStateService
) {
super(
ClusterStateAction.NAME,
Expand All @@ -93,6 +96,7 @@ public TransportClusterStateAction(
indexNameExpressionResolver
);
this.localExecuteSupported = true;
this.remoteClusterStateService = remoteClusterStateService;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.cluster.ClusterStateObserver;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException;
Expand All @@ -63,6 +64,8 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.Task;
Expand All @@ -74,6 +77,7 @@
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand All @@ -95,6 +99,8 @@
protected final ClusterService clusterService;
protected final IndexNameExpressionResolver indexNameExpressionResolver;

protected RemoteClusterStateService remoteClusterStateService;

private final String executor;

protected TransportClusterManagerNodeAction(
Expand Down Expand Up @@ -378,9 +384,12 @@
response.getClusterStateTermVersion(),
isLatestClusterStatePresentOnLocalNode
);
if (isLatestClusterStatePresentOnLocalNode) {
onLatestLocalState.accept(clusterState);

ClusterState stateFromNode = getStateFromLocalNode(response);
if (stateFromNode != null) {
onLatestLocalState.accept(stateFromNode);
} else {
// fallback to clusterManager
onStaleLocalState.accept(clusterManagerNode, clusterState);
}
}
Expand All @@ -405,6 +414,52 @@
};
}

private ClusterState getStateFromLocalNode(GetTermVersionResponse termVersionResponse) {
ClusterStateTermVersion termVersion = termVersionResponse.getClusterStateTermVersion();
ClusterState appliedState = clusterService.state();
if (termVersion.equals(new ClusterStateTermVersion(appliedState))) {
logger.trace("Using the applied State from local, ClusterStateTermVersion {}", termVersion);
return appliedState;
}

ClusterState preCommitState = clusterService.preCommitState();
if (preCommitState != null && termVersion.equals(new ClusterStateTermVersion(preCommitState))) {
logger.trace("Using the published state from local, ClusterStateTermVersion {}", termVersion);
return preCommitState;

Check warning on line 428 in server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java#L427-L428

Added lines #L427 - L428 were not covered by tests
}

if (remoteClusterStateService != null && termVersionResponse.isStatePresentInRemote()) {
try {
ClusterStateTermVersion clusterStateTermVersion = termVersionResponse.getClusterStateTermVersion();
Optional<ClusterMetadataManifest> clusterMetadataManifest = remoteClusterStateService
.getClusterMetadataManifestByTermVersion(
clusterStateTermVersion.getClusterName().value(),
clusterStateTermVersion.getClusterUUID(),
clusterStateTermVersion.getTerm(),
clusterStateTermVersion.getVersion()
);
if (clusterMetadataManifest.isEmpty()) {
logger.trace("could not find manifest in remote-store for ClusterStateTermVersion {}", termVersion);
return null;

Check warning on line 443 in server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java#L442-L443

Added lines #L442 - L443 were not covered by tests
}
ClusterState clusterStateFromRemote = remoteClusterStateService.getClusterStateForManifest(
appliedState.getClusterName().value(),
clusterMetadataManifest.get(),
appliedState.nodes().getLocalNode().getId(),
true
);

if (clusterStateFromRemote != null) {
logger.trace("Using the remote cluster-state fetched from local node, ClusterStateTermVersion {}", termVersion);
return clusterStateFromRemote;
}
} catch (Exception e) {
logger.trace("Error while fetching from remote cluster state", e);
rajiv-kv marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 458 in server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java#L456-L458

Added lines #L456 - L458 were not covered by tests
}
return null;
}

private boolean checkForBlock(Request request, ClusterState localClusterState) {
final ClusterBlockException blockException = checkBlock(request, localClusterState);
if (blockException != null) {
Expand Down
Loading
Loading