Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Elasticsearch 6.8 support to Document Migrations #921

Merged
merged 12 commits into from
Aug 29, 2024
Merged
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ TrafficCapture/**/out/
opensearch-cluster-cdk/
test/opensearch-cluster-cdk/
TrafficCapture/dockerSolution/src/main/docker/migrationConsole/staging

# Directory used to temporarily store snapshots during testing
DocumentsFromSnapshotMigration/docker/snapshots/
8 changes: 8 additions & 0 deletions DocumentsFromSnapshotMigration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ DockerServiceProps[] dockerServices = [
dockerImageName:"reindex_from_snapshot",
inputDir:"./docker",
taskDependencies:["copyDockerRuntimeJars"]]),
new DockerServiceProps([projectName:"emptyElasticsearchSource_5_6",
dockerImageName:"empty_elasticsearch_source_5_6",
inputDir:"./docker/TestSource_ES_5_6"]),
new DockerServiceProps([projectName:"emptyElasticsearchSource_6_8",
dockerImageName:"empty_elasticsearch_source_6_8",
inputDir:"./docker/TestSource_ES_6_8"]),
Expand All @@ -112,6 +115,7 @@ for (dockerService in dockerServices) {
dependsOn dep
}
inputDir = project.file(dockerService.inputDir)
// platform.set("linux/amd64")
buildArgs = dockerService.buildArgs
images.add("migrations/${dockerService.dockerImageName}:${hash}")
images.add("migrations/${dockerService.dockerImageName}:${version}")
Expand All @@ -123,6 +127,10 @@ dockerCompose {
useComposeFiles = ['docker/docker-compose-es710.yml']
projectName = 'rfs-compose'

es56 {
useComposeFiles = ['docker/docker-compose-es56.yml']
}

es68 {
useComposeFiles = ['docker/docker-compose-es68.yml']
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM docker.elastic.co/elasticsearch/elasticsearch:5.6.16 AS base

# Configure Elastic
ENV ELASTIC_SEARCH_CONFIG_FILE=/usr/share/elasticsearch/config/elasticsearch.yml
RUN echo "discovery.type: single-node" >> $ELASTIC_SEARCH_CONFIG_FILE
RUN echo "xpack.security.enabled: false" >> $ELASTIC_SEARCH_CONFIG_FILE
RUN echo "bootstrap.system_call_filter: false" >> $ELASTIC_SEARCH_CONFIG_FILE
ENV PATH=${PATH}:/usr/share/elasticsearch/jdk/bin/

# Make our snapshot directory
USER root
RUN mkdir /snapshots && chown elasticsearch /snapshots
USER elasticsearch

# We do not install the S3 Repo plugin here, because it is not compatible with modern
# IAM Roles. Specifically, it does not support the AWS_SESSION_TOKEN environment variable.
# We will instead take snapshots into a mounted local volume.

# Additionally, we will rely on the base image's default entrypoint command to start the
# Elasticsearch service.
47 changes: 47 additions & 0 deletions DocumentsFromSnapshotMigration/docker/docker-compose-es56.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
version: '3.7'
services:

elasticsearchsource:
image: 'migrations/empty_elasticsearch_source_5_6:latest'
platform: linux/amd64
networks:
- migrations
environment:
- path.repo=/snapshots
- AWS_ACCESS_KEY_ID=${access_key}
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
ports:
- '19200:9200'
volumes:
- ./snapshots:/snapshots

reindex-from-snapshot:
image: 'migrations/reindex_from_snapshot:latest'
depends_on:
elasticsearchsource:
condition: service_started
opensearchtarget:
condition: service_started
networks:
- migrations
environment:
- AWS_ACCESS_KEY_ID=${access_key}
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
volumes:
- ./snapshots:/snapshots

opensearchtarget:
image: 'opensearchproject/opensearch:2.11.1'
environment:
- discovery.type=single-node
- plugins.security.disabled=true
networks:
- migrations
ports:
- "29200:9200"

networks:
migrations:
driver: bridge
10 changes: 2 additions & 8 deletions DocumentsFromSnapshotMigration/docker/docker-compose-es68.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ services:
networks:
- migrations
environment:
- discovery.type=single-node
- path.repo=/snapshots
- AWS_ACCESS_KEY_ID=${access_key}
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
ports:
- '19200:9200'
volumes:
- snapshotStorage:/snapshots
- ./snapshots:/snapshots

# Sample command to kick off RFS here: https:/opensearch-project/opensearch-migrations/blob/main/RFS/README.md#using-docker
reindex-from-snapshot:
image: 'migrations/reindex_from_snapshot:latest'
depends_on:
Expand All @@ -31,7 +29,7 @@ services:
- AWS_SECRET_ACCESS_KEY=${secret_key}
- AWS_SESSION_TOKEN=${session_token}
volumes:
- snapshotStorage:/snapshots
- ./snapshots:/snapshots

opensearchtarget:
image: 'opensearchproject/opensearch:2.11.1'
Expand All @@ -46,7 +44,3 @@ services:
networks:
migrations:
driver: bridge

volumes:
snapshotStorage:
driver: local
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.rfs.cms.LeaseExpireTrigger;
import com.rfs.cms.OpenSearchWorkCoordinator;
import com.rfs.cms.ScopedWorkCoordinator;
import com.rfs.common.ClusterVersion;
import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
Expand All @@ -35,15 +36,13 @@
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.SourceRepo;
import com.rfs.common.SourceResourceProvider;
import com.rfs.common.SourceResourceProviderFactory;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.http.ConnectionContext;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.tracing.RootWorkCoordinationContext;
import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10;
peternied marked this conversation as resolved.
Show resolved Hide resolved
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.ShardWorkPreparer;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -132,6 +131,11 @@ public static class Args {
description = "Optional. The maximum number of connections to simultaneously " +
"used to communicate to the target, default 10")
int maxConnections = 10;

@Parameter(names = { "--source-version" }, description = ("Optional. Version of the source cluster. Possible "
+ "values include: ES_6_8, ES_7_10, ES_7_17. Default: ES_7_10"), required = false,
converter = ClusterVersion.ArgsConverter.class)
public ClusterVersion sourceVersion = ClusterVersion.ES_7_10;
peternied marked this conversation as resolved.
Show resolved Hide resolved
}

public static class NoWorkLeftException extends Exception {
Expand Down Expand Up @@ -212,20 +216,22 @@ public static void main(String[] args) throws Exception {
} else {
sourceRepo = new FileSystemRepo(snapshotLocalDirPath);
}
SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);

IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);

