Skip to content

Commit

Permalink
Update replicationCheckpoint to include accurate seqNo from provided …
Browse files Browse the repository at this point in the history
…SegmentInfos.

This change updates IndexShard#getlatestSegmentInfos to include the latest max seqNo from the primary's
segmentInfos snapshot.  It also updates the method to return a Tuple so that both can be fetched.
This change also updates replicas to not bump their SegmentInfos version when performing a commit.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Feb 1, 2023
1 parent b1cf2d1 commit be9b402
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https:/opensearch-project/OpenSearch/pull/4827))
- Fixed compression support for h2c protocol ([#4944](https:/opensearch-project/OpenSearch/pull/4944))
- Support OpenSSL Provider with default Netty allocator ([#5460](https:/opensearch-project/OpenSearch/pull/5460))
- Segment Replication - Fixed bug where inaccurate sequence numbers were sent during replication ([#6122](https:/opensearch-project/OpenSearch/pull/6122))

### Security

Expand Down
49 changes: 37 additions & 12 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1447,31 +1447,56 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}

/**
* Returns the latest ReplicationCheckpoint that shard received.
* Compute and return the latest ReplicationCheckpoint for a particular shard.
* @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> infosAndCheckpoint = getLatestSegmentInfosAndCheckpoint();
if (infosAndCheckpoint == null) {
return null;
}
try (final GatedCloseable<SegmentInfos> ignored = infosAndCheckpoint.v1()) {
return infosAndCheckpoint.v2();
} catch (IOException e) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e);
}
}

/**
* Compute and return the latest ReplicationCheckpoint for a shard and a GatedCloseable containing the corresponding SegmentInfos.
* The segments referenced by the SegmentInfos will remain on disk until the GatedCloseable is closed.
*
* Primary shards compute the seqNo used in the replication checkpoint from the fetched SegmentInfos.
* Replica shards compute the seqNo from its latest processed checkpoint, which only increases when refreshing on new segments.
*
* @return A {@link Tuple} containing SegmentInfos wrapped in a {@link GatedCloseable} and the {@link ReplicationCheckpoint} computed from the infos.
*
*/
public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() {
if (indexSettings.isSegRepEnabled() == false) {
return null;
}
if (getEngineOrNull() == null) {
return ReplicationCheckpoint.empty(shardId);
return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId));
}
try (final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot()) {
return Optional.ofNullable(snapshot.get())
.map(
segmentInfos -> new ReplicationCheckpoint(
// do not close the snapshot - caller will close it.
final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot();
return Optional.ofNullable(snapshot.get()).map(segmentInfos -> {
try {
return new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
getProcessedLocalCheckpoint(),
shardRouting.primary() ? getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos) : getProcessedLocalCheckpoint(),
segmentInfos.getVersion()
)
)
.orElse(ReplicationCheckpoint.empty(shardId));
} catch (IOException ex) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex);
}
);
} catch (IOException e) {
throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e);
}
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l
final Map<String, String> userData = new HashMap<>(latestSegmentInfos.getUserData());
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(processedCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
latestSegmentInfos.setUserData(userData, true);
latestSegmentInfos.setUserData(userData, false);
latestSegmentInfos.commit(directory());
directory.sync(latestSegmentInfos.files(true));
directory.syncMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -44,16 +45,12 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar
super("CopyState-" + shard.shardId());
this.requestedReplicationCheckpoint = requestedReplicationCheckpoint;
this.shard = shard;
this.segmentInfosRef = shard.getSegmentInfosSnapshot();
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = shard
.getLatestSegmentInfosAndCheckpoint();
this.segmentInfosRef = latestSegmentInfosAndCheckpoint.v1();
this.replicationCheckpoint = latestSegmentInfosAndCheckpoint.v2();
SegmentInfos segmentInfos = this.segmentInfosRef.get();
this.metadataMap = shard.store().getSegmentMetadataMap(segmentInfos);
this.replicationCheckpoint = new ReplicationCheckpoint(
shard.shardId(),
shard.getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
shard.getProcessedLocalCheckpoint(),
segmentInfos.getVersion()
);
this.commitRef = shard.acquireLastIndexCommit(false);

ByteBuffersDataOutput buffer = new ByteBuffersDataOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingHelper;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -104,6 +105,60 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException {
closeShards(indexShard);
}

public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

// assert before any indexing:
// replica:
Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> replicaTuple = replica.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> gatedCloseable = replicaTuple.v1()) {
assertReplicationCheckpoint(replica, gatedCloseable.get(), replicaTuple.v2());
}

// primary:
Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> primaryTuple = primary.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> gatedCloseable = primaryTuple.v1()) {
assertReplicationCheckpoint(primary, gatedCloseable.get(), primaryTuple.v2());
}
// We use compareTo here instead of equals because we ignore segments gen with replicas performing their own commits.
// However infos version we expect to be equal.
assertEquals(1, primary.getLatestReplicationCheckpoint().compareTo(replica.getLatestReplicationCheckpoint()));

