Skip to content

Commit

Permalink
Merge branch '2.x' into fix_segrep_ut_2x
Browse files Browse the repository at this point in the history
  • Loading branch information
dreamer-89 authored Sep 6, 2022
2 parents 72a6b06 + 35ddbd8 commit 58a202e
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 230 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https:/opensearch-project/OpenSearch/pull/4363))
- [Segment Replication] Bump segment infos counter before commit during replica promotion ([#4365](https:/opensearch-project/OpenSearch/pull/4365))
- [Segment Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https:/opensearch-project/OpenSearch/pull/4414))
- [Segment Replication] Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https:/opensearch-project/OpenSearch/pull/4366))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.apache.lucene.index.SegmentInfos;
import org.junit.BeforeClass;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
Expand Down Expand Up @@ -458,13 +457,56 @@ private void assertSegmentStats(int numberOfReplicas) throws IOException {
ClusterState state = client(internalCluster().getMasterName()).admin().cluster().prepareState().get().getState();
final DiscoveryNode replicaNode = state.nodes().resolveNode(replicaShardRouting.currentNodeId());
IndexShard indexShard = getIndexShard(replicaNode.getName());
final String lastCommitSegmentsFileName = SegmentInfos.getLastCommitSegmentsFileName(indexShard.store().directory());
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.
SegmentInfos.readCommit(indexShard.store().directory(), lastCommitSegmentsFileName);
indexShard.store().readLastCommittedSegmentsInfo();
}
}
}

public void testDropPrimaryDuringReplication() throws Exception {
final Settings settings = Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
createIndex(INDEX_NAME, settings);
internalCluster().startDataOnlyNodes(6);
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
// don't wait for replication to complete, stop the primary immediately.
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNode));
ensureYellow(INDEX_NAME);

// start another replica.
internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// index another doc and refresh - without this the new replica won't catch up.
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get();

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();
assertSegmentStats(6);
}
}

/**
* Waits until the replica is caught up to the latest primary segments gen.
* @throws Exception
Expand All @@ -483,10 +525,12 @@ private void waitForReplicaUpdate() throws Exception {
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);
// if we don't have any segments yet, proceed.
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();
logger.debug("Primary Segments: {}", primaryShardSegments.getSegments());
if (primaryShardSegments.getSegments().isEmpty() == false) {
final Map<String, Segment> latestPrimarySegments = getLatestSegments(primaryShardSegments);
final Long latestPrimaryGen = latestPrimarySegments.values().stream().findFirst().map(Segment::getGeneration).get();
for (ShardSegments shardSegments : replicaShardSegments) {
logger.debug("Replica {} Segments: {}", shardSegments.getShardRouting(), shardSegments.getSegments());
final boolean isReplicaCaughtUpToPrimary = shardSegments.getSegments()
.stream()
.anyMatch(segment -> segment.getGeneration() == latestPrimaryGen);
Expand Down
133 changes: 68 additions & 65 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -122,6 +123,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata;

/**
* A Store provides plain access to files written by an opensearch index shard. Each shard
Expand Down Expand Up @@ -334,6 +336,51 @@ public MetadataSnapshot getMetadata(SegmentInfos segmentInfos) throws IOExceptio
return new MetadataSnapshot(segmentInfos, directory, logger);
}

/**
* Segment Replication method - Fetch a map of StoreFileMetadata for segments, ignoring Segment_N files.
* @param segmentInfos {@link SegmentInfos} from which to compute metadata.
* @return {@link Map} map file name to {@link StoreFileMetadata}.
*/
public Map<String, StoreFileMetadata> getSegmentMetadataMap(SegmentInfos segmentInfos) throws IOException {
assert indexSettings.isSegRepEnabled();
return loadMetadata(segmentInfos, directory, logger, true).fileMetadata;
}

