Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SegmentReplication flaky integ tests #8134

Merged
merged 5 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.junit.Before;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
Expand Down Expand Up @@ -95,11 +96,16 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends SegmentReplicationBaseIT {

@Before
private void setup() {
internalCluster().startClusterManagerOnlyNode();
mch2 marked this conversation as resolved.
Show resolved Hide resolved
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
Expand All @@ -125,7 +131,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

// start another node, index another doc and replicate.
String nodeC = internalCluster().startNode();
String nodeC = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
Expand All @@ -134,10 +140,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
}

public void testRestartPrimary() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);
Expand All @@ -160,10 +166,10 @@ public void testRestartPrimary() throws Exception {

public void testCancelPrimaryAllocation() throws Exception {
// this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica.
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int initialDocCount = 1;
Expand All @@ -190,8 +196,8 @@ public void testCancelPrimaryAllocation() throws Exception {
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
final Settings settings = Settings.builder()
.put(indexSettings())
.put(
Expand Down Expand Up @@ -233,8 +239,8 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
}

public void testIndexReopenClose() throws Exception {
final String primary = internalCluster().startNode();
final String replica = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
final String replica = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -274,8 +280,8 @@ public void testMultipleShards() throws Exception {
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -310,8 +316,8 @@ public void testMultipleShards() throws Exception {
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -351,14 +357,13 @@ public void testReplicationAfterForceMerge() throws Exception {
* This test verifies that segment replication does not fail for closed indices
*/
public void testClosedIndices() {
internalCluster().startClusterManagerOnlyNode();
List<String> nodes = new ArrayList<>();
// start 1st node so that it contains the primary
nodes.add(internalCluster().startNode());
nodes.add(internalCluster().startDataOnlyNode());
createIndex(INDEX_NAME, super.indexSettings());
ensureYellowAndNoInitializingShards(INDEX_NAME);
// start 2nd node so that it contains the replica
nodes.add(internalCluster().startNode());
nodes.add(internalCluster().startDataOnlyNode());
ensureGreen(INDEX_NAME);

logger.info("--> Close index");
Expand All @@ -373,8 +378,7 @@ public void testClosedIndices() {
* @throws Exception when issue is encountered
*/
public void testNodeDropWithOngoingReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
Expand All @@ -385,7 +389,7 @@ public void testNodeDropWithOngoingReplication() throws Exception {
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
// Get replica allocation id
Expand Down Expand Up @@ -447,11 +451,11 @@ public void testNodeDropWithOngoingReplication() throws Exception {
}

public void testCancellation() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();

final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance(
SegmentReplicationSourceService.class,
Expand Down Expand Up @@ -506,7 +510,7 @@ public void testCancellation() throws Exception {
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

Expand All @@ -529,7 +533,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
Expand All @@ -544,8 +548,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
}

public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startDataOnlyNode();
final String nodeB = internalCluster().startDataOnlyNode();

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -591,9 +595,9 @@ public void testDeleteOperations() throws Exception {
*/
public void testReplicationPostDeleteAndForceMerge() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
final int initialDocCount = scaledRandomIntBetween(10, 200);
for (int i = 0; i < initialDocCount; i++) {
Expand Down Expand Up @@ -648,7 +652,6 @@ public void testReplicationPostDeleteAndForceMerge() throws Exception {
}

public void testUpdateOperations() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);
Expand Down Expand Up @@ -702,7 +705,6 @@ public void testDropPrimaryDuringReplication() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, replica_count)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, settings);
final List<String> dataNodes = internalCluster().startDataOnlyNodes(6);
Expand Down Expand Up @@ -742,11 +744,10 @@ public void testDropPrimaryDuringReplication() throws Exception {
}

public void testReplicaHasDiffFilesThanPrimary() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final IndexShard replicaShard = getIndexShard(replicaNode, INDEX_NAME);
Expand Down Expand Up @@ -796,9 +797,9 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}

public void testPressureServiceStats() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
Expand Down Expand Up @@ -848,7 +849,7 @@ public void testPressureServiceStats() throws Exception {
assertEquals(0, replicaNode_service.nodeStats().getShardStats().get(primaryShard.shardId()).getReplicaStats().size());

// start another replica.
String replicaNode_2 = internalCluster().startNode();
String replicaNode_2 = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
String docId = String.valueOf(initialDocCount + 1);
client().prepareIndex(INDEX_NAME).setId(docId).setSource("foo", "bar").get();
Expand Down Expand Up @@ -887,10 +888,10 @@ public void testPressureServiceStats() throws Exception {
public void testScrollCreatedOnReplica() throws Exception {
assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled());
// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

// index 100 docs
Expand Down Expand Up @@ -981,15 +982,15 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
);

// create the cluster with one primary node containing primary shard and replica node containing replica shard
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to control refreshes
.put("index.refresh_interval", -1)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int initialDocCount = 10;
Expand Down Expand Up @@ -1104,10 +1105,10 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
}

public void testPitCreatedOnReplica() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME)
.setId("1")
Expand Down Expand Up @@ -1234,13 +1235,13 @@ public void testPitCreatedOnReplica() throws Exception {
*/
public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception {
final List<String> nodes = new ArrayList<>();
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startDataOnlyNode();
nodes.add(primaryNode);
final Settings settings = Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build();
createIndex(INDEX_NAME, settings);
ensureGreen(INDEX_NAME);
// start a replica node, initially will be empty with no shard assignment.
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startDataOnlyNode();
nodes.add(replicaNode);

// index a doc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.remotestore;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -26,7 +25,6 @@
* This makes sure that the constructs/flows that are being tested with Segment Replication, holds true after enabling
* remote store.
*/
@LuceneTestCase.AwaitsFix(bugUrl = "https:/opensearch-project/OpenSearch/issues/7643")
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT {

Expand All @@ -49,7 +47,7 @@ protected Settings featureFlagSettings() {
}

@Before
public void setup() {
private void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
Expand Down
30 changes: 23 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1579,13 +1579,21 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
if (indexSettings.isSegRepEnabled() == false) {
return null;
}

Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>(
new GatedCloseable<>(null, () -> {}),
ReplicationCheckpoint.empty(shardId, getDefaultCodecName())
);

if (getEngineOrNull() == null) {
return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName()));
return nullSegmentInfosEmptyCheckpoint;
}
// do not close the snapshot - caller will close it.
final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot();
return Optional.ofNullable(snapshot.get()).map(segmentInfos -> {
try {
GatedCloseable<SegmentInfos> snapshot = null;
try {
snapshot = getSegmentInfosSnapshot();
if (snapshot.get() != null) {
SegmentInfos segmentInfos = snapshot.get();
return new Tuple<>(
snapshot,
new ReplicationCheckpoint(
Expand All @@ -1601,10 +1609,18 @@ public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegme
getEngine().config().getCodec().getName()
)
);
} catch (IOException e) {
throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e);
}
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId, getDefaultCodecName())));
} catch (IOException | AlreadyClosedException e) {
logger.error("Error Fetching SegmentInfos and latest checkpoint", e);
if (snapshot != null) {
try {
snapshot.close();
} catch (IOException ex) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e);
}
}
}
return nullSegmentInfosEmptyCheckpoint;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ public void testRefreshSuccessOnSecondAttempt() throws Exception {
/**
* Tests retry flow after snapshot and metadata files have been uploaded to remote store in the failed attempt.
* Snapshot and metadata files created in failed attempt should not break retry.
* @throws Exception
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
*/
public void testRefreshSuccessAfterFailureInFirstAttemptAfterSnapshotAndMetadataUpload() throws Exception {
int succeedOnAttempt = 1;
Expand Down