diff --git a/CHANGELOG.md b/CHANGELOG.md index d8d68c51ceed5..63e4486012a67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -188,6 +188,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix condition to remove index create block ([#9437](https://github.com/opensearch-project/OpenSearch/pull/9437)) - Add support to clear archived index setting ([#9019](https://github.com/opensearch-project/OpenSearch/pull/9019)) - [Segment Replication] Fixed bug where replica shard temporarily serves stale data during an engine reset ([#9495](https://github.com/opensearch-project/OpenSearch/pull/9495)) +- [Segment Replication] Fixed bug where bytes behind metric is not accurate ([#9686](https://github.com/opensearch-project/OpenSearch/pull/9686)) ### Security diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index 686fc78dcec8a..75083a929b491 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -114,6 +114,7 @@ private void printClusterRouting() throws IOException, ParseException { * This test verifies that segment replication does not break when primary shards are on lower OS version. It does this * by verifying replica shards contains same number of documents as primary's. */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9685") public void testIndexingWithPrimaryOnBwcNodes() throws Exception { if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION); diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 48bfce1013f17..4b6d72b86ff62 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -57,6 +57,8 @@ import org.opensearch.index.shard.AbstractIndexShardComponent; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ReplicationGroup; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.SegmentReplicationLagTimer; @@ -1290,27 +1292,25 @@ public synchronized Set getSegmentReplicationStats && entry.getValue().inSync && replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false ) - .map(entry -> buildShardStats(latestReplicationCheckpoint.getLength(), entry.getKey(), entry.getValue())) + .map(entry -> buildShardStats(entry.getKey(), entry.getValue())) .collect(Collectors.toUnmodifiableSet()); } return Collections.emptySet(); } - private SegmentReplicationShardStats buildShardStats( - final long latestCheckpointLength, - final String allocationId, - final CheckpointState checkpointState - ) { - final Map checkpointTimers = checkpointState.checkpointTimers; + private SegmentReplicationShardStats buildShardStats(final String allocationId, final CheckpointState cps) { + final Store.RecoveryDiff diff = Store.segmentReplicationDiff( + latestReplicationCheckpoint.getMetadataMap(), + cps.visibleReplicationCheckpoint != null ? cps.visibleReplicationCheckpoint.getMetadataMap() : Collections.emptyMap() + ); + final long bytesBehind = diff.missing.stream().mapToLong(StoreFileMetadata::length).sum(); return new SegmentReplicationShardStats( allocationId, - checkpointTimers.size(), - checkpointState.visibleReplicationCheckpoint == null - ? latestCheckpointLength - : Math.max(latestCheckpointLength - checkpointState.visibleReplicationCheckpoint.getLength(), 0), - checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0), - checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0), - checkpointState.lastCompletedReplicationLag + cps.checkpointTimers.size(), + bytesBehind, + cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::time).max().orElse(0), + cps.checkpointTimers.values().stream().mapToLong(SegmentReplicationLagTimer::totalElapsedTime).max().orElse(0), + cps.lastCompletedReplicationLag ); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index e398cab23a085..352876e54547e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1610,6 +1610,7 @@ public Tuple, ReplicationCheckpoint> getLatestSegme snapshot = getSegmentInfosSnapshot(); if (snapshot.get() != null) { SegmentInfos segmentInfos = snapshot.get(); + final Map metadataMap = store.getSegmentMetadataMap(segmentInfos); return new Tuple<>( snapshot, new ReplicationCheckpoint( @@ -1617,8 +1618,9 @@ public Tuple, ReplicationCheckpoint> getLatestSegme getOperationPrimaryTerm(), segmentInfos.getGeneration(), segmentInfos.getVersion(), - store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum(), - getEngine().config().getCodec().getName() + metadataMap.values().stream().mapToLong(StoreFileMetadata::length).sum(), + getEngine().config().getCodec().getName(), + metadataMap ) ); } diff --git a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java index 207620573886d..1cec20ec3f6cc 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java +++ b/server/src/main/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadata.java @@ -10,13 +10,16 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.Version; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import java.io.IOException; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -110,11 +113,13 @@ public void write(IndexOutput out) throws IOException { public static RemoteSegmentMetadata read(IndexInput indexInput) throws IOException { Map metadata = indexInput.readMapOfStrings(); - ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput); + final Map uploadedSegmentMetadataMap = RemoteSegmentMetadata + .fromMapOfStrings(metadata); + ReplicationCheckpoint replicationCheckpoint = readCheckpointFromIndexInput(indexInput, uploadedSegmentMetadataMap); int byteArraySize = (int) indexInput.readLong(); byte[] segmentInfosBytes = new byte[byteArraySize]; indexInput.readBytes(segmentInfosBytes, 0, byteArraySize); - return new RemoteSegmentMetadata(RemoteSegmentMetadata.fromMapOfStrings(metadata), segmentInfosBytes, replicationCheckpoint); + return new RemoteSegmentMetadata(uploadedSegmentMetadataMap, segmentInfosBytes, replicationCheckpoint); } public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicationCheckpoint, IndexOutput out) throws IOException { @@ -131,14 +136,30 @@ public static void writeCheckpointToIndexOutput(ReplicationCheckpoint replicatio out.writeString(replicationCheckpoint.getCodec()); } - private static ReplicationCheckpoint readCheckpointFromIndexInput(IndexInput in) throws IOException { + private static ReplicationCheckpoint readCheckpointFromIndexInput( + IndexInput in, + Map uploadedSegmentMetadataMap + ) throws IOException { return new ReplicationCheckpoint( new ShardId(new Index(in.readString(), in.readString()), in.readVInt()), in.readLong(), in.readLong(), in.readLong(), in.readLong(), - in.readString() + in.readString(), + toStoreFileMetadata(uploadedSegmentMetadataMap) ); } + + private static Map toStoreFileMetadata( + Map metadata + ) { + return metadata.entrySet() + .stream() + // TODO: Version here should be read from UploadedSegmentMetadata. + .map( + entry -> new StoreFileMetadata(entry.getKey(), entry.getValue().getLength(), entry.getValue().getChecksum(), Version.LATEST) + ) + .collect(Collectors.toMap(StoreFileMetadata::name, Function.identity())); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 70c3e71ba18b9..521522803c726 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -15,8 +15,11 @@ import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.store.StoreFileMetadata; import java.io.IOException; +import java.util.Collections; +import java.util.Map; import java.util.Objects; /** @@ -32,6 +35,7 @@ public class ReplicationCheckpoint implements Writeable, Comparable metadataMap; public static ReplicationCheckpoint empty(ShardId shardId) { return empty(shardId, ""); @@ -48,19 +52,29 @@ private ReplicationCheckpoint(ShardId shardId, String codec) { segmentInfosVersion = SequenceNumbers.NO_OPS_PERFORMED; length = 0L; this.codec = codec; + this.metadataMap = Collections.emptyMap(); } public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, String codec) { - this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec); - } - - public ReplicationCheckpoint(ShardId shardId, long primaryTerm, long segmentsGen, long segmentInfosVersion, long length, String codec) { + this(shardId, primaryTerm, segmentsGen, segmentInfosVersion, 0L, codec, Collections.emptyMap()); + } + + public ReplicationCheckpoint( + ShardId shardId, + long primaryTerm, + long segmentsGen, + long segmentInfosVersion, + long length, + String codec, + Map metadataMap + ) { this.shardId = shardId; this.primaryTerm = primaryTerm; this.segmentsGen = segmentsGen; this.segmentInfosVersion = segmentInfosVersion; this.length = length; this.codec = codec; + this.metadataMap = metadataMap; } public ReplicationCheckpoint(StreamInput in) throws IOException { @@ -75,6 +89,11 @@ public ReplicationCheckpoint(StreamInput in) throws IOException { length = 0L; codec = null; } + if (in.getVersion().onOrAfter(Version.V_2_10_0)) { + this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new); + } else { + this.metadataMap = Collections.emptyMap(); + } } /** @@ -135,6 +154,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(length); out.writeString(codec); } + if (out.getVersion().onOrAfter(Version.V_2_10_0)) { + out.writeMap(metadataMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut)); + } } @Override @@ -169,6 +191,10 @@ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { || (primaryTerm == other.getPrimaryTerm() && segmentInfosVersion > other.getSegmentInfosVersion()); } + public Map getMetadataMap() { + return metadataMap; + } + @Override public String toString() { return "ReplicationCheckpoint{" diff --git a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java index ab87d31d15e2f..28c95ddf13fc4 100644 --- a/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/seqno/ReplicationTrackerTests.java @@ -33,6 +33,7 @@ package org.opensearch.index.seqno; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.util.Version; import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.AllocationId; @@ -50,6 +51,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.IndexSettings; import org.opensearch.index.SegmentReplicationShardStats; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.indices.replication.common.SegmentReplicationLagTimer; @@ -1826,29 +1828,35 @@ public void testSegmentReplicationCheckpointTracking() { initializingIds.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); + final StoreFileMetadata segment_1 = new StoreFileMetadata("segment_1", 1L, "abcd", Version.LATEST); + final StoreFileMetadata segment_2 = new StoreFileMetadata("segment_2", 50L, "abcd", Version.LATEST); + final StoreFileMetadata segment_3 = new StoreFileMetadata("segment_3", 100L, "abcd", Version.LATEST); final ReplicationCheckpoint initialCheckpoint = new ReplicationCheckpoint( tracker.shardId(), 0L, 1, 1, 1L, - Codec.getDefault().getName() + Codec.getDefault().getName(), + Map.of("segment_1", segment_1) ); final ReplicationCheckpoint secondCheckpoint = new ReplicationCheckpoint( tracker.shardId(), 0L, 2, 2, - 50L, - Codec.getDefault().getName() + 51L, + Codec.getDefault().getName(), + Map.of("segment_1", segment_1, "segment_2", segment_2) ); final ReplicationCheckpoint thirdCheckpoint = new ReplicationCheckpoint( tracker.shardId(), 0L, 2, 3, - 100L, - Codec.getDefault().getName() + 151L, + Codec.getDefault().getName(), + Map.of("segment_1", segment_1, "segment_2", segment_2, "segment_3", segment_3) ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); @@ -1864,7 +1872,7 @@ public void testSegmentReplicationCheckpointTracking() { assertEquals(expectedIds.size(), groupStats.size()); for (SegmentReplicationShardStats shardStat : groupStats) { assertEquals(3, shardStat.getCheckpointsBehindCount()); - assertEquals(100L, shardStat.getBytesBehindCount()); + assertEquals(151L, shardStat.getBytesBehindCount()); assertTrue(shardStat.getCurrentReplicationLagMillis() >= shardStat.getCurrentReplicationTimeMillis()); } @@ -1881,7 +1889,7 @@ public void testSegmentReplicationCheckpointTracking() { assertEquals(expectedIds.size(), groupStats.size()); for (SegmentReplicationShardStats shardStat : groupStats) { assertEquals(2, shardStat.getCheckpointsBehindCount()); - assertEquals(99L, shardStat.getBytesBehindCount()); + assertEquals(150L, shardStat.getBytesBehindCount()); } for (String id : expectedIds) { @@ -1938,7 +1946,8 @@ public void testSegmentReplicationCheckpointTrackingInvalidAllocationIDs() { 1, 1, 1L, - Codec.getDefault().getName() + Codec.getDefault().getName(), + Collections.emptyMap() ); tracker.setLatestReplicationCheckpoint(initialCheckpoint); tracker.startReplicationLagTimers(initialCheckpoint); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 29daa3936e8bb..b7972810dddb9 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -30,6 +30,8 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.index.IndexSettings; +import org.opensearch.index.SegmentReplicationShardStats; +import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.NRTReplicationEngine; @@ -69,6 +71,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -805,6 +808,65 @@ public void testQueryDuringEngineResetShowsDocs() throws Exception { } } + public void testSegmentReplicationStats() throws Exception { + final NRTReplicationEngineFactory engineFactory = new NRTReplicationEngineFactory(); + final NRTReplicationEngineFactory spy = spy(engineFactory); + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, spy, createTempDir())) { + final IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + shards.startAll(); + + assertReplicaCaughtUp(primaryShard); + + shards.indexDocs(10); + shards.refresh("test"); + + final ReplicationCheckpoint primaryCheckpoint = primaryShard.getLatestReplicationCheckpoint(); + final long initialCheckpointSize = primaryCheckpoint.getMetadataMap() + .values() + .stream() + .mapToLong(StoreFileMetadata::length) + .sum(); + + Set postRefreshStats = primaryShard.getReplicationStats(); + SegmentReplicationShardStats shardStats = postRefreshStats.stream().findFirst().get(); + assertEquals(1, shardStats.getCheckpointsBehindCount()); + assertEquals(initialCheckpointSize, shardStats.getBytesBehindCount()); + replicateSegments(primaryShard, shards.getReplicas()); + assertReplicaCaughtUp(primaryShard); + shards.assertAllEqual(10); + + final List docIdAndSeqNos = getDocIdAndSeqNos(primaryShard); + for (DocIdSeqNoAndSource docIdAndSeqNo : docIdAndSeqNos.subList(0, 5)) { + deleteDoc(primaryShard, docIdAndSeqNo.getId()); + // delete on replica for xlog. + deleteDoc(replicaShard, docIdAndSeqNo.getId()); + } + primaryShard.forceMerge(new ForceMergeRequest().maxNumSegments(1).flush(true)); + + final Map segmentMetadataMap = primaryShard.getSegmentMetadataMap(); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(segmentMetadataMap, replicaShard.getSegmentMetadataMap()); + final long sizeAfterDeleteAndCommit = diff.missing.stream().mapToLong(StoreFileMetadata::length).sum(); + + final Set statsAfterFlush = primaryShard.getReplicationStats(); + shardStats = statsAfterFlush.stream().findFirst().get(); + assertEquals(sizeAfterDeleteAndCommit, shardStats.getBytesBehindCount()); + assertEquals(1, shardStats.getCheckpointsBehindCount()); + + replicateSegments(primaryShard, shards.getReplicas()); + assertReplicaCaughtUp(primaryShard); + shards.assertAllEqual(5); + } + } + + private void assertReplicaCaughtUp(IndexShard primaryShard) { + Set initialStats = primaryShard.getReplicationStats(); + assertEquals(initialStats.size(), 1); + SegmentReplicationShardStats shardStats = initialStats.stream().findFirst().get(); + assertEquals(0, shardStats.getCheckpointsBehindCount()); + assertEquals(0, shardStats.getBytesBehindCount()); + } + /** * Assert persisted and searchable doc counts. This method should not be used while docs are concurrently indexed because * it asserts point in time seqNos are relative to the doc counts.