// index and copy segments to replica.
int numDocs = randomIntBetween(10, 100);
shards.indexDocs(numDocs);
primary.refresh("test");
replicateSegments(primary, List.of(replica));

replicaTuple = replica.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> gatedCloseable = replicaTuple.v1()) {
assertReplicationCheckpoint(replica, gatedCloseable.get(), replicaTuple.v2());
}

primaryTuple = primary.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> gatedCloseable = primaryTuple.v1()) {
assertReplicationCheckpoint(primary, gatedCloseable.get(), primaryTuple.v2());
}

replicaTuple = replica.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> gatedCloseable = replicaTuple.v1()) {
assertReplicationCheckpoint(replica, gatedCloseable.get(), replicaTuple.v2());
}
assertEquals(1, primary.getLatestReplicationCheckpoint().compareTo(replica.getLatestReplicationCheckpoint()));
}
}

private void assertReplicationCheckpoint(IndexShard shard, SegmentInfos segmentInfos, ReplicationCheckpoint checkpoint)
throws IOException {
assertNotNull(segmentInfos);
assertEquals(checkpoint.getSeqNo(), shard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos));
assertEquals(checkpoint.getSegmentInfosVersion(), segmentInfos.getVersion());
assertEquals(checkpoint.getSegmentsGen(), segmentInfos.getGeneration());
}

public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException {
final IndexShard indexShard = newShard(false, settings, new InternalEngineFactory());
assertFalse(indexShard.isSegmentReplicationAllowed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.util.Version;
import org.opensearch.common.collect.Map;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
Expand Down Expand Up @@ -48,14 +49,7 @@ public class CopyStateTests extends IndexShardTestCase {

public void testCopyStateCreation() throws IOException {
final IndexShard mockIndexShard = createMockIndexShard();
ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint(
mockIndexShard.shardId(),
mockIndexShard.getOperationPrimaryTerm(),
0L,
mockIndexShard.getProcessedLocalCheckpoint(),
0L
);
CopyState copyState = new CopyState(testCheckpoint, mockIndexShard);
CopyState copyState = new CopyState(ReplicationCheckpoint.empty(mockIndexShard.shardId()), mockIndexShard);
ReplicationCheckpoint checkpoint = copyState.getCheckpoint();
assertEquals(TEST_SHARD_ID, checkpoint.getShardId());
// version was never set so this should be zero
Expand All @@ -73,7 +67,18 @@ public static IndexShard createMockIndexShard() throws IOException {
when(mockShard.store()).thenReturn(mockStore);

SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major);
when(mockShard.getSegmentInfosSnapshot()).thenReturn(new GatedCloseable<>(testSegmentInfos, () -> {}));
ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint(
mockShard.shardId(),
mockShard.getOperationPrimaryTerm(),
0L,
mockShard.getProcessedLocalCheckpoint(),
0L
);
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>(
new GatedCloseable<>(testSegmentInfos, () -> {}),
testCheckpoint
);
when(mockShard.getLatestSegmentInfosAndCheckpoint()).thenReturn(gatedCloseableReplicationCheckpointTuple);
when(mockStore.getSegmentMetadataMap(testSegmentInfos)).thenReturn(SI_SNAPSHOT.asMap());

IndexCommit mockIndexCommit = mock(IndexCommit.class);
Expand Down

0 comments on commit be9b402

Please sign in to comment.