From c334bbde881c61e66b7b4768e2d43455f1c827d9 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Tue, 4 Apr 2023 17:03:42 -0700 Subject: [PATCH] [Segment Replication] Compatibility check for differing lucene codec versions (#6730) * compatCheck Signed-off-by: Poojita Raj * refactor Signed-off-by: Poojita Raj --------- Signed-off-by: Poojita Raj --- .../opensearch/index/shard/IndexShard.java | 20 ++++++-- .../OngoingSegmentReplications.java | 7 +++ .../SegmentReplicationTargetService.java | 2 +- .../checkpoint/ReplicationCheckpoint.java | 43 +++++++++++++---- .../gateway/PrimaryShardAllocatorTests.java | 46 +++++++++++------- .../index/seqno/ReplicationTrackerTests.java | 28 +++++++++-- .../SegmentReplicationIndexShardTests.java | 7 +-- .../OngoingSegmentReplicationsTests.java | 47 ++++++++++++++++++- .../PrimaryShardReplicationSourceTests.java | 33 +++++++++++-- .../SegmentReplicationSourceServiceTests.java | 9 +++- .../SegmentReplicationTargetServiceTests.java | 11 +++-- .../SegmentReplicationTargetTests.java | 4 +- .../PublishCheckpointActionTests.java | 5 +- .../replication/common/CopyStateTests.java | 15 +++++- .../index/shard/IndexShardTestCase.java | 7 ++- 15 files changed, 232 insertions(+), 52 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 0072d1594da0e..ab83d9b0f4bf1 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -494,6 +494,13 @@ public boolean isSystem() { return indexSettings.getIndexMetadata().isSystem(); } + /** + * Returns the name of the default codec in codecService + */ + public String getDefaultCodecName() { + return codecService.codec(CodecService.DEFAULT_CODEC).getName(); + } + /** * USE THIS METHOD WITH CARE! * Returns the primary term the index shard is supposed to be on. In case of primary promotion or when a replica learns about @@ -1484,7 +1491,7 @@ public Tuple, ReplicationCheckpoint> getLatestSegme return null; } if (getEngineOrNull() == null) { - return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)); + return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName())); } // do not close the snapshot - caller will close it. final GatedCloseable snapshot = getSegmentInfosSnapshot(); @@ -1501,13 +1508,14 @@ public Tuple, ReplicationCheckpoint> getLatestSegme // getSegmentInfosSnapshot, so computing length from SegmentInfos can cause issues. shardRouting.primary() ? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum() - : store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes() + : store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(), + getEngine().config().getCodec().getName() ) ); } catch (IOException e) { throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e); } - }).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); + }).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName()))); } /** @@ -1577,6 +1585,12 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp ); return false; } + if (localCheckpoint.getCodec().equals(requestCheckpoint.getCodec()) == false) { + logger.trace( + () -> new ParameterizedMessage("Shard does not support the received lucene codec version {}", requestCheckpoint.getCodec()) + ); + return false; + } return true; } diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 6f04c6cf6f665..3ab0a7539fb06 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -14,6 +14,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; @@ -147,6 +148,12 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener { if (segrepHandler != null) { logger.warn("Override handler for allocation id {}", request.getTargetAllocationId()); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index bf626ff93760c..1858449e13ae8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -401,7 +401,7 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha return; } startReplication( - ReplicationCheckpoint.empty(request.getShardId()), + ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()), indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 57e667b06a223..32521fb0cd944 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication.checkpoint; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -30,29 +31,32 @@ public class ReplicationCheckpoint implements Writeable, Comparable tracker.shardAllocationId.equals(id) == false) .collect(Collectors.toSet()); - final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 1, 1, 1L); - final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 2, 50L); - final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 3, 100L); + final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint( + tracker.shardId(), + 0L, + 1, + 1, + 1L, + Codec.getDefault().getName() + ); + final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( + tracker.shardId(), + 0L, + 2, + 2, + 50L, + Codec.getDefault().getName() + ); + final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint( + tracker.shardId(), + 0L, + 2, + 3, + 100L, + Codec.getDefault().getName() + ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); tracker.setLatestReplicationCheckpoint(secondCheckpoint); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 014a37249612b..c4db88782638f 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -8,6 +8,7 @@ package org.opensearch.index.shard; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.SegmentInfos; import org.junit.Assert; import org.opensearch.ExceptionsHelper; @@ -306,7 +307,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode()); assertEquals(true, primaryShard.routingEntry().primary()); - spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L), spyShard); + spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard); // Verify that checkpoint is not processed as shard routing is primary. verify(spy, times(0)).startReplication(any(), any(), any()); @@ -1020,7 +1021,7 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun private void resolveCheckpointInfoResponseListener(ActionListener listener, IndexShard primary) { try { - final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId), primary); + final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); @@ -1034,7 +1035,7 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); final SegmentReplicationTarget target = targetService.startReplication( - ReplicationCheckpoint.empty(replica.shardId), + ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()), replica, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 78767ee1dcf8c..6e27a4db6afec 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -17,6 +17,7 @@ import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.xcontent.XContentType; import org.opensearch.index.IndexService; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -47,7 +48,7 @@ public class OngoingSegmentReplicationsTests extends IndexShardTestCase { private final IndicesService mockIndicesService = mock(IndicesService.class); - private ReplicationCheckpoint testCheckpoint; + private ReplicationCheckpoint testCheckpoint, olderCodecTestCheckpoint; private DiscoveryNode primaryDiscoveryNode; private DiscoveryNode replicaDiscoveryNode; private IndexShard primary; @@ -73,8 +74,12 @@ public void setUp() throws Exception { ShardId testShardId = primary.shardId(); + CodecService codecService = new CodecService(null, null); + String defaultCodecName = codecService.codec(CodecService.DEFAULT_CODEC).getName(); + // This mirrors the creation of the ReplicationCheckpoint inside CopyState - testCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L); + testCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L, defaultCodecName); + olderCodecTestCheckpoint = new ReplicationCheckpoint(testShardId, primary.getOperationPrimaryTerm(), 0L, 0L, "Lucene94"); IndexService mockIndexService = mock(IndexService.class); when(mockIndicesService.indexServiceSafe(testShardId.getIndex())).thenReturn(mockIndexService); when(mockIndexService.getShard(testShardId.id())).thenReturn(primary); @@ -89,6 +94,44 @@ public void tearDown() throws Exception { super.tearDown(); } + public void testSuccessfulCodecCompatibilityCheck() throws Exception { + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + primary.refresh("Test"); + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + // replica checkpoint is on same/higher lucene codec than primary + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + } + + public void testFailCodecCompatibilityCheck() throws Exception { + indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); + primary.refresh("Test"); + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + // replica checkpoint is on lower/older lucene codec than primary + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + olderCodecTestCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; + try { + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + } catch (CancellableThreads.ExecutionCancelledException ex) { + Assert.assertTrue(ex.getMessage().contains("Requested unsupported codec version")); + } + } + public void testPrepareAndSendSegments() throws IOException { indexDoc(primary, "1", "{\"foo\" : \"baz\"}", XContentType.JSON, "foobar"); primary.refresh("Test"); diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java index 10b747b822819..fdd707ae88195 100644 --- a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.util.Version; import org.junit.Assert; import org.opensearch.action.ActionListener; @@ -93,7 +94,13 @@ public void tearDown() throws Exception { } public void testGetCheckpointMetadata() { - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, mock(ActionListener.class)); CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); assertEquals(1, requestList.length); @@ -104,7 +111,13 @@ public void testGetCheckpointMetadata() { } public void testGetSegmentFiles() { - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST); replicationSource.getSegmentFiles( REPLICATION_ID, @@ -126,7 +139,13 @@ public void testGetSegmentFiles() { */ public void testTransportTimeoutForGetSegmentFilesAction() { long fileSize = (long) (Math.pow(10, 9)); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", fileSize, "checksum", Version.LATEST); replicationSource.getSegmentFiles( REPLICATION_ID, @@ -145,7 +164,13 @@ public void testTransportTimeoutForGetSegmentFilesAction() { public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), PRIMARY_TERM, SEGMENTS_GEN, VERSION); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); StoreFileMetadata testMetadata = new StoreFileMetadata("testFile", 1L, "checksum", Version.LATEST); replicationSource.getSegmentFiles( REPLICATION_ID, diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java index 0d05b1ec8679e..41022b77b46e1 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import org.apache.lucene.codecs.Codec; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; @@ -55,7 +56,13 @@ public void setUp() throws Exception { when(mockIndexService.getShard(testShardId.id())).thenReturn(mockIndexShard); // This mirrors the creation of the ReplicationCheckpoint inside CopyState - testCheckpoint = new ReplicationCheckpoint(testShardId, mockIndexShard.getOperationPrimaryTerm(), 0L, 0L); + testCheckpoint = new ReplicationCheckpoint( + testShardId, + mockIndexShard.getOperationPrimaryTerm(), + 0L, + 0L, + Codec.getDefault().getName() + ); testThreadPool = new TestThreadPool("test", Settings.EMPTY); CapturingTransport transport = new CapturingTransport(); localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index bae0afb5bcc3b..357a88c27fc46 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.CancellableThreads; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; @@ -62,10 +63,12 @@ public void setUp() throws Exception { .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()) .build(); + CodecService codecService = new CodecService(null, null); + String defaultCodecName = codecService.codec(CodecService.DEFAULT_CODEC).getName(); primaryShard = newStartedShard(true, settings); replicaShard = newShard(false, settings, new NRTReplicationEngineFactory()); recoverReplica(replicaShard, primaryShard, true, getReplicationFunc(replicaShard)); - checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L); + checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, defaultCodecName); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); when(replicationSourceFactory.get(replicaShard)).thenReturn(replicationSource); @@ -76,13 +79,15 @@ public void setUp() throws Exception { initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm(), initialCheckpoint.getSegmentsGen(), - initialCheckpoint.getSegmentInfosVersion() + 1 + initialCheckpoint.getSegmentInfosVersion() + 1, + defaultCodecName ); newPrimaryCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm() + 1, initialCheckpoint.getSegmentsGen(), - initialCheckpoint.getSegmentInfosVersion() + 1 + initialCheckpoint.getSegmentInfosVersion() + 1, + defaultCodecName ); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 0c766c66413dd..a029d87f4a575 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; @@ -106,7 +107,8 @@ public void setUp() throws Exception { spyIndexShard.shardId(), spyIndexShard.getPendingPrimaryTerm(), testSegmentInfos.getGeneration(), - testSegmentInfos.version + testSegmentInfos.version, + Codec.getDefault().getName() ); } diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index 8a67292703da0..c851edf5e1bc8 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication.checkpoint; +import org.apache.lucene.codecs.Codec; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActionTestUtils; @@ -104,7 +105,7 @@ public void testPublishCheckpointActionOnPrimary() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1, Codec.getDefault().getName()); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); action.shardOperationOnPrimary(request, indexShard, ActionTestUtils.assertNoFailureListener(result -> { @@ -139,7 +140,7 @@ public void testPublishCheckpointActionOnReplica() { mockTargetService ); - final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 11, 1, Codec.getDefault().getName()); final PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index a87a8de206a39..e3b48302ae6ef 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -8,12 +8,14 @@ package org.opensearch.indices.replication.common; +import org.apache.lucene.codecs.Codec; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.index.codec.CodecService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.index.shard.ShardId; @@ -49,7 +51,10 @@ public class CopyStateTests extends IndexShardTestCase { public void testCopyStateCreation() throws IOException { final IndexShard mockIndexShard = createMockIndexShard(); - CopyState copyState = new CopyState(ReplicationCheckpoint.empty(mockIndexShard.shardId()), mockIndexShard); + CopyState copyState = new CopyState( + ReplicationCheckpoint.empty(mockIndexShard.shardId(), new CodecService(null, null).codec("default").getName()), + mockIndexShard + ); ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); // version was never set so this should be zero @@ -67,7 +72,13 @@ public static IndexShard createMockIndexShard() throws IOException { when(mockShard.store()).thenReturn(mockStore); SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); - ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint(mockShard.shardId(), mockShard.getOperationPrimaryTerm(), 0L, 0L); + ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint( + mockShard.shardId(), + mockShard.getOperationPrimaryTerm(), + 0L, + 0L, + Codec.getDefault().getName() + ); final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( new GatedCloseable<>(testSegmentInfos, () -> {}), testCheckpoint diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 7b81bd45cc8b6..80e603bd80420 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1299,7 +1299,10 @@ public void getCheckpointMetadata( ActionListener listener ) { try { - final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primaryShard.shardId), primaryShard); + final CopyState copyState = new CopyState( + ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.getDefaultCodecName()), + primaryShard + ); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); @@ -1353,7 +1356,7 @@ public final List replicateSegments(IndexShard primary for (IndexShard replica : replicaShards) { final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica); final SegmentReplicationTarget target = targetService.startReplication( - ReplicationCheckpoint.empty(replica.shardId), + ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()), replica, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override