Skip to content

Commit

Permalink
[Segment Replication] Compatibility check for differing lucene codec …
Browse files Browse the repository at this point in the history
…versions (#6730)

* compatCheck

Signed-off-by: Poojita Raj <[email protected]>

* refactor

Signed-off-by: Poojita Raj <[email protected]>

---------

Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj authored Apr 5, 2023
1 parent 65443ad commit c334bbd
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 52 deletions.
20 changes: 17 additions & 3 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,13 @@ public boolean isSystem() {
return indexSettings.getIndexMetadata().isSystem();
}

/**
* Returns the name of the default codec in codecService
*/
public String getDefaultCodecName() {
return codecService.codec(CodecService.DEFAULT_CODEC).getName();
}

/**
* USE THIS METHOD WITH CARE!
* Returns the primary term the index shard is supposed to be on. In case of primary promotion or when a replica learns about
Expand Down Expand Up @@ -1484,7 +1491,7 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
return null;
}
if (getEngineOrNull() == null) {
return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId));
return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName()));
}
// do not close the snapshot - caller will close it.
final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot();
Expand All @@ -1501,13 +1508,14 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
// getSegmentInfosSnapshot, so computing length from SegmentInfos can cause issues.
shardRouting.primary()
? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum()
: store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes()
: store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(),
getEngine().config().getCodec().getName()
)
);
} catch (IOException e) {
throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e);
}
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName())));
}

/**
Expand Down Expand Up @@ -1577,6 +1585,12 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
);
return false;
}
if (localCheckpoint.getCodec().equals(requestCheckpoint.getCodec()) == false) {
logger.trace(
() -> new ParameterizedMessage("Shard does not support the received lucene codec version {}", requestCheckpoint.getCodec())
);
return false;
}
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -147,6 +148,12 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
*/
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
if (copyState.getCheckpoint().getCodec().equals(request.getCheckpoint().getCodec()) == false) {
logger.trace("Requested unsupported codec version {}", request.getCheckpoint().getCodec());
throw new CancellableThreads.ExecutionCancelledException(
new ParameterizedMessage("Requested unsupported codec version {}", request.getCheckpoint().getCodec()).toString()
);
}
allocationIdToHandlers.compute(request.getTargetAllocationId(), (allocationId, segrepHandler) -> {
if (segrepHandler != null) {
logger.warn("Override handler for allocation id {}", request.getTargetAllocationId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public void messageReceived(final ForceSyncRequest request, TransportChannel cha
return;
}
startReplication(
ReplicationCheckpoint.empty(request.getShardId()),
ReplicationCheckpoint.empty(request.getShardId(), indexShard.getDefaultCodecName()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.indices.replication.checkpoint;

import org.opensearch.Version;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
Expand All @@ -30,37 +31,46 @@ public class ReplicationCheckpoint implements Writeable, Comparable<ReplicationC
private final long segmentsGen;
private final long segmentInfosVersion;
private final long length;
private final String codec;

public static ReplicationCheckpoint empty(ShardId shardId) {
return new ReplicationCheckpoint(shardId);
public static ReplicationCheckpoint empty(ShardId shardId, String codec) {
return new ReplicationCheckpoint(shardId, codec);
}

private ReplicationCheckpoint(ShardId shardId) {
private ReplicationCheckpoint(ShardId shardId, String codec) {
this.shardId = shardId;
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
segmentsGen = SequenceNumbers.NO_OPS_PERFORMED;
segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED;
length = 0L;
this.codec = codec;
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion) {
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L);
public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) {
this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec);
}

public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, long length) {
public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, long length, String codec) {
this.shardId = shardId;
this.primaryTerm = primaryTerm;
this.segmentsGen = segmentsGen;
this.segmentInfosVersion = segmentInfosVersion;
this.length = length;
this.codec = codec;
}

public ReplicationCheckpoint(StreamInput in) throws IOException {
shardId = new ShardId(in);
primaryTerm = in.readLong();
segmentsGen = in.readLong();
segmentInfosVersion = in.readLong();
length = in.readLong();
if (in.getVersion().onOrAfter(Version.V_2_7_0)) {
length = in.readLong();
codec = in.readString();
} else {
length = 0L;
codec = null;
}
}

/**
Expand Down Expand Up @@ -102,13 +112,25 @@ public long getLength() {
return length;
}

/**
* Latest supported codec version
*
* @return the codec name
*/
public String getCodec() {
return codec;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
out.writeLong(primaryTerm);
out.writeLong(segmentsGen);
out.writeLong(segmentInfosVersion);
out.writeLong(length);
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeLong(length);
out.writeString(codec);
}
}

@Override
Expand All @@ -124,7 +146,8 @@ public boolean equals(Object o) {
return primaryTerm == that.primaryTerm
&& segmentsGen == that.segmentsGen
&& segmentInfosVersion == that.segmentInfosVersion
&& Objects.equals(shardId, that.shardId);
&& Objects.equals(shardId, that.shardId)
&& codec.equals(that.codec);
}

@Override
Expand Down Expand Up @@ -155,6 +178,8 @@ public String toString() {
+ segmentInfosVersion
+ ", size="
+ length
+ ", codec="
+ codec
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.gateway;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.CorruptIndexException;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
Expand Down Expand Up @@ -61,6 +62,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.set.Sets;
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.IndexId;
Expand Down Expand Up @@ -220,9 +222,9 @@ public void testPreferReplicaWithHighestPrimaryTerm() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 120, 2, Codec.getDefault().getName()));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -253,9 +255,9 @@ public void testPreferReplicaWithNullReplicationCheckpoint() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false);
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -319,9 +321,9 @@ public void testPreferReplicaWithHighestSegmentInfoVersion() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 3));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 3, Codec.getDefault().getName()));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -351,9 +353,9 @@ public void testOutOfSyncHighestRepCheckpointIsIgnored() {
allocId1,
allocId3
);
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2));
testAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName()));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -384,9 +386,9 @@ public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() {
allocId2,
allocId3
);
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 101, 1));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2));
testAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 101, 1, Codec.getDefault().getName()));
testAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName()));
testAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2, Codec.getDefault().getName()));
allocateAllUnassigned(allocation);
assertThat(allocation.routingNodesChanged(), equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
Expand Down Expand Up @@ -809,11 +811,23 @@ public TestAllocator addData(
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary) {
return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId), null);
return addData(
node,
allocationId,
primary,
ReplicationCheckpoint.empty(shardId, new CodecService(null, null).codec("default").getName()),
null
);
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) {
return addData(node, allocationId, primary, ReplicationCheckpoint.empty(shardId), storeException);
return addData(
node,
allocationId,
primary,
ReplicationCheckpoint.empty(shardId, new CodecService(null, null).codec("default").getName()),
storeException
);
}

