Skip to content

Commit

Permalink
Draft changes
Browse files Browse the repository at this point in the history
  • Loading branch information
psychbot committed Aug 3, 2023
1 parent 1b5e64a commit 972b0b5
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.remotestore.repository;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Settings;
import org.opensearch.repositories.RepositoriesService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* RemoteStore Repository Registration helper
*/
public class RemoteStoreRepositoryRegistrationHelper {

public static boolean isRemoteStoreEnabled(ClusterState currentState) {
return currentState.getMetadata().settings().get("cluster.remote_store.segment.repository") != null
&& currentState.getMetadata().settings().get("cluster.remote_store.translog.repository") != null;
}

private static Settings buildSettings(String stringSettings) {
Settings.Builder settings = Settings.builder();

String[] stringKeyValue = stringSettings.split(",");
for (int i = 0; i < stringKeyValue.length; i++) {
String[] keyValue = stringKeyValue[i].split(":");
settings.put(keyValue[0].trim(), keyValue[1].trim());
}

return settings.build();
}

public static RepositoryMetadata buildSegmentRepositoryMetadata(DiscoveryNode node) {

String name = node.getAttributes().get("remote_store.segment.repository.name");
String type = node.getAttributes().get("remote_store.segment.repository.type");
String stringSettings = node.getAttributes().get("remote_store.segment.repository.settings");

assert name != null : "Name cannot be null";
assert type != null : "Type cannot be null";
assert stringSettings != null : "Settings cannot be null";

return new RepositoryMetadata(name, type, buildSettings(stringSettings));

}

public static RepositoryMetadata buildTranslogRepositoryMetadata(DiscoveryNode node) {
String name = node.getAttributes().get("remote_store.translog.repository.name");
String type = node.getAttributes().get("remote_store.translog.repository.type");
String stringSettings = node.getAttributes().get("remote_store.translog.repository.settings");

assert name != null : "Name cannot be null";
assert type != null : "Type cannot be null";
assert stringSettings != null : "Settings cannot be null";

return new RepositoryMetadata(name, type, buildSettings(stringSettings));
}

synchronized public static ClusterState validateOrAddRemoteStoreRepository(ClusterState currentState, DiscoveryNode node) {

assert currentState.getMetadata()
.settings()
.get("cluster.remote_store.segment.repository")
.equals(node.getAttributes().get("remote_store.segment.repository.name"));
assert currentState.getMetadata()
.settings()
.get("cluster.remote_store.translog.repository")
.equals(node.getAttributes().get("remote_store.translog.repository.name"));

boolean segmentStoreRepositoryFound = false;
boolean translogStoreRepositoryFound = false;
RepositoryMetadata newSegmentRepositoryMetadata = buildSegmentRepositoryMetadata(node);
RepositoryMetadata newTranslogRepositoryMetadata = buildTranslogRepositoryMetadata(node);

RepositoriesMetadata repositories = currentState.metadata().custom(RepositoriesMetadata.TYPE);
if (repositories != null) {
for (RepositoryMetadata existingRepositoryMetadata : repositories.repositories()) {
if (existingRepositoryMetadata.name().equals(node.getAttributes().get("remote_store.segment.repository.name"))) {
assert existingRepositoryMetadata.equalsIgnoreGenerations(newSegmentRepositoryMetadata);
segmentStoreRepositoryFound = true;
}
if (existingRepositoryMetadata.name().equals(node.getAttributes().get("remote_store.translog.repository.name"))) {
assert existingRepositoryMetadata.equalsIgnoreGenerations(newTranslogRepositoryMetadata);
translogStoreRepositoryFound = true;
}
}
}

ClusterState.Builder newState = ClusterState.builder(currentState);

if (!segmentStoreRepositoryFound) {
newState = AddRepositoryInformation(currentState, newSegmentRepositoryMetadata);
currentState = newState.build();
}
if (!translogStoreRepositoryFound) {
newState = AddRepositoryInformation(currentState, newTranslogRepositoryMetadata);
}

return newState.build();
}

private static ClusterState.Builder AddRepositoryInformation(ClusterState currentState, RepositoryMetadata newRepositoryMetadata) {
RepositoriesService.validate(newRepositoryMetadata.name());

Metadata metadata = currentState.metadata();
Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata());
RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
if (repositories == null) {
repositories = new RepositoriesMetadata(Collections.singletonList(newRepositoryMetadata));
} else {
boolean found = false;
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);

for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
if (repositoryMetadata.name().equals(newRepositoryMetadata.name())) {
if (newRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)) {
// Previous version is the same as this one no update is needed.
return new ClusterState.Builder(currentState);
}
found = true;
repositoriesMetadata.add(newRepositoryMetadata);
} else {
repositoriesMetadata.add(repositoryMetadata);
}
}
if (!found) {
repositoriesMetadata.add(newRepositoryMetadata);
}
repositories = new RepositoriesMetadata(repositoriesMetadata);
}
mdBuilder.putCustom(RepositoriesMetadata.TYPE, repositories);
return ClusterState.builder(currentState).metadata(mdBuilder);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Restore remote store transport handler. */
package org.opensearch.action.admin.cluster.remotestore.repository;
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,8 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback
// we are checking source node commission status here to reject any join request coming from a decommissioned node
// even before executing the join task to fail fast
JoinTaskExecutor.ensureNodeCommissioned(joinRequest.getSourceNode(), stateForJoinValidation.metadata());

