Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add remote segment upload backpressure integ tests #8197

Merged
merged 3 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected void deleteRepo() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

protected void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
protected String setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation);
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
Expand All @@ -88,13 +88,14 @@ protected void setup(Path repoLocation, double ioFailureRate, String skipExcepti
.put("max_failure_number", maxFailure)
);

internalCluster().startDataOnlyNodes(1);
String dataNodeName = internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME);
logger.info("--> Created index={}", INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
logger.info("--> Cluster is yellow with no initializing shards");
ensureGreen(INDEX_NAME);
logger.info("--> Cluster is green");
return dataNodeName;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,149 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT;
import static org.opensearch.index.remote.RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreBackpressureIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
public void testWritesRejectedDueToConsecutiveFailureBreach() throws Exception {
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
// Here the doc size of the request remains same throughout the test. After initial indexing, all remote store interactions
// fail leading to consecutive failure limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 10, ByteSizeUnit.KB.toIntBytes(1), 15, "failure_streak_count");
}

public void testWritesRejectedDueToBytesLagBreach() throws Exception {
// Initially indexing happens with doc size of 2 bytes, then all remote store interactions start failing. Now, the
// indexing happens with doc size of 1KB leading to bytes lag limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.BYTES.toIntBytes(2), 30, ByteSizeUnit.KB.toIntBytes(1), 15, "bytes_lag");
}

public void testWritesRejected() {
public void testWritesRejectedDueToTimeLagBreach() throws Exception {
// Initially indexing happens with doc size of 1KB, then all remote store interactions start failing. Now, the
// indexing happens with doc size of 1 byte leading to time lag limit getting exceeded and leading to rejections.
validateBackpressure(ByteSizeUnit.KB.toIntBytes(1), 20, ByteSizeUnit.BYTES.toIntBytes(1), 15, "time_lag");
}

private void validateBackpressure(
int initialDocSize,
int initialDocsToIndex,
int onFailureDocSize,
int onFailureDocsToIndex,
String breachMode
) throws Exception {
Path location = randomRepoPath().toAbsolutePath();
setup(location, 1d, "metadata", Long.MAX_VALUE);
String dataNodeName = setup(location, 0d, "metadata", Long.MAX_VALUE);

Settings request = Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true).build();
Settings request = Settings.builder()
.put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), true)
.put(MIN_CONSECUTIVE_FAILURES_LIMIT.getKey(), 10)
.build();
ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(request)
.get();
assertEquals(clusterUpdateResponse.getPersistentSettings().get(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey()), "true");
assertEquals(clusterUpdateResponse.getPersistentSettings().get(MIN_CONSECUTIVE_FAILURES_LIMIT.getKey()), "10");

logger.info("--> Indexing data");

String jsonString = generateString(initialDocSize);
BytesReference initialSource = new BytesArray(jsonString);
indexDocAndRefresh(initialSource, initialDocsToIndex);

((MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName).repository(REPOSITORY_NAME))
.setRandomControlIOExceptionRate(1d);

jsonString = generateString(onFailureDocSize);
BytesReference onFailureSource = new BytesArray(jsonString);
OpenSearchRejectedExecutionException ex = assertThrows(
OpenSearchRejectedExecutionException.class,
() -> indexData(randomIntBetween(10, 20), randomBoolean())
() -> indexDocAndRefresh(onFailureSource, onFailureDocsToIndex)
);
assertTrue(ex.getMessage().contains("rejected execution on primary shard"));
assertTrue(ex.getMessage().contains(breachMode));
ashking94 marked this conversation as resolved.
Show resolved Hide resolved

RemoteRefreshSegmentTracker.Stats stats = stats();
assertTrue(stats.bytesLag > 0);
assertTrue(stats.refreshTimeLagMs > 0);
assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0);
assertTrue(stats.rejectionCount > 0);

((MockRepository) internalCluster().getInstance(RepositoriesService.class, dataNodeName).repository(REPOSITORY_NAME))
.setRandomControlIOExceptionRate(0d);

assertBusy(() -> {
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
RemoteRefreshSegmentTracker.Stats finalStats = stats();
assertEquals(0, finalStats.bytesLag);
assertEquals(0, finalStats.refreshTimeLagMs);
assertEquals(0, finalStats.localRefreshNumber - finalStats.remoteRefreshNumber);
}, 30, TimeUnit.SECONDS);

long rejectionCount = stats.rejectionCount;
stats = stats();
indexDocAndRefresh(initialSource, initialDocsToIndex);
assertEquals(rejectionCount, stats.rejectionCount);
deleteRepo();
}

private RemoteRefreshSegmentTracker.Stats stats() {
String shardId = "0";
RemoteStoreStatsResponse response = client().admin().cluster().prepareRemoteStoreStats(INDEX_NAME, shardId).get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, shardId);
List<RemoteStoreStats> matches = Arrays.stream(response.getShards())
.filter(stat -> indexShardId.equals(stat.getStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats();
assertTrue(stats.bytesLag > 0);
assertTrue(stats.refreshTimeLagMs > 0);
assertTrue(stats.localRefreshNumber - stats.remoteRefreshNumber > 0);
assertTrue(stats.rejectionCount > 0);
deleteRepo();
return matches.get(0).getStats();
}

private void indexDocAndRefresh(BytesReference source, int iterations) {
for (int i = 0; i < iterations; i++) {
client().prepareIndex(INDEX_NAME).setSource(source, XContentType.JSON).get();
refresh(INDEX_NAME);
}
}

/**
* Generates string of given sizeInBytes
*
* @param sizeInBytes size of the string
* @return the generated string
*/
private String generateString(int sizeInBytes) {
StringBuilder sb = new StringBuilder();
sb.append("{");
int i = 0;
// Based on local tests, 1 char is occupying 1 byte
while (sb.length() < sizeInBytes) {
String key = "field" + i;
String value = "value" + i;
sb.append("\"").append(key).append("\":\"").append(value).append("\",");
i++;
}
if (sb.length() > 1 && sb.charAt(sb.length() - 1) == ',') {
sb.setLength(sb.length() - 1);
}
sb.append("}");
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public long getFailureCount() {
return failureCounter.get();
}

private final double randomControlIOExceptionRate;
private volatile double randomControlIOExceptionRate;

private final double randomDataFileIOExceptionRate;

Expand Down Expand Up @@ -246,6 +246,10 @@ public synchronized void unblock() {
this.notifyAll();
}

public void setRandomControlIOExceptionRate(double randomControlIOExceptionRate) {
this.randomControlIOExceptionRate = randomControlIOExceptionRate;
}

public void blockOnDataFiles(boolean blocked) {
blockOnDataFiles = blocked;
}
Expand Down