public TestAllocator addData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.index.seqno;

import org.apache.lucene.codecs.Codec;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.cluster.routing.AllocationId;
Expand Down Expand Up @@ -1800,9 +1801,30 @@ public void testSegmentReplicationCheckpointTracking() {
.filter(id -> tracker.shardAllocationId.equals(id) == false)
.collect(Collectors.toSet());

final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 1, 1, 1L);
final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 2, 50L);
final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(tracker.shardId(), 0L, 2, 3, 100L);
final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
1,
1,
1L,
Codec.getDefault().getName()
);
final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
2,
50L,
Codec.getDefault().getName()
);
final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint(
tracker.shardId(),
0L,
2,
3,
100L,
Codec.getDefault().getName()
);

tracker.setLatestReplicationCheckpoint(initialCheckpoint);
tracker.setLatestReplicationCheckpoint(secondCheckpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.shard;

import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.SegmentInfos;
import org.junit.Assert;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -306,7 +307,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException {
assertEquals(false, primaryShard.getReplicationTracker().isPrimaryMode());
assertEquals(true, primaryShard.routingEntry().primary());

spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L), spyShard);
spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard);

// Verify that checkpoint is not processed as shard routing is primary.
verify(spy, times(0)).startReplication(any(), any(), any());
Expand Down Expand Up @@ -1020,7 +1021,7 @@ private void assertDocCounts(IndexShard indexShard, int expectedPersistedDocCoun

private void resolveCheckpointInfoResponseListener(ActionListener<CheckpointInfoResponse> listener, IndexShard primary) {
try {
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId), primary);
final CopyState copyState = new CopyState(ReplicationCheckpoint.empty(primary.shardId, primary.getDefaultCodecName()), primary);
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
Expand All @@ -1034,7 +1035,7 @@ private void startReplicationAndAssertCancellation(IndexShard replica, SegmentRe
throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
final SegmentReplicationTarget target = targetService.startReplication(
ReplicationCheckpoint.empty(replica.shardId),
ReplicationCheckpoint.empty(replica.shardId, replica.getDefaultCodecName()),
replica,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
Expand Down
Loading

0 comments on commit c334bbd

Please sign in to comment.