Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Elasticsearch 6.8 support to Document Migrations #921

Merged
merged 12 commits into from
Aug 29, 2024
Merged
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ TrafficCapture/**/out/
opensearch-cluster-cdk/
test/opensearch-cluster-cdk/
TrafficCapture/dockerSolution/src/main/docker/migrationConsole/staging

# Directory used to temporarily store snapshots during testing
DocumentsFromSnapshotMigration/docker/snapshots/
8 changes: 8 additions & 0 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ DockerServiceProps[] dockerServices = [
dockerImageName:"reindex_from_snapshot",
inputDir:"./docker",
taskDependencies:["copyDockerRuntimeJars"]]),
new DockerServiceProps([projectName:"emptyElasticsearchSource_5_6",
dockerImageName:"empty_elasticsearch_source_5_6",
inputDir:"./docker/TestSource_ES_5_6"]),
new DockerServiceProps([projectName:"emptyElasticsearchSource_6_8",
dockerImageName:"empty_elasticsearch_source_6_8",
inputDir:"./docker/TestSource_ES_6_8"]),
Expand All @@ -112,6 +115,7 @@ for (dockerService in dockerServices) {
dependsOn dep
}
inputDir = project.file(dockerService.inputDir)
// platform.set("linux/amd64")
buildArgs = dockerService.buildArgs
images.add("migrations/${dockerService.dockerImageName}:${hash}")
images.add("migrations/${dockerService.dockerImageName}:${version}")
Expand All @@ -123,6 +127,10 @@ dockerCompose {
useComposeFiles = ['docker/docker-compose-es710.yml']
projectName = 'rfs-compose'

es56 {
useComposeFiles = ['docker/docker-compose-es56.yml']
}

es68 {
useComposeFiles = ['docker/docker-compose-es68.yml']
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM docker.elastic.co/elasticsearch/elasticsearch:5.6.16 AS base

# Configure Elastic
ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml
RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE
RUN echo "xpack.security.enabled: false" >> $ELASTIC_SEARCH_CONFIG_FILE
RUN echo "bootstrap.system_call_filter: false" >> $ELASTIC_SEARCH_CONFIG_FILE
ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/

# Make our snapshot directory
USER root
RUN mkdir /snapshots && chown elasticsearch /snapshots
USER elasticsearch

# We do not install the S3 Repo plugin here, because it is not compatible with modern
# IAM Roles. Specifically, it does not support the AWS_SESSION_TOKEN environment variable.
# We will instead take snapshots into a mounted local volume.

# Additionally, we will rely on the base image's default entrypoint command to start the
# Elasticsearch service.
47 changes: 47 additions & 0 deletions DocumentsFromSnapshotMigration/docker/docker-compose-es56.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
version: '3.7'
services:

elasticsearchsource:
image: 'migrations/empty_elasticsearch_source_5_6:latest'
platform: linux/amd64
networks:
- migrations
environment:
- path.repo=/snapshots
- AWS_ACCESS_KEY_ID=${access_key}
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
ports:
- '19200:9200'
volumes:
- ./snapshots:/snapshots

reindex-from-snapshot:
image: 'migrations/reindex_from_snapshot:latest'
depends_on:
elasticsearchsource:
condition: service_started
opensearchtarget:
condition: service_started
networks:
- migrations
environment:
- AWS_ACCESS_KEY_ID=${access_key}
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
volumes:
- ./snapshots:/snapshots

opensearchtarget:
image: 'opensearchproject/opensearch:2.11.1'
environment:
- discovery.type=single-node
- plugins.security.disabled=true
networks:
- migrations
ports:
- "29200:9200"

networks:
migrations:
driver: bridge
10 changes: 2 additions & 8 deletions DocumentsFromSnapshotMigration/docker/docker-compose-es68.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ services:
networks:
- migrations
environment:
- discovery.type=single-node
- path.repo=/snapshots
- AWS_ACCESS_KEY_ID=${access_key}
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
ports:
- '19200:9200'
volumes:
- snapshotStorage:/snapshots
- ./snapshots:/snapshots

# Sample command to kick off RFS here: https:/opensearch-project/opensearch-migrations/blob/main/RFS/README.md#using-docker
reindex-from-snapshot:
image: 'migrations/reindex_from_snapshot:latest'
depends_on:
Expand All @@ -31,7 +29,7 @@ services:
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
volumes:
- snapshotStorage:/snapshots
- ./snapshots:/snapshots

opensearchtarget:
image: 'opensearchproject/opensearch:2.11.1'
Expand All @@ -46,7 +44,3 @@ services:
networks:
migrations:
driver: bridge

volumes:
snapshotStorage:
driver: local
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.rfs.cms.LeaseExpireTrigger;
import com.rfs.cms.OpenSearchWorkCoordinator;
import com.rfs.cms.ScopedWorkCoordinator;
import com.rfs.common.ClusterVersion;
import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
Expand All @@ -35,15 +36,13 @@
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.SourceRepo;
import com.rfs.common.SourceResourceProvider;
import com.rfs.common.SourceResourceProviderFactory;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.http.ConnectionContext;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.tracing.RootWorkCoordinationContext;
import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10;
peternied marked this conversation as resolved.
Show resolved Hide resolved
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.ShardWorkPreparer;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -132,6 +131,11 @@ public static class Args {
description = "Optional. The maximum number of connections to simultaneously " +
"used to communicate to the target, default 10")
int maxConnections = 10;

@Parameter(names = { "--source-version" }, description = ("Optional. Version of the source cluster. Possible "
+ "values include: ES_6_8, ES_7_10, ES_7_17. Default: ES_7_10"), required = false,
converter = ClusterVersion.ArgsConverter.class)
public ClusterVersion sourceVersion = ClusterVersion.ES_7_10;
peternied marked this conversation as resolved.
Show resolved Hide resolved
}

public static class NoWorkLeftException extends Exception {
Expand Down Expand Up @@ -212,20 +216,22 @@ public static void main(String[] args) throws Exception {
} else {
sourceRepo = new FileSystemRepo(snapshotLocalDirPath);
}
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);

SourceResourceProvider sourceResourceProvider = SourceResourceProviderFactory.getProvider(arguments.sourceVersion);

SnapshotRepo.Provider repoDataProvider = sourceResourceProvider.getSnapshotRepoProvider(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = sourceResourceProvider.getIndexMetadataFactory(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = sourceResourceProvider.getShardMetadataFactory(repoDataProvider);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(
repoAccessor,
luceneDirPath,
ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES
sourceResourceProvider.getBufferSizeInBytes()
);

run(
LuceneDocumentsReader.getFactory(ElasticsearchConstants_ES_7_10.SOFT_DELETES_POSSIBLE,
ElasticsearchConstants_ES_7_10.SOFT_DELETES_FIELD),
LuceneDocumentsReader.getFactory(sourceResourceProvider.getSoftDeletesPossible(),
sourceResourceProvider.getSoftDeletesFieldData()),
reindexer,
workCoordinator,
arguments.initialLeaseDuration,
Expand Down
Loading