From 16cafd1688eb927b6d4cd01363974f3d14d2bedc Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 24 May 2022 23:13:59 -0700 Subject: [PATCH 1/3] Add SegmentReplicationTargetService to orchestrate replication events. This change introduces boilerplate classes for Segment Replication and a target service to orchestrate replication events. It also includes two refactors of peer recovery components for reuse. 1. Rename RecoveryFileChunkRequest to FileChunkRequest and extract code to handle throttling into ReplicationTarget. 2. Extracts a component to execute retryable requests over the transport layer. Signed-off-by: Marc Handalian --- .../index/store/CorruptedFileIT.java | 8 +- .../org/opensearch/recovery/RelocationIT.java | 4 +- .../recovery/TruncatedRecoveryIT.java | 4 +- ...hunkRequest.java => FileChunkRequest.java} | 8 +- .../recovery/PeerRecoveryTargetService.java | 119 ++++--------- .../indices/recovery/RecoveryState.java | 1 + .../indices/recovery/RecoveryTarget.java | 3 +- .../recovery/RemoteRecoveryTargetHandler.java | 124 +++---------- .../recovery/RetryableTransportClient.java | 139 +++++++++++++++ .../replication/CheckpointInfoResponse.java | 79 ++++++++ .../replication/GetSegmentFilesResponse.java | 40 +++++ .../replication/SegmentReplicationSource.java | 50 ++++++ .../SegmentReplicationSourceFactory.java | 41 +++++ .../replication/SegmentReplicationState.java | 84 +++++++++ .../replication/SegmentReplicationTarget.java | 115 ++++++++++++ .../SegmentReplicationTargetService.java | 168 ++++++++++++++++++ .../common/ReplicationCollection.java | 2 +- .../replication/common/ReplicationState.java | 2 + .../replication/common/ReplicationTarget.java | 91 +++++++++- .../PeerRecoveryTargetServiceTests.java | 8 +- .../SegmentReplicationTargetServiceTests.java | 149 ++++++++++++++++ 21 files changed, 1028 insertions(+), 211 deletions(-) rename server/src/main/java/org/opensearch/indices/recovery/{RecoveryFileChunkRequest.java => FileChunkRequest.java} (95%) create mode 100644 server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java create mode 100644 server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java b/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java index 3a5e21fc8ef65..ee2067c591cef 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java @@ -77,7 +77,7 @@ import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.PeerRecoveryTargetService; -import org.opensearch.indices.recovery.RecoveryFileChunkRequest; +import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.plugins.Plugin; import org.opensearch.snapshots.SnapshotState; @@ -397,7 +397,7 @@ public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionExc internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), (connection, requestId, action, request, options) -> { if (corrupt.get() && action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { - RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; + FileChunkRequest req = (FileChunkRequest) request; byte[] array = BytesRef.deepCopyOf(req.content().toBytesRef()).bytes; int i = randomIntBetween(0, req.content().length() - 1); array[i] = (byte) ~array[i]; // flip one byte in the content @@ -474,11 +474,11 @@ public void testCorruptionOnNetworkLayer() throws ExecutionException, Interrupte internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), (connection, requestId, action, request, options) -> { if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { - RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; + FileChunkRequest req = (FileChunkRequest) request; if (truncate && req.length() > 1) { BytesRef bytesRef = req.content().toBytesRef(); BytesArray array = new BytesArray(bytesRef.bytes, bytesRef.offset, (int) req.length() - 1); - request = new RecoveryFileChunkRequest( + request = new FileChunkRequest( req.recoveryId(), req.requestSeqNo(), req.shardId(), diff --git a/server/src/internalClusterTest/java/org/opensearch/recovery/RelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/recovery/RelocationIT.java index 06475f1e7ac9d..1f16cc0363686 100644 --- a/server/src/internalClusterTest/java/org/opensearch/recovery/RelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/recovery/RelocationIT.java @@ -67,7 +67,7 @@ import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.recovery.PeerRecoveryTargetService; -import org.opensearch.indices.recovery.RecoveryFileChunkRequest; +import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.plugins.Plugin; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -809,7 +809,7 @@ public void sendRequest( TransportRequestOptions options ) throws IOException { if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { - RecoveryFileChunkRequest chunkRequest = (RecoveryFileChunkRequest) request; + FileChunkRequest chunkRequest = (FileChunkRequest) request; if (chunkRequest.name().startsWith(IndexFileNames.SEGMENTS)) { // corrupting the segments_N files in order to make sure future recovery re-send files logger.debug("corrupting [{}] to {}. file name: [{}]", action, connection.getNode(), chunkRequest.name()); diff --git a/server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java index 1708454faf7b3..b5d7bd476059d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/recovery/TruncatedRecoveryIT.java @@ -43,7 +43,7 @@ import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.index.query.QueryBuilders; import org.opensearch.indices.recovery.PeerRecoveryTargetService; -import org.opensearch.indices.recovery.RecoveryFileChunkRequest; +import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.node.RecoverySettingsChunkSizePlugin; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -146,7 +146,7 @@ public void testCancelRecoveryAndResume() throws Exception { internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), (connection, requestId, action, request, options) -> { if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { - RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; + FileChunkRequest req = (FileChunkRequest) request; logger.info("file chunk [{}] lastChunk: {}", req, req.lastChunk()); if ((req.name().endsWith("cfs") || req.name().endsWith("fdt")) && req.lastChunk() && truncate.get()) { latch.countDown(); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryFileChunkRequest.java b/server/src/main/java/org/opensearch/indices/recovery/FileChunkRequest.java similarity index 95% rename from server/src/main/java/org/opensearch/indices/recovery/RecoveryFileChunkRequest.java rename to server/src/main/java/org/opensearch/indices/recovery/FileChunkRequest.java index 886de8d56645c..3594495224481 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryFileChunkRequest.java +++ b/server/src/main/java/org/opensearch/indices/recovery/FileChunkRequest.java @@ -43,11 +43,11 @@ import java.io.IOException; /** - * Request for a recovery file chunk + * Request containing a file chunk. * * @opensearch.internal */ -public final class RecoveryFileChunkRequest extends RecoveryTransportRequest { +public final class FileChunkRequest extends RecoveryTransportRequest { private final boolean lastChunk; private final long recoveryId; private final ShardId shardId; @@ -58,7 +58,7 @@ public final class RecoveryFileChunkRequest extends RecoveryTransportRequest { private final int totalTranslogOps; - public RecoveryFileChunkRequest(StreamInput in) throws IOException { + public FileChunkRequest(StreamInput in) throws IOException { super(in); recoveryId = in.readLong(); shardId = new ShardId(in); @@ -75,7 +75,7 @@ public RecoveryFileChunkRequest(StreamInput in) throws IOException { sourceThrottleTimeInNanos = in.readLong(); } - public RecoveryFileChunkRequest( + public FileChunkRequest( long recoveryId, final long requestSeqNo, ShardId shardId, diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index e13022afa81ba..8db8e68e191ab 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -36,20 +36,17 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.RateLimiter; -import org.opensearch.ExceptionsHelper; import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; -import org.opensearch.action.support.ChannelActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.CheckedFunction; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.settings.Settings; @@ -60,7 +57,6 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.engine.RecoveryEngineException; import org.opensearch.index.mapper.MapperException; -import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IllegalIndexShardStateException; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; @@ -71,7 +67,6 @@ import org.opensearch.index.translog.TranslogCorruptedException; import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef; -import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTimer; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -148,7 +143,7 @@ public PeerRecoveryTargetService( transportService.registerRequestHandler( Actions.FILE_CHUNK, ThreadPool.Names.GENERIC, - RecoveryFileChunkRequest::new, + FileChunkRequest::new, new FileChunkTransportRequestHandler() ); transportService.registerRequestHandler( @@ -354,12 +349,13 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand @Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) { try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { - final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.PREPARE_TRANSLOG, request); + final RecoveryTarget recoveryTarget = recoveryRef.get(); + final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.PREPARE_TRANSLOG, request); if (listener == null) { return; } - recoveryRef.get().prepareForTranslogOperations(request.totalTranslogOps(), listener); + recoveryTarget.prepareForTranslogOperations(request.totalTranslogOps(), listener); } } } @@ -369,12 +365,13 @@ class FinalizeRecoveryRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { - final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.FINALIZE, request); + final RecoveryTarget recoveryTarget = recoveryRef.get(); + final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.FINALIZE, request); if (listener == null) { return; } - recoveryRef.get().finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener); + recoveryTarget.finalizeRecovery(request.globalCheckpoint(), request.trimAboveSeqNo(), listener); } } } @@ -399,8 +396,7 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin throws IOException { try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); - final ActionListener listener = createOrFinishListener( - recoveryRef, + final ActionListener listener = recoveryTarget.createOrFinishListener( channel, Actions.TRANSLOG_OPS, request, @@ -484,20 +480,20 @@ class FilesInfoRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { - final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.FILES_INFO, request); + final RecoveryTarget recoveryTarget = recoveryRef.get(); + final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.FILES_INFO, request); if (listener == null) { return; } - recoveryRef.get() - .receiveFileInfo( - request.phase1FileNames, - request.phase1FileSizes, - request.phase1ExistingFileNames, - request.phase1ExistingFileSizes, - request.totalTranslogOps, - listener - ); + recoveryTarget.receiveFileInfo( + request.phase1FileNames, + request.phase1FileSizes, + request.phase1ExistingFileNames, + request.phase1ExistingFileSizes, + request.totalTranslogOps, + listener + ); } } } @@ -507,90 +503,37 @@ class CleanFilesRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { - final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.CLEAN_FILES, request); + final RecoveryTarget recoveryTarget = recoveryRef.get(); + final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.CLEAN_FILES, request); if (listener == null) { return; } - recoveryRef.get() - .cleanFiles(request.totalTranslogOps(), request.getGlobalCheckpoint(), request.sourceMetaSnapshot(), listener); + recoveryTarget.cleanFiles( + request.totalTranslogOps(), + request.getGlobalCheckpoint(), + request.sourceMetaSnapshot(), + listener + ); } } } - class FileChunkTransportRequestHandler implements TransportRequestHandler { + class FileChunkTransportRequestHandler implements TransportRequestHandler { // How many bytes we've copied since we last called RateLimiter.pause final AtomicLong bytesSinceLastPause = new AtomicLong(); @Override - public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception { + public void messageReceived(FileChunkRequest request, TransportChannel channel, Task task) throws Exception { try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); - final ActionListener listener = createOrFinishListener(recoveryRef, channel, Actions.FILE_CHUNK, request); - if (listener == null) { - return; - } - - final ReplicationLuceneIndex indexState = recoveryTarget.state().getIndex(); - if (request.sourceThrottleTimeInNanos() != ReplicationLuceneIndex.UNKNOWN) { - indexState.addSourceThrottling(request.sourceThrottleTimeInNanos()); - } - - RateLimiter rateLimiter = recoverySettings.rateLimiter(); - if (rateLimiter != null) { - long bytes = bytesSinceLastPause.addAndGet(request.content().length()); - if (bytes > rateLimiter.getMinPauseCheckBytes()) { - // Time to pause - bytesSinceLastPause.addAndGet(-bytes); - long throttleTimeInNanos = rateLimiter.pause(bytes); - indexState.addTargetThrottling(throttleTimeInNanos); - recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos); - } - } - recoveryTarget.writeFileChunk( - request.metadata(), - request.position(), - request.content(), - request.lastChunk(), - request.totalTranslogOps(), - listener - ); + final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request); + recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); } } } - private ActionListener createOrFinishListener( - final ReplicationRef recoveryRef, - final TransportChannel channel, - final String action, - final RecoveryTransportRequest request - ) { - return createOrFinishListener(recoveryRef, channel, action, request, nullVal -> TransportResponse.Empty.INSTANCE); - } - - private ActionListener createOrFinishListener( - final ReplicationRef recoveryRef, - final TransportChannel channel, - final String action, - final RecoveryTransportRequest request, - final CheckedFunction responseFn - ) { - final RecoveryTarget recoveryTarget = recoveryRef.get(); - final ActionListener channelListener = new ChannelActionListener<>(channel, action, request); - final ActionListener voidListener = ActionListener.map(channelListener, responseFn); - - final long requestSeqNo = request.requestSeqNo(); - final ActionListener listener; - if (requestSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { - listener = recoveryTarget.markRequestReceivedAndCreateListener(requestSeqNo, voidListener); - } else { - listener = voidListener; - } - - return listener; - } - class RecoveryRunner extends AbstractRunnable { final long recoveryId; diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java index a3c7adb755145..57208ab029bf4 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryState.java @@ -260,6 +260,7 @@ public Translog getTranslog() { return translog; } + @Override public ReplicationTimer getTimer() { return timer; } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 92897ab19ad64..1735bb015c90c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -37,6 +37,7 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.opensearch.Assertions; import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.cluster.node.DiscoveryNode; @@ -141,7 +142,7 @@ public String description() { } @Override - public void notifyListener(Exception e, boolean sendShardFailure) { + public void notifyListener(OpenSearchException e, boolean sendShardFailure) { listener.onFailure(state(), new RecoveryFailedException(state(), e.getMessage(), e), sendShardFailure); } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index fd6de6322bb0a..b5e23619e761d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -32,41 +32,25 @@ package org.opensearch.indices.recovery; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; -import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchException; -import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionListener; -import org.opensearch.action.ActionListenerResponseHandler; -import org.opensearch.action.support.RetryableAction; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.breaker.CircuitBreakingException; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.stream.Writeable; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.CancellableThreads; -import org.opensearch.common.util.concurrent.ConcurrentCollections; -import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; -import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.ConnectTransportException; import org.opensearch.transport.EmptyTransportResponseHandler; -import org.opensearch.transport.RemoteTransportException; -import org.opensearch.transport.SendRequestTransportException; import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -77,15 +61,11 @@ */ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { - private static final Logger logger = LogManager.getLogger(RemoteRecoveryTargetHandler.class); - private final TransportService transportService; - private final ThreadPool threadPool; private final long recoveryId; private final ShardId shardId; private final DiscoveryNode targetNode; private final RecoverySettings recoverySettings; - private final Map> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap(); private final TransportRequestOptions translogOpsRequestOptions; private final TransportRequestOptions fileChunkRequestOptions; @@ -94,8 +74,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { private final AtomicLong requestSeqNoGenerator = new AtomicLong(0); private final Consumer onSourceThrottle; - private final boolean retriesSupported; - private volatile boolean isCancelled = false; + private final RetryableTransportClient retryableTransportClient; public RemoteRecoveryTargetHandler( long recoveryId, @@ -106,7 +85,11 @@ public RemoteRecoveryTargetHandler( Consumer onSourceThrottle ) { this.transportService = transportService; - this.threadPool = transportService.getThreadPool(); + this.retryableTransportClient = new RetryableTransportClient( + transportService, + targetNode, + recoverySettings.internalActionRetryTimeout() + ); this.recoveryId = recoveryId; this.shardId = shardId; this.targetNode = targetNode; @@ -120,7 +103,6 @@ public RemoteRecoveryTargetHandler( .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionTimeout()) .build(); - this.retriesSupported = targetNode.getVersion().onOrAfter(LegacyESVersion.V_7_9_0); } public DiscoveryNode targetNode() { @@ -137,12 +119,9 @@ public void prepareForTranslogOperations(int totalTranslogOps, ActionListener reader = in -> TransportResponse.Empty.INSTANCE; final ActionListener responseListener = ActionListener.map(listener, r -> null); - executeRetryableAction(action, request, options, responseListener, reader); + retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); } @Override @@ -156,12 +135,9 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe globalCheckpoint, trimAboveSeqNo ); - final TransportRequestOptions options = TransportRequestOptions.builder() - .withTimeout(recoverySettings.internalActionLongTimeout()) - .build(); final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; final ActionListener responseListener = ActionListener.map(listener, r -> null); - executeRetryableAction(action, request, options, responseListener, reader); + retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); } @Override @@ -200,7 +176,7 @@ public void indexTranslogOperations( ); final Writeable.Reader reader = RecoveryTranslogOperationsResponse::new; final ActionListener responseListener = ActionListener.map(listener, r -> r.localCheckpoint); - executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader); + retryableTransportClient.executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader); } @Override @@ -224,12 +200,9 @@ public void receiveFileInfo( phase1ExistingFileSizes, totalTranslogOps ); - final TransportRequestOptions options = TransportRequestOptions.builder() - .withTimeout(recoverySettings.internalActionTimeout()) - .build(); final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; final ActionListener responseListener = ActionListener.map(listener, r -> null); - executeRetryableAction(action, request, options, responseListener, reader); + retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); } @Override @@ -249,12 +222,9 @@ public void cleanFiles( totalTranslogOps, globalCheckpoint ); - final TransportRequestOptions options = TransportRequestOptions.builder() - .withTimeout(recoverySettings.internalActionTimeout()) - .build(); final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; final ActionListener responseListener = ActionListener.map(listener, r -> null); - executeRetryableAction(action, request, options, responseListener, reader); + retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); } @Override @@ -294,7 +264,7 @@ public void writeFileChunk( * see how many translog ops we accumulate while copying files across the network. A future optimization * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. */ - final RecoveryFileChunkRequest request = new RecoveryFileChunkRequest( + final FileChunkRequest request = new FileChunkRequest( recoveryId, requestSeqNo, shardId, @@ -306,71 +276,17 @@ public void writeFileChunk( throttleTimeInNanos ); final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; - executeRetryableAction(action, request, fileChunkRequestOptions, ActionListener.map(listener, r -> null), reader); + retryableTransportClient.executeRetryableAction( + action, + request, + fileChunkRequestOptions, + ActionListener.map(listener, r -> null), + reader + ); } @Override public void cancel() { - isCancelled = true; - if (onGoingRetryableActions.isEmpty()) { - return; - } - final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("recovery was cancelled"); - // Dispatch to generic as cancellation calls can come on the cluster state applier thread - threadPool.generic().execute(() -> { - for (RetryableAction action : onGoingRetryableActions.values()) { - action.cancel(exception); - } - onGoingRetryableActions.clear(); - }); - } - - private void executeRetryableAction( - String action, - RecoveryTransportRequest request, - TransportRequestOptions options, - ActionListener actionListener, - Writeable.Reader reader - ) { - final Object key = new Object(); - final ActionListener removeListener = ActionListener.runBefore(actionListener, () -> onGoingRetryableActions.remove(key)); - final TimeValue initialDelay = TimeValue.timeValueMillis(200); - final TimeValue timeout = recoverySettings.internalActionRetryTimeout(); - final RetryableAction retryableAction = new RetryableAction(logger, threadPool, initialDelay, timeout, removeListener) { - - @Override - public void tryAction(ActionListener listener) { - transportService.sendRequest( - targetNode, - action, - request, - options, - new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC) - ); - } - - @Override - public boolean shouldRetry(Exception e) { - return retriesSupported && retryableException(e); - } - }; - onGoingRetryableActions.put(key, retryableAction); - retryableAction.run(); - if (isCancelled) { - retryableAction.cancel(new CancellableThreads.ExecutionCancelledException("recovery was cancelled")); - } - } - - private static boolean retryableException(Exception e) { - if (e instanceof ConnectTransportException) { - return true; - } else if (e instanceof SendRequestTransportException) { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - return cause instanceof ConnectTransportException; - } else if (e instanceof RemoteTransportException) { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - return cause instanceof CircuitBreakingException || cause instanceof OpenSearchRejectedExecutionException; - } - return false; + retryableTransportClient.cancel(); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java new file mode 100644 index 0000000000000..b78fc994e9b13 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.LegacyESVersion; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionListenerResponseHandler; +import org.opensearch.action.support.RetryableAction; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.breaker.CircuitBreakingException; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ConnectTransportException; +import org.opensearch.transport.RemoteTransportException; +import org.opensearch.transport.SendRequestTransportException; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestOptions; +import org.opensearch.transport.TransportResponse; +import org.opensearch.transport.TransportService; + +import java.util.Map; + +/** + * Client that implements retry functionality for transport layer requests. + * + * @opensearch.internal + */ +public final class RetryableTransportClient { + + private static final Logger logger = LogManager.getLogger(RetryableTransportClient.class); + + private final ThreadPool threadPool; + private final Map> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap(); + private volatile boolean isCancelled = false; + private final TransportService transportService; + private final TimeValue retryTimeout; + private final DiscoveryNode targetNode; + + public RetryableTransportClient(TransportService transportService, DiscoveryNode targetNode, TimeValue retryTimeout) { + this.threadPool = transportService.getThreadPool(); + this.transportService = transportService; + this.retryTimeout = retryTimeout; + this.targetNode = targetNode; + } + + /** + * Execute a retryable action. + * @param action {@link String} Action Name. + * @param request {@link TransportRequest} Transport request to execute. + * @param actionListener {@link ActionListener} Listener to complete + * @param reader {@link Writeable.Reader} Reader to read the response stream. + * @param {@link TransportResponse} type. + */ + public void executeRetryableAction( + String action, + TransportRequest request, + ActionListener actionListener, + Writeable.Reader reader + ) { + final TransportRequestOptions options = TransportRequestOptions.builder().withTimeout(retryTimeout).build(); + executeRetryableAction(action, request, options, actionListener, reader); + } + + void executeRetryableAction( + String action, + TransportRequest request, + TransportRequestOptions options, + ActionListener actionListener, + Writeable.Reader reader + ) { + final Object key = new Object(); + final ActionListener removeListener = ActionListener.runBefore(actionListener, () -> onGoingRetryableActions.remove(key)); + final TimeValue initialDelay = TimeValue.timeValueMillis(200); + final RetryableAction retryableAction = new RetryableAction(logger, threadPool, initialDelay, retryTimeout, removeListener) { + + @Override + public void tryAction(ActionListener listener) { + transportService.sendRequest( + targetNode, + action, + request, + options, + new ActionListenerResponseHandler<>(listener, reader, ThreadPool.Names.GENERIC) + ); + } + + @Override + public boolean shouldRetry(Exception e) { + return targetNode.getVersion().onOrAfter(LegacyESVersion.V_7_9_0) && retryableException(e); + } + }; + onGoingRetryableActions.put(key, retryableAction); + retryableAction.run(); + if (isCancelled) { + retryableAction.cancel(new CancellableThreads.ExecutionCancelledException("recovery was cancelled")); + } + } + + public void cancel() { + isCancelled = true; + if (onGoingRetryableActions.isEmpty()) { + return; + } + final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("recovery was cancelled"); + // Dispatch to generic as cancellation calls can come on the cluster state applier thread + threadPool.generic().execute(() -> { + for (RetryableAction action : onGoingRetryableActions.values()) { + action.cancel(exception); + } + onGoingRetryableActions.clear(); + }); + } + + private static boolean retryableException(Exception e) { + if (e instanceof ConnectTransportException) { + return true; + } else if (e instanceof SendRequestTransportException) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + return cause instanceof ConnectTransportException; + } else if (e instanceof RemoteTransportException) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + return cause instanceof CircuitBreakingException || cause instanceof OpenSearchRejectedExecutionException; + } + return false; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java new file mode 100644 index 0000000000000..a73a3b54184da --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/CheckpointInfoResponse.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.Set; + +/** + * Response returned from a {@link SegmentReplicationSource} that includes the file metadata, and SegmentInfos + * associated with a particular {@link ReplicationCheckpoint}. The {@link SegmentReplicationSource} may determine that + * the requested {@link ReplicationCheckpoint} is behind and return a different {@link ReplicationCheckpoint} in this response. + * + * @opensearch.internal + */ +public class CheckpointInfoResponse extends TransportResponse { + + private final ReplicationCheckpoint checkpoint; + private final Store.MetadataSnapshot snapshot; + private final byte[] infosBytes; + // pendingDeleteFiles are segments that have been merged away in the latest in memory SegmentInfos + // but are still referenced by the latest commit point (Segments_N). + private final Set pendingDeleteFiles; + + public CheckpointInfoResponse( + final ReplicationCheckpoint checkpoint, + final Store.MetadataSnapshot snapshot, + final byte[] infosBytes, + final Set additionalFiles + ) { + this.checkpoint = checkpoint; + this.snapshot = snapshot; + this.infosBytes = infosBytes; + this.pendingDeleteFiles = additionalFiles; + } + + public CheckpointInfoResponse(StreamInput in) throws IOException { + this.checkpoint = new ReplicationCheckpoint(in); + this.snapshot = new Store.MetadataSnapshot(in); + this.infosBytes = in.readByteArray(); + this.pendingDeleteFiles = in.readSet(StoreFileMetadata::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + checkpoint.writeTo(out); + snapshot.writeTo(out); + out.writeByteArray(infosBytes); + out.writeCollection(pendingDeleteFiles); + } + + public ReplicationCheckpoint getCheckpoint() { + return checkpoint; + } + + public Store.MetadataSnapshot getSnapshot() { + return snapshot; + } + + public byte[] getInfosBytes() { + return infosBytes; + } + + public Set getPendingDeleteFiles() { + return pendingDeleteFiles; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java new file mode 100644 index 0000000000000..6dc7e293b2c31 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesResponse.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.List; + +/** + * Response from a {@link SegmentReplicationSource} indicating that a replication event has completed. + * + * @opensearch.internal + */ +public class GetSegmentFilesResponse extends TransportResponse { + + List files; + + public GetSegmentFilesResponse(List files) { + this.files = files; + } + + public GetSegmentFilesResponse(StreamInput out) throws IOException { + out.readList(StoreFileMetadata::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(files); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java new file mode 100644 index 0000000000000..8628a266ea7d0 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.action.ActionListener; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.util.List; + +/** + * Represents the source of a replication event. + * + * @opensearch.internal + */ +public interface SegmentReplicationSource { + + /** + * Get Metadata for a ReplicationCheckpoint. + * + * @param replicationId {@link long} - ID of the replication event. + * @param checkpoint {@link ReplicationCheckpoint} Checkpoint to fetch metadata for. + * @param listener {@link ActionListener} listener that completes with a {@link CheckpointInfoResponse}. + */ + void getCheckpointMetadata(long replicationId, ReplicationCheckpoint checkpoint, ActionListener listener); + + /** + * Fetch the requested segment files. Passes a listener that completes when files are stored locally. + * + * @param replicationId {@link long} - ID of the replication event. + * @param checkpoint {@link ReplicationCheckpoint} Checkpoint to fetch metadata for. + * @param filesToFetch {@link List} List of files to fetch. + * @param store {@link Store} Reference to the local store. + * @param listener {@link ActionListener} Listener that completes with the list of files copied. + */ + void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ); +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java new file mode 100644 index 0000000000000..3ca31503f176d --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.transport.TransportService; + +/** + * Factory to build {@link SegmentReplicationSource} used by {@link SegmentReplicationTargetService}. + * + * @opensearch.internal + */ +public class SegmentReplicationSourceFactory { + + private TransportService transportService; + private RecoverySettings recoverySettings; + private ClusterService clusterService; + + public SegmentReplicationSourceFactory( + TransportService transportService, + RecoverySettings recoverySettings, + ClusterService clusterService + ) { + this.transportService = transportService; + this.recoverySettings = recoverySettings; + this.clusterService = clusterService; + } + + public SegmentReplicationSource get(IndexShard shard) { + // TODO: Default to an implementation that uses the primary shard. + return null; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java new file mode 100644 index 0000000000000..d4a946a2a8428 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; +import org.opensearch.indices.replication.common.ReplicationState; +import org.opensearch.indices.replication.common.ReplicationTimer; + +/** + * ReplicationState implementation to track Segment Replication events. + * + * @opensearch.internal + */ +public class SegmentReplicationState implements ReplicationState { + + /** + * The stage of the recovery state + * + * @opensearch.internal + */ + public enum Stage { + INIT((byte) 0), + + DONE((byte) 1); + + private static final Stage[] STAGES = new Stage[Stage.values().length]; + + static { + for (Stage stage : Stage.values()) { + assert stage.id() < STAGES.length && stage.id() >= 0; + STAGES[stage.id] = stage; + } + } + + private final byte id; + + Stage(byte id) { + this.id = id; + } + + public byte id() { + return id; + } + + public static Stage fromId(byte id) { + if (id < 0 || id >= STAGES.length) { + throw new IllegalArgumentException("No mapping for id [" + id + "]"); + } + return STAGES[id]; + } + } + + public SegmentReplicationState() { + this.stage = Stage.INIT; + } + + private Stage stage; + + @Override + public ReplicationLuceneIndex getIndex() { + // TODO + return null; + } + + @Override + public ReplicationTimer getTimer() { + // TODO + return null; + } + + public Stage getStage() { + return stage; + } + + public void setStage(Stage stage) { + this.stage = stage; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java new file mode 100644 index 0000000000000..7933ea5f0344b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -0,0 +1,115 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; +import org.opensearch.indices.replication.common.ReplicationState; +import org.opensearch.indices.replication.common.ReplicationTarget; + +import java.io.IOException; + +/** + * Represents the target of a replication event. + * + * @opensearch.internal + */ +public class SegmentReplicationTarget extends ReplicationTarget { + + private final ReplicationCheckpoint checkpoint; + private final SegmentReplicationSource source; + private final SegmentReplicationState state; + + public SegmentReplicationTarget( + ReplicationCheckpoint checkpoint, + IndexShard indexShard, + SegmentReplicationSource source, + SegmentReplicationTargetService.SegmentReplicationListener listener + ) { + super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); + this.checkpoint = checkpoint; + this.source = source; + this.state = new SegmentReplicationState(); + } + + @Override + protected void closeInternal() { + // TODO + } + + @Override + protected String getPrefix() { + // TODO + return null; + } + + @Override + protected void onDone() { + this.state.setStage(SegmentReplicationState.Stage.DONE); + } + + @Override + protected void onCancel(String reason) { + // TODO + } + + @Override + public ReplicationState state() { + return state; + } + + @Override + public ReplicationTarget retryCopy() { + // TODO + return null; + } + + @Override + public String description() { + // TODO + return null; + } + + @Override + public void notifyListener(OpenSearchException e, boolean sendShardFailure) { + listener.onFailure(state(), e, sendShardFailure); + } + + @Override + public boolean reset(CancellableThreads newTargetCancellableThreads) throws IOException { + // TODO + return false; + } + + @Override + public void writeFileChunk( + StoreFileMetadata metadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + // TODO + } + + /** + * Start the Replication event. + * @param listener {@link ActionListener} listener. + */ + public void startReplication(ActionListener listener) { + // TODO + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java new file mode 100644 index 0000000000000..2722b8d5ec193 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -0,0 +1,168 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.common.Nullable; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.IndexEventListener; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef; +import org.opensearch.indices.replication.common.ReplicationListener; +import org.opensearch.indices.replication.common.ReplicationState; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequestHandler; +import org.opensearch.transport.TransportService; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Service class that orchestrates replication events on replicas. + * + * @opensearch.internal + */ +public class SegmentReplicationTargetService implements IndexEventListener { + + private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class); + + private final ThreadPool threadPool; + private final RecoverySettings recoverySettings; + + private final ReplicationCollection onGoingReplications; + + private final SegmentReplicationSourceFactory sourceFactory; + + /** + * The internal actions + * + * @opensearch.internal + */ + public static class Actions { + public static final String FILE_CHUNK = "internal:index/shard/replication/file_chunk"; + } + + public SegmentReplicationTargetService( + final ThreadPool threadPool, + final RecoverySettings recoverySettings, + final TransportService transportService, + final SegmentReplicationSourceFactory sourceFactory + ) { + this.threadPool = threadPool; + this.recoverySettings = recoverySettings; + this.onGoingReplications = new ReplicationCollection<>(logger, threadPool); + this.sourceFactory = sourceFactory; + + transportService.registerRequestHandler( + Actions.FILE_CHUNK, + ThreadPool.Names.GENERIC, + FileChunkRequest::new, + new FileChunkTransportRequestHandler() + ); + } + + @Override + public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { + if (indexShard != null) { + onGoingReplications.cancelForShard(shardId, "shard closed"); + } + } + + public void startReplication( + final ReplicationCheckpoint checkpoint, + final IndexShard indexShard, + final SegmentReplicationListener listener + ) { + startReplication(new SegmentReplicationTarget(checkpoint, indexShard, sourceFactory.get(indexShard), listener)); + } + + public void startReplication(final SegmentReplicationTarget target) { + final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout()); + logger.trace(() -> new ParameterizedMessage("Starting replication {}", replicationId)); + threadPool.generic().execute(new ReplicationRunner(replicationId)); + } + + /** + * Listener that runs on changes in Replication state + * + * @opensearch.internal + */ + public interface SegmentReplicationListener extends ReplicationListener { + + @Override + default void onDone(ReplicationState state) { + onReplicationDone((SegmentReplicationState) state); + } + + @Override + default void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + onReplicationFailure((SegmentReplicationState) state, e, sendShardFailure); + } + + void onReplicationDone(SegmentReplicationState state); + + void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure); + } + + private class ReplicationRunner implements Runnable { + + final long replicationId; + + public ReplicationRunner(long replicationId) { + this.replicationId = replicationId; + } + + @Override + public void run() { + start(replicationId); + } + } + + private void start(final long replicationId) { + try (ReplicationRef replicationRef = onGoingReplications.get(replicationId)) { + replicationRef.get().startReplication(new ActionListener<>() { + @Override + public void onResponse(Void o) { + onGoingReplications.markAsDone(replicationId); + } + + @Override + public void onFailure(Exception e) { + onGoingReplications.fail(replicationId, new OpenSearchException("Segment Replication failed", e), true); + } + }); + } + } + + private class FileChunkTransportRequestHandler implements TransportRequestHandler { + + // How many bytes we've copied since we last called RateLimiter.pause + final AtomicLong bytesSinceLastPause = new AtomicLong(); + + @Override + public void messageReceived(FileChunkRequest request, TransportChannel channel, Task task) throws Exception { + // How many bytes we've copied since we last called RateLimiter.pause + try (ReplicationRef ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) { + final SegmentReplicationTarget target = ref.get(); + final ActionListener listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request); + target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index 609825eb5227b..b8295f0685a7f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -133,7 +133,7 @@ public T reset(final long id, final TimeValue activityTimeout) { } catch (Exception e) { // fail shard to be safe assert oldTarget != null; - oldTarget.notifyListener(e, true); + oldTarget.notifyListener(new OpenSearchException("Unable to reset target", e), true); return null; } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java index 7942fa8938dd0..029fcb6a3b690 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationState.java @@ -14,5 +14,7 @@ * @opensearch.internal */ public interface ReplicationState { + ReplicationLuceneIndex getIndex(); + ReplicationTimer getTimer(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 0192270907fd2..c7e585a27e35f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -9,14 +9,24 @@ package org.opensearch.indices.replication.common; import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.RateLimiter; import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ChannelActionListener; +import org.opensearch.common.CheckedFunction; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.AbstractRefCounted; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.RecoveryTransportRequest; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportResponse; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; @@ -64,7 +74,7 @@ public CancellableThreads cancellableThreads() { return cancellableThreads; } - public abstract void notifyListener(Exception e, boolean sendShardFailure); + public abstract void notifyListener(OpenSearchException e, boolean sendShardFailure); public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) { super(name); @@ -172,4 +182,83 @@ protected void ensureRefCount() { } } + public ActionListener createOrFinishListener( + final TransportChannel channel, + final String action, + final RecoveryTransportRequest request + ) { + return createOrFinishListener(channel, action, request, nullVal -> TransportResponse.Empty.INSTANCE); + } + + public ActionListener createOrFinishListener( + final TransportChannel channel, + final String action, + final RecoveryTransportRequest request, + final CheckedFunction responseFn + ) { + final ActionListener channelListener = new ChannelActionListener<>(channel, action, request); + final ActionListener voidListener = ActionListener.map(channelListener, responseFn); + + final long requestSeqNo = request.requestSeqNo(); + final ActionListener listener; + if (requestSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { + listener = markRequestReceivedAndCreateListener(requestSeqNo, voidListener); + } else { + listener = voidListener; + } + + return listener; + } + + /** + * Handle a FileChunkRequest for a {@link ReplicationTarget}. + * + * @param request {@link FileChunkRequest} Request containing the file chunk. + * @param bytesSinceLastPause {@link AtomicLong} Bytes since the last pause. + * @param rateLimiter {@link RateLimiter} Rate limiter. + * @throws IOException When there is an issue pausing the rate limiter. + */ + public void handleFileChunk( + final FileChunkRequest request, + final ReplicationTarget replicationTarget, + final AtomicLong bytesSinceLastPause, + final RateLimiter rateLimiter, + final ActionListener listener + ) throws IOException { + + if (listener == null) { + return; + } + final ReplicationLuceneIndex indexState = replicationTarget.state().getIndex(); + if (request.sourceThrottleTimeInNanos() != ReplicationLuceneIndex.UNKNOWN) { + indexState.addSourceThrottling(request.sourceThrottleTimeInNanos()); + } + if (rateLimiter != null) { + long bytes = bytesSinceLastPause.addAndGet(request.content().length()); + if (bytes > rateLimiter.getMinPauseCheckBytes()) { + // Time to pause + bytesSinceLastPause.addAndGet(-bytes); + long throttleTimeInNanos = rateLimiter.pause(bytes); + indexState.addTargetThrottling(throttleTimeInNanos); + replicationTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos); + } + } + writeFileChunk( + request.metadata(), + request.position(), + request.content(), + request.lastChunk(), + request.totalTranslogOps(), + listener + ); + } + + public abstract void writeFileChunk( + StoreFileMetadata metadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ); } diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index e54f06937cad3..bda2a910d922e 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -105,7 +105,7 @@ public void testWriteFileChunksConcurrently() throws Exception { receiveFileInfoFuture ); receiveFileInfoFuture.actionGet(); - List requests = new ArrayList<>(); + List requests = new ArrayList<>(); long seqNo = 0; for (StoreFileMetadata md : mdFiles) { try (IndexInput in = sourceShard.store().directory().openInput(md.name(), IOContext.READONCE)) { @@ -115,7 +115,7 @@ public void testWriteFileChunksConcurrently() throws Exception { byte[] buffer = new byte[length]; in.readBytes(buffer, 0, length); requests.add( - new RecoveryFileChunkRequest( + new FileChunkRequest( 0, seqNo++, sourceShard.shardId(), @@ -132,7 +132,7 @@ public void testWriteFileChunksConcurrently() throws Exception { } } Randomness.shuffle(requests); - BlockingQueue queue = new ArrayBlockingQueue<>(requests.size()); + BlockingQueue queue = new ArrayBlockingQueue<>(requests.size()); queue.addAll(requests); Thread[] senders = new Thread[between(1, 4)]; CyclicBarrier barrier = new CyclicBarrier(senders.length); @@ -140,7 +140,7 @@ public void testWriteFileChunksConcurrently() throws Exception { senders[i] = new Thread(() -> { try { barrier.await(); - RecoveryFileChunkRequest r; + FileChunkRequest r; while ((r = queue.poll()) != null) { recoveryTarget.writeFileChunk( r.metadata(), diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java new file mode 100644 index 0000000000000..753528afd0a07 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -0,0 +1,149 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.junit.Assert; +import org.mockito.Mockito; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { + + public void testTargetReturnsSuccess_listenerCompletes() throws IOException { + Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); + final TransportService transportService = mock(TransportService.class); + final IndexShard indexShard = newShard(false, settings); + ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); + SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); + final SegmentReplicationSource replicationSource = mock(SegmentReplicationSource.class); + when(replicationSourceFactory.get(indexShard)).thenReturn(replicationSource); + + SegmentReplicationTargetService sut = new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + replicationSourceFactory + ); + + final SegmentReplicationTarget target = new SegmentReplicationTarget( + checkpoint, + indexShard, + replicationSource, + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + assertEquals(SegmentReplicationState.Stage.DONE, state.getStage()); + } + + @Override + public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + Assert.fail(); + } + } + ); + final SegmentReplicationTarget spy = Mockito.spy(target); + doAnswer(invocation -> { + final ActionListener listener = invocation.getArgument(0); + listener.onResponse(null); + return null; + }).when(spy).startReplication(any()); + sut.startReplication(spy); + closeShards(indexShard); + } + + public void testTargetThrowsException() throws IOException { + Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); + final TransportService transportService = mock(TransportService.class); + final IndexShard indexShard = newShard(false, settings); + ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); + SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); + SegmentReplicationSource replicationSource = mock(SegmentReplicationSource.class); + + SegmentReplicationTargetService sut = new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + replicationSourceFactory + ); + + final OpenSearchException expectedError = new OpenSearchException("Fail"); + final SegmentReplicationTarget target = new SegmentReplicationTarget( + checkpoint, + indexShard, + replicationSource, + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + Assert.fail(); + } + + @Override + public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + assertEquals(SegmentReplicationState.Stage.INIT, state.getStage()); + assertEquals(expectedError, e.getCause()); + assertTrue(sendShardFailure); + } + } + ); + final SegmentReplicationTarget spy = Mockito.spy(target); + doAnswer(invocation -> { + final ActionListener listener = invocation.getArgument(0); + listener.onFailure(expectedError); + return null; + }).when(spy).startReplication(any()); + sut.startReplication(spy); + closeShards(indexShard); + } + + public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOException { + Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); + final TransportService transportService = mock(TransportService.class); + final IndexShard indexShard = newShard(false, settings); + ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); + SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); + SegmentReplicationSource replicationSource = mock(SegmentReplicationSource.class); + + SegmentReplicationTargetService sut = new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + replicationSourceFactory + ); + + final SegmentReplicationTarget target = new SegmentReplicationTarget( + checkpoint, + indexShard, + replicationSource, + mock(SegmentReplicationTargetService.SegmentReplicationListener.class) + ); + final SegmentReplicationTarget spy = Mockito.spy(target); + sut.startReplication(spy); + sut.beforeIndexShardClosed(indexShard.shardId(), indexShard, settings); + Mockito.verify(spy, times(1)).cancel(any()); + closeShards(indexShard); + } +} From 4c3e74d568deadbbbee6063db3eb69baf2c99d77 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 26 May 2022 13:51:16 -0700 Subject: [PATCH 2/3] Code cleanup. Signed-off-by: Marc Handalian --- .../recovery/PeerRecoveryTargetService.java | 2 +- .../recovery/RemoteRecoveryTargetHandler.java | 10 ++- .../recovery/RetryableTransportClient.java | 12 ++-- .../replication/SegmentReplicationState.java | 4 +- .../SegmentReplicationTargetService.java | 6 +- .../replication/common/ReplicationTarget.java | 5 ++ .../SegmentReplicationTargetServiceTests.java | 66 +++++++------------ 7 files changed, 49 insertions(+), 56 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 8db8e68e191ab..85141556657f3 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -525,7 +525,7 @@ class FileChunkTransportRequestHandler implements TransportRequestHandler recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index b5e23619e761d..ab6466feb11f8 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -32,6 +32,8 @@ package org.opensearch.indices.recovery; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; @@ -61,6 +63,8 @@ */ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { + private static final Logger logger = LogManager.getLogger(RemoteRecoveryTargetHandler.class); + private final TransportService transportService; private final long recoveryId; private final ShardId shardId; @@ -85,10 +89,14 @@ public RemoteRecoveryTargetHandler( Consumer onSourceThrottle ) { this.transportService = transportService; + // It is safe to pass the retry timeout value here because RemoteRecoveryTargetHandler + // created per recovery. Any change to RecoverySettings will be applied on the next + // recovery. this.retryableTransportClient = new RetryableTransportClient( transportService, targetNode, - recoverySettings.internalActionRetryTimeout() + recoverySettings.internalActionRetryTimeout(), + logger ); this.recoveryId = recoveryId; this.shardId = shardId; diff --git a/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java index b78fc994e9b13..bc10cc80b7fdc 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java @@ -8,7 +8,6 @@ package org.opensearch.indices.recovery; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.LegacyESVersion; @@ -40,8 +39,6 @@ */ public final class RetryableTransportClient { - private static final Logger logger = LogManager.getLogger(RetryableTransportClient.class); - private final ThreadPool threadPool; private final Map> onGoingRetryableActions = ConcurrentCollections.newConcurrentMap(); private volatile boolean isCancelled = false; @@ -49,11 +46,14 @@ public final class RetryableTransportClient { private final TimeValue retryTimeout; private final DiscoveryNode targetNode; - public RetryableTransportClient(TransportService transportService, DiscoveryNode targetNode, TimeValue retryTimeout) { + private final Logger logger; + + public RetryableTransportClient(TransportService transportService, DiscoveryNode targetNode, TimeValue retryTimeout, Logger logger) { this.threadPool = transportService.getThreadPool(); this.transportService = transportService; this.retryTimeout = retryTimeout; this.targetNode = targetNode; + this.logger = logger; } /** @@ -105,7 +105,7 @@ public boolean shouldRetry(Exception e) { onGoingRetryableActions.put(key, retryableAction); retryableAction.run(); if (isCancelled) { - retryableAction.cancel(new CancellableThreads.ExecutionCancelledException("recovery was cancelled")); + retryableAction.cancel(new CancellableThreads.ExecutionCancelledException("retryable action was cancelled")); } } @@ -114,7 +114,7 @@ public void cancel() { if (onGoingRetryableActions.isEmpty()) { return; } - final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("recovery was cancelled"); + final RuntimeException exception = new CancellableThreads.ExecutionCancelledException("retryable action was cancelled"); // Dispatch to generic as cancellation calls can come on the cluster state applier thread threadPool.generic().execute(() -> { for (RetryableAction action : onGoingRetryableActions.values()) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java index d4a946a2a8428..b01016d2a1e62 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -25,9 +25,9 @@ public class SegmentReplicationState implements ReplicationState { * @opensearch.internal */ public enum Stage { - INIT((byte) 0), + DONE((byte) 0), - DONE((byte) 1); + INIT((byte) 1); private static final Stage[] STAGES = new Stage[Stage.values().length]; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 2722b8d5ec193..1e064d483a4bb 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -120,6 +120,9 @@ default void onFailure(ReplicationState state, OpenSearchException e, boolean se void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure); } + /** + * Runnable implementation to trigger a replication event. + */ private class ReplicationRunner implements Runnable { final long replicationId; @@ -156,8 +159,7 @@ private class FileChunkTransportRequestHandler implements TransportRequestHandle final AtomicLong bytesSinceLastPause = new AtomicLong(); @Override - public void messageReceived(FileChunkRequest request, TransportChannel channel, Task task) throws Exception { - // How many bytes we've copied since we last called RateLimiter.pause + public void messageReceived(final FileChunkRequest request, TransportChannel channel, Task task) throws Exception { try (ReplicationRef ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) { final SegmentReplicationTarget target = ref.get(); final ActionListener listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index c7e585a27e35f..f8dc07f122c02 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -15,6 +15,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.support.ChannelActionListener; import org.opensearch.common.CheckedFunction; +import org.opensearch.common.Nullable; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.logging.Loggers; import org.opensearch.common.util.CancellableThreads; @@ -108,6 +109,7 @@ public void setLastAccessTime() { lastAccessTime = System.nanoTime(); } + @Nullable public ActionListener markRequestReceivedAndCreateListener(long requestSeqNo, ActionListener listener) { return requestTracker.markReceivedAndCreateListener(requestSeqNo, listener); } @@ -182,6 +184,7 @@ protected void ensureRefCount() { } } + @Nullable public ActionListener createOrFinishListener( final TransportChannel channel, final String action, @@ -190,6 +193,7 @@ public ActionListener createOrFinishListener( return createOrFinishListener(channel, action, request, nullVal -> TransportResponse.Empty.INSTANCE); } + @Nullable public ActionListener createOrFinishListener( final TransportChannel channel, final String action, @@ -216,6 +220,7 @@ public ActionListener createOrFinishListener( * @param request {@link FileChunkRequest} Request containing the file chunk. * @param bytesSinceLastPause {@link AtomicLong} Bytes since the last pause. * @param rateLimiter {@link RateLimiter} Rate limiter. + * @param listener {@link ActionListener} listener that completes when the chunk has been written. * @throws IOException When there is an issue pausing the rate limiter. */ public void handleFileChunk( diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 753528afd0a07..aa17dec5767da 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -27,24 +27,34 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { - public void testTargetReturnsSuccess_listenerCompletes() throws IOException { - Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + private IndexShard indexShard; + private ReplicationCheckpoint checkpoint; + private SegmentReplicationSource replicationSource; + private SegmentReplicationTargetService sut; + + @Override + public void setUp() throws Exception { + super.setUp(); + final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); final TransportService transportService = mock(TransportService.class); - final IndexShard indexShard = newShard(false, settings); - ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); + indexShard = newShard(false, settings); + checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); - final SegmentReplicationSource replicationSource = mock(SegmentReplicationSource.class); + replicationSource = mock(SegmentReplicationSource.class); when(replicationSourceFactory.get(indexShard)).thenReturn(replicationSource); - SegmentReplicationTargetService sut = new SegmentReplicationTargetService( - threadPool, - recoverySettings, - transportService, - replicationSourceFactory - ); + sut = new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, replicationSourceFactory); + } + + @Override + public void tearDown() throws Exception { + closeShards(indexShard); + super.tearDown(); + } + public void testTargetReturnsSuccess_listenerCompletes() throws IOException { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, indexShard, @@ -72,22 +82,6 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept } public void testTargetThrowsException() throws IOException { - Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); - final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); - final TransportService transportService = mock(TransportService.class); - final IndexShard indexShard = newShard(false, settings); - ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); - SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); - SegmentReplicationSource replicationSource = mock(SegmentReplicationSource.class); - - SegmentReplicationTargetService sut = new SegmentReplicationTargetService( - threadPool, - recoverySettings, - transportService, - replicationSourceFactory - ); - final OpenSearchException expectedError = new OpenSearchException("Fail"); final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, @@ -118,22 +112,6 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept } public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOException { - Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); - final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); - final TransportService transportService = mock(TransportService.class); - final IndexShard indexShard = newShard(false, settings); - ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); - SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); - SegmentReplicationSource replicationSource = mock(SegmentReplicationSource.class); - - SegmentReplicationTargetService sut = new SegmentReplicationTargetService( - threadPool, - recoverySettings, - transportService, - replicationSourceFactory - ); - final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, indexShard, @@ -142,7 +120,7 @@ public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOExc ); final SegmentReplicationTarget spy = Mockito.spy(target); sut.startReplication(spy); - sut.beforeIndexShardClosed(indexShard.shardId(), indexShard, settings); + sut.beforeIndexShardClosed(indexShard.shardId(), indexShard, Settings.EMPTY); Mockito.verify(spy, times(1)).cancel(any()); closeShards(indexShard); } From d07ee74ef0974dd94eb8032c9fd33315a6169837 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 27 May 2022 10:36:30 -0700 Subject: [PATCH 3/3] Make SegmentReplicationTargetService component final so that it can not be extended by plugins. Signed-off-by: Marc Handalian --- .../indices/replication/SegmentReplicationTargetService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 1e064d483a4bb..1c6053a72a4c5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -38,7 +38,7 @@ * * @opensearch.internal */ -public class SegmentReplicationTargetService implements IndexEventListener { +public final class SegmentReplicationTargetService implements IndexEventListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class);