Skip to content

Commit

Permalink
[Backport 2.x][Remote Store]Add integ tests for remote store stats api (
Browse files Browse the repository at this point in the history
#8397)

Signed-off-by: Dharmesh 💤 <[email protected]>
Co-authored-by: Varun Bansal <[email protected]>
  • Loading branch information
psychbot and linuxpi authored Jul 5, 2023
1 parent bd150c5 commit b110ada
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,36 @@ private Settings defaultIndexSettings() {
.build();
}

protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
protected Settings remoteStoreIndexSettings(int numberOfReplicas, int numberOfShards) {
return Settings.builder()
.put(defaultIndexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.build();
}

protected Settings remoteTranslogIndexSettings(int numberOfReplicas) {
protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
return remoteStoreIndexSettings(numberOfReplicas, 1);
}

protected Settings remoteTranslogIndexSettings(int numberOfReplicas, int numberOfShards) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas))
.put(remoteStoreIndexSettings(numberOfReplicas, numberOfShards))
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

protected Settings remoteTranslogIndexSettings(int numberOfReplicas) {
return remoteTranslogIndexSettings(numberOfReplicas, 1);
}

protected void putRepository(Path path) {
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", path))
);
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@
package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequestBuilder;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.common.UUIDs;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 3)
public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
Expand All @@ -29,7 +32,6 @@ public void testStatsResponseFromAllNodes() {

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
internalCluster().startDataOnlyNodes(3);
if (randomBoolean()) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
} else {
Expand All @@ -38,18 +40,7 @@ public void testStatsResponseFromAllNodes() {
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

// Indexing documents along with refreshes and flushes.
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
}
}
indexDocs();

// Step 2 - We find all the nodes that are present in the cluster. We make the remote store stats api call from
// each of the node in the cluster and check that the response is coming as expected.
Expand All @@ -68,23 +59,98 @@ public void testStatsResponseFromAllNodes() {
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats();
assertEquals(0, stats.refreshTimeLagMs);
assertEquals(stats.localRefreshNumber, stats.remoteRefreshNumber);
assertTrue(stats.uploadBytesStarted > 0);
assertEquals(0, stats.uploadBytesFailed);
assertTrue(stats.uploadBytesSucceeded > 0);
assertTrue(stats.totalUploadsStarted > 0);
assertEquals(0, stats.totalUploadsFailed);
assertTrue(stats.totalUploadsSucceeded > 0);
assertEquals(0, stats.rejectionCount);
assertEquals(0, stats.consecutiveFailuresCount);
assertEquals(0, stats.bytesLag);
assertTrue(stats.uploadBytesMovingAverage > 0);
assertTrue(stats.uploadBytesPerSecMovingAverage > 0);
assertTrue(stats.uploadTimeMovingAverage > 0);
assertResponseStats(stats);
}
}

public void testStatsResponseAllShards() {

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0, 3));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

indexDocs();

// Step 2 - We find all the nodes that are present in the cluster. We make the remote store stats api call from
// each of the node in the cluster and check that the response is coming as expected.
ClusterState state = getClusterState();
String node = StreamSupport.stream(state.nodes().getDataNodes().values().spliterator(), false)
.map(x -> x.getName())
.findFirst()
.get();
RemoteStoreStatsRequestBuilder remoteStoreStatsRequestBuilder = client(node).admin()
.cluster()
.prepareRemoteStoreStats(INDEX_NAME, null);
RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get();
assertTrue(response.getSuccessfulShards() == 3);
assertTrue(response.getShards() != null && response.getShards().length == 3);
RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getStats();
assertResponseStats(stats);
}

public void testStatsResponseFromLocalNode() {

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0, 3));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

indexDocs();

// Step 2 - We find a data node in the cluster. We make the remote store stats api call from
// each of the data node in the cluster and check that only local shards are returned.
ClusterState state = getClusterState();
List<String> nodes = StreamSupport.stream(state.nodes().getDataNodes().values().spliterator(), false)
.map(x -> x.getName())
.collect(Collectors.toList());
for (String node : nodes) {
RemoteStoreStatsRequestBuilder remoteStoreStatsRequestBuilder = client(node).admin()
.cluster()
.prepareRemoteStoreStats(INDEX_NAME, null);
remoteStoreStatsRequestBuilder.setLocal(true);
RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get();
assertTrue(response.getSuccessfulShards() == 1);
assertTrue(response.getShards() != null && response.getShards().length == 1);
RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getStats();
assertResponseStats(stats);
}
}

private void indexDocs() {
// Indexing documents along with refreshes and flushes.
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
}
}
}

private void assertResponseStats(RemoteRefreshSegmentTracker.Stats stats) {
assertEquals(0, stats.refreshTimeLagMs);
assertEquals(stats.localRefreshNumber, stats.remoteRefreshNumber);
assertTrue(stats.uploadBytesStarted > 0);
assertEquals(0, stats.uploadBytesFailed);
assertTrue(stats.uploadBytesSucceeded > 0);
assertTrue(stats.totalUploadsStarted > 0);
assertEquals(0, stats.totalUploadsFailed);
assertTrue(stats.totalUploadsSucceeded > 0);
assertEquals(0, stats.rejectionCount);
assertEquals(0, stats.consecutiveFailuresCount);
assertEquals(0, stats.bytesLag);
assertTrue(stats.uploadBytesMovingAverage > 0);
assertTrue(stats.uploadBytesPerSecMovingAverage > 0);
assertTrue(stats.uploadTimeMovingAverage > 0);
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
Expand Down

0 comments on commit b110ada

Please sign in to comment.