From bf9ed57ad0ebedf44c33e944da0e44dd389bcc93 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Mon, 6 Jun 2022 23:02:38 -0700 Subject: [PATCH 01/11] Add components for segment replication to perform file copy. This change adds the required components to SegmentReplicationSourceService to initiate copy and react to lifecycle events. Along with new components it refactors common file copy code from RecoverySourceHandler into reusable pieces. Signed-off-by: Marc Handalian --- .../indices/RunUnderPrimaryPermit.java | 72 +++++ .../indices/recovery/FileChunkWriter.java | 30 +++ .../recovery/RecoverySourceHandler.java | 250 ++---------------- .../recovery/RecoveryTargetHandler.java | 14 +- .../recovery/RemoteRecoveryTargetHandler.java | 80 ++---- .../recovery/RetryableTransportClient.java | 2 +- .../replication/GetSegmentFilesRequest.java | 4 + .../OngoingSegmentReplications.java | 203 ++++++++++++++ .../RemoteSegmentFileChunkWriter.java | 123 +++++++++ .../SegmentFileTransferHandler.java | 240 +++++++++++++++++ .../SegmentReplicationSourceHandler.java | 154 +++++++++++ .../SegmentReplicationSourceService.java | 135 +++++----- .../indices/replication/common/CopyState.java | 16 +- .../SegmentReplicationTransportRequest.java | 25 ++ .../recovery/RecoverySourceHandlerTests.java | 7 +- .../OngoingSegmentReplicationsTests.java | 217 +++++++++++++++ .../SegmentReplicationSourceHandlerTests.java | 167 ++++++++++++ .../SegmentReplicationSourceServiceTests.java | 104 +++++--- .../replication/common/CopyStateTests.java | 10 +- 19 files changed, 1437 insertions(+), 416 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/RunUnderPrimaryPermit.java create mode 100644 server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java create mode 100644 server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java create mode 100644 server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java diff --git a/server/src/main/java/org/opensearch/indices/RunUnderPrimaryPermit.java b/server/src/main/java/org/opensearch/indices/RunUnderPrimaryPermit.java new file mode 100644 index 0000000000000..29cac1601dc67 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/RunUnderPrimaryPermit.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.util.concurrent.FutureUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardRelocatedException; +import org.opensearch.threadpool.ThreadPool; + +import java.util.concurrent.CompletableFuture; + +/** + * Execute a Runnable after acquiring the primary's operation permit. + * + * @opensearch.internal + */ +public final class RunUnderPrimaryPermit { + + public static void run( + CancellableThreads.Interruptible runnable, + String reason, + IndexShard primary, + CancellableThreads cancellableThreads, + Logger logger + ) { + cancellableThreads.execute(() -> { + CompletableFuture permit = new CompletableFuture<>(); + final ActionListener onAcquired = new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + if (permit.complete(releasable) == false) { + releasable.close(); + } + } + + @Override + public void onFailure(Exception e) { + permit.completeExceptionally(e); + } + }; + primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); + try (Releasable ignored = FutureUtils.get(permit)) { + // check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent + // races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated() + if (primary.isRelocatedPrimary()) { + throw new IndexShardRelocatedException(primary.shardId()); + } + runnable.run(); + } finally { + // just in case we got an exception (likely interrupted) while waiting for the get + permit.whenComplete((r, e) -> { + if (r != null) { + r.close(); + } + if (e != null) { + logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e); + } + }); + } + }); + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java new file mode 100644 index 0000000000000..073442e9d58ae --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.opensearch.action.ActionListener; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.index.store.StoreFileMetadata; + +/** + * Writes a partial file chunk to the target store. + * + * @opensearch.internal + */ +public interface FileChunkWriter { + + void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ); +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 0870fd4ca9295..64b29a86a0c24 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -32,18 +32,13 @@ package org.opensearch.indices.recovery; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.SetOnce; -import org.opensearch.ExceptionsHelper; import org.opensearch.LegacyESVersion; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; @@ -55,13 +50,10 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.StopWatch; -import org.opensearch.common.bytes.BytesArray; -import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.logging.Loggers; -import org.opensearch.common.lucene.store.InputStreamIndexInput; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.CancellableThreads; @@ -77,27 +69,22 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; -import org.opensearch.index.shard.IndexShardRelocatedException; import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.RunUnderPrimaryPermit; +import org.opensearch.indices.replication.SegmentFileTransferHandler; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.Transports; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.Deque; import java.util.List; import java.util.Locale; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -118,9 +105,8 @@ * * @opensearch.internal */ -public class RecoverySourceHandler { +public class RecoverySourceHandler extends SegmentFileTransferHandler { - protected final Logger logger; // Shard that is going to be recovered (the "source") private final IndexShard shard; private final int shardId; @@ -128,11 +114,8 @@ public class RecoverySourceHandler { private final StartRecoveryRequest request; private final int chunkSizeInBytes; private final RecoveryTargetHandler recoveryTarget; - private final int maxConcurrentFileChunks; private final int maxConcurrentOperations; private final ThreadPool threadPool; - private final CancellableThreads cancellableThreads = new CancellableThreads(); - private final List resources = new CopyOnWriteArrayList<>(); private final ListenableFuture future = new ListenableFuture<>(); public static final String PEER_RECOVERY_NAME = "peer-recovery"; @@ -145,15 +128,22 @@ public RecoverySourceHandler( int maxConcurrentFileChunks, int maxConcurrentOperations ) { + super( + shard, + request.targetNode(), + recoveryTarget, + Loggers.getLogger(RecoverySourceHandler.class, request.shardId(), "recover to " + request.targetNode().getName()), + threadPool, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); this.shard = shard; - this.recoveryTarget = recoveryTarget; this.threadPool = threadPool; this.request = request; + this.recoveryTarget = recoveryTarget; this.shardId = this.request.shardId().id(); - this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName()); this.chunkSizeInBytes = fileChunkSizeInBytes; // if the target is on an old version, it won't be able to handle out-of-order file chunks. - this.maxConcurrentFileChunks = maxConcurrentFileChunks; this.maxConcurrentOperations = maxConcurrentOperations; } @@ -192,7 +182,7 @@ public void recoverToTarget(ActionListener listener) { final SetOnce retentionLeaseRef = new SetOnce<>(); - runUnderPrimaryPermit(() -> { + RunUnderPrimaryPermit.run(() -> { final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); if (targetShardRouting == null) { @@ -286,7 +276,7 @@ && isTargetSameHistory() }); final StepListener deleteRetentionLeaseStep = new StepListener<>(); - runUnderPrimaryPermit(() -> { + RunUnderPrimaryPermit.run(() -> { try { // If the target previously had a copy of this shard then a file-based recovery might move its global // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a @@ -332,7 +322,7 @@ && isTargetSameHistory() * make sure to do this before sampling the max sequence number in the next step, to ensure that we send * all documents up to maxSeqNo in phase2. */ - runUnderPrimaryPermit( + RunUnderPrimaryPermit.run( () -> shard.initiateTracking(request.targetAllocationId()), shardId + " initiating tracking of " + request.targetAllocationId(), shard, @@ -420,50 +410,6 @@ private int countNumberOfHistoryOperations(long startingSeqNo) throws IOExceptio return shard.countNumberOfHistoryOperations(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE); } - static void runUnderPrimaryPermit( - CancellableThreads.Interruptible runnable, - String reason, - IndexShard primary, - CancellableThreads cancellableThreads, - Logger logger - ) { - cancellableThreads.execute(() -> { - CompletableFuture permit = new CompletableFuture<>(); - final ActionListener onAcquired = new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - if (permit.complete(releasable) == false) { - releasable.close(); - } - } - - @Override - public void onFailure(Exception e) { - permit.completeExceptionally(e); - } - }; - primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); - try (Releasable ignored = FutureUtils.get(permit)) { - // check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent - // races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated() - if (primary.isRelocatedPrimary()) { - throw new IndexShardRelocatedException(primary.shardId()); - } - runnable.run(); - } finally { - // just in case we got an exception (likely interrupted) while waiting for the get - permit.whenComplete((r, e) -> { - if (r != null) { - r.close(); - } - if (e != null) { - logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e); - } - }); - } - }); - } - /** * Increases the store reference and returns a {@link Releasable} that will decrease the store reference using the generic thread pool. * We must never release the store using an interruptible thread as we can risk invalidating the node lock. @@ -709,7 +655,7 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A } void createRetentionLease(final long startingSeqNo, ActionListener listener) { - runUnderPrimaryPermit(() -> { + RunUnderPrimaryPermit.run(() -> { // Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local // checkpoint of the safe commit we're creating and this lease's retained seqno with the retention lock, and by cloning an // existing lease we (approximately) know that all our peers are also retaining history as requested by the cloned lease. If @@ -983,7 +929,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire * the permit then the state of the shard will be relocated and this recovery will fail. */ - runUnderPrimaryPermit( + RunUnderPrimaryPermit.run( () -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), shardId + " marking " + request.targetAllocationId() + " as in sync", shard, @@ -995,7 +941,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis cancellableThreads.checkForCancel(); recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener); finalizeListener.whenComplete(r -> { - runUnderPrimaryPermit( + RunUnderPrimaryPermit.run( () -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, @@ -1056,121 +1002,6 @@ public String toString() { + '}'; } - /** - * A file chunk from the recovery source - * - * @opensearch.internal - */ - private static class FileChunk implements MultiChunkTransfer.ChunkRequest, Releasable { - final StoreFileMetadata md; - final BytesReference content; - final long position; - final boolean lastChunk; - final Releasable onClose; - - FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose) { - this.md = md; - this.content = content; - this.position = position; - this.lastChunk = lastChunk; - this.onClose = onClose; - } - - @Override - public boolean lastChunk() { - return lastChunk; - } - - @Override - public void close() { - onClose.close(); - } - } - - void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener listener) { - ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length)); // send smallest first - - final MultiChunkTransfer multiFileSender = new MultiChunkTransfer( - logger, - threadPool.getThreadContext(), - listener, - maxConcurrentFileChunks, - Arrays.asList(files) - ) { - - final Deque buffers = new ConcurrentLinkedDeque<>(); - InputStreamIndexInput currentInput = null; - long offset = 0; - - @Override - protected void onNewResource(StoreFileMetadata md) throws IOException { - offset = 0; - IOUtils.close(currentInput, () -> currentInput = null); - final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); - currentInput = new InputStreamIndexInput(indexInput, md.length()) { - @Override - public void close() throws IOException { - IOUtils.close(indexInput, super::close); // InputStreamIndexInput's close is a noop - } - }; - } - - private byte[] acquireBuffer() { - final byte[] buffer = buffers.pollFirst(); - if (buffer != null) { - return buffer; - } - return new byte[chunkSizeInBytes]; - } - - @Override - protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException { - assert Transports.assertNotTransportThread("read file chunk"); - cancellableThreads.checkForCancel(); - final byte[] buffer = acquireBuffer(); - final int bytesRead = currentInput.read(buffer); - if (bytesRead == -1) { - throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name()); - } - final boolean lastChunk = offset + bytesRead == md.length(); - final FileChunk chunk = new FileChunk( - md, - new BytesArray(buffer, 0, bytesRead), - offset, - lastChunk, - () -> buffers.addFirst(buffer) - ); - offset += bytesRead; - return chunk; - } - - @Override - protected void executeChunkRequest(FileChunk request, ActionListener listener) { - cancellableThreads.checkForCancel(); - recoveryTarget.writeFileChunk( - request.md, - request.position, - request.content, - request.lastChunk, - translogOps.getAsInt(), - ActionListener.runBefore(listener, request::close) - ); - } - - @Override - protected void handleError(StoreFileMetadata md, Exception e) throws Exception { - handleErrorOnSendFiles(store, e, new StoreFileMetadata[] { md }); - } - - @Override - public void close() throws IOException { - IOUtils.close(currentInput, () -> currentInput = null); - } - }; - resources.add(multiFileSender); - multiFileSender.start(); - } - private void cleanFiles( Store store, Store.MetadataSnapshot sourceMetadata, @@ -1199,47 +1030,4 @@ private void cleanFiles( })) ); } - - private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] mds) throws Exception { - final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e); - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[handle error on send/clean files]"); - if (corruptIndexException != null) { - Exception localException = null; - for (StoreFileMetadata md : mds) { - cancellableThreads.checkForCancel(); - logger.debug("checking integrity for file {} after remove corruption exception", md); - if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! - logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); - if (localException == null) { - localException = corruptIndexException; - } - failEngine(corruptIndexException); - } - } - if (localException != null) { - throw localException; - } else { // corruption has happened on the way to replica - RemoteTransportException remoteException = new RemoteTransportException( - "File corruption occurred on recovery but checksums are ok", - null - ); - remoteException.addSuppressed(e); - logger.warn( - () -> new ParameterizedMessage( - "{} Remote file corruption on node {}, recovering {}. local checksum OK", - shardId, - request.targetNode(), - mds - ), - corruptIndexException - ); - throw remoteException; - } - } - throw e; - } - - protected void failEngine(IOException cause) { - shard.failShard("recovery", cause); - } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java index 84b6ec170d3f7..c750c0e88364b 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java @@ -32,11 +32,9 @@ package org.opensearch.indices.recovery; import org.opensearch.action.ActionListener; -import org.opensearch.common.bytes.BytesReference; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.store.Store; -import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import java.util.List; @@ -46,7 +44,7 @@ * * @opensearch.internal */ -public interface RecoveryTargetHandler { +public interface RecoveryTargetHandler extends FileChunkWriter { /** * Prepares the target to receive translog operations, after all file have been copied @@ -123,15 +121,5 @@ void receiveFileInfo( */ void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetadata, ActionListener listener); - /** writes a partial file chunk to the target store */ - void writeFileChunk( - StoreFileMetadata fileMetadata, - long position, - BytesReference content, - boolean lastChunk, - int totalTranslogOps, - ActionListener listener - ); - default void cancel() {} } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index ab6466feb11f8..7b7844265b067 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -34,8 +34,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.RateLimiter; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.bytes.BytesReference; @@ -46,12 +44,12 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.RemoteSegmentFileChunkWriter; import org.opensearch.transport.EmptyTransportResponseHandler; import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; -import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -72,13 +70,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { private final RecoverySettings recoverySettings; private final TransportRequestOptions translogOpsRequestOptions; - private final TransportRequestOptions fileChunkRequestOptions; - private final AtomicLong bytesSinceLastPause = new AtomicLong(); private final AtomicLong requestSeqNoGenerator = new AtomicLong(0); - - private final Consumer onSourceThrottle; private final RetryableTransportClient retryableTransportClient; + private final RemoteSegmentFileChunkWriter fileChunkWriter; public RemoteRecoveryTargetHandler( long recoveryId, @@ -102,15 +97,18 @@ public RemoteRecoveryTargetHandler( this.shardId = shardId; this.targetNode = targetNode; this.recoverySettings = recoverySettings; - this.onSourceThrottle = onSourceThrottle; this.translogOpsRequestOptions = TransportRequestOptions.builder() .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionLongTimeout()) .build(); - this.fileChunkRequestOptions = TransportRequestOptions.builder() - .withType(TransportRequestOptions.Type.RECOVERY) - .withTimeout(recoverySettings.internalActionTimeout()) - .build(); + this.fileChunkWriter = new RemoteSegmentFileChunkWriter( + recoveryId, + recoverySettings, + retryableTransportClient, + shardId, + PeerRecoveryTargetService.Actions.FILE_CHUNK, + onSourceThrottle + ); } public DiscoveryNode targetNode() { @@ -235,6 +233,11 @@ public void cleanFiles( retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); } + @Override + public void cancel() { + retryableTransportClient.cancel(); + } + @Override public void writeFileChunk( StoreFileMetadata fileMetadata, @@ -244,57 +247,6 @@ public void writeFileChunk( int totalTranslogOps, ActionListener listener ) { - // Pause using the rate limiter, if desired, to throttle the recovery - final long throttleTimeInNanos; - // always fetch the ratelimiter - it might be updated in real-time on the recovery settings - final RateLimiter rl = recoverySettings.rateLimiter(); - if (rl != null) { - long bytes = bytesSinceLastPause.addAndGet(content.length()); - if (bytes > rl.getMinPauseCheckBytes()) { - // Time to pause - bytesSinceLastPause.addAndGet(-bytes); - try { - throttleTimeInNanos = rl.pause(bytes); - onSourceThrottle.accept(throttleTimeInNanos); - } catch (IOException e) { - throw new OpenSearchException("failed to pause recovery", e); - } - } else { - throttleTimeInNanos = 0; - } - } else { - throttleTimeInNanos = 0; - } - - final String action = PeerRecoveryTargetService.Actions.FILE_CHUNK; - final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); - /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can - * see how many translog ops we accumulate while copying files across the network. A future optimization - * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. - */ - final FileChunkRequest request = new FileChunkRequest( - recoveryId, - requestSeqNo, - shardId, - fileMetadata, - position, - content, - lastChunk, - totalTranslogOps, - throttleTimeInNanos - ); - final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; - retryableTransportClient.executeRetryableAction( - action, - request, - fileChunkRequestOptions, - ActionListener.map(listener, r -> null), - reader - ); - } - - @Override - public void cancel() { - retryableTransportClient.cancel(); + fileChunkWriter.writeFileChunk(fileMetadata, position, content, lastChunk, totalTranslogOps, listener); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java index bc10cc80b7fdc..a7113fb3fee5c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java @@ -74,7 +74,7 @@ public void executeRetryableAction( executeRetryableAction(action, request, options, actionListener, reader); } - void executeRetryableAction( + public void executeRetryableAction( String action, TransportRequest request, TransportRequestOptions options, diff --git a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java index 21749d3fe7d8a..daad33ed93f28 100644 --- a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java @@ -57,4 +57,8 @@ public void writeTo(StreamOutput out) throws IOException { public ReplicationCheckpoint getCheckpoint() { return checkpoint; } + + public List getFilesToFetch() { + return filesToFetch; + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java new file mode 100644 index 0000000000000..fe4cbc255ec1e --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -0,0 +1,203 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyState; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Manages references to ongoing segrep events on a node. + * + * @opensearch.internal + */ +class OngoingSegmentReplications { + + private final RecoverySettings recoverySettings; + private final IndicesService indicesService; + private final Map copyStateMap; + private final Map nodesToHandlers; + + /** + * Constructor. + * @param indicesService {@link IndicesService} + * @param recoverySettings {@link RecoverySettings} + */ + OngoingSegmentReplications(IndicesService indicesService, RecoverySettings recoverySettings) { + this.indicesService = indicesService; + this.recoverySettings = recoverySettings; + this.copyStateMap = Collections.synchronizedMap(new HashMap<>()); + this.nodesToHandlers = Collections.synchronizedMap(new HashMap<>()); + } + + /** + * Operations on the {@link #copyStateMap} member. + */ + + /** + * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key + * and returns the cached value if one is present. If the key is not present, a {@link CopyState} + * object is constructed and stored in the map before being returned. + */ + synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException { + if (isInCopyStateMap(checkpoint)) { + final CopyState copyState = fetchFromCopyStateMap(checkpoint); + // we incref the copyState for every replica that is using this checkpoint. + // decref will happen when copy completes. + copyState.incRef(); + return copyState; + } else { + // From the checkpoint's shard ID, fetch the IndexShard + ShardId shardId = checkpoint.getShardId(); + final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.id()); + // build the CopyState object and cache it before returning + final CopyState copyState = new CopyState(checkpoint, indexShard); + + /** + * Use the checkpoint from the request as the key in the map, rather than + * the checkpoint from the created CopyState. This maximizes cache hits + * if replication targets make a request with an older checkpoint. + * Replication targets are expected to fetch the checkpoint in the response + * CopyState to bring themselves up to date. + */ + addToCopyStateMap(checkpoint, copyState); + return copyState; + } + } + + void cancelReplication(DiscoveryNode node) { + if (nodesToHandlers.containsKey(node)) { + final SegmentReplicationSourceHandler handler = nodesToHandlers.remove(node); + handler.cancel("Cancel on node left"); + removeCopyState(handler.getCopyState()); + } + } + + SegmentReplicationSourceHandler createTargetHandler( + DiscoveryNode node, + CopyState copyState, + RemoteSegmentFileChunkWriter segmentFileChunkWriter + ) { + return new SegmentReplicationSourceHandler( + node, + segmentFileChunkWriter, + copyState.getShard().getThreadPool(), + copyState, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks() + ); + } + + /** + * Adds the input {@link CopyState} object to {@link #copyStateMap}. + * The key is the CopyState's {@link ReplicationCheckpoint} object. + */ + private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { + copyStateMap.putIfAbsent(checkpoint, copyState); + } + + /** + * Given a {@link ReplicationCheckpoint}, return the corresponding + * {@link CopyState} object, if any, from {@link #copyStateMap}. + */ + private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { + return copyStateMap.get(replicationCheckpoint); + } + + /** + * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} + * as a key by invoking {@link Map#containsKey(Object)}. + */ + boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { + return copyStateMap.containsKey(replicationCheckpoint); + } + + void startSegmentCopy(GetSegmentFilesRequest request, ActionListener listener) { + final DiscoveryNode node = request.getTargetNode(); + if (nodesToHandlers.containsKey(node)) { + final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node); + // update the given listener to release the CopyState before it resolves. + final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { + final SegmentReplicationSourceHandler sourceHandler = nodesToHandlers.remove(node); + removeCopyState(sourceHandler.getCopyState()); + }); + handler.sendFiles(request, wrappedListener); + } else { + listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + } + } + + /** + * Remove a CopyState. Intended to be called after a replication event completes. + * This method will remove a copyState from the copyStateMap only if its refCount hits 0. + * @param copyState {@link CopyState} + */ + private synchronized void removeCopyState(CopyState copyState) { + copyState.decRef(); + if (copyState.refCount() <= 0) { + copyStateMap.remove(copyState.getRequestedReplicationCheckpoint()); + } + } + + /** + * Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current + * nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its + * local store. It will then build a handler to orchestrate the segment copy that will be stored locally and started on a subsequent request from replicas + * with the list of required files. + * @param request {@link CheckpointInfoRequest} + * @param segmentSegmentFileChunkWriter {@link RemoteSegmentFileChunkWriter} writer to handle sending files over the transport layer. + * @return {@link CopyState} the built CopyState for this replication event. + * @throws IOException - When there is an IO error building CopyState. + */ + CopyState prepareForReplication(CheckpointInfoRequest request, RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter) + throws IOException { + final CopyState copyState = getCachedCopyState(request.getCheckpoint()); + final SegmentReplicationSourceHandler handler = createTargetHandler( + request.getTargetNode(), + copyState, + segmentSegmentFileChunkWriter + ); + nodesToHandlers.putIfAbsent(request.getTargetNode(), handler); + return copyState; + } + + int size() { + return nodesToHandlers.size(); + } + + int cachedCopyStateSize() { + return copyStateMap.size(); + } + + /** + * Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down. + * @param shard {@link IndexShard} + * @param reason {@link String} - Reason for the cancel + */ + public void cancel(IndexShard shard, String reason) { + for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) { + if (entry.getCopyState().getShard().equals(shard)) { + entry.cancel(reason); + } + } + copyStateMap.clear(); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java new file mode 100644 index 0000000000000..d4c8ec0ea6dff --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.store.RateLimiter; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RetryableTransportClient; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.transport.TransportRequestOptions; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +/** + * This class handles sending file chunks over the transport layer to a target shard. + * + * @opensearch.internal + */ +public class RemoteSegmentFileChunkWriter implements FileChunkWriter { + + protected final AtomicLong requestSeqNoGenerator = new AtomicLong(0); + protected final RetryableTransportClient retryableTransportClient; + protected final ShardId shardId; + protected final RecoverySettings recoverySettings; + private final long replicationId; + private final AtomicLong bytesSinceLastPause = new AtomicLong(); + private final TransportRequestOptions fileChunkRequestOptions; + private final Consumer onSourceThrottle; + private final String action; + + public RemoteSegmentFileChunkWriter( + long replicationId, + RecoverySettings recoverySettings, + RetryableTransportClient retryableTransportClient, + ShardId shardId, + String action, + Consumer onSourceThrottle + ) { + this.replicationId = replicationId; + this.recoverySettings = recoverySettings; + this.retryableTransportClient = retryableTransportClient; + this.shardId = shardId; + this.onSourceThrottle = onSourceThrottle; + this.fileChunkRequestOptions = TransportRequestOptions.builder() + .withType(TransportRequestOptions.Type.RECOVERY) + .withTimeout(recoverySettings.internalActionTimeout()) + .build(); + + this.action = action; + } + + @Override + public void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + // Pause using the rate limiter, if desired, to throttle the recovery + final long throttleTimeInNanos; + // always fetch the ratelimiter - it might be updated in real-time on the recovery settings + final RateLimiter rl = recoverySettings.rateLimiter(); + if (rl != null) { + long bytes = bytesSinceLastPause.addAndGet(content.length()); + if (bytes > rl.getMinPauseCheckBytes()) { + // Time to pause + bytesSinceLastPause.addAndGet(-bytes); + try { + throttleTimeInNanos = rl.pause(bytes); + onSourceThrottle.accept(throttleTimeInNanos); + } catch (IOException e) { + throw new OpenSearchException("failed to pause recovery", e); + } + } else { + throttleTimeInNanos = 0; + } + } else { + throttleTimeInNanos = 0; + } + + final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); + /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can + * see how many translog ops we accumulate while copying files across the network. A future optimization + * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. + */ + final FileChunkRequest request = new FileChunkRequest( + replicationId, + requestSeqNo, + shardId, + fileMetadata, + position, + content, + lastChunk, + totalTranslogOps, + throttleTimeInNanos + ); + final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + retryableTransportClient.executeRetryableAction( + action, + request, + fileChunkRequestOptions, + ActionListener.map(listener, r -> null), + reader + ); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java new file mode 100644 index 0000000000000..24dbb9c8205d8 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java @@ -0,0 +1,240 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.ArrayUtil; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lucene.store.InputStreamIndexInput; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.MultiChunkTransfer; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteTransportException; +import org.opensearch.transport.Transports; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Deque; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.IntSupplier; + +/** + * SegmentFileSender handles building and starting a {@link MultiChunkTransfer} to orchestrate sending chunks to a given targetNode. + * This class delegates to a {@link FileChunkWriter} to handle the transport of chunks. + * + * @opensearch.internal + * // TODO: make this package-private after combining recovery and replication into single package. + */ +public class SegmentFileTransferHandler { + + protected final Logger logger; + protected final CancellableThreads cancellableThreads = new CancellableThreads(); + protected final List resources = new CopyOnWriteArrayList<>(); + private final IndexShard shard; + private final FileChunkWriter chunkWriter; + private final ThreadPool threadPool; + private final int chunkSizeInBytes; + private final int maxConcurrentFileChunks; + private final DiscoveryNode targetNode; + + public SegmentFileTransferHandler( + IndexShard shard, + DiscoveryNode targetNode, + FileChunkWriter chunkWriter, + Logger logger, + ThreadPool threadPool, + int fileChunkSizeInBytes, + int maxConcurrentFileChunks + ) { + this.shard = shard; + this.targetNode = targetNode; + this.chunkWriter = chunkWriter; + this.logger = logger; + this.threadPool = threadPool; + this.chunkSizeInBytes = fileChunkSizeInBytes; + // if the target is on an old version, it won't be able to handle out-of-order file chunks. + this.maxConcurrentFileChunks = maxConcurrentFileChunks; + } + + public final void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener listener) { + ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length)); // send smallest first + + final MultiChunkTransfer multiFileSender = new MultiChunkTransfer<>( + logger, + threadPool.getThreadContext(), + listener, + maxConcurrentFileChunks, + Arrays.asList(files) + ) { + + final Deque buffers = new ConcurrentLinkedDeque<>(); + InputStreamIndexInput currentInput = null; + long offset = 0; + + @Override + protected void onNewResource(StoreFileMetadata md) throws IOException { + offset = 0; + IOUtils.close(currentInput, () -> currentInput = null); + final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); + currentInput = new InputStreamIndexInput(indexInput, md.length()) { + @Override + public void close() throws IOException { + IOUtils.close(indexInput, super::close); // InputStreamIndexInput's close is a noop + } + }; + } + + private byte[] acquireBuffer() { + final byte[] buffer = buffers.pollFirst(); + if (buffer != null) { + return buffer; + } + return new byte[chunkSizeInBytes]; + } + + @Override + protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException { + assert Transports.assertNotTransportThread("read file chunk"); + cancellableThreads.checkForCancel(); + final byte[] buffer = acquireBuffer(); + final int bytesRead = currentInput.read(buffer); + if (bytesRead == -1) { + throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name()); + } + final boolean lastChunk = offset + bytesRead == md.length(); + final FileChunk chunk = new FileChunk( + md, + new BytesArray(buffer, 0, bytesRead), + offset, + lastChunk, + () -> buffers.addFirst(buffer) + ); + offset += bytesRead; + return chunk; + } + + @Override + protected void executeChunkRequest(FileChunk request, ActionListener listener) { + cancellableThreads.checkForCancel(); + chunkWriter.writeFileChunk( + request.md, + request.position, + request.content, + request.lastChunk, + translogOps.getAsInt(), + ActionListener.runBefore(listener, request::close) + ); + } + + @Override + protected void handleError(StoreFileMetadata md, Exception e) throws Exception { + handleErrorOnSendFiles(store, e, new StoreFileMetadata[] { md }); + } + + @Override + public void close() throws IOException { + IOUtils.close(currentInput, () -> currentInput = null); + } + }; + resources.add(multiFileSender); + multiFileSender.start(); + } + + public final void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] mds) throws Exception { + final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e); + assert Transports.assertNotTransportThread(this + "[handle error on send/clean files]"); + if (corruptIndexException != null) { + Exception localException = null; + for (StoreFileMetadata md : mds) { + cancellableThreads.checkForCancel(); + logger.debug("checking integrity for file {} after remove corruption exception", md); + if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! + logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md); + if (localException == null) { + localException = corruptIndexException; + } + failEngine(corruptIndexException); + } + } + if (localException != null) { + throw localException; + } else { // corruption has happened on the way to replica + RemoteTransportException remoteException = new RemoteTransportException( + "File corruption occurred on recovery but checksums are ok", + null + ); + remoteException.addSuppressed(e); + logger.warn( + () -> new ParameterizedMessage( + "{} Remote file corruption on node {}, recovering {}. local checksum OK", + shard.shardId(), + targetNode, + mds + ), + corruptIndexException + ); + throw remoteException; + } + } + throw e; + } + + protected void failEngine(IOException cause) { + shard.failShard("recovery", cause); + } + + /** + * A file chunk from the recovery source + * + * @opensearch.internal + */ + public static final class FileChunk implements MultiChunkTransfer.ChunkRequest, Releasable { + final StoreFileMetadata md; + final BytesReference content; + final long position; + final boolean lastChunk; + final Releasable onClose; + + FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose) { + this.md = md; + this.content = content; + this.position = position; + this.lastChunk = lastChunk; + this.onClose = onClose; + } + + @Override + public boolean lastChunk() { + return lastChunk; + } + + @Override + public void close() { + onClose.close(); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java new file mode 100644 index 0000000000000..b7c558b1b584f --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -0,0 +1,154 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.util.concurrent.ListenableFuture; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.RunUnderPrimaryPermit; +import org.opensearch.indices.recovery.DelayRecoveryException; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.Transports; + +import java.io.Closeable; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; + +/** + * Orchestrates sending requested segment files to a target shard. + * + * @opensearch.internal + */ +class SegmentReplicationSourceHandler { + + protected final IndexShard shard; + private final CopyState copyState; + private final SegmentFileTransferHandler segmentFileTransferHandler; + protected final ListenableFuture future = new ListenableFuture<>(); + protected final List resources = new CopyOnWriteArrayList<>(); + private final Logger logger; + + /** + * Constructor. + * + * @param targetNode - {@link DiscoveryNode} target node where files should be sent. + * @param writer {@link FileChunkWriter} implementation that sends file chunks over the transport layer. + * @param threadPool {@link ThreadPool} Thread pool. + * @param copyState {@link CopyState} CopyState holding segment file metadata. + * @param fileChunkSizeInBytes {@link Integer} + * @param maxConcurrentFileChunks {@link Integer} + */ + SegmentReplicationSourceHandler( + DiscoveryNode targetNode, + FileChunkWriter writer, + ThreadPool threadPool, + CopyState copyState, + int fileChunkSizeInBytes, + int maxConcurrentFileChunks + ) { + this.shard = copyState.getShard(); + this.logger = Loggers.getLogger( + SegmentReplicationSourceHandler.class, + copyState.getShard().shardId(), + "sending segments to " + targetNode.getName() + ); + this.segmentFileTransferHandler = new SegmentFileTransferHandler( + copyState.getShard(), + targetNode, + writer, + logger, + threadPool, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + this.copyState = copyState; + } + + /** + * Sends Segment files from the local node to the given target. + * + * @param request {@link GetSegmentFilesRequest} request object containing list of files to be sent. + * @param listener {@link ActionListener} that completes with the list of files sent. + */ + public void sendFiles(GetSegmentFilesRequest request, ActionListener listener) { + future.addListener(listener, OpenSearchExecutors.newDirectExecutorService()); + final Closeable releaseResources = () -> IOUtils.close(resources); + try { + + final Consumer onFailure = e -> { + assert Transports.assertNotTransportThread(SegmentReplicationSourceHandler.this + "[onFailure]"); + IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); + }; + + RunUnderPrimaryPermit.run(() -> { + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); + if (targetShardRouting == null) { + logger.debug( + "delaying replication of {} as it is not listed as assigned to target node {}", + shard.shardId(), + request.getTargetNode() + ); + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } + }, + shard.shardId() + " validating recovery target [" + request.getTargetAllocationId() + "] registered ", + shard, + segmentFileTransferHandler.cancellableThreads, + logger + ); + + final StepListener sendFileStep = new StepListener<>(); + Set storeFiles = new HashSet<>(Arrays.asList(shard.store().directory().listAll())); + final StoreFileMetadata[] storeFileMetadata = request.getFilesToFetch() + .stream() + .filter(file -> storeFiles.contains(file.name())) + .toArray(StoreFileMetadata[]::new); + + segmentFileTransferHandler.sendFiles(shard.store(), storeFileMetadata, () -> 0, sendFileStep); + resources.addAll(segmentFileTransferHandler.resources); + + sendFileStep.whenComplete(r -> { + try { + future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); + } finally { + IOUtils.close(resources); + } + }, onFailure); + } catch (Exception e) { + IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); + } + } + + /** + * Cancels the recovery and interrupts all eligible threads. + */ + public void cancel(String reason) { + segmentFileTransferHandler.cancellableThreads.cancel(reason); + } + + CopyState getCopyState() { + return copyState; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 9f70120dedd6c..e02c8ab0799bf 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -10,10 +10,20 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.index.IndexService; +import org.opensearch.action.support.ChannelActionListener; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; +import org.opensearch.common.component.AbstractLifecycleComponent; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RetryableTransportClient; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.tasks.Task; @@ -23,9 +33,6 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; /** * Service class that handles segment replication requests from replica shards. @@ -33,9 +40,12 @@ * * @opensearch.internal */ -public class SegmentReplicationSourceService { +public final class SegmentReplicationSourceService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationSourceService.class); + private final RecoverySettings recoverySettings; + private final TransportService transportService; + private final IndicesService indicesService; /** * Internal actions used by the segment replication source service on the primary shard @@ -43,20 +53,22 @@ public class SegmentReplicationSourceService { * @opensearch.internal */ public static class Actions { + public static final String GET_CHECKPOINT_INFO = "internal:index/shard/replication/get_checkpoint_info"; public static final String GET_SEGMENT_FILES = "internal:index/shard/replication/get_segment_files"; } - private final Map copyStateMap; - private final TransportService transportService; - private final IndicesService indicesService; + private final OngoingSegmentReplications ongoingSegmentReplications; // TODO mark this as injected and bind in Node - public SegmentReplicationSourceService(TransportService transportService, IndicesService indicesService) { - copyStateMap = Collections.synchronizedMap(new HashMap<>()); + public SegmentReplicationSourceService( + IndicesService indicesService, + TransportService transportService, + RecoverySettings recoverySettings + ) { this.transportService = transportService; this.indicesService = indicesService; - + this.recoverySettings = recoverySettings; transportService.registerRequestHandler( Actions.GET_CHECKPOINT_INFO, ThreadPool.Names.GENERIC, @@ -69,6 +81,7 @@ public SegmentReplicationSourceService(TransportService transportService, Indice GetSegmentFilesRequest::new, new GetSegmentFilesRequestHandler() ); + this.ongoingSegmentReplications = new OngoingSegmentReplications(indicesService, recoverySettings); } private class CheckpointInfoRequestHandler implements TransportRequestHandler { @@ -76,7 +89,21 @@ private class CheckpointInfoRequestHandler implements TransportRequestHandler {} + ); + final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); channel.sendResponse( new CheckpointInfoResponse( copyState.getCheckpoint(), @@ -88,73 +115,47 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan } } - class GetSegmentFilesRequestHandler implements TransportRequestHandler { + private class GetSegmentFilesRequestHandler implements TransportRequestHandler { @Override public void messageReceived(GetSegmentFilesRequest request, TransportChannel channel, Task task) throws Exception { - if (isInCopyStateMap(request.getCheckpoint())) { - // TODO send files - } else { - // Return an empty list of files - channel.sendResponse(new GetSegmentFilesResponse(Collections.emptyList())); - } + ongoingSegmentReplications.startSegmentCopy(request, new ChannelActionListener<>(channel, Actions.GET_SEGMENT_FILES, request)); } } - /** - * Operations on the {@link #copyStateMap} member. - */ + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.nodesRemoved()) { + for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) { + ongoingSegmentReplications.cancelReplication(removedNode); + } + } + } - /** - * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key - * and returns the cached value if one is present. If the key is not present, a {@link CopyState} - * object is constructed and stored in the map before being returned. - */ - private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException { - if (isInCopyStateMap(checkpoint)) { - final CopyState copyState = fetchFromCopyStateMap(checkpoint); - copyState.incRef(); - return copyState; - } else { - // From the checkpoint's shard ID, fetch the IndexShard - ShardId shardId = checkpoint.getShardId(); - final IndexService indexService = indicesService.indexService(shardId.getIndex()); - final IndexShard indexShard = indexService.getShard(shardId.id()); - // build the CopyState object and cache it before returning - final CopyState copyState = new CopyState(indexShard); - - /** - * Use the checkpoint from the request as the key in the map, rather than - * the checkpoint from the created CopyState. This maximizes cache hits - * if replication targets make a request with an older checkpoint. - * Replication targets are expected to fetch the checkpoint in the response - * CopyState to bring themselves up to date. - */ - addToCopyStateMap(checkpoint, copyState); - return copyState; + @Override + protected void doStart() { + final ClusterService clusterService = indicesService.clusterService(); + if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + clusterService.addListener(this); } } - /** - * Adds the input {@link CopyState} object to {@link #copyStateMap}. - * The key is the CopyState's {@link ReplicationCheckpoint} object. - */ - private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { - copyStateMap.putIfAbsent(checkpoint, copyState); + @Override + protected void doStop() { + final ClusterService clusterService = indicesService.clusterService(); + if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + indicesService.clusterService().removeListener(this); + } } - /** - * Given a {@link ReplicationCheckpoint}, return the corresponding - * {@link CopyState} object, if any, from {@link #copyStateMap}. - */ - private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.get(replicationCheckpoint); + @Override + protected void doClose() throws IOException { + } - /** - * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} - * as a key by invoking {@link Map#containsKey(Object)}. - */ - private boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.containsKey(replicationCheckpoint); + @Override + public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { + if (indexShard != null) { + ongoingSegmentReplications.cancel(indexShard, "shard is closed"); + } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 250df3481435a..c0e0b4dee2b3f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -33,14 +33,20 @@ public class CopyState extends AbstractRefCounted { private final GatedCloseable segmentInfosRef; + /** ReplicationCheckpoint requested */ + private final ReplicationCheckpoint requestedReplicationCheckpoint; + /** Actual ReplicationCheckpoint returned by the shard */ private final ReplicationCheckpoint replicationCheckpoint; private final Store.MetadataSnapshot metadataSnapshot; private final HashSet pendingDeleteFiles; private final byte[] infosBytes; private GatedCloseable commitRef; + private final IndexShard shard; - public CopyState(IndexShard shard) throws IOException { + public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException { super("CopyState-" + shard.shardId()); + this.requestedReplicationCheckpoint = requestedReplicationCheckpoint; + this.shard = shard; this.segmentInfosRef = shard.getSegmentInfosSnapshot(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); this.metadataSnapshot = shard.store().getMetadata(segmentInfos); @@ -100,4 +106,12 @@ public byte[] getInfosBytes() { public Set getPendingDeleteFiles() { return pendingDeleteFiles; } + + public IndexShard getShard() { + return shard; + } + + public ReplicationCheckpoint getRequestedReplicationCheckpoint() { + return requestedReplicationCheckpoint; + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java b/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java index db8206d131c13..09b14fb1b5333 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java @@ -46,4 +46,29 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(this.targetAllocationId); targetNode.writeTo(out); } + + public long getReplicationId() { + return replicationId; + } + + public String getTargetAllocationId() { + return targetAllocationId; + } + + public DiscoveryNode getTargetNode() { + return targetNode; + } + + @Override + public String toString() { + return "SegmentReplicationTransportRequest{" + + "replicationId=" + + replicationId + + ", targetAllocationId='" + + targetAllocationId + + '\'' + + ", targetNode=" + + targetNode + + '}'; + } } diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java index fc5c429d74b16..db017630c9d15 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java @@ -94,6 +94,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.RunUnderPrimaryPermit; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.test.CorruptionUtils; import org.opensearch.test.DummyShardLock; @@ -544,8 +545,10 @@ public void writeFileChunk( }); } }; + IndexShard mockShard = mock(IndexShard.class); + when(mockShard.shardId()).thenReturn(new ShardId("testIndex", "testUUID", 0)); RecoverySourceHandler handler = new RecoverySourceHandler( - null, + mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, request, @@ -747,7 +750,7 @@ public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { Thread cancelingThread = new Thread(() -> cancellableThreads.cancel("test")); cancelingThread.start(); try { - RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads, logger); + RunUnderPrimaryPermit.run(() -> {}, "test", shard, cancellableThreads, logger); } catch (CancellableThreads.ExecutionCancelledException e) { // expected. } diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java new file mode 100644 index 0000000000000..df81a91add6e4 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -0,0 +1,217 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.junit.Assert; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class OngoingSegmentReplicationsTests extends IndexShardTestCase { + + private final IndicesService mockIndicesService = mock(IndicesService.class); + private ReplicationCheckpoint testCheckpoint; + private DiscoveryNode primaryDiscoveryNode; + private DiscoveryNode replicaDiscoveryNode; + private IndexShard primary; + private IndexShard replica; + + private GetSegmentFilesRequest getSegmentFilesRequest; + + final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); + + @Override + public void setUp() throws Exception { + super.setUp(); + primary = newStartedShard(true); + replica = newShard(primary.shardId(), false); + recoverReplica(replica, primary, true); + replicaDiscoveryNode = replica.recoveryState().getTargetNode(); + primaryDiscoveryNode = replica.recoveryState().getSourceNode(); + + ShardId testShardId = primary.shardId(); + + // This mirrors the creation of the ReplicationCheckpoint inside CopyState + testCheckpoint = new ReplicationCheckpoint( + testShardId, + primary.getOperationPrimaryTerm(), + 0L, + primary.getProcessedLocalCheckpoint(), + 0L + ); + IndexService mockIndexService = mock(IndexService.class); + when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService); + when(mockIndexService.getShard(testShardId.id())).thenReturn(primary); + + TransportService transportService = mock(TransportService.class); + when(transportService.getThreadPool()).thenReturn(threadPool); + } + + @Override + public void tearDown() throws Exception { + closeShards(primary, replica); + super.tearDown(); + } + + public void testPrepareAndSendSegments() throws IOException { + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = mock(RemoteSegmentFileChunkWriter.class); + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(1, replications.size()); + assertEquals(1, copyState.refCount()); + + doAnswer((invocation -> { + final ActionListener listener = invocation.getArgument(5); + listener.onResponse(null); + return null; + })).when(segmentSegmentFileChunkWriter).writeFileChunk(any(), anyLong(), any(), anyBoolean(), anyInt(), any()); + + getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + new ArrayList<>(copyState.getMetadataSnapshot().asMap().values()), + testCheckpoint + ); + + final Collection expectedFiles = List.copyOf(primary.store().getMetadata().asMap().values()); + replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + assertEquals(1, getSegmentFilesResponse.files.size()); + assertEquals(1, expectedFiles.size()); + assertTrue(expectedFiles.stream().findFirst().get().isSame(getSegmentFilesResponse.files.get(0))); + assertEquals(0, copyState.refCount()); + assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(0, replications.size()); + } + + @Override + public void onFailure(Exception e) { + logger.error("Unexpected failure", e); + Assert.fail(); + } + }); + } + + public void testCancelReplication() throws IOException { + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + primaryDiscoveryNode, + testCheckpoint + ); + final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = mock(RemoteSegmentFileChunkWriter.class); + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertEquals(1, replications.size()); + assertEquals(1, replications.cachedCopyStateSize()); + + replications.cancelReplication(primaryDiscoveryNode); + assertEquals(0, copyState.refCount()); + assertEquals(0, replications.size()); + assertEquals(0, replications.cachedCopyStateSize()); + } + + public void testMultipleReplicasUseSameCheckpoint() throws IOException { + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + primaryDiscoveryNode, + testCheckpoint + ); + final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = mock(RemoteSegmentFileChunkWriter.class); + + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertEquals(1, copyState.refCount()); + + final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + replications.prepareForReplication(secondRequest, segmentSegmentFileChunkWriter); + + assertEquals(2, copyState.refCount()); + assertEquals(2, replications.size()); + assertEquals(1, replications.cachedCopyStateSize()); + + replications.cancelReplication(primaryDiscoveryNode); + replications.cancelReplication(replicaDiscoveryNode); + assertEquals(0, copyState.refCount()); + assertEquals(0, replications.size()); + assertEquals(0, replications.cachedCopyStateSize()); + } + + public void testStartCopyWithoutPrepareStep() { + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); + final ActionListener listener = spy(new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + assertTrue(getSegmentFilesResponse.files.isEmpty()); + } + + @Override + public void onFailure(Exception e) { + Assert.fail(); + } + }); + + getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + Collections.emptyList(), + testCheckpoint + ); + + replications.startSegmentCopy(getSegmentFilesRequest, listener); + verify(listener, times(1)).onResponse(any()); + } + +} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java new file mode 100644 index 0000000000000..baa8d4fa29aff --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -0,0 +1,167 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.mockito.Mockito; +import org.opensearch.OpenSearchException; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyState; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class SegmentReplicationSourceHandlerTests extends IndexShardTestCase { + + private final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + private DiscoveryNode replicaDiscoveryNode; + private IndexShard primary; + private IndexShard replica; + + private FileChunkWriter chunkWriter; + + @Override + public void setUp() throws Exception { + super.setUp(); + primary = newStartedShard(true); + replica = newShard(primary.shardId(), false); + recoverReplica(replica, primary, true); + replicaDiscoveryNode = replica.recoveryState().getTargetNode(); + } + + @Override + public void tearDown() throws Exception { + closeShards(primary, replica); + super.tearDown(); + } + + public void testSendFiles() throws IOException { + chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onResponse(null); + + final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); + final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( + localNode, + chunkWriter, + threadPool, + copyState, + 5000, + 1 + ); + + final List expectedFiles = List.copyOf(copyState.getMetadataSnapshot().asMap().values()); + + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + expectedFiles, + latestReplicationCheckpoint + ); + + handler.sendFiles(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + MatcherAssert.assertThat(getSegmentFilesResponse.files, Matchers.containsInAnyOrder(expectedFiles.toArray())); + } + + @Override + public void onFailure(Exception e) { + Assert.fail(); + } + }); + } + + public void testSendFiles_emptyRequest() throws IOException { + chunkWriter = mock(FileChunkWriter.class); + + final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); + final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( + localNode, + chunkWriter, + threadPool, + copyState, + 5000, + 1 + ); + + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + Collections.emptyList(), + latestReplicationCheckpoint + ); + + handler.sendFiles(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + assertTrue(getSegmentFilesResponse.files.isEmpty()); + Mockito.verifyNoInteractions(chunkWriter); + } + + @Override + public void onFailure(Exception e) { + Assert.fail(); + } + }); + } + + public void testSendFileFails() throws IOException { + chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onFailure( + new OpenSearchException("Test") + ); + + final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); + final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( + localNode, + chunkWriter, + threadPool, + copyState, + 5000, + 1 + ); + + final List expectedFiles = List.copyOf(copyState.getMetadataSnapshot().asMap().values()); + + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + expectedFiles, + latestReplicationCheckpoint + ); + + handler.sendFiles(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(e.getClass(), OpenSearchException.class); + } + }); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java index 67c867d360e70..8d2ca9ff63f3d 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -9,13 +9,16 @@ package org.opensearch.indices.replication; import org.opensearch.Version; +import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyStateTests; import org.opensearch.test.OpenSearchTestCase; @@ -30,30 +33,23 @@ import java.util.Collections; import java.util.concurrent.TimeUnit; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; public class SegmentReplicationSourceServiceTests extends OpenSearchTestCase { - private ShardId testShardId; private ReplicationCheckpoint testCheckpoint; - private IndicesService mockIndicesService; - private IndexService mockIndexService; - private IndexShard mockIndexShard; private TestThreadPool testThreadPool; - private CapturingTransport transport; private TransportService transportService; private DiscoveryNode localNode; - private SegmentReplicationSourceService segmentReplicationSourceService; @Override public void setUp() throws Exception { super.setUp(); // setup mocks - mockIndexShard = CopyStateTests.createMockIndexShard(); - testShardId = mockIndexShard.shardId(); - mockIndicesService = mock(IndicesService.class); - mockIndexService = mock(IndexService.class); + IndexShard mockIndexShard = CopyStateTests.createMockIndexShard(); + ShardId testShardId = mockIndexShard.shardId(); + IndicesService mockIndicesService = mock(IndicesService.class); + IndexService mockIndexService = mock(IndexService.class); when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService); when(mockIndexService.getShard(testShardId.id())).thenReturn(mockIndexShard); @@ -66,7 +62,7 @@ public void setUp() throws Exception { 0L ); testThreadPool = new TestThreadPool("test", Settings.EMPTY); - transport = new CapturingTransport(); + CapturingTransport transport = new CapturingTransport(); localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); transportService = transport.createTransportService( Settings.EMPTY, @@ -78,7 +74,16 @@ public void setUp() throws Exception { ); transportService.start(); transportService.acceptIncomingRequests(); - segmentReplicationSourceService = new SegmentReplicationSourceService(transportService, mockIndicesService); + + final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); + + SegmentReplicationSourceService segmentReplicationSourceService = new SegmentReplicationSourceService( + mockIndicesService, + transportService, + recoverySettings + ); } @Override @@ -88,7 +93,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testGetSegmentFiles_EmptyResponse() { + public void testGetSegmentFiles() { final GetSegmentFilesRequest request = new GetSegmentFilesRequest( 1, "allocationId", @@ -96,19 +101,52 @@ public void testGetSegmentFiles_EmptyResponse() { Collections.emptyList(), testCheckpoint ); + executeGetSegmentFiles(request, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse response) { + assertEquals(0, response.files.size()); + } + + @Override + public void onFailure(Exception e) { + fail("unexpected exception: " + e); + } + }); + } + + public void testCheckpointInfo() { + executeGetCheckpointInfo(new ActionListener<>() { + @Override + public void onResponse(CheckpointInfoResponse response) { + assertEquals(testCheckpoint, response.getCheckpoint()); + assertNotNull(response.getInfosBytes()); + // CopyStateTests sets up one pending delete file and one committed segments file + assertEquals(1, response.getPendingDeleteFiles().size()); + assertEquals(1, response.getSnapshot().size()); + } + + @Override + public void onFailure(Exception e) { + fail("unexpected exception: " + e); + } + }); + } + + private void executeGetCheckpointInfo(ActionListener listener) { + final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, "testAllocationId", localNode, testCheckpoint); transportService.sendRequest( localNode, - SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, + SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, request, - new TransportResponseHandler() { + new TransportResponseHandler() { @Override - public void handleResponse(GetSegmentFilesResponse response) { - assertEquals(0, response.files.size()); + public void handleResponse(CheckpointInfoResponse response) { + listener.onResponse(response); } @Override public void handleException(TransportException e) { - fail("unexpected exception: " + e); + listener.onFailure(e); } @Override @@ -117,32 +155,27 @@ public String executor() { } @Override - public GetSegmentFilesResponse read(StreamInput in) throws IOException { - return new GetSegmentFilesResponse(in); + public CheckpointInfoResponse read(StreamInput in) throws IOException { + return new CheckpointInfoResponse(in); } } ); } - public void testCheckpointInfo() { - final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, "testAllocationId", localNode, testCheckpoint); + private void executeGetSegmentFiles(GetSegmentFilesRequest request, ActionListener listener) { transportService.sendRequest( localNode, - SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, + SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, request, - new TransportResponseHandler() { + new TransportResponseHandler() { @Override - public void handleResponse(CheckpointInfoResponse response) { - assertEquals(testCheckpoint, response.getCheckpoint()); - assertNotNull(response.getInfosBytes()); - // CopyStateTests sets up one pending delete file and one committed segments file - assertEquals(1, response.getPendingDeleteFiles().size()); - assertEquals(1, response.getSnapshot().size()); + public void handleResponse(GetSegmentFilesResponse response) { + listener.onResponse(response); } @Override public void handleException(TransportException e) { - fail("unexpected exception: " + e); + listener.onFailure(e); } @Override @@ -151,11 +184,10 @@ public String executor() { } @Override - public CheckpointInfoResponse read(StreamInput in) throws IOException { - return new CheckpointInfoResponse(in); + public GetSegmentFilesResponse read(StreamInput in) throws IOException { + return new GetSegmentFilesResponse(in); } } ); } - } diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index afa38afb0cf2f..a6f0cf7e98411 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -47,7 +47,15 @@ public class CopyStateTests extends IndexShardTestCase { ); public void testCopyStateCreation() throws IOException { - CopyState copyState = new CopyState(createMockIndexShard()); + final IndexShard mockIndexShard = createMockIndexShard(); + ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint( + mockIndexShard.shardId(), + mockIndexShard.getOperationPrimaryTerm(), + 0L, + mockIndexShard.getProcessedLocalCheckpoint(), + 0L + ); + CopyState copyState = new CopyState(testCheckpoint, mockIndexShard); ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); // version was never set so this should be zero From 6d7657e65ccee9a41b23a4216e783f4482634cab Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 7 Jun 2022 12:51:45 -0700 Subject: [PATCH 02/11] Fix requestSeqNo to be generated correctly during recovery. Signed-off-by: Marc Handalian --- .../indices/recovery/RemoteRecoveryTargetHandler.java | 1 + .../indices/replication/RemoteSegmentFileChunkWriter.java | 4 +++- .../indices/replication/SegmentReplicationSourceService.java | 2 ++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index 7b7844265b067..e7ae62c1bee7d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -107,6 +107,7 @@ public RemoteRecoveryTargetHandler( retryableTransportClient, shardId, PeerRecoveryTargetService.Actions.FILE_CHUNK, + requestSeqNoGenerator, onSourceThrottle ); } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java index d4c8ec0ea6dff..f563f7359ab5a 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java @@ -33,7 +33,7 @@ */ public class RemoteSegmentFileChunkWriter implements FileChunkWriter { - protected final AtomicLong requestSeqNoGenerator = new AtomicLong(0); + protected final AtomicLong requestSeqNoGenerator; protected final RetryableTransportClient retryableTransportClient; protected final ShardId shardId; protected final RecoverySettings recoverySettings; @@ -49,12 +49,14 @@ public RemoteSegmentFileChunkWriter( RetryableTransportClient retryableTransportClient, ShardId shardId, String action, + AtomicLong requestSeqNoGenerator, Consumer onSourceThrottle ) { this.replicationId = replicationId; this.recoverySettings = recoverySettings; this.retryableTransportClient = retryableTransportClient; this.shardId = shardId; + this.requestSeqNoGenerator = requestSeqNoGenerator; this.onSourceThrottle = onSourceThrottle; this.fileChunkRequestOptions = TransportRequestOptions.builder() .withType(TransportRequestOptions.Type.RECOVERY) diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index e02c8ab0799bf..6b1a4b1563e37 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -33,6 +33,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; /** * Service class that handles segment replication requests from replica shards. @@ -101,6 +102,7 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan ), request.getCheckpoint().getShardId(), SegmentReplicationTargetService.Actions.FILE_CHUNK, + new AtomicLong(0), (throttleTime) -> {} ); final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); From 2387cd2d1cbb6c5d9f397ff3dff667ecedcd613e Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 7 Jun 2022 16:08:02 -0700 Subject: [PATCH 03/11] Fix RecoverySourceHandlerTests that pass null for indexShard. Signed-off-by: Marc Handalian --- .../indices/recovery/RecoverySourceHandlerTests.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java index db017630c9d15..4de298b06d339 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java @@ -620,8 +620,10 @@ public void writeFileChunk( } } }; + IndexShard mockShard = mock(IndexShard.class); + when(mockShard.shardId()).thenReturn(new ShardId("testIndex", "testUUID", 0)); RecoverySourceHandler handler = new RecoverySourceHandler( - null, + mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, request, From 45f8f7e96b6981b8bd05e517fa87726c84db0a3f Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 7 Jun 2022 19:48:58 -0700 Subject: [PATCH 04/11] Change RecoverySourceHandler to use SegmentFileTransferHandler as a component over inheritence. Signed-off-by: Marc Handalian --- .../recovery/RecoverySourceHandler.java | 32 +++++++++-- .../SegmentFileTransferHandler.java | 57 ++++++++++--------- .../SegmentReplicationSourceHandler.java | 26 ++++++--- .../recovery/RecoverySourceHandlerTests.java | 1 + 4 files changed, 78 insertions(+), 38 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 64b29a86a0c24..cd6445cac9682 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -32,6 +32,7 @@ package org.opensearch.indices.recovery; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFormatTooNewException; @@ -85,6 +86,7 @@ import java.util.Comparator; import java.util.List; import java.util.Locale; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -105,8 +107,9 @@ * * @opensearch.internal */ -public class RecoverySourceHandler extends SegmentFileTransferHandler { +public class RecoverySourceHandler { + protected final Logger logger; // Shard that is going to be recovered (the "source") private final IndexShard shard; private final int shardId; @@ -116,8 +119,11 @@ public class RecoverySourceHandler extends SegmentFileTransferHandler { private final RecoveryTargetHandler recoveryTarget; private final int maxConcurrentOperations; private final ThreadPool threadPool; + private final CancellableThreads cancellableThreads = new CancellableThreads(); + private final List resources = new CopyOnWriteArrayList<>(); private final ListenableFuture future = new ListenableFuture<>(); public static final String PEER_RECOVERY_NAME = "peer-recovery"; + private final SegmentFileTransferHandler transferHandler; public RecoverySourceHandler( IndexShard shard, @@ -128,12 +134,15 @@ public RecoverySourceHandler( int maxConcurrentFileChunks, int maxConcurrentOperations ) { - super( + this.logger = Loggers.getLogger(RecoverySourceHandler.class, request.shardId(), "recover to " + request.targetNode().getName()); + this.transferHandler = new SegmentFileTransferHandler( shard, request.targetNode(), recoveryTarget, - Loggers.getLogger(RecoverySourceHandler.class, request.shardId(), "recover to " + request.targetNode().getName()), + logger, threadPool, + cancellableThreads, + this::failEngine, fileChunkSizeInBytes, maxConcurrentFileChunks ); @@ -654,6 +663,17 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A } } + void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener listener) { + final MultiChunkTransfer transfer = transferHandler.sendFiles( + store, + files, + translogOps, + listener + ); + resources.add(transfer); + transfer.start(); + } + void createRetentionLease(final long startingSeqNo, ActionListener listener) { RunUnderPrimaryPermit.run(() -> { // Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local @@ -1025,9 +1045,13 @@ private void cleanFiles( ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> { StoreFileMetadata[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetadata[]::new); ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetadata::length)); // check small files first - handleErrorOnSendFiles(store, e, mds); + transferHandler.handleErrorOnSendFiles(store, e, mds); throw e; })) ); } + + protected void failEngine(IOException cause) { + shard.failShard("recovery", cause); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java index 24dbb9c8205d8..fdb569c638081 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java @@ -26,20 +26,18 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.indices.recovery.MultiChunkTransfer; import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.MultiChunkTransfer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.Transports; -import java.io.Closeable; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.Deque; -import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; import java.util.function.IntSupplier; /** @@ -49,17 +47,17 @@ * @opensearch.internal * // TODO: make this package-private after combining recovery and replication into single package. */ -public class SegmentFileTransferHandler { +public final class SegmentFileTransferHandler { - protected final Logger logger; - protected final CancellableThreads cancellableThreads = new CancellableThreads(); - protected final List resources = new CopyOnWriteArrayList<>(); + private final Logger logger; private final IndexShard shard; private final FileChunkWriter chunkWriter; private final ThreadPool threadPool; private final int chunkSizeInBytes; private final int maxConcurrentFileChunks; private final DiscoveryNode targetNode; + private final CancellableThreads cancellableThreads; + private final Consumer onCorruptException; public SegmentFileTransferHandler( IndexShard shard, @@ -67,6 +65,8 @@ public SegmentFileTransferHandler( FileChunkWriter chunkWriter, Logger logger, ThreadPool threadPool, + CancellableThreads cancellableThreads, + Consumer onCorruptException, int fileChunkSizeInBytes, int maxConcurrentFileChunks ) { @@ -75,21 +75,30 @@ public SegmentFileTransferHandler( this.chunkWriter = chunkWriter; this.logger = logger; this.threadPool = threadPool; + this.cancellableThreads = cancellableThreads; + this.onCorruptException = onCorruptException; this.chunkSizeInBytes = fileChunkSizeInBytes; // if the target is on an old version, it won't be able to handle out-of-order file chunks. this.maxConcurrentFileChunks = maxConcurrentFileChunks; } - public final void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener listener) { + /** + * Returns a closeable {@link MultiChunkTransfer} to initiate sending a list of files. + * Callers are responsible for starting the transfer and closing the resource. + * @param store {@link Store} + * @param files {@link StoreFileMetadata[]} + * @param translogOps {@link IntSupplier} + * @param listener {@link ActionListener} + * @return {@link MultiChunkTransfer} + */ + public MultiChunkTransfer sendFiles( + Store store, + StoreFileMetadata[] files, + IntSupplier translogOps, + ActionListener listener + ) { ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length)); // send smallest first - - final MultiChunkTransfer multiFileSender = new MultiChunkTransfer<>( - logger, - threadPool.getThreadContext(), - listener, - maxConcurrentFileChunks, - Arrays.asList(files) - ) { + return new MultiChunkTransfer<>(logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) { final Deque buffers = new ConcurrentLinkedDeque<>(); InputStreamIndexInput currentInput = null; @@ -138,7 +147,7 @@ protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException { } @Override - protected void executeChunkRequest(FileChunk request, ActionListener listener) { + protected void executeChunkRequest(FileChunk request, ActionListener listener1) { cancellableThreads.checkForCancel(); chunkWriter.writeFileChunk( request.md, @@ -146,7 +155,7 @@ protected void executeChunkRequest(FileChunk request, ActionListener liste request.content, request.lastChunk, translogOps.getAsInt(), - ActionListener.runBefore(listener, request::close) + ActionListener.runBefore(listener1, request::close) ); } @@ -160,11 +169,9 @@ public void close() throws IOException { IOUtils.close(currentInput, () -> currentInput = null); } }; - resources.add(multiFileSender); - multiFileSender.start(); } - public final void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] mds) throws Exception { + public void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] mds) throws Exception { final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e); assert Transports.assertNotTransportThread(this + "[handle error on send/clean files]"); if (corruptIndexException != null) { @@ -177,7 +184,7 @@ public final void handleErrorOnSendFiles(Store store, Exception e, StoreFileMeta if (localException == null) { localException = corruptIndexException; } - failEngine(corruptIndexException); + onCorruptException.accept(corruptIndexException); } } if (localException != null) { @@ -203,10 +210,6 @@ public final void handleErrorOnSendFiles(Store store, Exception e, StoreFileMeta throw e; } - protected void failEngine(IOException cause) { - shard.failShard("recovery", cause); - } - /** * A file chunk from the recovery source * diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index b7c558b1b584f..61cf3d713cc2b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.logging.Loggers; +import org.opensearch.common.util.CancellableThreads; import org.opensearch.common.util.concurrent.ListenableFuture; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.internal.io.IOUtils; @@ -23,11 +24,13 @@ import org.opensearch.indices.RunUnderPrimaryPermit; import org.opensearch.indices.recovery.DelayRecoveryException; import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.MultiChunkTransfer; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.Transports; import java.io.Closeable; +import java.io.IOException; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -42,11 +45,12 @@ */ class SegmentReplicationSourceHandler { - protected final IndexShard shard; + private final IndexShard shard; private final CopyState copyState; private final SegmentFileTransferHandler segmentFileTransferHandler; - protected final ListenableFuture future = new ListenableFuture<>(); - protected final List resources = new CopyOnWriteArrayList<>(); + private final CancellableThreads cancellableThreads = new CancellableThreads(); + private final ListenableFuture future = new ListenableFuture<>(); + private final List resources = new CopyOnWriteArrayList<>(); private final Logger logger; /** @@ -79,6 +83,8 @@ class SegmentReplicationSourceHandler { writer, logger, threadPool, + cancellableThreads, + this::failEngine, fileChunkSizeInBytes, maxConcurrentFileChunks ); @@ -115,7 +121,7 @@ public void sendFiles(GetSegmentFilesRequest request, ActionListener storeFiles.contains(file.name())) .toArray(StoreFileMetadata[]::new); - segmentFileTransferHandler.sendFiles(shard.store(), storeFileMetadata, () -> 0, sendFileStep); - resources.addAll(segmentFileTransferHandler.resources); + final MultiChunkTransfer transfer = segmentFileTransferHandler + .sendFiles(shard.store(), storeFileMetadata, () -> 0, sendFileStep); + resources.add(transfer); + transfer.start(); sendFileStep.whenComplete(r -> { try { @@ -145,7 +153,11 @@ public void sendFiles(GetSegmentFilesRequest request, ActionListener terminate(threadPool), () -> threadPool = null); From 2bfbf4a57a3200590e8c8557f136557030913752 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 7 Jun 2022 22:25:17 -0700 Subject: [PATCH 05/11] Make RemoteSegmentFileChunkWriter class final. Signed-off-by: Marc Handalian --- .../OngoingSegmentReplications.java | 20 +++++---------- .../RemoteSegmentFileChunkWriter.java | 10 ++++---- .../OngoingSegmentReplicationsTests.java | 25 +++++++++---------- 3 files changed, 23 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index fe4cbc255ec1e..8a1cf84b5327f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -14,6 +14,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; @@ -91,14 +92,10 @@ void cancelReplication(DiscoveryNode node) { } } - SegmentReplicationSourceHandler createTargetHandler( - DiscoveryNode node, - CopyState copyState, - RemoteSegmentFileChunkWriter segmentFileChunkWriter - ) { + SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) { return new SegmentReplicationSourceHandler( node, - segmentFileChunkWriter, + fileChunkWriter, copyState.getShard().getThreadPool(), copyState, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), @@ -163,18 +160,13 @@ private synchronized void removeCopyState(CopyState copyState) { * local store. It will then build a handler to orchestrate the segment copy that will be stored locally and started on a subsequent request from replicas * with the list of required files. * @param request {@link CheckpointInfoRequest} - * @param segmentSegmentFileChunkWriter {@link RemoteSegmentFileChunkWriter} writer to handle sending files over the transport layer. + * @param fileChunkWriter {@link FileChunkWriter} writer to handle sending files over the transport layer. * @return {@link CopyState} the built CopyState for this replication event. * @throws IOException - When there is an IO error building CopyState. */ - CopyState prepareForReplication(CheckpointInfoRequest request, RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter) - throws IOException { + CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException { final CopyState copyState = getCachedCopyState(request.getCheckpoint()); - final SegmentReplicationSourceHandler handler = createTargetHandler( - request.getTargetNode(), - copyState, - segmentSegmentFileChunkWriter - ); + final SegmentReplicationSourceHandler handler = createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter); nodesToHandlers.putIfAbsent(request.getTargetNode(), handler); return copyState; } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java index f563f7359ab5a..05f1c9d757e5c 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java @@ -31,12 +31,12 @@ * * @opensearch.internal */ -public class RemoteSegmentFileChunkWriter implements FileChunkWriter { +public final class RemoteSegmentFileChunkWriter implements FileChunkWriter { - protected final AtomicLong requestSeqNoGenerator; - protected final RetryableTransportClient retryableTransportClient; - protected final ShardId shardId; - protected final RecoverySettings recoverySettings; + private final AtomicLong requestSeqNoGenerator; + private final RetryableTransportClient retryableTransportClient; + private final ShardId shardId; + private final RecoverySettings recoverySettings; private final long replicationId; private final AtomicLong bytesSinceLastPause = new AtomicLong(); private final TransportRequestOptions fileChunkRequestOptions; diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index df81a91add6e4..9cc74276d2b1a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -19,6 +19,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.FileChunkWriter; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; @@ -31,10 +32,6 @@ import java.util.List; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyBoolean; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -97,18 +94,14 @@ public void testPrepareAndSendSegments() throws IOException { replicaDiscoveryNode, testCheckpoint ); - final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = mock(RemoteSegmentFileChunkWriter.class); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); assertEquals(1, replications.size()); assertEquals(1, copyState.refCount()); - doAnswer((invocation -> { - final ActionListener listener = invocation.getArgument(5); - listener.onResponse(null); - return null; - })).when(segmentSegmentFileChunkWriter).writeFileChunk(any(), anyLong(), any(), anyBoolean(), anyInt(), any()); - getSegmentFilesRequest = new GetSegmentFilesRequest( 1L, replica.routingEntry().allocationId().getId(), @@ -145,7 +138,10 @@ public void testCancelReplication() throws IOException { primaryDiscoveryNode, testCheckpoint ); - final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = mock(RemoteSegmentFileChunkWriter.class); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + // this shouldn't be called in this test. + Assert.fail(); + }; final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, replications.size()); assertEquals(1, replications.cachedCopyStateSize()); @@ -164,7 +160,10 @@ public void testMultipleReplicasUseSameCheckpoint() throws IOException { primaryDiscoveryNode, testCheckpoint ); - final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = mock(RemoteSegmentFileChunkWriter.class); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + // this shouldn't be called in this test. + Assert.fail(); + }; final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); assertEquals(1, copyState.refCount()); From ec397f1f590e8f2f81a43ac40ab6ba1aa2262343 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Sat, 11 Jun 2022 11:25:34 -0700 Subject: [PATCH 06/11] Move shard failure after send failure into SegmentFileTransferHandler. Signed-off-by: Marc Handalian --- .../recovery/RecoverySourceHandler.java | 5 ---- .../SegmentFileTransferHandler.java | 6 +---- .../SegmentReplicationSourceHandler.java | 6 ----- .../recovery/RecoverySourceHandlerTests.java | 26 +++++++++---------- 4 files changed, 13 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index cd6445cac9682..995fddb1efc54 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -142,7 +142,6 @@ public RecoverySourceHandler( logger, threadPool, cancellableThreads, - this::failEngine, fileChunkSizeInBytes, maxConcurrentFileChunks ); @@ -1050,8 +1049,4 @@ private void cleanFiles( })) ); } - - protected void failEngine(IOException cause) { - shard.failShard("recovery", cause); - } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java index fdb569c638081..23f6d67cf5ab7 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java @@ -37,7 +37,6 @@ import java.util.Comparator; import java.util.Deque; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.function.Consumer; import java.util.function.IntSupplier; /** @@ -57,7 +56,6 @@ public final class SegmentFileTransferHandler { private final int maxConcurrentFileChunks; private final DiscoveryNode targetNode; private final CancellableThreads cancellableThreads; - private final Consumer onCorruptException; public SegmentFileTransferHandler( IndexShard shard, @@ -66,7 +64,6 @@ public SegmentFileTransferHandler( Logger logger, ThreadPool threadPool, CancellableThreads cancellableThreads, - Consumer onCorruptException, int fileChunkSizeInBytes, int maxConcurrentFileChunks ) { @@ -76,7 +73,6 @@ public SegmentFileTransferHandler( this.logger = logger; this.threadPool = threadPool; this.cancellableThreads = cancellableThreads; - this.onCorruptException = onCorruptException; this.chunkSizeInBytes = fileChunkSizeInBytes; // if the target is on an old version, it won't be able to handle out-of-order file chunks. this.maxConcurrentFileChunks = maxConcurrentFileChunks; @@ -184,7 +180,7 @@ public void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] if (localException == null) { localException = corruptIndexException; } - onCorruptException.accept(corruptIndexException); + shard.failShard("error sending files", corruptIndexException); } } if (localException != null) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index 61cf3d713cc2b..2f433c87158d8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -30,7 +30,6 @@ import org.opensearch.transport.Transports; import java.io.Closeable; -import java.io.IOException; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -84,7 +83,6 @@ class SegmentReplicationSourceHandler { logger, threadPool, cancellableThreads, - this::failEngine, fileChunkSizeInBytes, maxConcurrentFileChunks ); @@ -156,10 +154,6 @@ public void cancel(String reason) { cancellableThreads.cancel(reason); } - private void failEngine(IOException e) { - shard.failShard("Failed Replication", e); - } - CopyState getCopyState() { return copyState; } diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java index 74d07ebe5b35c..2b5550b71a627 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java @@ -547,6 +547,11 @@ public void writeFileChunk( }; IndexShard mockShard = mock(IndexShard.class); when(mockShard.shardId()).thenReturn(new ShardId("testIndex", "testUUID", 0)); + doAnswer(invocation -> { + assertFalse(failedEngine.get()); + failedEngine.set(true); + return null; + }).when(mockShard).failShard(any(), any()); RecoverySourceHandler handler = new RecoverySourceHandler( mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), @@ -555,13 +560,7 @@ public void writeFileChunk( Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8), between(1, 8) - ) { - @Override - protected void failEngine(IOException cause) { - assertFalse(failedEngine.get()); - failedEngine.set(true); - } - }; + ); SetOnce sendFilesError = new SetOnce<>(); CountDownLatch latch = new CountDownLatch(1); handler.sendFiles( @@ -623,6 +622,11 @@ public void writeFileChunk( }; IndexShard mockShard = mock(IndexShard.class); when(mockShard.shardId()).thenReturn(new ShardId("testIndex", "testUUID", 0)); + doAnswer(invocation -> { + assertFalse(failedEngine.get()); + failedEngine.set(true); + return null; + }).when(mockShard).failShard(any(), any()); RecoverySourceHandler handler = new RecoverySourceHandler( mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), @@ -631,13 +635,7 @@ public void writeFileChunk( Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10), between(1, 4) - ) { - @Override - protected void failEngine(IOException cause) { - assertFalse(failedEngine.get()); - failedEngine.set(true); - } - }; + ); PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture); Exception ex = expectThrows(Exception.class, sendFilesFuture::actionGet); From de6b554ba3c4fecc03bbb2b903ce375b8162e7d7 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Mon, 13 Jun 2022 16:20:34 -0700 Subject: [PATCH 07/11] Add tests for SegmentFileTransferHandler Signed-off-by: Marc Handalian --- .../SegmentFileTransferHandlerTests.java | 251 ++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java new file mode 100644 index 0000000000000..536a042c8eb10 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java @@ -0,0 +1,251 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFileNames; +import org.junit.Assert; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.MultiChunkTransfer; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.IntSupplier; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.mockito.Mockito.*; + +public class SegmentFileTransferHandlerTests extends IndexShardTestCase { + + private IndexShard shard; + private StoreFileMetadata[] filesToSend; + private final DiscoveryNode targetNode = new DiscoveryNode( + "foo", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + Version.CURRENT + ); + + final int fileChunkSizeInBytes = 5000; + final int maxConcurrentFileChunks = 1; + private CancellableThreads cancellableThreads; + final IntSupplier translogOps = () -> 0; + + @Override + public void setUp() throws Exception { + super.setUp(); + cancellableThreads = new CancellableThreads(); + shard = spy(newStartedShard(true)); + filesToSend = getFilestoSend(shard); + // we should only have a Segments_N file at this point. + assertEquals(1, filesToSend.length); + } + + private StoreFileMetadata[] getFilestoSend(IndexShard shard) throws IOException { + final Store.MetadataSnapshot metadata = shard.store().getMetadata(); + return metadata.asMap().values().toArray(StoreFileMetadata[]::new); + } + + @Override + public void tearDown() throws Exception { + closeShards(shard); + super.tearDown(); + } + + public void testSendFiles_invokesChunkWriter() throws IOException, InterruptedException { + // use counDownLatch and countDown when our chunkWriter is invoked. + final CountDownLatch countDownLatch = new CountDownLatch(1); + final FileChunkWriter chunkWriter = spy(new FileChunkWriter() { + @Override + public void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + assertTrue(filesToSend[0].isSame(fileMetadata)); + assertTrue(lastChunk); + countDownLatch.countDown(); + } + }); + + SegmentFileTransferHandler handler = new SegmentFileTransferHandler( + shard, + targetNode, + chunkWriter, + logger, + shard.getThreadPool(), + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + final MultiChunkTransfer transfer = handler.sendFiles( + shard.store(), + filesToSend, + translogOps, + mock(ActionListener.class) + ); + + // start the transfer + transfer.start(); + countDownLatch.await(5, TimeUnit.SECONDS); + verify(chunkWriter, times(1)).writeFileChunk(any(), anyLong(), any(), anyBoolean(), anyInt(), any()); + IOUtils.close(transfer); + } + + public void testSendFiles_cancelThreads_beforeStart() throws IOException, InterruptedException { + final FileChunkWriter chunkWriter = spy(new FileChunkWriter() { + @Override + public void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + Assert.fail(); + } + }); + SegmentFileTransferHandler handler = new SegmentFileTransferHandler( + shard, + targetNode, + chunkWriter, + logger, + shard.getThreadPool(), + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + + final MultiChunkTransfer transfer = handler.sendFiles( + shard.store(), + filesToSend, + translogOps, + mock(ActionListener.class) + ); + + // start the transfer + cancellableThreads.cancel("test"); + transfer.start(); + verifyNoInteractions(chunkWriter); + IOUtils.close(transfer); + } + + public void testSendFiles_cancelThreads_afterStart() throws IOException, InterruptedException { + // index a doc a flush so we have more than 1 file to send. + indexDoc(shard, "_doc", "test"); + flushShard(shard, true); + filesToSend = getFilestoSend(shard); + + // we should have 4 files to send now - + // [_0.cfe, _0.si, _0.cfs, segments_3] + assertEquals(4, filesToSend.length); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + FileChunkWriter chunkWriter = spy(new FileChunkWriter() { + @Override + public void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + // cancel the threads at this point, we'll ensure this is not invoked more than once. + cancellableThreads.cancel("test"); + listener.onResponse(null); + } + }); + SegmentFileTransferHandler handler = new SegmentFileTransferHandler( + shard, + targetNode, + chunkWriter, + logger, + shard.getThreadPool(), + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + + final MultiChunkTransfer transfer = handler.sendFiles( + shard.store(), + filesToSend, + translogOps, + new ActionListener() { + @Override + public void onResponse(Void unused) { + // do nothing here, we will just resolve in test. + } + + @Override + public void onFailure(Exception e) { + assertEquals(CancellableThreads.ExecutionCancelledException.class, e.getClass()); + countDownLatch.countDown(); + } + } + ); + + // start the transfer + transfer.start(); + countDownLatch.await(30, TimeUnit.SECONDS); + verify(chunkWriter, times(1)).writeFileChunk(any(), anyLong(), any(), anyBoolean(), anyInt(), any()); + IOUtils.close(transfer); + } + + public void testSendFiles_CorruptIndexException() throws Exception { + final CancellableThreads cancellableThreads = new CancellableThreads(); + SegmentFileTransferHandler handler = new SegmentFileTransferHandler( + shard, + targetNode, + mock(FileChunkWriter.class), + logger, + shard.getThreadPool(), + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata( + IndexFileNames.SEGMENTS, + 1L, + "0", + org.apache.lucene.util.Version.LATEST + ); + + doNothing().when(shard).failShard(anyString(), any()); + assertThrows( + CorruptIndexException.class, + () -> { + handler.handleErrorOnSendFiles( + shard.store(), + new CorruptIndexException("test", "test"), + new StoreFileMetadata[] { SEGMENTS_FILE } + ); + } + ); + + verify(shard, times(1)).failShard(any(), any()); + } +} From 990246df1de8dc9f254645d43482069fd37edba4 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Tue, 14 Jun 2022 12:53:42 -0700 Subject: [PATCH 08/11] Address PR feedback: Correct synchronization in OngoingSegmentReplications. Throw when already replicating to a specific replica. Signed-off-by: Marc Handalian --- .../recovery/RecoverySourceHandler.java | 2 +- .../OngoingSegmentReplications.java | 164 +++++++++++------- .../SegmentFileTransferHandler.java | 2 +- .../SegmentReplicationSourceHandler.java | 14 +- .../SegmentReplicationSourceService.java | 4 - .../OngoingSegmentReplicationsTests.java | 15 ++ .../SegmentFileTransferHandlerTests.java | 6 +- .../SegmentReplicationSourceHandlerTests.java | 26 +++ 8 files changed, 157 insertions(+), 76 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 995fddb1efc54..9e219db5a4c96 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -663,7 +663,7 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A } void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener listener) { - final MultiChunkTransfer transfer = transferHandler.sendFiles( + final MultiChunkTransfer transfer = transferHandler.createTransfer( store, files, translogOps, diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 8a1cf84b5327f..f3d2a50d1d943 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -8,8 +8,10 @@ package org.opensearch.indices.replication; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; @@ -26,6 +28,8 @@ /** * Manages references to ongoing segrep events on a node. + * Each replica will have a new {@link SegmentReplicationSourceHandler} created when starting replication. + * CopyStates will be cached for reuse between replicas and only released when all replicas have finished copying segments. * * @opensearch.internal */ @@ -38,14 +42,15 @@ class OngoingSegmentReplications { /** * Constructor. - * @param indicesService {@link IndicesService} + * + * @param indicesService {@link IndicesService} * @param recoverySettings {@link RecoverySettings} */ OngoingSegmentReplications(IndicesService indicesService, RecoverySettings recoverySettings) { this.indicesService = indicesService; this.recoverySettings = recoverySettings; this.copyStateMap = Collections.synchronizedMap(new HashMap<>()); - this.nodesToHandlers = Collections.synchronizedMap(new HashMap<>()); + this.nodesToHandlers = ConcurrentCollections.newConcurrentMap(); } /** @@ -84,57 +89,29 @@ synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) thro } } - void cancelReplication(DiscoveryNode node) { - if (nodesToHandlers.containsKey(node)) { - final SegmentReplicationSourceHandler handler = nodesToHandlers.remove(node); - handler.cancel("Cancel on node left"); - removeCopyState(handler.getCopyState()); - } - } - - SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) { - return new SegmentReplicationSourceHandler( - node, - fileChunkWriter, - copyState.getShard().getThreadPool(), - copyState, - Math.toIntExact(recoverySettings.getChunkSize().getBytes()), - recoverySettings.getMaxConcurrentFileChunks() - ); - } - - /** - * Adds the input {@link CopyState} object to {@link #copyStateMap}. - * The key is the CopyState's {@link ReplicationCheckpoint} object. - */ - private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { - copyStateMap.putIfAbsent(checkpoint, copyState); - } - - /** - * Given a {@link ReplicationCheckpoint}, return the corresponding - * {@link CopyState} object, if any, from {@link #copyStateMap}. - */ - private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.get(replicationCheckpoint); - } - /** - * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} - * as a key by invoking {@link Map#containsKey(Object)}. + * Start sending files to the replica. + * + * @param request {@link GetSegmentFilesRequest} + * @param listener {@link ActionListener} that resolves when sending files is complete. */ - boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.containsKey(replicationCheckpoint); - } - void startSegmentCopy(GetSegmentFilesRequest request, ActionListener listener) { final DiscoveryNode node = request.getTargetNode(); - if (nodesToHandlers.containsKey(node)) { - final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node); + final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node); + if (handler != null) { + if (handler.isActive()) { + throw new OpenSearchException( + "Replication to shard {}, on node {} has already started", + request.getCheckpoint().getShardId(), + request.getTargetNode() + ); + } // update the given listener to release the CopyState before it resolves. final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { final SegmentReplicationSourceHandler sourceHandler = nodesToHandlers.remove(node); - removeCopyState(sourceHandler.getCopyState()); + if (sourceHandler != null) { + removeCopyState(sourceHandler.getCopyState()); + } }); handler.sendFiles(request, wrappedListener); } else { @@ -143,14 +120,15 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener createTargetHandler(node, copyState, fileChunkWriter)); return copyState; } + /** + * Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down. + * + * @param shard {@link IndexShard} + * @param reason {@link String} - Reason for the cancel + */ + synchronized void cancel(IndexShard shard, String reason) { + for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) { + if (entry.getCopyState().getShard().equals(shard)) { + entry.cancel(reason); + } + } + copyStateMap.clear(); + } + + /** + * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} + * as a key by invoking {@link Map#containsKey(Object)}. + */ + boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { + return copyStateMap.containsKey(replicationCheckpoint); + } + int size() { return nodesToHandlers.size(); } @@ -179,17 +187,43 @@ int cachedCopyStateSize() { return copyStateMap.size(); } + private SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) { + return new SegmentReplicationSourceHandler( + node, + fileChunkWriter, + copyState.getShard().getThreadPool(), + copyState, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks() + ); + } + /** - * Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down. - * @param shard {@link IndexShard} - * @param reason {@link String} - Reason for the cancel + * Adds the input {@link CopyState} object to {@link #copyStateMap}. + * The key is the CopyState's {@link ReplicationCheckpoint} object. */ - public void cancel(IndexShard shard, String reason) { - for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) { - if (entry.getCopyState().getShard().equals(shard)) { - entry.cancel(reason); - } + private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { + copyStateMap.putIfAbsent(checkpoint, copyState); + } + + /** + * Given a {@link ReplicationCheckpoint}, return the corresponding + * {@link CopyState} object, if any, from {@link #copyStateMap}. + */ + private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { + return copyStateMap.get(replicationCheckpoint); + } + + /** + * Remove a CopyState. Intended to be called after a replication event completes. + * This method will remove a copyState from the copyStateMap only if its refCount hits 0. + * + * @param copyState {@link CopyState} + */ + private synchronized void removeCopyState(CopyState copyState) { + copyState.decRef(); + if (copyState.refCount() <= 0) { + copyStateMap.remove(copyState.getRequestedReplicationCheckpoint()); } - copyStateMap.clear(); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java index 23f6d67cf5ab7..e95c2c6470b4b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java @@ -87,7 +87,7 @@ public SegmentFileTransferHandler( * @param listener {@link ActionListener} * @return {@link MultiChunkTransfer} */ - public MultiChunkTransfer sendFiles( + public MultiChunkTransfer createTransfer( Store store, StoreFileMetadata[] files, IntSupplier translogOps, diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java index 2f433c87158d8..939b699759579 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -9,6 +9,7 @@ package org.opensearch.indices.replication; import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; import org.opensearch.cluster.node.DiscoveryNode; @@ -35,6 +36,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; /** @@ -51,6 +53,7 @@ class SegmentReplicationSourceHandler { private final ListenableFuture future = new ListenableFuture<>(); private final List resources = new CopyOnWriteArrayList<>(); private final Logger logger; + private final AtomicBoolean active = new AtomicBoolean(); /** * Constructor. @@ -95,7 +98,10 @@ class SegmentReplicationSourceHandler { * @param request {@link GetSegmentFilesRequest} request object containing list of files to be sent. * @param listener {@link ActionListener} that completes with the list of files sent. */ - public void sendFiles(GetSegmentFilesRequest request, ActionListener listener) { + public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListener listener) { + if (active.compareAndSet(false, true) == false) { + throw new OpenSearchException("Replication to {} is already running.", shard.shardId()); + } future.addListener(listener, OpenSearchExecutors.newDirectExecutorService()); final Closeable releaseResources = () -> IOUtils.close(resources); try { @@ -131,7 +137,7 @@ public void sendFiles(GetSegmentFilesRequest request, ActionListener transfer = segmentFileTransferHandler - .sendFiles(shard.store(), storeFileMetadata, () -> 0, sendFileStep); + .createTransfer(shard.store(), storeFileMetadata, () -> 0, sendFileStep); resources.add(transfer); transfer.start(); @@ -157,4 +163,8 @@ public void cancel(String reason) { CopyState getCopyState() { return copyState; } + + public boolean isActive() { + return active.get(); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 6b1a4b1563e37..227acbc83674d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -24,7 +24,6 @@ import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RetryableTransportClient; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -88,9 +87,6 @@ public SegmentReplicationSourceService( private class CheckpointInfoRequestHandler implements TransportRequestHandler { @Override public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception { - final ReplicationCheckpoint checkpoint = request.getCheckpoint(); - logger.trace("Received request for checkpoint {}", checkpoint); - final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = new RemoteSegmentFileChunkWriter( request.getReplicationId(), recoverySettings, diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java index 9cc74276d2b1a..260f6a13b5010 100644 --- a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -9,6 +9,7 @@ package org.opensearch.indices.replication; import org.junit.Assert; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.ClusterSettings; @@ -213,4 +214,18 @@ public void onFailure(Exception e) { verify(listener, times(1)).onResponse(any()); } + public void testShardAlreadyReplicatingToNode() throws IOException { + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; + replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertThrows(OpenSearchException.class, () -> { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); }); + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java index 536a042c8eb10..5fd8bc1e74625 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java @@ -100,7 +100,7 @@ public void writeFileChunk( fileChunkSizeInBytes, maxConcurrentFileChunks ); - final MultiChunkTransfer transfer = handler.sendFiles( + final MultiChunkTransfer transfer = handler.createTransfer( shard.store(), filesToSend, translogOps, @@ -139,7 +139,7 @@ public void writeFileChunk( maxConcurrentFileChunks ); - final MultiChunkTransfer transfer = handler.sendFiles( + final MultiChunkTransfer transfer = handler.createTransfer( shard.store(), filesToSend, translogOps, @@ -190,7 +190,7 @@ public void writeFileChunk( maxConcurrentFileChunks ); - final MultiChunkTransfer transfer = handler.sendFiles( + final MultiChunkTransfer transfer = handler.createTransfer( shard.store(), filesToSend, translogOps, diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java index baa8d4fa29aff..70061c54d0da2 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -164,4 +164,30 @@ public void onFailure(Exception e) { } }); } + + public void testReplicationAlreadyRunning() throws IOException { + chunkWriter = mock(FileChunkWriter.class); + + final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); + final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( + localNode, + chunkWriter, + threadPool, + copyState, + 5000, + 1 + ); + + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + Collections.emptyList(), + latestReplicationCheckpoint + ); + + handler.sendFiles(getSegmentFilesRequest, mock(ActionListener.class)); + Assert.assertThrows(OpenSearchException.class, () -> { handler.sendFiles(getSegmentFilesRequest, mock(ActionListener.class)); }); + } } From b016ad9e6029693e4e2d4143a2581c7c7056bcb0 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 17 Jun 2022 10:24:52 -0700 Subject: [PATCH 09/11] Rename handler isActive to isReplicating. Signed-off-by: Marc Handalian --- .../indices/replication/OngoingSegmentReplications.java | 2 +- .../replication/SegmentReplicationSourceHandler.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index f3d2a50d1d943..679c56fbc79c3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -99,7 +99,7 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener future = new ListenableFuture<>(); private final List resources = new CopyOnWriteArrayList<>(); private final Logger logger; - private final AtomicBoolean active = new AtomicBoolean(); + private final AtomicBoolean isReplicating = new AtomicBoolean(); /** * Constructor. @@ -99,7 +99,7 @@ class SegmentReplicationSourceHandler { * @param listener {@link ActionListener} that completes with the list of files sent. */ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListener listener) { - if (active.compareAndSet(false, true) == false) { + if (isReplicating.compareAndSet(false, true) == false) { throw new OpenSearchException("Replication to {} is already running.", shard.shardId()); } future.addListener(listener, OpenSearchExecutors.newDirectExecutorService()); @@ -164,7 +164,7 @@ CopyState getCopyState() { return copyState; } - public boolean isActive() { - return active.get(); + public boolean isReplicating() { + return isReplicating.get(); } } From 081195988c50c7d5cd67c9b697afc9e2da957df5 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 17 Jun 2022 12:40:06 -0700 Subject: [PATCH 10/11] PR Feedback: - Make FileChunkWriter a functional interface. - simplify logic in OngoingSegmentReplications.prepareForReplication. - Rename segmentReplicationSourceHandler.isActive to isReplicating. Signed-off-by: Marc Handalian --- .../org/opensearch/indices/recovery/FileChunkWriter.java | 1 + .../indices/replication/OngoingSegmentReplications.java | 9 +++++---- .../replication/SegmentReplicationSourceService.java | 1 - server/src/main/java/org/opensearch/node/Node.java | 3 +++ 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java index 073442e9d58ae..cb43af3b82e09 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java @@ -17,6 +17,7 @@ * * @opensearch.internal */ +@FunctionalInterface public interface FileChunkWriter { void writeFileChunk( diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java index 679c56fbc79c3..6302d364fc6d1 100644 --- a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -145,14 +145,16 @@ void cancelReplication(DiscoveryNode node) { */ CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException { final CopyState copyState = getCachedCopyState(request.getCheckpoint()); - if (nodesToHandlers.containsKey(request.getTargetNode())) { + if (nodesToHandlers.putIfAbsent( + request.getTargetNode(), + createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter) + ) != null) { throw new OpenSearchException( "Shard copy {} on node {} already replicating", request.getCheckpoint().getShardId(), request.getTargetNode() ); } - nodesToHandlers.computeIfAbsent(request.getTargetNode(), node -> createTargetHandler(node, copyState, fileChunkWriter)); return copyState; } @@ -221,8 +223,7 @@ private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpo * @param copyState {@link CopyState} */ private synchronized void removeCopyState(CopyState copyState) { - copyState.decRef(); - if (copyState.refCount() <= 0) { + if (copyState.decRef() == true) { copyStateMap.remove(copyState.getRequestedReplicationCheckpoint()); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 227acbc83674d..d428459884f97 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -60,7 +60,6 @@ public static class Actions { private final OngoingSegmentReplications ongoingSegmentReplications; - // TODO mark this as injected and bind in Node public SegmentReplicationSourceService( IndicesService indicesService, TransportService transportService, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7e205b88e9eb1..b9ac0e610218b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -37,6 +37,7 @@ import org.apache.lucene.util.Constants; import org.apache.lucene.util.SetOnce; import org.opensearch.index.IndexingPressureService; +import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; import org.opensearch.Build; @@ -932,6 +933,8 @@ protected Node( .toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class) .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); + b.bind(SegmentReplicationSourceService.class) + .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); From d94401d820114d158174c7ee2cb6b78a3be732c6 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Fri, 17 Jun 2022 12:58:39 -0700 Subject: [PATCH 11/11] Update wiring of SegmentReplicationSourceService only if feature flag is enabled. Signed-off-by: Marc Handalian --- server/src/main/java/org/opensearch/node/Node.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index b9ac0e610218b..6ecf8b64f464f 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.Constants; import org.apache.lucene.util.SetOnce; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexingPressureService; import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.watcher.ResourceWatcherService; @@ -220,6 +221,7 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toList; +import static org.opensearch.common.util.FeatureFlags.REPLICATION_TYPE; import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY; /** @@ -933,8 +935,10 @@ protected Node( .toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class) .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); - b.bind(SegmentReplicationSourceService.class) - .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); + if (FeatureFlags.isEnabled(REPLICATION_TYPE)) { + b.bind(SegmentReplicationSourceService.class) + .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); + } } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));