diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 45cf83a1bad74..08cd7d0ab02db 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -66,7 +66,6 @@ public class CoordinationState { // persisted state private final PersistedState persistedState; - private final PersistedState remotePersistedState; // transient state private VoteCollection joinVotes; @@ -76,7 +75,7 @@ public class CoordinationState { private VotingConfiguration lastPublishedConfiguration; private VoteCollection publishVotes; - public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, ElectionStrategy electionStrategy, PersistedState remotePersistedState) { + public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, ElectionStrategy electionStrategy) { this.localNode = localNode; // persisted state @@ -90,7 +89,6 @@ public CoordinationState(DiscoveryNode localNode, PersistedState persistedState, this.lastPublishedVersion = 0L; this.lastPublishedConfiguration = persistedState.getLastAcceptedState().getLastAcceptedConfiguration(); this.publishVotes = new VoteCollection(); - this.remotePersistedState = remotePersistedState; } public long getCurrentTerm() { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index ad6a5efcc221f..0274073ddfdc7 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -45,7 +45,6 @@ import org.opensearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; -import org.opensearch.cluster.coordination.CoordinationState.PersistedState; import org.opensearch.cluster.coordination.CoordinationState.VoteCollection; import org.opensearch.cluster.coordination.FollowersChecker.FollowerCheckRequest; import org.opensearch.cluster.coordination.JoinHelper.InitialJoinAccumulator; @@ -182,7 +181,6 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private JoinHelper.JoinAccumulator joinAccumulator; private Optional currentPublication = Optional.empty(); private final NodeHealthService nodeHealthService; - private final Supplier remotePersistedStateSupplier; /** * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. @@ -203,8 +201,7 @@ public Coordinator( Random random, RerouteService rerouteService, ElectionStrategy electionStrategy, - NodeHealthService nodeHealthService, - Supplier remotePersistedStateSupplier + NodeHealthService nodeHealthService ) { this.settings = settings; this.transportService = transportService; @@ -289,7 +286,6 @@ public Coordinator( joinHelper::logLastFailedJoinAttempt ); this.nodeHealthService = nodeHealthService; - this.remotePersistedStateSupplier = remotePersistedStateSupplier; this.localNodeCommissioned = true; } @@ -825,7 +821,7 @@ boolean publicationInProgress() { protected void doStart() { synchronized (mutex) { CoordinationState.PersistedState persistedState = persistedStateSupplier.get(); - coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy, remotePersistedStateSupplier.get())); + coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy)); peerFinder.setCurrentTerm(getCurrentTerm()); configuredHostsResolver.start(); final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState(); @@ -1312,10 +1308,6 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId()) leaderChecker.setCurrentNodes(publishNodes); followersChecker.setCurrentNodes(publishNodes); lagDetector.setTrackedNodes(publishNodes); - - PersistedState remotePersistedState = remotePersistedStateSupplier.get(); - assert remotePersistedState != null : "Remote state has not been initialized"; - remotePersistedState.setLastAcceptedState(clusterState); publication.start(followersChecker.getFaultyNodes()); } } catch (Exception e) { diff --git a/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java b/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java deleted file mode 100644 index 8028aa2035d63..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/store/ClusterMetadataMarker.java +++ /dev/null @@ -1,293 +0,0 @@ -package org.opensearch.cluster.store; - -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import org.opensearch.core.ParseField; -import org.opensearch.core.common.Strings; -import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.common.io.stream.Writeable; -import org.opensearch.core.xcontent.ConstructingObjectParser; -import org.opensearch.core.xcontent.MediaTypeRegistry; -import org.opensearch.core.xcontent.ToXContentFragment; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.core.xcontent.XContentParser.Token; - -/** - * Marker file which contains the details of the uploaded entity metadata - * - * @opensearch.internal - */ -public class ClusterMetadataMarker implements Writeable, ToXContentFragment { - - private static final ParseField INDICES_FIELD = new ParseField("indices"); - private static final ParseField TERM_FIELD = new ParseField("term"); - private static final ParseField VERSION_FIELD = new ParseField("version"); - private static final ParseField CLUSTER_UUID_FIELD = new ParseField("cluster_uuid"); - private static final ParseField STATE_UUID_FIELD = new ParseField("state_uuid"); - - private static Map indices(Object[] fields) { - return (Map) fields[0]; - } - - private static long term(Object[] fields) { - return (long) fields[1]; - } - - private static long version(Object[] fields) { - return (long) fields[2]; - } - - private static String clusterUUID(Object[] fields) { - return (String) fields[3]; - } - - private static String stateUUID(Object[] fields) { - return (String) fields[4]; - } - - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("cluster_metadata_marker", - fields -> new ClusterMetadataMarker(indices(fields), term(fields), version(fields), clusterUUID(fields), stateUUID(fields))); - - static { - PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> { - Map uploadMetadataMap = new HashMap<>(); - while (p.nextToken() != Token.END_OBJECT) { - UploadedIndexMetadata uploadMetadata = UploadedIndexMetadata.fromXContent(p); - uploadMetadataMap.put(uploadMetadata.getIndexName(), uploadMetadata); - } - return uploadMetadataMap; - }, INDICES_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), TERM_FIELD); - PARSER.declareLong(ConstructingObjectParser.constructorArg(), VERSION_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_UUID_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_UUID_FIELD); - } - - private final Map indices; - private final long term; - private final long version; - private final String clusterUUID; - private final String stateUUID; - - public Map getIndices() { - return indices; - } - - public long getTerm() { - return term; - } - - public long getVersion() { - return version; - } - - public String getClusterUUID() { - return clusterUUID; - } - - public String getStateUUID() { - return stateUUID; - } - - public ClusterMetadataMarker(Map indices, long term, long version, String clusterUUID, String stateUUID) { - this.indices = Collections.unmodifiableMap(indices); - this.term = term; - this.version = version; - this.clusterUUID = clusterUUID; - this.stateUUID = stateUUID; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(INDICES_FIELD.getPreferredName()); - { - for (UploadedIndexMetadata uploadedIndexMetadata : indices.values()) { - uploadedIndexMetadata.toXContent(builder, params); - } - } - builder.endObject(); - builder.field(TERM_FIELD.getPreferredName(), getTerm()).field(VERSION_FIELD.getPreferredName(), getVersion()) - .field(CLUSTER_UUID_FIELD.getPreferredName(), getClusterUUID()).field(STATE_UUID_FIELD.getPreferredName(), getStateUUID()); - return builder; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeMap(indices, StreamOutput::writeString, (stream, uploadedMetadata) -> uploadedMetadata.writeTo(stream)); - out.writeVLong(term); - out.writeVLong(version); - out.writeString(clusterUUID); - out.writeString(stateUUID); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final ClusterMetadataMarker that = (ClusterMetadataMarker) o; - return Objects.equals(indices, that.indices) && term == that.term && version == that.version && Objects.equals(clusterUUID, that.clusterUUID) - && Objects.equals(stateUUID, that.stateUUID); - } - - @Override - public int hashCode() { - return Objects.hash(indices, term, version, clusterUUID, stateUUID); - } - - @Override - public String toString() { - return Strings.toString(MediaTypeRegistry.JSON, this); - } - - public static ClusterMetadataMarker fromXContent(XContentParser parser) throws IOException { - return PARSER.parse(parser, null); - } - - /** - * Builder for ClusterMetadataMarker - * - * @opensearch.internal - */ - public static class Builder { - - private final Map indices; - private long term; - private long version; - private String clusterUUID; - private String stateUUID; - - - public void term(long term) { - this.term = term; - } - - public void version(long version) { - this.version = version; - } - - public void clusterUUID(String clusterUUID) { - this.clusterUUID = clusterUUID; - } - - public void stateUUID(String stateUUID) { - this.stateUUID = stateUUID; - } - - public Map getIndices() { - return indices; - } - - public Builder() { - indices = new HashMap<>(); - } - - public ClusterMetadataMarker build() { - return new ClusterMetadataMarker(indices, term, version, clusterUUID, stateUUID); - } - - } - - /** - * Metadata for uploaded index metadata - * - * @opensearch.internal - */ - public static class UploadedIndexMetadata implements Writeable, ToXContentFragment { - - private static final ParseField INDEX_NAME_FIELD = new ParseField("index_name"); - private static final ParseField INDEX_UUID_FIELD = new ParseField("index_uuid"); - private static final ParseField UPLOADED_FILENAME_FIELD = new ParseField("uploaded_filename"); - - private static String indexName(Object[] fields) { - return (String) fields[0]; - } - - private static String indexUUID(Object[] fields) { - return (String) fields[1]; - } - - private static String uploadedFilename(Object[] fields) { - return (String) fields[2]; - } - - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("uploaded_index_metadata", - fields -> new UploadedIndexMetadata(indexName(fields), indexUUID(fields), uploadedFilename(fields))); - - static { - PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_NAME_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), INDEX_UUID_FIELD); - PARSER.declareString(ConstructingObjectParser.constructorArg(), UPLOADED_FILENAME_FIELD); - } - - private final String indexName; - private final String indexUUID; - private final String uploadedFilename; - - public UploadedIndexMetadata(String indexName, String indexUUID, String uploadedFileName) { - this.indexName = indexName; - this.indexUUID = indexUUID; - this.uploadedFilename = uploadedFileName; - } - - public String getUploadedFilename() { - return uploadedFilename; - } - - public String getIndexName() { - return indexName; - } - - public String getIndexUUID() { - return indexUUID; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return builder.startObject(getIndexName()).field(INDEX_NAME_FIELD.getPreferredName(), getIndexName()) - .field(INDEX_UUID_FIELD.getPreferredName(), getIndexUUID()).field(UPLOADED_FILENAME_FIELD.getPreferredName(), getUploadedFilename()) - .endObject(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeString(indexName); - out.writeString(indexUUID); - out.writeString(uploadedFilename); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final UploadedIndexMetadata that = (UploadedIndexMetadata) o; - return Objects.equals(indexName, that.indexName) && Objects.equals(indexUUID, that.indexUUID) && Objects.equals(uploadedFilename, - that.uploadedFilename); - } - - @Override - public int hashCode() { - return Objects.hash(indexName, indexUUID, uploadedFilename); - } - - @Override - public String toString() { - return Strings.toString(MediaTypeRegistry.JSON, this); - } - - public static UploadedIndexMetadata fromXContent(XContentParser parser) throws IOException { - return PARSER.parse(parser, null); - } - } -} diff --git a/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java deleted file mode 100644 index 535c011dd7a98..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/store/RemoteClusterStateService.java +++ /dev/null @@ -1,188 +0,0 @@ -package org.opensearch.cluster.store; - -import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STATE_REPOSITORY_SETTING; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Supplier; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.store.ClusterMetadataMarker.UploadedIndexMetadata; -import org.opensearch.common.blobstore.BlobContainer; -import org.opensearch.common.settings.Settings; -import org.opensearch.indices.IndicesService; -import org.opensearch.repositories.RepositoriesService; -import org.opensearch.repositories.Repository; -import org.opensearch.repositories.RepositoryMissingException; -import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; - -/** - * A Service which provides APIs to upload and download cluster metadata from remote store. - * - * @opensearch.internal - */ -public class RemoteClusterStateService { - - public static final String METADATA_NAME_FORMAT = "%s.dat"; - - public static final String METADATA_MARKER_NAME_FORMAT = "%s"; - - public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( - "index-metadata", - METADATA_NAME_FORMAT, - IndexMetadata::fromXContent - ); - - public static final ChecksumBlobStoreFormat CLUSTER_METADATA_MARKER_FORMAT = new ChecksumBlobStoreFormat<>( - "cluster-metadata-marker", - METADATA_MARKER_NAME_FORMAT, - ClusterMetadataMarker::fromXContent - ); - private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class); - - private static final String DELIMITER = "__"; - - private final Supplier repositoriesService; - private final Settings settings; - private BlobStoreRepository blobStoreRepository; - - public RemoteClusterStateService(Supplier repositoriesService, Settings settings) { - this.repositoriesService = repositoriesService; - this.settings = settings; - } - - public ClusterMetadataMarker writeFullMetadata(long currentTerm, ClusterState clusterState) throws IOException { - if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) { - logger.error("Local node is not elected cluster manager. Exiting"); - return null; - } - setRepository(); - if (blobStoreRepository == null) { - logger.error("Unable to set repository"); - return null; - } - - final Map allUploadedIndexMetadata = new HashMap<>(); - //todo parallel upload - // any validations before/after upload ? - for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { - //123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 - final String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(), - indexMetadata, indexMetadataFileName(indexMetadata)); - final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(), - indexMetadataKey); - allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata); - } - return uploadMarker(clusterState, allUploadedIndexMetadata); - } - - private void setRepository() { - try { - if (blobStoreRepository != null) { - return; - } - if (IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING.get(settings)) { - final String remoteStoreRepo = CLUSTER_REMOTE_STATE_REPOSITORY_SETTING.get(settings); - final Repository repository = repositoriesService.get().repository(remoteStoreRepo); - assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; - blobStoreRepository = (BlobStoreRepository) repository; - } else { - logger.info("remote store is not enabled"); - } - } catch (RepositoryMissingException e) { - logger.error("Remote state repository is missing", e); - } - } - - public ClusterMetadataMarker writeIncrementalMetadata(long currentTerm, ClusterState previousClusterState, ClusterState clusterState, - ClusterMetadataMarker previousMarker) throws IOException { - assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term(); - final Map indexMetadataVersionByName = new HashMap<>(); - for (final IndexMetadata indexMetadata : previousClusterState.metadata().indices().values()) { - indexMetadataVersionByName.putIfAbsent(indexMetadata.getIndex().getName(), indexMetadata.getVersion()); - } - - int numIndicesUpdated = 0; - int numIndicesUnchanged = 0; - final Map allUploadedIndexMetadata = new HashMap<>(previousMarker.getIndices()); - for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { - final Long previousVersion = indexMetadataVersionByName.get(indexMetadata.getIndex().getName()); - if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { - logger.trace("updating metadata for [{}], changing version from [{}] to [{}]", indexMetadata.getIndex(), previousVersion, - indexMetadata.getVersion()); - numIndicesUpdated++; - final String indexMetadataKey = writeIndexMetadata(clusterState.getClusterName().value(), clusterState.getMetadata().clusterUUID(), - indexMetadata, indexMetadataFileName(indexMetadata)); - final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata(indexMetadata.getIndex().getName(), indexMetadata.getIndexUUID(), - indexMetadataKey); - allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata); - } else { - numIndicesUnchanged++; - } - indexMetadataVersionByName.remove(indexMetadata.getIndex().getName()); - } - - for (String removedIndexName : indexMetadataVersionByName.keySet()) { - allUploadedIndexMetadata.remove(removedIndexName); - } - return uploadMarker(clusterState, allUploadedIndexMetadata); - } - - public ClusterState getLatestClusterState(String clusterUUID) { - //todo - return null; - } - - public ClusterMetadataMarker uploadMarker(ClusterState clusterState, Map uploadedIndexMetadata) - throws IOException { - synchronized (this) { - final String markerFileName = getMarkerFileName(clusterState.term(), clusterState.version()); - final ClusterMetadataMarker marker = new ClusterMetadataMarker(uploadedIndexMetadata, clusterState.term(), clusterState.getVersion(), - clusterState.metadata().clusterUUID(), - clusterState.stateUUID()); - writeMetadataMarker(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), marker, markerFileName); - return marker; - } - } - - public String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata indexMetadata, String fileName) throws IOException { - final BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, indexMetadata.getIndexUUID()); - INDEX_METADATA_FORMAT.write(indexMetadata, indexMetadataContainer, fileName, blobStoreRepository.getCompressor()); - // returning full path - return indexMetadataContainer.path().buildAsString() + fileName; - } - - public void writeMetadataMarker(String clusterName, String clusterUUID, ClusterMetadataMarker marker, String fileName) throws IOException { - final BlobContainer metadataMarkerContainer = markerContainer(clusterName, clusterUUID); - RemoteClusterStateService.CLUSTER_METADATA_MARKER_FORMAT.write(marker, metadataMarkerContainer, fileName, blobStoreRepository.getCompressor()); - } - - public BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) { - //123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX - return blobStoreRepository.blobStore() - .blobContainer(blobStoreRepository.basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("index").add(indexUUID)); - } - - public BlobContainer markerContainer(String clusterName, String clusterUUID) { - //123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker - return blobStoreRepository.blobStore() - .blobContainer(blobStoreRepository.basePath().add(clusterName).add("cluster-state").add(clusterUUID).add("marker")); - } - - private static String getMarkerFileName(long term, long version) { - //123456789012_test-cluster/cluster-state/dsgYj10Nkso7/marker/2147483642_2147483637_456536447_marker - return String.join(DELIMITER, "marker", String.valueOf(Long.MAX_VALUE - term), String.valueOf(Long.MAX_VALUE - version), - String.valueOf(Long.MAX_VALUE - System.currentTimeMillis())); - } - - - private static String indexMetadataFileName(IndexMetadata indexMetadata) { - return String.join(DELIMITER, "metadata", String.valueOf(indexMetadata.getVersion()), String.valueOf(System.currentTimeMillis())); - } - - -} diff --git a/server/src/main/java/org/opensearch/cluster/store/package-info.java b/server/src/main/java/org/opensearch/cluster/store/package-info.java deleted file mode 100644 index 7b7a9dc165eb7..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/store/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -/** - * Package containing class to perform operations on remote cluster state - */ -package org.opensearch.cluster.store; diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 2b198cfc949b6..32d14a3519659 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -677,8 +677,7 @@ public void apply(Settings value, Settings current, Settings previous) { List.of( IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING, IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING, - IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING, - IndicesService.CLUSTER_REMOTE_STATE_REPOSITORY_SETTING + IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING ), List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH), List.of( diff --git a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java index bd35e89a1bbf6..68fce4d9b9bb4 100644 --- a/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java +++ b/server/src/main/java/org/opensearch/discovery/DiscoveryModule.java @@ -42,7 +42,6 @@ import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterApplier; import org.opensearch.cluster.service.ClusterManagerService; -import org.opensearch.cluster.store.RemoteClusterStateService; import org.opensearch.common.Randomness; import org.opensearch.common.network.NetworkService; import org.opensearch.common.settings.ClusterSettings; @@ -130,8 +129,7 @@ public DiscoveryModule( Path configFile, GatewayMetaState gatewayMetaState, RerouteService rerouteService, - NodeHealthService nodeHealthService, - RemoteClusterStateService remoteClusterStateService + NodeHealthService nodeHealthService ) { final Collection> joinValidators = new ArrayList<>(); final Map> hostProviders = new HashMap<>(); @@ -207,8 +205,7 @@ public DiscoveryModule( new Random(Randomness.get().nextLong()), rerouteService, electionStrategy, - nodeHealthService, - gatewayMetaState::getRemotePersistedState + nodeHealthService ); } else { throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]"); diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index 30c50c50898c9..af894bdbc117e 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -52,8 +52,6 @@ import org.opensearch.cluster.metadata.MetadataIndexUpgradeService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.cluster.store.ClusterMetadataMarker; -import org.opensearch.cluster.store.RemoteClusterStateService; import org.opensearch.common.SetOnce; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; @@ -62,7 +60,6 @@ import org.opensearch.common.util.concurrent.OpenSearchThreadPoolExecutor; import org.opensearch.common.util.io.IOUtils; import org.opensearch.env.NodeMetadata; -import org.opensearch.indices.IndicesService; import org.opensearch.node.Node; import org.opensearch.plugins.MetadataUpgrader; import org.opensearch.threadpool.ThreadPool; @@ -87,40 +84,31 @@ /** * Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts. * - * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that the state being - * loaded when constructing the instance of this class is not necessarily the state that will be used as {@link ClusterState#metadata()} because it might be - * stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and non-stale state, and cluster-manager-ineligible nodes - * receive the real cluster state from the elected cluster-manager after joining the cluster. + * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that + * the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link + * ClusterState#metadata()} because it might be stale or incomplete. Cluster-manager-eligible nodes must perform an election to find a complete and + * non-stale state, and cluster-manager-ineligible nodes receive the real cluster state from the elected cluster-manager after joining the cluster. * * @opensearch.internal */ public class GatewayMetaState implements Closeable { - private static final Logger logger = LogManager.getLogger(GatewayMetaState.class); - /** - * Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially stale (since - * it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is restarted as a - * cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state. + * Fake node ID for a voting configuration written by a cluster-manager-ineligible data node to indicate that its on-disk state is potentially + * stale (since it is written asynchronously after application, rather than before acceptance). This node ID means that if the node is + * restarted as a cluster-manager-eligible node then it does not win any elections until it has received a fresh cluster state. */ public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG"; // Set by calling start() private final SetOnce persistedState = new SetOnce<>(); - private final SetOnce remotePersistedState = new SetOnce<>(); - public PersistedState getPersistedState() { final PersistedState persistedState = this.persistedState.get(); assert persistedState != null : "not started"; return persistedState; } - public PersistedState getRemotePersistedState() { - final PersistedState remotePersistedState = this.remotePersistedState.get(); - return remotePersistedState; - } - public Metadata getMetadata() { return getPersistedState().getLastAcceptedState().metadata(); } @@ -132,8 +120,7 @@ public void start( MetaStateService metaStateService, MetadataIndexUpgradeService metadataIndexUpgradeService, MetadataUpgrader metadataUpgrader, - PersistedClusterStateService persistedClusterStateService, - RemoteClusterStateService remoteClusterStateService + PersistedClusterStateService persistedClusterStateService ) { assert persistedState.get() == null : "should only start once, but already have " + persistedState.get(); @@ -157,7 +144,6 @@ public void start( } PersistedState persistedState = null; - PersistedState remotePersistedState = null; boolean success = false; try { final ClusterState clusterState = prepareInitialClusterState( @@ -171,7 +157,6 @@ public void start( if (DiscoveryNode.isClusterManagerNode(settings)) { persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState); - remotePersistedState = new RemotePersistedState(remoteClusterStateService); } else { persistedState = new AsyncLucenePersistedState( settings, @@ -197,7 +182,6 @@ public void start( } this.persistedState.set(persistedState); - this.remotePersistedState.set(remotePersistedState); } catch (IOException e) { throw new OpenSearchException("failed to load metadata", e); } @@ -250,8 +234,8 @@ Metadata upgradeMetadataForNode( } /** - * This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current version. The MetadataIndexUpgradeService - * might also update obsolete settings if needed. + * This method calls {@link MetadataIndexUpgradeService} to makes sure that indices are compatible with the current + * version. The MetadataIndexUpgradeService might also update obsolete settings if needed. * * @return input metadata if no upgrade is needed or an upgraded metadata */ @@ -615,82 +599,4 @@ public void close() throws IOException { IOUtils.close(persistenceWriter.getAndSet(null)); } } - - /** - * Encapsulates the writing of metadata to a remote store using {@link RemoteClusterStateService}. - */ - public static class RemotePersistedState implements PersistedState { - - //todo check diff between currentTerm and clusterState term - private long currentTerm; - private ClusterState lastAcceptedState; - private ClusterMetadataMarker lastAcceptedMarker; - private final RemoteClusterStateService remoteClusterStateService; - //todo Is this needed? - private boolean writeNextStateFully; - - public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService, final long currentTerm, final ClusterState lastAcceptedState) { - this.remoteClusterStateService = remoteClusterStateService; - this.currentTerm = currentTerm; - this.lastAcceptedState = lastAcceptedState; - - // todo write state to remote only for active master - } - - public RemotePersistedState(final RemoteClusterStateService remoteClusterStateService) { - this.remoteClusterStateService = remoteClusterStateService; - } - - @Override - public long getCurrentTerm() { - return currentTerm; - } - - @Override - public ClusterState getLastAcceptedState() { - return lastAcceptedState; - } - - @Override - public void setCurrentTerm(long currentTerm) { - // no-op - } - - @Override - public void setLastAcceptedState(ClusterState clusterState) { - try { - final ClusterMetadataMarker marker; - if (shouldWriteFullClusterState(clusterState)) { - marker = remoteClusterStateService.writeFullMetadata(currentTerm, clusterState); - } else { - marker = remoteClusterStateService.writeIncrementalMetadata(currentTerm, lastAcceptedState, clusterState, lastAcceptedMarker); - } - lastAcceptedState = clusterState; - lastAcceptedMarker = marker; - } catch (Exception e) { - handleExceptionOnWrite(e); - } - } - - private boolean shouldWriteFullClusterState(ClusterState clusterState) { - if (lastAcceptedState == null || lastAcceptedMarker == null || lastAcceptedState.term() != clusterState.term()) { - return true; - } - return false; - } - - @Override - public void markLastAcceptedStateAsCommitted() { - // no-op - } - - @Override - public void close() throws IOException { - PersistedState.super.close(); - } - - private void handleExceptionOnWrite(Exception e) { - throw ExceptionsHelper.convertToRuntime(e); - } - } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index a4cf451501ae2..297beff981722 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -272,16 +272,6 @@ public class IndicesService extends AbstractLifecycleComponent Property.Final ); - /** - * Used to specify default repo to use for translog upload for remote store backed indices - */ - public static final Setting CLUSTER_REMOTE_STATE_REPOSITORY_SETTING = Setting.simpleString( - "cluster.remote_store.state.repository", - "", - Property.NodeScope, - Property.Final - ); - /** * This setting is used to set the refresh interval when the {@code index.refresh_interval} index setting is not * provided during index creation or when the existing {@code index.refresh_interval} index setting is set as null. diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d59b02a041cf0..30ecc7f76b069 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -35,7 +35,44 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.Constants; -import org.opensearch.cluster.store.RemoteClusterStateService; +import org.opensearch.ExceptionsHelper; +import org.opensearch.common.SetOnce; +import org.opensearch.common.settings.SettingsException; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexingPressureService; +import org.opensearch.index.recovery.RemoteStoreRestoreService; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.index.store.remote.filecache.FileCacheCleaner; +import org.opensearch.index.store.remote.filecache.FileCacheFactory; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.SegmentReplicationSourceService; +import org.opensearch.extensions.ExtensionsManager; +import org.opensearch.extensions.NoopExtensionsManager; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.monitor.fs.FsProbe; +import org.opensearch.plugins.ExtensionAwarePlugin; +import org.opensearch.plugins.SearchPipelinePlugin; +import org.opensearch.telemetry.tracing.NoopTracerFactory; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.TracerFactory; +import org.opensearch.search.backpressure.SearchBackpressureService; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.pipeline.SearchPipelineService; +import org.opensearch.tasks.TaskCancellationMonitoringService; +import org.opensearch.tasks.TaskCancellationMonitoringSettings; +import org.opensearch.tasks.TaskResourceTrackingService; +import org.opensearch.tasks.consumer.TopNSearchTasksLogger; +import org.opensearch.threadpool.RunnableTaskExecutionListener; +import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; +import org.opensearch.telemetry.TelemetryModule; +import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.watcher.ResourceWatcherService; +import org.opensearch.core.Assertions; import org.opensearch.Build; import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; @@ -964,8 +1001,6 @@ protected Node( ); clusterInfoService.addListener(diskThresholdMonitor::onNewInfo); - final RemoteClusterStateService remoteClusterStateService = new RemoteClusterStateService(repositoriesServiceReference::get, settings); - final DiscoveryModule discoveryModule = new DiscoveryModule( settings, threadPool, @@ -980,8 +1015,7 @@ protected Node( environment.configDir(), gatewayMetaState, rerouteService, - fsHealthService, - remoteClusterStateService + fsHealthService ); final SearchPipelineService searchPipelineService = new SearchPipelineService( clusterService, @@ -1104,7 +1138,6 @@ protected Node( b.bind(MetadataUpgrader.class).toInstance(metadataUpgrader); b.bind(MetaStateService.class).toInstance(metaStateService); b.bind(PersistedClusterStateService.class).toInstance(lucenePersistedStateFactory); - b.bind(RemoteClusterStateService.class).toInstance(remoteClusterStateService); b.bind(IndicesService.class).toInstance(indicesService); b.bind(AliasValidator.class).toInstance(aliasValidator); b.bind(MetadataCreateIndexService.class).toInstance(metadataCreateIndexService); @@ -1307,8 +1340,7 @@ public Node start() throws NodeValidationException { injector.getInstance(MetaStateService.class), injector.getInstance(MetadataIndexUpgradeService.class), injector.getInstance(MetadataUpgrader.class), - injector.getInstance(PersistedClusterStateService.class), - injector.getInstance(RemoteClusterStateService.class) + injector.getInstance(PersistedClusterStateService.class) ); if (Assertions.ENABLED) { try { diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index c5b18378c2060..693022a60cc09 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -789,10 +789,6 @@ public RepositoryMetadata getMetadata() { return metadata; } - public Compressor getCompressor() { - return compressor; - } - @Override public RepositoryStats stats() { final BlobStore store = blobStore.get();