SourceResourceProvider sourceResourceProvider = SourceResourceProviderFactory.getProvider(arguments.sourceVersion);

SnapshotRepo.Provider repoDataProvider = sourceResourceProvider.getSnapshotRepoProvider(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = sourceResourceProvider.getIndexMetadataFactory(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = sourceResourceProvider.getShardMetadataFactory(repoDataProvider);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(
repoAccessor,
luceneDirPath,
ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES
sourceResourceProvider.getBufferSizeInBytes()
);

run(
LuceneDocumentsReader.getFactory(ElasticsearchConstants_ES_7_10.SOFT_DELETES_POSSIBLE,
ElasticsearchConstants_ES_7_10.SOFT_DELETES_FIELD),
LuceneDocumentsReader.getFactory(sourceResourceProvider.getSoftDeletesPossible(),
sourceResourceProvider.getSoftDeletesFieldData()),
reindexer,
workCoordinator,
arguments.initialLeaseDuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.rfs.cms.CoordinateWorkHttpClient;
import com.rfs.cms.LeaseExpireTrigger;
import com.rfs.cms.OpenSearchWorkCoordinator;
import com.rfs.common.ClusterVersion;
import com.rfs.common.DefaultSourceRepoAccessor;
import com.rfs.common.DocumentReindexer;
import com.rfs.common.FileSystemRepo;
Expand All @@ -49,16 +50,14 @@
import com.rfs.common.SnapshotRepo;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.SourceRepo;
import com.rfs.common.SourceResourceProvider;
import com.rfs.common.SourceResourceProviderFactory;
import com.rfs.common.http.ConnectionContextTestParams;
import com.rfs.framework.PreloadedSearchClusterContainer;
import com.rfs.framework.SearchClusterContainer;
import com.rfs.http.SearchClusterRequests;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.version_es_7_10.ElasticsearchConstants_ES_7_10;
import com.rfs.version_es_7_10.IndexMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.ShardMetadataFactory_ES_7_10;
import com.rfs.version_es_7_10.SnapshotRepoProvider_ES_7_10;
import com.rfs.worker.DocumentsRunner;
import lombok.AllArgsConstructor;
import lombok.Lombok;
Expand Down Expand Up @@ -176,7 +175,8 @@ public void testDocumentMigration(
osTargetContainer.getHttpHostAddress(),
runCounter,
clockJitter,
testDocMigrationContext
testDocMigrationContext,
baseSourceImageVersion.getParserVersion()
),
executorService
)
Expand Down Expand Up @@ -304,7 +304,8 @@ private int migrateDocumentsSequentially(
String targetAddress,
AtomicInteger runCounter,
Random clockJitter,
DocumentMigrationTestContext testContext
DocumentMigrationTestContext testContext,
ClusterVersion parserVersion
peternied marked this conversation as resolved.
Show resolved Hide resolved
) {
for (int runNumber = 1;; ++runNumber) {
try {
Expand All @@ -314,7 +315,8 @@ private int migrateDocumentsSequentially(
indexAllowlist,
targetAddress,
clockJitter,
testContext
testContext,
parserVersion
);
if (workResult == DocumentsRunner.CompletionStatus.NOTHING_DONE) {
return runNumber;
Expand Down Expand Up @@ -362,7 +364,8 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
List<String> indexAllowlist,
String targetAddress,
Random clockJitter,
DocumentMigrationTestContext context
DocumentMigrationTestContext context,
ClusterVersion parserVersion
) throws RfsMigrateDocuments.NoWorkLeftException {
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");
var shouldThrow = new AtomicBoolean();
Expand All @@ -377,23 +380,25 @@ private DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
return d;
};

SourceResourceProvider sourceResourceProvider = SourceResourceProviderFactory.getProvider(parserVersion);

DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(
repoAccessor,
tempDir,
ElasticsearchConstants_ES_7_10.BUFFER_SIZE_IN_BYTES
sourceResourceProvider.getBufferSizeInBytes()
);

SnapshotRepo.Provider repoDataProvider = new SnapshotRepoProvider_ES_7_10(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = new IndexMetadataFactory_ES_7_10(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = new ShardMetadataFactory_ES_7_10(repoDataProvider);
SnapshotRepo.Provider repoDataProvider = sourceResourceProvider.getSnapshotRepoProvider(sourceRepo);
IndexMetadata.Factory indexMetadataFactory = sourceResourceProvider.getIndexMetadataFactory(repoDataProvider);
ShardMetadata.Factory shardMetadataFactory = sourceResourceProvider.getShardMetadataFactory(repoDataProvider);
final int ms_window = 1000;
final var nextClockShift = (int) (clockJitter.nextDouble() * ms_window) - (ms_window / 2);
log.info("nextClockShift=" + nextClockShift);


Function<Path, LuceneDocumentsReader> readerFactory = path -> new FilteredLuceneDocumentsReader(path, ElasticsearchConstants_ES_7_10.SOFT_DELETES_POSSIBLE,
ElasticsearchConstants_ES_7_10.SOFT_DELETES_FIELD, terminatingDocumentFilter);
Function<Path, LuceneDocumentsReader> readerFactory = path -> new FilteredLuceneDocumentsReader(path, sourceResourceProvider.getSoftDeletesPossible(),
peternied marked this conversation as resolved.
Show resolved Hide resolved
sourceResourceProvider.getSoftDeletesFieldData(), terminatingDocumentFilter);

return RfsMigrateDocuments.run(
readerFactory,
Expand Down
16 changes: 14 additions & 2 deletions RFS/src/main/java/com/rfs/common/ClusterVersion.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.rfs.common;

import java.util.List;

import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.ParameterException;

Expand All @@ -9,20 +11,30 @@
public enum ClusterVersion {
ES_6_8,
ES_7_10,
ES_7_17,
OS_1_3,
OS_2_11;

public static final List<ClusterVersion> SOURCE_VERSIONS = List.of(ES_6_8, ES_7_10, ES_7_17);
public static final List<ClusterVersion> TARGET_VERSIONS = List.of(OS_2_11);

public static class ArgsConverter implements IStringConverter<ClusterVersion> {
@Override
public ClusterVersion convert(String value) {
switch (value) {
String lowerCasedValue = value.toLowerCase();

Check warning on line 24 in RFS/src/main/java/com/rfs/common/ClusterVersion.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/ClusterVersion.java#L24

Added line #L24 was not covered by tests
switch (lowerCasedValue) {
case "es_6_8":
return ClusterVersion.ES_6_8;
case "es_7_10":
return ClusterVersion.ES_7_10;
case "es_7_17":
return ClusterVersion.ES_7_17;

Check warning on line 31 in RFS/src/main/java/com/rfs/common/ClusterVersion.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/ClusterVersion.java#L31

Added line #L31 was not covered by tests
case "os_1_3":
return ClusterVersion.OS_1_3;

Check warning on line 33 in RFS/src/main/java/com/rfs/common/ClusterVersion.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/ClusterVersion.java#L33

Added line #L33 was not covered by tests
case "os_2_11":
return ClusterVersion.OS_2_11;
default:
throw new ParameterException("Invalid source version: " + value);
throw new ParameterException("Invalid cluster version: " + value);

Check warning on line 37 in RFS/src/main/java/com/rfs/common/ClusterVersion.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/ClusterVersion.java#L37

Added line #L37 was not covered by tests
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions RFS/src/main/java/com/rfs/common/SourceResourceProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.rfs.common;

import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;

public interface SourceResourceProvider {
peternied marked this conversation as resolved.
Show resolved Hide resolved
SnapshotRepo.Provider getSnapshotRepoProvider(SourceRepo sourceRepo);
IndexMetadata.Factory getIndexMetadataFactory(SnapshotRepo.Provider repoDataProvider);
ShardMetadata.Factory getShardMetadataFactory(SnapshotRepo.Provider repoDataProvider);

int getBufferSizeInBytes();
boolean getSoftDeletesPossible();
String getSoftDeletesFieldData();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.rfs.common;

import com.rfs.version_es_6_8.SourceResourceProvider_ES_6_8;
import com.rfs.version_es_7_10.SourceResourceProvider_ES_7_10;

public class SourceResourceProviderFactory {

Check warning on line 6 in RFS/src/main/java/com/rfs/common/SourceResourceProviderFactory.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SourceResourceProviderFactory.java#L6

Added line #L6 was not covered by tests
public static SourceResourceProvider getProvider(ClusterVersion version) {
switch (version) {
case ES_6_8:
return new SourceResourceProvider_ES_6_8();
case ES_7_10:
return new SourceResourceProvider_ES_7_10();
case ES_7_17:
// We don't currently distinguish between 7.10 and 7.17
return new SourceResourceProvider_ES_7_10();

Check warning on line 15 in RFS/src/main/java/com/rfs/common/SourceResourceProviderFactory.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SourceResourceProviderFactory.java#L15

Added line #L15 was not covered by tests
default:
throw new IllegalArgumentException("Invalid version: " + version);

Check warning on line 17 in RFS/src/main/java/com/rfs/common/SourceResourceProviderFactory.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/common/SourceResourceProviderFactory.java#L17

Added line #L17 was not covered by tests
}
}

}
Loading