JoinTaskExecutor.ensureRemoteStoreNodesCompatibility(joinRequest.getSourceNode(), stateForJoinValidation.metadata());
}
sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public class JoinHelper {

private final ClusterManagerService clusterManagerService;
private final TransportService transportService;
private volatile JoinTaskExecutor joinTaskExecutor;
private volatile RemoteStoreJoinTaskExecutor joinTaskExecutor;

private final TimeValue joinTimeout; // only used for Zen1 joining
private final NodeHealthService nodeHealthService;
Expand All @@ -125,7 +125,7 @@ public class JoinHelper {

private final AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();

private final Supplier<JoinTaskExecutor> joinTaskExecutorGenerator;
private final Supplier<RemoteStoreJoinTaskExecutor> joinTaskExecutorGenerator;
private final Consumer<Boolean> nodeCommissioned;
private final NamedWriteableRegistry namedWriteableRegistry;
private final AtomicReference<Tuple<Long, BytesReference>> serializedState = new AtomicReference<>();
Expand All @@ -152,7 +152,7 @@ public class JoinHelper {
this.nodeCommissioned = nodeCommissioned;
this.namedWriteableRegistry = namedWriteableRegistry;

this.joinTaskExecutorGenerator = () -> new JoinTaskExecutor(settings, allocationService, logger, rerouteService) {
this.joinTaskExecutorGenerator = () -> new RemoteStoreJoinTaskExecutor(settings, allocationService, logger, rerouteService) {

private final long term = currentTermSupplier.getAsLong();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.opensearch.cluster.decommission.NodeDecommissionedException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.RepositoriesMetadata;
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RerouteService;
Expand All @@ -61,6 +63,8 @@

import static org.opensearch.cluster.decommission.DecommissionHelper.nodeCommissioned;
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.buildSegmentRepositoryMetadata;
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.buildTranslogRepositoryMetadata;

/**
* Main executor for Nodes joining the OpenSearch cluster
Expand Down Expand Up @@ -187,6 +191,8 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// we have added the same check in handleJoinRequest method and adding it here as this method
// would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness
ensureNodeCommissioned(node, currentState.metadata());

ensureRemoteStoreNodesCompatibility(node, currentState.metadata());
nodesBuilder.add(node);
nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
Expand Down Expand Up @@ -422,6 +428,29 @@ public static void ensureNodeCommissioned(DiscoveryNode node, Metadata metadata)
}
}

public static void ensureRemoteStoreNodesCompatibility(DiscoveryNode node, Metadata metadata) {
assert metadata.settings()
.get("cluster.remote_store.segment.repository")
.equals(node.getAttributes().get("remote_store.segment.repository.name"));
assert metadata.settings()
.get("cluster.remote_store.translog.repository")
.equals(node.getAttributes().get("remote_store.translog.repository.name"));

RepositoriesMetadata repositories = metadata.custom(RepositoriesMetadata.TYPE);
if (repositories != null && node != null) {
for (RepositoryMetadata existingRepositoryMetadata : repositories.repositories()) {
if (existingRepositoryMetadata.name().equals(node.getAttributes().get("remote_store.segment.repository.name"))) {
RepositoryMetadata newSegmentRepositoryMetadata = buildSegmentRepositoryMetadata(node);
existingRepositoryMetadata.equalsIgnoreGenerations(newSegmentRepositoryMetadata);
}
if (existingRepositoryMetadata.name().equals(node.getAttributes().get("remote_store.translog.repository.name"))) {
RepositoryMetadata newTranslogRepositoryMetadata = buildTranslogRepositoryMetadata(node);
existingRepositoryMetadata.equalsIgnoreGenerations(newTranslogRepositoryMetadata);
}
}
}
}

public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators(
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators
) {
Expand All @@ -430,6 +459,7 @@ public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoin
ensureNodesCompatibility(node.getVersion(), state.getNodes());
ensureIndexCompatibility(node.getVersion(), state.getMetadata());
ensureNodeCommissioned(node, state.getMetadata());
ensureRemoteStoreNodesCompatibility(node, state.metadata());
});
validators.addAll(onJoinValidators);
return Collections.unmodifiableCollection(validators);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.opensearch.cluster.coordination;

import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RerouteService;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.common.settings.Settings;

import java.util.List;

import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.validateOrAddRemoteStoreRepository;
import static org.opensearch.action.admin.cluster.remotestore.repository.RemoteStoreRepositoryRegistrationHelper.isRemoteStoreEnabled;

/**
* Main executor for Remote Store Nodes joining the OpenSearch cluster
*
* @opensearch.internal
*/
public class RemoteStoreJoinTaskExecutor extends JoinTaskExecutor {
public RemoteStoreJoinTaskExecutor(
Settings settings,
AllocationService allocationService,
Logger logger,
RerouteService rerouteService
) {
super(settings, allocationService, logger, rerouteService);
}

// TODO: Add verifyRepositoryListener to validate if all the nodes are aware of the repository.
@Override
public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> joiningNodes) throws Exception {
ClusterState intermediateState, newState = currentState;

if (isRemoteStoreEnabled(currentState)) {
for (final Task joinTask : joiningNodes) {
if (joinTask.isBecomeClusterManagerTask() || joinTask.isFinishElectionTask()) {
// noop
} else if (currentState.nodes().nodeExistsWithSameRoles(joinTask.node())) {
// noop
} else {
final DiscoveryNode node = joinTask.node();
intermediateState = validateOrAddRemoteStoreRepository(newState, node);
newState = intermediateState;
}
}
}
return super.execute(newState, joiningNodes);
}

/**
* Questions -
*
*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ private Repository createRepository(RepositoryMetadata repositoryMetadata, Map<S
}
}

private static void validate(final String repositoryName) {
public static void validate(final String repositoryName) {
if (org.opensearch.core.common.Strings.hasLength(repositoryName) == false) {
throw new RepositoryException(repositoryName, "cannot be empty");
}
Expand Down

0 comments on commit 972b0b5

Please sign in to comment.