/**
* Segment Replication method
* Returns a diff between the Maps of StoreFileMetadata that can be used for getting list of files to copy over to a replica for segment replication. The returned diff will hold a list of files that are:
* <ul>
* <li>identical: they exist in both maps and they can be considered the same ie. they don't need to be recovered</li>
* <li>different: they exist in both maps but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
*/
public static RecoveryDiff segmentReplicationDiff(Map<String, StoreFileMetadata> source, Map<String, StoreFileMetadata> target) {
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
for (StoreFileMetadata value : source.values()) {
if (value.name().startsWith(IndexFileNames.SEGMENTS)) {
continue;
}
if (target.containsKey(value.name()) == false) {
missing.add(value);
} else {
final StoreFileMetadata fileMetadata = target.get(value.name());
if (fileMetadata.isSame(value)) {
identical.add(value);
} else {
different.add(value);
}
}
}
return new RecoveryDiff(
Collections.unmodifiableList(identical),
Collections.unmodifiableList(different),
Collections.unmodifiableList(missing)
);
}

/**
* Renames all the given files from the key of the map to the
* value of the map. All successfully renamed files are removed from the map in-place.
Expand Down Expand Up @@ -709,31 +756,34 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}

/**
* This method deletes every file in this store that is not contained in either the remote or local metadata snapshots.
* Segment Replication method -
* This method deletes every file in this store that is not referenced by the passed in SegmentInfos or
* part of the latest on-disk commit point.
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param localSnapshot The local snapshot from in memory SegmentInfos.
* @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException {
public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException {
assert indexSettings.isSegRepEnabled();
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo()));
cleanupFiles(reason, getMetadata(readLastCommittedSegmentsInfo()), infos.files(true));
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot)
private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable Collection<String> additionalFiles)
throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| localSnapshot.contains(existingFile)
|| (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) {
|| (additionalFiles != null && additionalFiles.contains(existingFile))) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
Expand Down Expand Up @@ -825,17 +875,9 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
latestSegmentInfos.setUserData(userData, true);
latestSegmentInfos.commit(directory());

// similar to TrimUnsafeCommits, create a commit with an appending IW, this will delete old commits and ensure all files
// associated with the SegmentInfos.commit are fsynced.
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(directory);
assert existingCommits.isEmpty() == false : "Expected at least one commit but none found";
final IndexCommit lastIndexCommit = existingCommits.get(existingCommits.size() - 1);
assert latestSegmentInfos.getSegmentsFileName().equals(lastIndexCommit.getSegmentsFileName());
try (IndexWriter writer = newAppendingIndexWriter(directory, lastIndexCommit)) {
writer.setLiveCommitData(lastIndexCommit.getUserData().entrySet());
writer.commit();
}
directory.sync(latestSegmentInfos.files(true));
directory.syncMetaData();
cleanupAndPreserveLatestCommitPoint("After commit", latestSegmentInfos);
} finally {
metadataLock.writeLock().unlock();
}
Expand Down Expand Up @@ -1033,6 +1075,11 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger) throws IOException {
return loadMetadata(segmentInfos, directory, logger, false);
}

static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory directory, Logger logger, boolean ignoreSegmentsFile)
throws IOException {
long numDocs = Lucene.getNumDocs(segmentInfos);
Map<String, String> commitUserDataBuilder = new HashMap<>();
commitUserDataBuilder.putAll(segmentInfos.getUserData());
Expand Down Expand Up @@ -1067,8 +1114,10 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director
if (maxVersion == null) {
maxVersion = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
}
final String segmentsFile = segmentInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
if (ignoreSegmentsFile == false) {
final String segmentsFile = segmentInfos.getSegmentsFileName();
checksumFromLuceneFile(directory, segmentsFile, builder, logger, maxVersion, true);
}
return new LoadedMetadata(unmodifiableMap(builder), unmodifiableMap(commitUserDataBuilder), numDocs);
}

Expand Down Expand Up @@ -1148,7 +1197,6 @@ public Map<String, StoreFileMetadata> asMap() {
* Helper method used to group store files according to segment and commit.
*
* @see MetadataSnapshot#recoveryDiff(MetadataSnapshot)
* @see MetadataSnapshot#segmentReplicationDiff(MetadataSnapshot)
*/
private Iterable<List<StoreFileMetadata>> getGroupedFilesIterable() {
final Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();
Expand Down Expand Up @@ -1241,51 +1289,6 @@ public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
return recoveryDiff;
}

