Skip to content

Commit

Permalink
add integ tests to cover segment upload timeouts
Browse files Browse the repository at this point in the history
Signed-off-by: Varun Bansal <[email protected]>
  • Loading branch information
linuxpi committed May 20, 2024
1 parent d15ded5 commit a4ac198
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
import static org.opensearch.test.OpenSearchTestCase.getShardLevelBlobPath;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequest;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.Locale;
import java.util.concurrent.TimeUnit;

import static org.opensearch.index.remote.RemoteStorePressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRefreshListenerMultipartIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {

protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockFsRepositoryPlugin.class);
}

@Override
public Settings buildRemoteStoreNodeAttributes(Path repoLocation, double ioFailureRate, String skipExceptionBlobList, long maxFailure) {
Settings settings = super.buildRemoteStoreNodeAttributes(repoLocation, ioFailureRate, skipExceptionBlobList, maxFailure);
String segmentRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
REPOSITORY_NAME
);
String translogRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
TRANSLOG_REPOSITORY_NAME
);

String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
"node.attr." + REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT,
REPOSITORY_NAME
);

return Settings.builder()
.put(settings)
.put(segmentRepoTypeAttributeKey, MockFsRepositoryPlugin.TYPE)
.put(translogRepoTypeAttributeKey, MockFsRepositoryPlugin.TYPE)
.put(stateRepoTypeAttributeKey, MockFsRepositoryPlugin.TYPE)
.build();
}

public void testRemoteRefreshSegmentUploadTimeout() throws Exception {
Path location = randomRepoPath().toAbsolutePath();
setup(location, randomDoubleBetween(0.1, 0.15, true), "metadata", 10L);

client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED.getKey(), false))
.setPersistentSettings(
Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(1))
)
.get();

// Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed
// due to IOExceptions that are thrown while doing uploadBlobs.
indexData(randomIntBetween(5, 10), randomBoolean());
logger.info("--> Indexed data");
logger.info("--> Verify that the segment upload fails");
try {
assertBusy(() -> {
RemoteStoreStatsResponse remoteStoreStatsResponse = client().admin()
.cluster()
.remoteStoreStats(new RemoteStoreStatsRequest())
.get();
Arrays.asList(remoteStoreStatsResponse.getRemoteStoreStats()).forEach(remoteStoreStats -> {
assertTrue(remoteStoreStats.getSegmentStats().totalUploadsFailed > 10);
});
}, 10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
cleanupRepo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,46 +76,49 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
});
thread.start();
}
try {
if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName());
}
} catch (InterruptedException e) {
throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName());
}
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
outputStream.write(buffer);
}
if (writeContext.getFileSize() != totalContentRead.get()) {
throw new IOException(
"Incorrect content length read for file "
+ writeContext.getFileName()
+ ", actual file size: "
+ writeContext.getFileSize()
+ ", bytes read: "
+ totalContentRead.get()
);
}

try {
// bulks need to succeed for segment files to be generated
if (isSegmentFile(writeContext.getFileName()) && triggerDataIntegrityFailure) {
completionListener.onFailure(
new RuntimeException(
new CorruptIndexException(
"Data integrity check failure for file: " + writeContext.getFileName(),
writeContext.getFileName()
Thread thread = new Thread(() -> {
try {
try {
if (!latch.await(TRANSFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
throw new IOException("Timed out waiting for file transfer to complete for " + writeContext.getFileName());
}
} catch (InterruptedException e) {
throw new IOException("Await interrupted on CountDownLatch, transfer failed for " + writeContext.getFileName());
}
try (OutputStream outputStream = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW)) {
outputStream.write(buffer);
}
if (writeContext.getFileSize() != totalContentRead.get()) {
throw new IOException(
"Incorrect content length read for file "
+ writeContext.getFileName()
+ ", actual file size: "
+ writeContext.getFileSize()
+ ", bytes read: "
+ totalContentRead.get()
);
}

// bulks need to succeed for segment files to be generated
if (isSegmentFile(writeContext.getFileName()) && triggerDataIntegrityFailure) {
completionListener.onFailure(
new RuntimeException(
new CorruptIndexException(
"Data integrity check failure for file: " + writeContext.getFileName(),
writeContext.getFileName()
)
)
)
);
} else {
writeContext.getUploadFinalizer().accept(true);
completionListener.onResponse(null);
);
} else {
writeContext.getUploadFinalizer().accept(true);
completionListener.onResponse(null);
}
} catch (Exception e) {
completionListener.onFailure(e);
}
} catch (Exception e) {
completionListener.onFailure(e);
}

});
thread.start();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ public interface ClusterAdminClient extends OpenSearchClient {

void remoteStoreStats(RemoteStoreStatsRequest request, ActionListener<RemoteStoreStatsResponse> listener);

ActionFuture<RemoteStoreStatsResponse> remoteStoreStats(RemoteStoreStatsRequest request);

RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, String shardId);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,11 @@ public void remoteStoreStats(final RemoteStoreStatsRequest request, final Action
execute(RemoteStoreStatsAction.INSTANCE, request, listener);
}

@Override
public ActionFuture<RemoteStoreStatsResponse> remoteStoreStats(final RemoteStoreStatsRequest request) {
return execute(RemoteStoreStatsAction.INSTANCE, request);
}

@Override
public RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, String shardId) {
RemoteStoreStatsRequestBuilder remoteStoreStatsRequestBuilder = new RemoteStoreStatsRequestBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public class RemoteStoreSettings {
public static final Setting<TimeValue> CLUSTER_REMOTE_SEGMENT_TRANSFER_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.segment.transfer_timeout",
TimeValue.timeValueHours(3),
TimeValue.timeValueMinutes(10),
TimeValue.timeValueMillis(1),
Property.NodeScope,
Property.Dynamic
);
Expand Down

0 comments on commit a4ac198

Please sign in to comment.