/**
* Segment Replication method
* Returns a diff between the two snapshots that can be used for getting list of files to copy over to a replica for segment replication. The given snapshot is treated as the
* target and this snapshot as the source. The returned diff will hold a list of files that are:
* <ul>
* <li>identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered</li>
* <li>different: they exist in both snapshots but their they are not identical</li>
* <li>missing: files that exist in the source but not in the target</li>
* </ul>
*/
public RecoveryDiff segmentReplicationDiff(MetadataSnapshot recoveryTargetSnapshot) {
final List<StoreFileMetadata> identical = new ArrayList<>();
final List<StoreFileMetadata> different = new ArrayList<>();
final List<StoreFileMetadata> missing = new ArrayList<>();
final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();
for (List<StoreFileMetadata> segmentFiles : getGroupedFilesIterable()) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetadata meta : segmentFiles) {
StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());
if (storeFileMetadata == null) {
// Do not consider missing files as inconsistent in SegRep as replicas may lag while primary updates
// documents and generate new files specific to a segment
missing.add(meta);
} else if (storeFileMetadata.isSame(meta) == false) {
consistent = false;
different.add(meta);
} else {
identicalFiles.add(meta);
}
}
if (consistent) {
identical.addAll(identicalFiles);
} else {
different.addAll(identicalFiles);
}
}
RecoveryDiff recoveryDiff = new RecoveryDiff(
Collections.unmodifiableList(identical),
Collections.unmodifiableList(different),
Collections.unmodifiableList(missing)
);
return recoveryDiff;
}

/**
* Returns the number of files in this snapshot
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.transport.TransportResponse;

import java.io.IOException;
import java.util.Set;
import java.util.Map;

/**
* Response returned from a {@link SegmentReplicationSource} that includes the file metadata, and SegmentInfos
Expand All @@ -28,52 +27,41 @@
public class CheckpointInfoResponse extends TransportResponse {

private final ReplicationCheckpoint checkpoint;
private final Store.MetadataSnapshot snapshot;
private final Map<String, StoreFileMetadata> metadataMap;
private final byte[] infosBytes;
// pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos
// but are still referenced by the latest commit point (Segments_N).
private final Set<StoreFileMetadata> pendingDeleteFiles;

public CheckpointInfoResponse(
final ReplicationCheckpoint checkpoint,
final Store.MetadataSnapshot snapshot,
final byte[] infosBytes,
final Set<StoreFileMetadata> additionalFiles
final Map<String, StoreFileMetadata> metadataMap,
final byte[] infosBytes
) {
this.checkpoint = checkpoint;
this.snapshot = snapshot;
this.metadataMap = metadataMap;
this.infosBytes = infosBytes;
this.pendingDeleteFiles = additionalFiles;
}

public CheckpointInfoResponse(StreamInput in) throws IOException {
this.checkpoint = new ReplicationCheckpoint(in);
this.snapshot = new Store.MetadataSnapshot(in);
this.metadataMap = in.readMap(StreamInput::readString, StoreFileMetadata::new);
this.infosBytes = in.readByteArray();
this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
checkpoint.writeTo(out);
snapshot.writeTo(out);
out.writeMap(metadataMap, StreamOutput::writeString, (valueOut, fc) -> fc.writeTo(valueOut));
out.writeByteArray(infosBytes);
out.writeCollection(pendingDeleteFiles);
}

public ReplicationCheckpoint getCheckpoint() {
return checkpoint;
}

public Store.MetadataSnapshot getSnapshot() {
return snapshot;
public Map<String, StoreFileMetadata> getMetadataMap() {
return metadataMap;
}

public byte[] getInfosBytes() {
return infosBytes;
}

public Set<StoreFileMetadata> getPendingDeleteFiles() {
return pendingDeleteFiles;
}
}
Loading

0 comments on commit 58a202e

Please sign in to comment.