diff --git a/server/src/main/java/org/opensearch/OpenSearchException.java b/server/src/main/java/org/opensearch/OpenSearchException.java index 019d3a2cb42cd..4ebcd9622ce38 100644 --- a/server/src/main/java/org/opensearch/OpenSearchException.java +++ b/server/src/main/java/org/opensearch/OpenSearchException.java @@ -67,6 +67,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static java.util.Collections.unmodifiableMap; +import static org.opensearch.Version.V_2_1_0; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE; import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.common.xcontent.XContentParserUtils.ensureFieldName; @@ -1594,6 +1595,12 @@ private enum OpenSearchExceptionHandle { org.opensearch.transport.NoSeedNodeLeftException::new, 160, LegacyESVersion.V_7_10_0 + ), + REPLICATION_FAILED_EXCEPTION( + org.opensearch.indices.replication.common.ReplicationFailedException.class, + org.opensearch.indices.replication.common.ReplicationFailedException::new, + 161, + V_2_1_0 ); final Class exceptionClass; diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 1dd31b37efc12..2c8d171526d58 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -84,7 +84,6 @@ import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogStats; import org.opensearch.index.translog.TranslogDeletionPolicy; -import org.opensearch.index.translog.TranslogStats; import org.opensearch.search.suggest.completion.CompletionStats; import java.io.Closeable; diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 2ba06c0f16927..a8b00c9ed8504 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2285,7 +2285,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { } @Override - public SegmentInfos getLatestSegmentInfos() { + protected SegmentInfos getLatestSegmentInfos() { OpenSearchDirectoryReader reader = null; try { reader = internalReaderManager.acquire(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5dd4cf154539f..442cbeb913d93 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -162,9 +162,9 @@ import org.opensearch.indices.recovery.RecoveryListener; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; -import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -1361,6 +1361,12 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th } } + public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException { + if (getReplicationEngine().isPresent()) { + getReplicationEngine().get().updateSegments(infos, seqNo); + } + } + /** * Snapshots the most recent safe index commit from the currently running engine. * All index files referenced by this index commit won't be freed until the commit/snapshot is closed. @@ -1379,15 +1385,60 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti * Returns the lastest Replication Checkpoint that shard received */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { - return new ReplicationCheckpoint(shardId, 0, 0, 0, 0); + try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { + return Optional.ofNullable(snapshot.get()) + .map( + segmentInfos -> new ReplicationCheckpoint( + this.shardId, + getOperationPrimaryTerm(), + segmentInfos.getGeneration(), + getProcessedLocalCheckpoint(), + segmentInfos.getVersion() + ) + ) + .orElse( + new ReplicationCheckpoint( + shardId, + getOperationPrimaryTerm(), + SequenceNumbers.NO_OPS_PERFORMED, + getProcessedLocalCheckpoint(), + SequenceNumbers.NO_OPS_PERFORMED + ) + ); + } catch (IOException ex) { + throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex); + } } /** - * Invoked when a new checkpoint is received from a primary shard. Starts the copy process. + * Checks if checkpoint should be processed + * + * @param requestCheckpoint received checkpoint that is checked for processing + * @return true if checkpoint should be processed */ - public synchronized void onNewCheckpoint(final PublishCheckpointRequest request) { - assert shardRouting.primary() == false; - // TODO + public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + if (state().equals(IndexShardState.STARTED) == false) { + logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state())); + return false; + } + ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint(); + if (localCheckpoint.isAheadOf(requestCheckpoint)) { + logger.trace( + () -> new ParameterizedMessage( + "Ignoring new replication checkpoint - Shard is already on checkpoint {} that is ahead of {}", + localCheckpoint, + requestCheckpoint + ) + ); + return false; + } + if (localCheckpoint.equals(requestCheckpoint)) { + logger.trace( + () -> new ParameterizedMessage("Ignoring new replication checkpoint - Shard is already on checkpoint {}", requestCheckpoint) + ); + return false; + } + return true; } /** @@ -2657,7 +2708,6 @@ public long getLocalCheckpoint() { * Also see {@link #getLocalCheckpoint()}. */ public long getProcessedLocalCheckpoint() { - assert indexSettings.isSegRepEnabled(); // Returns checkpoint only if the current engine is an instance of NRTReplicationEngine or InternalEngine return getReplicationEngine().map(NRTReplicationEngine::getProcessedLocalCheckpoint).orElseGet(() -> { final Engine engine = getEngine(); diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index f818456c3a2c8..2309004c0777d 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -64,6 +64,7 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; +import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.Streams; @@ -706,6 +707,51 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } } + /** + * This method deletes every file in this store that is not contained in either the remote or local metadata snapshots. + * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. + * In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk. + * @param reason the reason for this cleanup operation logged for each deleted file + * @param localSnapshot The local snapshot from in memory SegmentInfos. + * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. + */ + public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException { + // fetch a snapshot from the latest on disk Segments_N file. This can be behind + // the passed in local in memory snapshot, so we want to ensure files it references are not removed. + metadataLock.writeLock().lock(); + try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo())); + } finally { + metadataLock.writeLock().unlock(); + } + } + + private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot) + throws IOException { + assert metadataLock.isWriteLockedByCurrentThread(); + for (String existingFile : directory.listAll()) { + if (Store.isAutogenerated(existingFile) + || localSnapshot.contains(existingFile) + || (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) { + // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete + // checksum) + continue; + } + try { + directory.deleteFile(reason, existingFile); + } catch (IOException ex) { + if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { + // TODO do we need to also fail this if we can't delete the pending commit file? + // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit + // point around? + throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); + } + logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); + // ignore, we don't really care, will get deleted later on + } + } + } + // pkg private for testing final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) { final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata); diff --git a/server/src/main/java/org/opensearch/indices/IndicesModule.java b/server/src/main/java/org/opensearch/indices/IndicesModule.java index c728952a9dcbf..e413a73940011 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesModule.java +++ b/server/src/main/java/org/opensearch/indices/IndicesModule.java @@ -284,6 +284,8 @@ protected void configure() { bind(RetentionLeaseSyncer.class).asEagerSingleton(); if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) { bind(SegmentReplicationCheckpointPublisher.class).asEagerSingleton(); + } else { + bind(SegmentReplicationCheckpointPublisher.class).toInstance(SegmentReplicationCheckpointPublisher.EMPTY); } } diff --git a/server/src/main/java/org/opensearch/indices/RunUnderPrimaryPermit.java b/server/src/main/java/org/opensearch/indices/RunUnderPrimaryPermit.java new file mode 100644 index 0000000000000..29cac1601dc67 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/RunUnderPrimaryPermit.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices; + +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.util.concurrent.FutureUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardRelocatedException; +import org.opensearch.threadpool.ThreadPool; + +import java.util.concurrent.CompletableFuture; + +/** + * Execute a Runnable after acquiring the primary's operation permit. + * + * @opensearch.internal + */ +public final class RunUnderPrimaryPermit { + + public static void run( + CancellableThreads.Interruptible runnable, + String reason, + IndexShard primary, + CancellableThreads cancellableThreads, + Logger logger + ) { + cancellableThreads.execute(() -> { + CompletableFuture permit = new CompletableFuture<>(); + final ActionListener onAcquired = new ActionListener<>() { + @Override + public void onResponse(Releasable releasable) { + if (permit.complete(releasable) == false) { + releasable.close(); + } + } + + @Override + public void onFailure(Exception e) { + permit.completeExceptionally(e); + } + }; + primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); + try (Releasable ignored = FutureUtils.get(permit)) { + // check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent + // races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated() + if (primary.isRelocatedPrimary()) { + throw new IndexShardRelocatedException(primary.shardId()); + } + runnable.run(); + } finally { + // just in case we got an exception (likely interrupted) while waiting for the get + permit.whenComplete((r, e) -> { + if (r != null) { + r.close(); + } + if (e != null) { + logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e); + } + }); + } + }); + } +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java new file mode 100644 index 0000000000000..cb43af3b82e09 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/recovery/FileChunkWriter.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.recovery; + +import org.opensearch.action.ActionListener; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.index.store.StoreFileMetadata; + +/** + * Writes a partial file chunk to the target store. + * + * @opensearch.internal + */ +@FunctionalInterface +public interface FileChunkWriter { + + void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ); +} diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 0870fd4ca9295..9e219db5a4c96 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -33,17 +33,13 @@ package org.opensearch.indices.recovery; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.SetOnce; -import org.opensearch.ExceptionsHelper; import org.opensearch.LegacyESVersion; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; @@ -55,13 +51,10 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.CheckedRunnable; import org.opensearch.common.StopWatch; -import org.opensearch.common.bytes.BytesArray; -import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.logging.Loggers; -import org.opensearch.common.lucene.store.InputStreamIndexInput; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.CancellableThreads; @@ -77,26 +70,22 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; -import org.opensearch.index.shard.IndexShardRelocatedException; import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.RunUnderPrimaryPermit; +import org.opensearch.indices.replication.SegmentFileTransferHandler; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.RemoteTransportException; import org.opensearch.transport.Transports; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.Deque; import java.util.List; import java.util.Locale; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -128,13 +117,13 @@ public class RecoverySourceHandler { private final StartRecoveryRequest request; private final int chunkSizeInBytes; private final RecoveryTargetHandler recoveryTarget; - private final int maxConcurrentFileChunks; private final int maxConcurrentOperations; private final ThreadPool threadPool; private final CancellableThreads cancellableThreads = new CancellableThreads(); private final List resources = new CopyOnWriteArrayList<>(); private final ListenableFuture future = new ListenableFuture<>(); public static final String PEER_RECOVERY_NAME = "peer-recovery"; + private final SegmentFileTransferHandler transferHandler; public RecoverySourceHandler( IndexShard shard, @@ -145,15 +134,24 @@ public RecoverySourceHandler( int maxConcurrentFileChunks, int maxConcurrentOperations ) { + this.logger = Loggers.getLogger(RecoverySourceHandler.class, request.shardId(), "recover to " + request.targetNode().getName()); + this.transferHandler = new SegmentFileTransferHandler( + shard, + request.targetNode(), + recoveryTarget, + logger, + threadPool, + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); this.shard = shard; - this.recoveryTarget = recoveryTarget; this.threadPool = threadPool; this.request = request; + this.recoveryTarget = recoveryTarget; this.shardId = this.request.shardId().id(); - this.logger = Loggers.getLogger(getClass(), request.shardId(), "recover to " + request.targetNode().getName()); this.chunkSizeInBytes = fileChunkSizeInBytes; // if the target is on an old version, it won't be able to handle out-of-order file chunks. - this.maxConcurrentFileChunks = maxConcurrentFileChunks; this.maxConcurrentOperations = maxConcurrentOperations; } @@ -192,7 +190,7 @@ public void recoverToTarget(ActionListener listener) { final SetOnce retentionLeaseRef = new SetOnce<>(); - runUnderPrimaryPermit(() -> { + RunUnderPrimaryPermit.run(() -> { final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId()); if (targetShardRouting == null) { @@ -286,7 +284,7 @@ && isTargetSameHistory() }); final StepListener deleteRetentionLeaseStep = new StepListener<>(); - runUnderPrimaryPermit(() -> { + RunUnderPrimaryPermit.run(() -> { try { // If the target previously had a copy of this shard then a file-based recovery might move its global // checkpoint backwards. We must therefore remove any existing retention lease so that we can create a @@ -332,7 +330,7 @@ && isTargetSameHistory() * make sure to do this before sampling the max sequence number in the next step, to ensure that we send * all documents up to maxSeqNo in phase2. */ - runUnderPrimaryPermit( + RunUnderPrimaryPermit.run( () -> shard.initiateTracking(request.targetAllocationId()), shardId + " initiating tracking of " + request.targetAllocationId(), shard, @@ -420,50 +418,6 @@ private int countNumberOfHistoryOperations(long startingSeqNo) throws IOExceptio return shard.countNumberOfHistoryOperations(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE); } - static void runUnderPrimaryPermit( - CancellableThreads.Interruptible runnable, - String reason, - IndexShard primary, - CancellableThreads cancellableThreads, - Logger logger - ) { - cancellableThreads.execute(() -> { - CompletableFuture permit = new CompletableFuture<>(); - final ActionListener onAcquired = new ActionListener() { - @Override - public void onResponse(Releasable releasable) { - if (permit.complete(releasable) == false) { - releasable.close(); - } - } - - @Override - public void onFailure(Exception e) { - permit.completeExceptionally(e); - } - }; - primary.acquirePrimaryOperationPermit(onAcquired, ThreadPool.Names.SAME, reason); - try (Releasable ignored = FutureUtils.get(permit)) { - // check that the IndexShard still has the primary authority. This needs to be checked under operation permit to prevent - // races, as IndexShard will switch its authority only when it holds all operation permits, see IndexShard.relocated() - if (primary.isRelocatedPrimary()) { - throw new IndexShardRelocatedException(primary.shardId()); - } - runnable.run(); - } finally { - // just in case we got an exception (likely interrupted) while waiting for the get - permit.whenComplete((r, e) -> { - if (r != null) { - r.close(); - } - if (e != null) { - logger.trace("suppressing exception on completion (it was already bubbled up or the operation was aborted)", e); - } - }); - } - }); - } - /** * Increases the store reference and returns a {@link Releasable} that will decrease the store reference using the generic thread pool. * We must never release the store using an interruptible thread as we can risk invalidating the node lock. @@ -708,8 +662,19 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A } } + void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener listener) { + final MultiChunkTransfer transfer = transferHandler.createTransfer( + store, + files, + translogOps, + listener + ); + resources.add(transfer); + transfer.start(); + } + void createRetentionLease(final long startingSeqNo, ActionListener listener) { - runUnderPrimaryPermit(() -> { + RunUnderPrimaryPermit.run(() -> { // Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local // checkpoint of the safe commit we're creating and this lease's retained seqno with the retention lock, and by cloning an // existing lease we (approximately) know that all our peers are also retaining history as requested by the cloned lease. If @@ -983,7 +948,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire * the permit then the state of the shard will be relocated and this recovery will fail. */ - runUnderPrimaryPermit( + RunUnderPrimaryPermit.run( () -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), shardId + " marking " + request.targetAllocationId() + " as in sync", shard, @@ -995,7 +960,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis cancellableThreads.checkForCancel(); recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener); finalizeListener.whenComplete(r -> { - runUnderPrimaryPermit( + RunUnderPrimaryPermit.run( () -> shard.updateGlobalCheckpointForShard(request.targetAllocationId(), globalCheckpoint), shardId + " updating " + request.targetAllocationId() + "'s global checkpoint", shard, @@ -1056,121 +1021,6 @@ public String toString() { + '}'; } - /** - * A file chunk from the recovery source - * - * @opensearch.internal - */ - private static class FileChunk implements MultiChunkTransfer.ChunkRequest, Releasable { - final StoreFileMetadata md; - final BytesReference content; - final long position; - final boolean lastChunk; - final Releasable onClose; - - FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose) { - this.md = md; - this.content = content; - this.position = position; - this.lastChunk = lastChunk; - this.onClose = onClose; - } - - @Override - public boolean lastChunk() { - return lastChunk; - } - - @Override - public void close() { - onClose.close(); - } - } - - void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener listener) { - ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length)); // send smallest first - - final MultiChunkTransfer multiFileSender = new MultiChunkTransfer( - logger, - threadPool.getThreadContext(), - listener, - maxConcurrentFileChunks, - Arrays.asList(files) - ) { - - final Deque buffers = new ConcurrentLinkedDeque<>(); - InputStreamIndexInput currentInput = null; - long offset = 0; - - @Override - protected void onNewResource(StoreFileMetadata md) throws IOException { - offset = 0; - IOUtils.close(currentInput, () -> currentInput = null); - final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); - currentInput = new InputStreamIndexInput(indexInput, md.length()) { - @Override - public void close() throws IOException { - IOUtils.close(indexInput, super::close); // InputStreamIndexInput's close is a noop - } - }; - } - - private byte[] acquireBuffer() { - final byte[] buffer = buffers.pollFirst(); - if (buffer != null) { - return buffer; - } - return new byte[chunkSizeInBytes]; - } - - @Override - protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException { - assert Transports.assertNotTransportThread("read file chunk"); - cancellableThreads.checkForCancel(); - final byte[] buffer = acquireBuffer(); - final int bytesRead = currentInput.read(buffer); - if (bytesRead == -1) { - throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name()); - } - final boolean lastChunk = offset + bytesRead == md.length(); - final FileChunk chunk = new FileChunk( - md, - new BytesArray(buffer, 0, bytesRead), - offset, - lastChunk, - () -> buffers.addFirst(buffer) - ); - offset += bytesRead; - return chunk; - } - - @Override - protected void executeChunkRequest(FileChunk request, ActionListener listener) { - cancellableThreads.checkForCancel(); - recoveryTarget.writeFileChunk( - request.md, - request.position, - request.content, - request.lastChunk, - translogOps.getAsInt(), - ActionListener.runBefore(listener, request::close) - ); - } - - @Override - protected void handleError(StoreFileMetadata md, Exception e) throws Exception { - handleErrorOnSendFiles(store, e, new StoreFileMetadata[] { md }); - } - - @Override - public void close() throws IOException { - IOUtils.close(currentInput, () -> currentInput = null); - } - }; - resources.add(multiFileSender); - multiFileSender.start(); - } - private void cleanFiles( Store store, Store.MetadataSnapshot sourceMetadata, @@ -1194,52 +1044,9 @@ private void cleanFiles( ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> { StoreFileMetadata[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetadata[]::new); ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetadata::length)); // check small files first - handleErrorOnSendFiles(store, e, mds); + transferHandler.handleErrorOnSendFiles(store, e, mds); throw e; })) ); } - - private void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] mds) throws Exception { - final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e); - assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[handle error on send/clean files]"); - if (corruptIndexException != null) { - Exception localException = null; - for (StoreFileMetadata md : mds) { - cancellableThreads.checkForCancel(); - logger.debug("checking integrity for file {} after remove corruption exception", md); - if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! - logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md); - if (localException == null) { - localException = corruptIndexException; - } - failEngine(corruptIndexException); - } - } - if (localException != null) { - throw localException; - } else { // corruption has happened on the way to replica - RemoteTransportException remoteException = new RemoteTransportException( - "File corruption occurred on recovery but checksums are ok", - null - ); - remoteException.addSuppressed(e); - logger.warn( - () -> new ParameterizedMessage( - "{} Remote file corruption on node {}, recovering {}. local checksum OK", - shardId, - request.targetNode(), - mds - ), - corruptIndexException - ); - throw remoteException; - } - } - throw e; - } - - protected void failEngine(IOException cause) { - shard.failShard("recovery", cause); - } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 1735bb015c90c..426409f7a5b65 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -77,9 +77,7 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH private static final String RECOVERY_PREFIX = "recovery."; private final DiscoveryNode sourceNode; - private final CancellableThreads cancellableThreads; protected final MultiFileWriter multiFileWriter; - protected final Store store; // latch that can be used to blockingly wait for RecoveryTarget to be closed private final CountDownLatch closedLatch = new CountDownLatch(1); @@ -93,13 +91,10 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH */ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) { super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener); - this.cancellableThreads = new CancellableThreads(); this.sourceNode = sourceNode; indexShard.recoveryStats().incCurrentAsTarget(); - this.store = indexShard.store(); final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + "."; this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount); - store.incRef(); } /** @@ -132,11 +127,6 @@ public CancellableThreads cancellableThreads() { return cancellableThreads; } - public Store store() { - ensureRefCount(); - return store; - } - public String description() { return "recovery from " + source(); } @@ -258,14 +248,6 @@ protected void onDone() { indexShard.postRecovery("peer recovery done"); } - /** - * if {@link #cancellableThreads()} was used, the threads will be interrupted. - */ - @Override - protected void onCancel(String reason) { - cancellableThreads.cancel(reason); - } - /*** Implementation of {@link RecoveryTargetHandler } */ @Override diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java index 84b6ec170d3f7..c750c0e88364b 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTargetHandler.java @@ -32,11 +32,9 @@ package org.opensearch.indices.recovery; import org.opensearch.action.ActionListener; -import org.opensearch.common.bytes.BytesReference; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLeases; import org.opensearch.index.store.Store; -import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; import java.util.List; @@ -46,7 +44,7 @@ * * @opensearch.internal */ -public interface RecoveryTargetHandler { +public interface RecoveryTargetHandler extends FileChunkWriter { /** * Prepares the target to receive translog operations, after all file have been copied @@ -123,15 +121,5 @@ void receiveFileInfo( */ void cleanFiles(int totalTranslogOps, long globalCheckpoint, Store.MetadataSnapshot sourceMetadata, ActionListener listener); - /** writes a partial file chunk to the target store */ - void writeFileChunk( - StoreFileMetadata fileMetadata, - long position, - BytesReference content, - boolean lastChunk, - int totalTranslogOps, - ActionListener listener - ); - default void cancel() {} } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index ab6466feb11f8..e7ae62c1bee7d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -34,8 +34,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.store.RateLimiter; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.bytes.BytesReference; @@ -46,12 +44,12 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.RemoteSegmentFileChunkWriter; import org.opensearch.transport.EmptyTransportResponseHandler; import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportResponse; import org.opensearch.transport.TransportService; -import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -72,13 +70,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { private final RecoverySettings recoverySettings; private final TransportRequestOptions translogOpsRequestOptions; - private final TransportRequestOptions fileChunkRequestOptions; - private final AtomicLong bytesSinceLastPause = new AtomicLong(); private final AtomicLong requestSeqNoGenerator = new AtomicLong(0); - - private final Consumer onSourceThrottle; private final RetryableTransportClient retryableTransportClient; + private final RemoteSegmentFileChunkWriter fileChunkWriter; public RemoteRecoveryTargetHandler( long recoveryId, @@ -102,15 +97,19 @@ public RemoteRecoveryTargetHandler( this.shardId = shardId; this.targetNode = targetNode; this.recoverySettings = recoverySettings; - this.onSourceThrottle = onSourceThrottle; this.translogOpsRequestOptions = TransportRequestOptions.builder() .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionLongTimeout()) .build(); - this.fileChunkRequestOptions = TransportRequestOptions.builder() - .withType(TransportRequestOptions.Type.RECOVERY) - .withTimeout(recoverySettings.internalActionTimeout()) - .build(); + this.fileChunkWriter = new RemoteSegmentFileChunkWriter( + recoveryId, + recoverySettings, + retryableTransportClient, + shardId, + PeerRecoveryTargetService.Actions.FILE_CHUNK, + requestSeqNoGenerator, + onSourceThrottle + ); } public DiscoveryNode targetNode() { @@ -235,6 +234,11 @@ public void cleanFiles( retryableTransportClient.executeRetryableAction(action, request, responseListener, reader); } + @Override + public void cancel() { + retryableTransportClient.cancel(); + } + @Override public void writeFileChunk( StoreFileMetadata fileMetadata, @@ -244,57 +248,6 @@ public void writeFileChunk( int totalTranslogOps, ActionListener listener ) { - // Pause using the rate limiter, if desired, to throttle the recovery - final long throttleTimeInNanos; - // always fetch the ratelimiter - it might be updated in real-time on the recovery settings - final RateLimiter rl = recoverySettings.rateLimiter(); - if (rl != null) { - long bytes = bytesSinceLastPause.addAndGet(content.length()); - if (bytes > rl.getMinPauseCheckBytes()) { - // Time to pause - bytesSinceLastPause.addAndGet(-bytes); - try { - throttleTimeInNanos = rl.pause(bytes); - onSourceThrottle.accept(throttleTimeInNanos); - } catch (IOException e) { - throw new OpenSearchException("failed to pause recovery", e); - } - } else { - throttleTimeInNanos = 0; - } - } else { - throttleTimeInNanos = 0; - } - - final String action = PeerRecoveryTargetService.Actions.FILE_CHUNK; - final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); - /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can - * see how many translog ops we accumulate while copying files across the network. A future optimization - * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. - */ - final FileChunkRequest request = new FileChunkRequest( - recoveryId, - requestSeqNo, - shardId, - fileMetadata, - position, - content, - lastChunk, - totalTranslogOps, - throttleTimeInNanos - ); - final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; - retryableTransportClient.executeRetryableAction( - action, - request, - fileChunkRequestOptions, - ActionListener.map(listener, r -> null), - reader - ); - } - - @Override - public void cancel() { - retryableTransportClient.cancel(); + fileChunkWriter.writeFileChunk(fileMetadata, position, content, lastChunk, totalTranslogOps, listener); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java index bc10cc80b7fdc..a7113fb3fee5c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RetryableTransportClient.java @@ -74,7 +74,7 @@ public void executeRetryableAction( executeRetryableAction(action, request, options, actionListener, reader); } - void executeRetryableAction( + public void executeRetryableAction( String action, TransportRequest request, TransportRequestOptions options, diff --git a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java index 21749d3fe7d8a..daad33ed93f28 100644 --- a/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/GetSegmentFilesRequest.java @@ -57,4 +57,8 @@ public void writeTo(StreamOutput out) throws IOException { public ReplicationCheckpoint getCheckpoint() { return checkpoint; } + + public List getFilesToFetch() { + return filesToFetch; + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java new file mode 100644 index 0000000000000..6302d364fc6d1 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java @@ -0,0 +1,230 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyState; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Manages references to ongoing segrep events on a node. + * Each replica will have a new {@link SegmentReplicationSourceHandler} created when starting replication. + * CopyStates will be cached for reuse between replicas and only released when all replicas have finished copying segments. + * + * @opensearch.internal + */ +class OngoingSegmentReplications { + + private final RecoverySettings recoverySettings; + private final IndicesService indicesService; + private final Map copyStateMap; + private final Map nodesToHandlers; + + /** + * Constructor. + * + * @param indicesService {@link IndicesService} + * @param recoverySettings {@link RecoverySettings} + */ + OngoingSegmentReplications(IndicesService indicesService, RecoverySettings recoverySettings) { + this.indicesService = indicesService; + this.recoverySettings = recoverySettings; + this.copyStateMap = Collections.synchronizedMap(new HashMap<>()); + this.nodesToHandlers = ConcurrentCollections.newConcurrentMap(); + } + + /** + * Operations on the {@link #copyStateMap} member. + */ + + /** + * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key + * and returns the cached value if one is present. If the key is not present, a {@link CopyState} + * object is constructed and stored in the map before being returned. + */ + synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException { + if (isInCopyStateMap(checkpoint)) { + final CopyState copyState = fetchFromCopyStateMap(checkpoint); + // we incref the copyState for every replica that is using this checkpoint. + // decref will happen when copy completes. + copyState.incRef(); + return copyState; + } else { + // From the checkpoint's shard ID, fetch the IndexShard + ShardId shardId = checkpoint.getShardId(); + final IndexService indexService = indicesService.indexService(shardId.getIndex()); + final IndexShard indexShard = indexService.getShard(shardId.id()); + // build the CopyState object and cache it before returning + final CopyState copyState = new CopyState(checkpoint, indexShard); + + /** + * Use the checkpoint from the request as the key in the map, rather than + * the checkpoint from the created CopyState. This maximizes cache hits + * if replication targets make a request with an older checkpoint. + * Replication targets are expected to fetch the checkpoint in the response + * CopyState to bring themselves up to date. + */ + addToCopyStateMap(checkpoint, copyState); + return copyState; + } + } + + /** + * Start sending files to the replica. + * + * @param request {@link GetSegmentFilesRequest} + * @param listener {@link ActionListener} that resolves when sending files is complete. + */ + void startSegmentCopy(GetSegmentFilesRequest request, ActionListener listener) { + final DiscoveryNode node = request.getTargetNode(); + final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node); + if (handler != null) { + if (handler.isReplicating()) { + throw new OpenSearchException( + "Replication to shard {}, on node {} has already started", + request.getCheckpoint().getShardId(), + request.getTargetNode() + ); + } + // update the given listener to release the CopyState before it resolves. + final ActionListener wrappedListener = ActionListener.runBefore(listener, () -> { + final SegmentReplicationSourceHandler sourceHandler = nodesToHandlers.remove(node); + if (sourceHandler != null) { + removeCopyState(sourceHandler.getCopyState()); + } + }); + handler.sendFiles(request, wrappedListener); + } else { + listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + } + } + + /** + * Cancel any ongoing replications for a given {@link DiscoveryNode} + * + * @param node {@link DiscoveryNode} node for which to cancel replication events. + */ + void cancelReplication(DiscoveryNode node) { + final SegmentReplicationSourceHandler handler = nodesToHandlers.remove(node); + if (handler != null) { + handler.cancel("Cancel on node left"); + removeCopyState(handler.getCopyState()); + } + } + + /** + * Prepare for a Replication event. This method constructs a {@link CopyState} holding files to be sent off of the current + * nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its + * local store. It will then build a handler to orchestrate the segment copy that will be stored locally and started on a subsequent request from replicas + * with the list of required files. + * + * @param request {@link CheckpointInfoRequest} + * @param fileChunkWriter {@link FileChunkWriter} writer to handle sending files over the transport layer. + * @return {@link CopyState} the built CopyState for this replication event. + * @throws IOException - When there is an IO error building CopyState. + */ + CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException { + final CopyState copyState = getCachedCopyState(request.getCheckpoint()); + if (nodesToHandlers.putIfAbsent( + request.getTargetNode(), + createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter) + ) != null) { + throw new OpenSearchException( + "Shard copy {} on node {} already replicating", + request.getCheckpoint().getShardId(), + request.getTargetNode() + ); + } + return copyState; + } + + /** + * Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down. + * + * @param shard {@link IndexShard} + * @param reason {@link String} - Reason for the cancel + */ + synchronized void cancel(IndexShard shard, String reason) { + for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) { + if (entry.getCopyState().getShard().equals(shard)) { + entry.cancel(reason); + } + } + copyStateMap.clear(); + } + + /** + * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} + * as a key by invoking {@link Map#containsKey(Object)}. + */ + boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { + return copyStateMap.containsKey(replicationCheckpoint); + } + + int size() { + return nodesToHandlers.size(); + } + + int cachedCopyStateSize() { + return copyStateMap.size(); + } + + private SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) { + return new SegmentReplicationSourceHandler( + node, + fileChunkWriter, + copyState.getShard().getThreadPool(), + copyState, + Math.toIntExact(recoverySettings.getChunkSize().getBytes()), + recoverySettings.getMaxConcurrentFileChunks() + ); + } + + /** + * Adds the input {@link CopyState} object to {@link #copyStateMap}. + * The key is the CopyState's {@link ReplicationCheckpoint} object. + */ + private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { + copyStateMap.putIfAbsent(checkpoint, copyState); + } + + /** + * Given a {@link ReplicationCheckpoint}, return the corresponding + * {@link CopyState} object, if any, from {@link #copyStateMap}. + */ + private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { + return copyStateMap.get(replicationCheckpoint); + } + + /** + * Remove a CopyState. Intended to be called after a replication event completes. + * This method will remove a copyState from the copyStateMap only if its refCount hits 0. + * + * @param copyState {@link CopyState} + */ + private synchronized void removeCopyState(CopyState copyState) { + if (copyState.decRef() == true) { + copyStateMap.remove(copyState.getRequestedReplicationCheckpoint()); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java new file mode 100644 index 0000000000000..05f1c9d757e5c --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.store.RateLimiter; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RetryableTransportClient; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.transport.TransportRequestOptions; +import org.opensearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +/** + * This class handles sending file chunks over the transport layer to a target shard. + * + * @opensearch.internal + */ +public final class RemoteSegmentFileChunkWriter implements FileChunkWriter { + + private final AtomicLong requestSeqNoGenerator; + private final RetryableTransportClient retryableTransportClient; + private final ShardId shardId; + private final RecoverySettings recoverySettings; + private final long replicationId; + private final AtomicLong bytesSinceLastPause = new AtomicLong(); + private final TransportRequestOptions fileChunkRequestOptions; + private final Consumer onSourceThrottle; + private final String action; + + public RemoteSegmentFileChunkWriter( + long replicationId, + RecoverySettings recoverySettings, + RetryableTransportClient retryableTransportClient, + ShardId shardId, + String action, + AtomicLong requestSeqNoGenerator, + Consumer onSourceThrottle + ) { + this.replicationId = replicationId; + this.recoverySettings = recoverySettings; + this.retryableTransportClient = retryableTransportClient; + this.shardId = shardId; + this.requestSeqNoGenerator = requestSeqNoGenerator; + this.onSourceThrottle = onSourceThrottle; + this.fileChunkRequestOptions = TransportRequestOptions.builder() + .withType(TransportRequestOptions.Type.RECOVERY) + .withTimeout(recoverySettings.internalActionTimeout()) + .build(); + + this.action = action; + } + + @Override + public void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + // Pause using the rate limiter, if desired, to throttle the recovery + final long throttleTimeInNanos; + // always fetch the ratelimiter - it might be updated in real-time on the recovery settings + final RateLimiter rl = recoverySettings.rateLimiter(); + if (rl != null) { + long bytes = bytesSinceLastPause.addAndGet(content.length()); + if (bytes > rl.getMinPauseCheckBytes()) { + // Time to pause + bytesSinceLastPause.addAndGet(-bytes); + try { + throttleTimeInNanos = rl.pause(bytes); + onSourceThrottle.accept(throttleTimeInNanos); + } catch (IOException e) { + throw new OpenSearchException("failed to pause recovery", e); + } + } else { + throttleTimeInNanos = 0; + } + } else { + throttleTimeInNanos = 0; + } + + final long requestSeqNo = requestSeqNoGenerator.getAndIncrement(); + /* we send estimateTotalOperations with every request since we collect stats on the target and that way we can + * see how many translog ops we accumulate while copying files across the network. A future optimization + * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. + */ + final FileChunkRequest request = new FileChunkRequest( + replicationId, + requestSeqNo, + shardId, + fileMetadata, + position, + content, + lastChunk, + totalTranslogOps, + throttleTimeInNanos + ); + final Writeable.Reader reader = in -> TransportResponse.Empty.INSTANCE; + retryableTransportClient.executeRetryableAction( + action, + request, + fileChunkRequestOptions, + ActionListener.map(listener, r -> null), + reader + ); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java new file mode 100644 index 0000000000000..e95c2c6470b4b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentFileTransferHandler.java @@ -0,0 +1,239 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.ArrayUtil; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.lease.Releasable; +import org.opensearch.common.lucene.store.InputStreamIndexInput; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.MultiChunkTransfer; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteTransportException; +import org.opensearch.transport.Transports; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.IntSupplier; + +/** + * SegmentFileSender handles building and starting a {@link MultiChunkTransfer} to orchestrate sending chunks to a given targetNode. + * This class delegates to a {@link FileChunkWriter} to handle the transport of chunks. + * + * @opensearch.internal + * // TODO: make this package-private after combining recovery and replication into single package. + */ +public final class SegmentFileTransferHandler { + + private final Logger logger; + private final IndexShard shard; + private final FileChunkWriter chunkWriter; + private final ThreadPool threadPool; + private final int chunkSizeInBytes; + private final int maxConcurrentFileChunks; + private final DiscoveryNode targetNode; + private final CancellableThreads cancellableThreads; + + public SegmentFileTransferHandler( + IndexShard shard, + DiscoveryNode targetNode, + FileChunkWriter chunkWriter, + Logger logger, + ThreadPool threadPool, + CancellableThreads cancellableThreads, + int fileChunkSizeInBytes, + int maxConcurrentFileChunks + ) { + this.shard = shard; + this.targetNode = targetNode; + this.chunkWriter = chunkWriter; + this.logger = logger; + this.threadPool = threadPool; + this.cancellableThreads = cancellableThreads; + this.chunkSizeInBytes = fileChunkSizeInBytes; + // if the target is on an old version, it won't be able to handle out-of-order file chunks. + this.maxConcurrentFileChunks = maxConcurrentFileChunks; + } + + /** + * Returns a closeable {@link MultiChunkTransfer} to initiate sending a list of files. + * Callers are responsible for starting the transfer and closing the resource. + * @param store {@link Store} + * @param files {@link StoreFileMetadata[]} + * @param translogOps {@link IntSupplier} + * @param listener {@link ActionListener} + * @return {@link MultiChunkTransfer} + */ + public MultiChunkTransfer createTransfer( + Store store, + StoreFileMetadata[] files, + IntSupplier translogOps, + ActionListener listener + ) { + ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length)); // send smallest first + return new MultiChunkTransfer<>(logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) { + + final Deque buffers = new ConcurrentLinkedDeque<>(); + InputStreamIndexInput currentInput = null; + long offset = 0; + + @Override + protected void onNewResource(StoreFileMetadata md) throws IOException { + offset = 0; + IOUtils.close(currentInput, () -> currentInput = null); + final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE); + currentInput = new InputStreamIndexInput(indexInput, md.length()) { + @Override + public void close() throws IOException { + IOUtils.close(indexInput, super::close); // InputStreamIndexInput's close is a noop + } + }; + } + + private byte[] acquireBuffer() { + final byte[] buffer = buffers.pollFirst(); + if (buffer != null) { + return buffer; + } + return new byte[chunkSizeInBytes]; + } + + @Override + protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException { + assert Transports.assertNotTransportThread("read file chunk"); + cancellableThreads.checkForCancel(); + final byte[] buffer = acquireBuffer(); + final int bytesRead = currentInput.read(buffer); + if (bytesRead == -1) { + throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name()); + } + final boolean lastChunk = offset + bytesRead == md.length(); + final FileChunk chunk = new FileChunk( + md, + new BytesArray(buffer, 0, bytesRead), + offset, + lastChunk, + () -> buffers.addFirst(buffer) + ); + offset += bytesRead; + return chunk; + } + + @Override + protected void executeChunkRequest(FileChunk request, ActionListener listener1) { + cancellableThreads.checkForCancel(); + chunkWriter.writeFileChunk( + request.md, + request.position, + request.content, + request.lastChunk, + translogOps.getAsInt(), + ActionListener.runBefore(listener1, request::close) + ); + } + + @Override + protected void handleError(StoreFileMetadata md, Exception e) throws Exception { + handleErrorOnSendFiles(store, e, new StoreFileMetadata[] { md }); + } + + @Override + public void close() throws IOException { + IOUtils.close(currentInput, () -> currentInput = null); + } + }; + } + + public void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] mds) throws Exception { + final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e); + assert Transports.assertNotTransportThread(this + "[handle error on send/clean files]"); + if (corruptIndexException != null) { + Exception localException = null; + for (StoreFileMetadata md : mds) { + cancellableThreads.checkForCancel(); + logger.debug("checking integrity for file {} after remove corruption exception", md); + if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! + logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md); + if (localException == null) { + localException = corruptIndexException; + } + shard.failShard("error sending files", corruptIndexException); + } + } + if (localException != null) { + throw localException; + } else { // corruption has happened on the way to replica + RemoteTransportException remoteException = new RemoteTransportException( + "File corruption occurred on recovery but checksums are ok", + null + ); + remoteException.addSuppressed(e); + logger.warn( + () -> new ParameterizedMessage( + "{} Remote file corruption on node {}, recovering {}. local checksum OK", + shard.shardId(), + targetNode, + mds + ), + corruptIndexException + ); + throw remoteException; + } + } + throw e; + } + + /** + * A file chunk from the recovery source + * + * @opensearch.internal + */ + public static final class FileChunk implements MultiChunkTransfer.ChunkRequest, Releasable { + final StoreFileMetadata md; + final BytesReference content; + final long position; + final boolean lastChunk; + final Releasable onClose; + + FileChunk(StoreFileMetadata md, BytesReference content, long position, boolean lastChunk, Releasable onClose) { + this.md = md; + this.content = content; + this.position = position; + this.lastChunk = lastChunk; + this.onClose = onClose; + } + + @Override + public boolean lastChunk() { + return lastChunk; + } + + @Override + public void close() { + onClose.close(); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java new file mode 100644 index 0000000000000..fdabd48c62929 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java @@ -0,0 +1,170 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.logging.Loggers; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.common.util.concurrent.ListenableFuture; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.RunUnderPrimaryPermit; +import org.opensearch.indices.recovery.DelayRecoveryException; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.MultiChunkTransfer; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.Transports; + +import java.io.Closeable; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Orchestrates sending requested segment files to a target shard. + * + * @opensearch.internal + */ +class SegmentReplicationSourceHandler { + + private final IndexShard shard; + private final CopyState copyState; + private final SegmentFileTransferHandler segmentFileTransferHandler; + private final CancellableThreads cancellableThreads = new CancellableThreads(); + private final ListenableFuture future = new ListenableFuture<>(); + private final List resources = new CopyOnWriteArrayList<>(); + private final Logger logger; + private final AtomicBoolean isReplicating = new AtomicBoolean(); + + /** + * Constructor. + * + * @param targetNode - {@link DiscoveryNode} target node where files should be sent. + * @param writer {@link FileChunkWriter} implementation that sends file chunks over the transport layer. + * @param threadPool {@link ThreadPool} Thread pool. + * @param copyState {@link CopyState} CopyState holding segment file metadata. + * @param fileChunkSizeInBytes {@link Integer} + * @param maxConcurrentFileChunks {@link Integer} + */ + SegmentReplicationSourceHandler( + DiscoveryNode targetNode, + FileChunkWriter writer, + ThreadPool threadPool, + CopyState copyState, + int fileChunkSizeInBytes, + int maxConcurrentFileChunks + ) { + this.shard = copyState.getShard(); + this.logger = Loggers.getLogger( + SegmentReplicationSourceHandler.class, + copyState.getShard().shardId(), + "sending segments to " + targetNode.getName() + ); + this.segmentFileTransferHandler = new SegmentFileTransferHandler( + copyState.getShard(), + targetNode, + writer, + logger, + threadPool, + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + this.copyState = copyState; + } + + /** + * Sends Segment files from the local node to the given target. + * + * @param request {@link GetSegmentFilesRequest} request object containing list of files to be sent. + * @param listener {@link ActionListener} that completes with the list of files sent. + */ + public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListener listener) { + if (isReplicating.compareAndSet(false, true) == false) { + throw new OpenSearchException("Replication to {} is already running.", shard.shardId()); + } + future.addListener(listener, OpenSearchExecutors.newDirectExecutorService()); + final Closeable releaseResources = () -> IOUtils.close(resources); + try { + + final Consumer onFailure = e -> { + assert Transports.assertNotTransportThread(SegmentReplicationSourceHandler.this + "[onFailure]"); + IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); + }; + + RunUnderPrimaryPermit.run(() -> { + final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable(); + ShardRouting targetShardRouting = routingTable.getByAllocationId(request.getTargetAllocationId()); + if (targetShardRouting == null) { + logger.debug( + "delaying replication of {} as it is not listed as assigned to target node {}", + shard.shardId(), + request.getTargetNode() + ); + throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node"); + } + }, + shard.shardId() + " validating recovery target [" + request.getTargetAllocationId() + "] registered ", + shard, + cancellableThreads, + logger + ); + + final StepListener sendFileStep = new StepListener<>(); + Set storeFiles = new HashSet<>(Arrays.asList(shard.store().directory().listAll())); + final StoreFileMetadata[] storeFileMetadata = request.getFilesToFetch() + .stream() + .filter(file -> storeFiles.contains(file.name())) + .toArray(StoreFileMetadata[]::new); + + final MultiChunkTransfer transfer = segmentFileTransferHandler + .createTransfer(shard.store(), storeFileMetadata, () -> 0, sendFileStep); + resources.add(transfer); + transfer.start(); + + sendFileStep.whenComplete(r -> { + try { + future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata))); + } finally { + IOUtils.close(resources); + } + }, onFailure); + } catch (Exception e) { + IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e)); + } + } + + /** + * Cancels the recovery and interrupts all eligible threads. + */ + public void cancel(String reason) { + cancellableThreads.cancel(reason); + } + + CopyState getCopyState() { + return copyState; + } + + public boolean isReplicating() { + return isReplicating.get(); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index 9f70120dedd6c..d428459884f97 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -10,11 +10,20 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.index.IndexService; +import org.opensearch.action.support.ChannelActionListener; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Nullable; +import org.opensearch.common.component.AbstractLifecycleComponent; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.recovery.RetryableTransportClient; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -23,9 +32,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; /** * Service class that handles segment replication requests from replica shards. @@ -33,9 +40,12 @@ * * @opensearch.internal */ -public class SegmentReplicationSourceService { +public final class SegmentReplicationSourceService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationSourceService.class); + private final RecoverySettings recoverySettings; + private final TransportService transportService; + private final IndicesService indicesService; /** * Internal actions used by the segment replication source service on the primary shard @@ -43,20 +53,21 @@ public class SegmentReplicationSourceService { * @opensearch.internal */ public static class Actions { + public static final String GET_CHECKPOINT_INFO = "internal:index/shard/replication/get_checkpoint_info"; public static final String GET_SEGMENT_FILES = "internal:index/shard/replication/get_segment_files"; } - private final Map copyStateMap; - private final TransportService transportService; - private final IndicesService indicesService; + private final OngoingSegmentReplications ongoingSegmentReplications; - // TODO mark this as injected and bind in Node - public SegmentReplicationSourceService(TransportService transportService, IndicesService indicesService) { - copyStateMap = Collections.synchronizedMap(new HashMap<>()); + public SegmentReplicationSourceService( + IndicesService indicesService, + TransportService transportService, + RecoverySettings recoverySettings + ) { this.transportService = transportService; this.indicesService = indicesService; - + this.recoverySettings = recoverySettings; transportService.registerRequestHandler( Actions.GET_CHECKPOINT_INFO, ThreadPool.Names.GENERIC, @@ -69,14 +80,27 @@ public SegmentReplicationSourceService(TransportService transportService, Indice GetSegmentFilesRequest::new, new GetSegmentFilesRequestHandler() ); + this.ongoingSegmentReplications = new OngoingSegmentReplications(indicesService, recoverySettings); } private class CheckpointInfoRequestHandler implements TransportRequestHandler { @Override public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception { - final ReplicationCheckpoint checkpoint = request.getCheckpoint(); - logger.trace("Received request for checkpoint {}", checkpoint); - final CopyState copyState = getCachedCopyState(checkpoint); + final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = new RemoteSegmentFileChunkWriter( + request.getReplicationId(), + recoverySettings, + new RetryableTransportClient( + transportService, + request.getTargetNode(), + recoverySettings.internalActionRetryTimeout(), + logger + ), + request.getCheckpoint().getShardId(), + SegmentReplicationTargetService.Actions.FILE_CHUNK, + new AtomicLong(0), + (throttleTime) -> {} + ); + final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); channel.sendResponse( new CheckpointInfoResponse( copyState.getCheckpoint(), @@ -88,73 +112,47 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan } } - class GetSegmentFilesRequestHandler implements TransportRequestHandler { + private class GetSegmentFilesRequestHandler implements TransportRequestHandler { @Override public void messageReceived(GetSegmentFilesRequest request, TransportChannel channel, Task task) throws Exception { - if (isInCopyStateMap(request.getCheckpoint())) { - // TODO send files - } else { - // Return an empty list of files - channel.sendResponse(new GetSegmentFilesResponse(Collections.emptyList())); - } + ongoingSegmentReplications.startSegmentCopy(request, new ChannelActionListener<>(channel, Actions.GET_SEGMENT_FILES, request)); } } - /** - * Operations on the {@link #copyStateMap} member. - */ + @Override + public void clusterChanged(ClusterChangedEvent event) { + if (event.nodesRemoved()) { + for (DiscoveryNode removedNode : event.nodesDelta().removedNodes()) { + ongoingSegmentReplications.cancelReplication(removedNode); + } + } + } - /** - * A synchronized method that checks {@link #copyStateMap} for the given {@link ReplicationCheckpoint} key - * and returns the cached value if one is present. If the key is not present, a {@link CopyState} - * object is constructed and stored in the map before being returned. - */ - private synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) throws IOException { - if (isInCopyStateMap(checkpoint)) { - final CopyState copyState = fetchFromCopyStateMap(checkpoint); - copyState.incRef(); - return copyState; - } else { - // From the checkpoint's shard ID, fetch the IndexShard - ShardId shardId = checkpoint.getShardId(); - final IndexService indexService = indicesService.indexService(shardId.getIndex()); - final IndexShard indexShard = indexService.getShard(shardId.id()); - // build the CopyState object and cache it before returning - final CopyState copyState = new CopyState(indexShard); - - /** - * Use the checkpoint from the request as the key in the map, rather than - * the checkpoint from the created CopyState. This maximizes cache hits - * if replication targets make a request with an older checkpoint. - * Replication targets are expected to fetch the checkpoint in the response - * CopyState to bring themselves up to date. - */ - addToCopyStateMap(checkpoint, copyState); - return copyState; + @Override + protected void doStart() { + final ClusterService clusterService = indicesService.clusterService(); + if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + clusterService.addListener(this); } } - /** - * Adds the input {@link CopyState} object to {@link #copyStateMap}. - * The key is the CopyState's {@link ReplicationCheckpoint} object. - */ - private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) { - copyStateMap.putIfAbsent(checkpoint, copyState); + @Override + protected void doStop() { + final ClusterService clusterService = indicesService.clusterService(); + if (DiscoveryNode.isDataNode(clusterService.getSettings())) { + indicesService.clusterService().removeListener(this); + } } - /** - * Given a {@link ReplicationCheckpoint}, return the corresponding - * {@link CopyState} object, if any, from {@link #copyStateMap}. - */ - private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.get(replicationCheckpoint); + @Override + protected void doClose() throws IOException { + } - /** - * Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint} - * as a key by invoking {@link Map#containsKey(Object)}. - */ - private boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) { - return copyStateMap.containsKey(replicationCheckpoint); + @Override + public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { + if (indexShard != null) { + ongoingSegmentReplications.cancel(indexShard, "shard is closed"); + } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java index b01016d2a1e62..838c06a4785ef 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -27,7 +27,9 @@ public class SegmentReplicationState implements ReplicationState { public enum Stage { DONE((byte) 0), - INIT((byte) 1); + INIT((byte) 1), + + REPLICATING((byte) 2); private static final Stage[] STAGES = new Stage[Stage.values().length]; @@ -56,29 +58,58 @@ public static Stage fromId(byte id) { } } - public SegmentReplicationState() { - this.stage = Stage.INIT; - } - private Stage stage; + private final ReplicationLuceneIndex index; + private final ReplicationTimer timer; + + public SegmentReplicationState(ReplicationLuceneIndex index) { + stage = Stage.INIT; + this.index = index; + timer = new ReplicationTimer(); + timer.start(); + } @Override public ReplicationLuceneIndex getIndex() { - // TODO - return null; + return index; } @Override public ReplicationTimer getTimer() { - // TODO - return null; + return timer; } public Stage getStage() { return stage; } + protected void validateAndSetStage(Stage expected, Stage next) { + if (stage != expected) { + assert false : "can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"; + throw new IllegalStateException( + "can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])" + ); + } + stage = next; + } + public void setStage(Stage stage) { - this.stage = stage; + switch (stage) { + case INIT: + this.stage = Stage.INIT; + getIndex().reset(); + break; + case REPLICATING: + validateAndSetStage(Stage.INIT, stage); + getIndex().start(); + break; + case DONE: + validateAndSetStage(Stage.REPLICATING, stage); + getIndex().stop(); + getTimer().stop(); + break; + default: + throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]"); + } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7933ea5f0344b..fb68e59f3b2ef 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -8,18 +8,40 @@ package org.opensearch.indices.replication; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.BufferedChecksumIndexInput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersIndexInput; +import org.apache.lucene.store.ChecksumIndexInput; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.MultiFileWriter; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationFailedException; +import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; -import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationTarget; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** * Represents the target of a replication event. @@ -31,55 +53,52 @@ public class SegmentReplicationTarget extends ReplicationTarget { private final ReplicationCheckpoint checkpoint; private final SegmentReplicationSource source; private final SegmentReplicationState state; + protected final MultiFileWriter multiFileWriter; public SegmentReplicationTarget( ReplicationCheckpoint checkpoint, IndexShard indexShard, SegmentReplicationSource source, - SegmentReplicationTargetService.SegmentReplicationListener listener + ReplicationListener listener ) { super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); this.checkpoint = checkpoint; this.source = source; - this.state = new SegmentReplicationState(); + this.state = new SegmentReplicationState(stateIndex); + this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, getPrefix(), logger, this::ensureRefCount); } @Override protected void closeInternal() { - // TODO + try { + multiFileWriter.close(); + } finally { + super.closeInternal(); + } } @Override protected String getPrefix() { - // TODO - return null; + return "replication." + UUIDs.randomBase64UUID() + "."; } @Override protected void onDone() { - this.state.setStage(SegmentReplicationState.Stage.DONE); + state.setStage(SegmentReplicationState.Stage.DONE); } @Override - protected void onCancel(String reason) { - // TODO - } - - @Override - public ReplicationState state() { + public SegmentReplicationState state() { return state; } - @Override - public ReplicationTarget retryCopy() { - // TODO - return null; + public SegmentReplicationTarget retryCopy() { + return new SegmentReplicationTarget(checkpoint, indexShard, source, listener); } @Override public String description() { - // TODO - return null; + return "Segment replication from " + source.toString(); } @Override @@ -102,7 +121,12 @@ public void writeFileChunk( int totalTranslogOps, ActionListener listener ) { - // TODO + try { + multiFileWriter.writeFileChunk(metadata, position, content, lastChunk); + listener.onResponse(null); + } catch (Exception e) { + listener.onFailure(e); + } } /** @@ -110,6 +134,127 @@ public void writeFileChunk( * @param listener {@link ActionListener} listener. */ public void startReplication(ActionListener listener) { - // TODO + state.setStage(SegmentReplicationState.Stage.REPLICATING); + final StepListener checkpointInfoListener = new StepListener<>(); + final StepListener getFilesListener = new StepListener<>(); + final StepListener finalizeListener = new StepListener<>(); + + // Get list of files to copy from this checkpoint. + source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); + + checkpointInfoListener.whenComplete(checkpointInfo -> getFiles(checkpointInfo, getFilesListener), listener::onFailure); + getFilesListener.whenComplete( + response -> finalizeReplication(checkpointInfoListener.result(), finalizeListener), + listener::onFailure + ); + finalizeListener.whenComplete(r -> listener.onResponse(null), listener::onFailure); + } + + private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener getFilesListener) + throws IOException { + final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot(); + Store.MetadataSnapshot localMetadata = getMetadataSnapshot(); + final Store.RecoveryDiff diff = snapshot.recoveryDiff(localMetadata); + logger.debug("Replication diff {}", diff); + // Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming snapshot + // from + // source that means the local copy of the segment has been corrupted/changed in some way and we throw an IllegalStateException to + // fail the shard + if (diff.different.isEmpty() == false) { + getFilesListener.onFailure( + new IllegalStateException( + new ParameterizedMessage("Shard {} has local copies of segments that differ from the primary", indexShard.shardId()) + .getFormattedMessage() + ) + ); + } + final List filesToFetch = new ArrayList(diff.missing); + + Set storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll())); + final Set pendingDeleteFiles = checkpointInfo.getPendingDeleteFiles() + .stream() + .filter(f -> storeFiles.contains(f.name()) == false) + .collect(Collectors.toSet()); + + filesToFetch.addAll(pendingDeleteFiles); + + for (StoreFileMetadata file : filesToFetch) { + state.getIndex().addFileDetail(file.name(), file.length(), false); + } + if (filesToFetch.isEmpty()) { + getFilesListener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } else { + source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener); + } + } + + private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + multiFileWriter.renameAllTempFiles(); + final Store store = store(); + store.incRef(); + try { + // Deserialize the new SegmentInfos object sent from the primary. + final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); + SegmentInfos infos = SegmentInfos.readCommit( + store.directory(), + toIndexInput(checkpointInfoResponse.getInfosBytes()), + responseCheckpoint.getSegmentsGen() + ); + indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); + store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", store.getMetadata(infos)); + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { + // this is a fatal exception at this stage. + // this means we transferred files from the remote that have not be checksummed and they are + // broken. We have to clean up this shard entirely, remove all files and bubble it up to the + // source shard since this index might be broken there as well? The Source can handle this and checks + // its content on disk if possible. + try { + try { + store.removeCorruptionMarker(); + } finally { + Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files + } + } catch (Exception e) { + logger.debug("Failed to clean lucene index", e); + ex.addSuppressed(e); + } + ReplicationFailedException rfe = new ReplicationFailedException( + indexShard.shardId(), + "failed to clean after replication", + ex + ); + fail(rfe, true); + throw rfe; + } catch (Exception ex) { + ReplicationFailedException rfe = new ReplicationFailedException( + indexShard.shardId(), + "failed to clean after replication", + ex + ); + fail(rfe, true); + throw rfe; + } finally { + store.decRef(); + } + return null; + }); + } + + /** + * This method formats our byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be + * passed to SegmentInfos.readCommit + */ + private ChecksumIndexInput toIndexInput(byte[] input) { + return new BufferedChecksumIndexInput( + new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos") + ); + } + + Store.MetadataSnapshot getMetadataSnapshot() throws IOException { + if (indexShard.getSegmentInfosSnapshot() == null) { + return Store.MetadataSnapshot.EMPTY; + } + return store.getMetadata(indexShard.getSegmentInfosSnapshot().get()); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 1c6053a72a4c5..0c3350f224b11 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -38,7 +38,7 @@ * * @opensearch.internal */ -public final class SegmentReplicationTargetService implements IndexEventListener { +public class SegmentReplicationTargetService implements IndexEventListener { private static final Logger logger = LogManager.getLogger(SegmentReplicationTargetService.class); @@ -84,6 +84,39 @@ public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexSh } } + /** + * Invoked when a new checkpoint is received from a primary shard. + * It checks if a new checkpoint should be processed or not and starts replication if needed. + * @param receivedCheckpoint received checkpoint that is checked for processing + * @param replicaShard replica shard on which checkpoint is received + */ + public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedCheckpoint, final IndexShard replicaShard) { + if (onGoingReplications.isShardReplicating(replicaShard.shardId())) { + logger.trace( + () -> new ParameterizedMessage( + "Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", + replicaShard.getLatestReplicationCheckpoint() + ) + ); + return; + } + if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { + startReplication(receivedCheckpoint, replicaShard, new SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) {} + + @Override + public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + if (sendShardFailure == true) { + logger.error("replication failure", e); + replicaShard.failShard("replication failure", e); + } + } + }); + + } + } + public void startReplication( final ReplicationCheckpoint checkpoint, final IndexShard indexShard, @@ -139,6 +172,11 @@ public void run() { private void start(final long replicationId) { try (ReplicationRef replicationRef = onGoingReplications.get(replicationId)) { + // This check is for handling edge cases where the reference is removed before the ReplicationRunner is started by the + // threadpool. + if (replicationRef == null) { + return; + } replicationRef.get().startReplication(new ActionListener<>() { @Override public void onResponse(Void o) { diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java index b74a69971ebd5..8093b6aee88f9 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointAction.java @@ -28,6 +28,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardClosedException; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -52,6 +53,8 @@ public class PublishCheckpointAction extends TransportReplicationAction< public static final String ACTION_NAME = "indices:admin/publishCheckpoint"; protected static Logger logger = LogManager.getLogger(PublishCheckpointAction.class); + private final SegmentReplicationTargetService replicationService; + @Inject public PublishCheckpointAction( Settings settings, @@ -60,7 +63,8 @@ public PublishCheckpointAction( IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - ActionFilters actionFilters + ActionFilters actionFilters, + SegmentReplicationTargetService targetService ) { super( settings, @@ -75,6 +79,7 @@ public PublishCheckpointAction( PublishCheckpointRequest::new, ThreadPool.Names.REFRESH ); + this.replicationService = targetService; } @Override @@ -165,7 +170,7 @@ protected void shardOperationOnReplica(PublishCheckpointRequest request, IndexSh ActionListener.completeWith(listener, () -> { logger.trace("Checkpoint received on replica {}", request); if (request.getCheckpoint().getShardId().equals(replica.shardId())) { - replica.onNewCheckpoint(request); + replicationService.onNewCheckpoint(request.getCheckpoint(), replica); } return new ReplicaResult(); }); diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java index 98ab9cc4c1708..f84a65206190b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/ReplicationCheckpoint.java @@ -115,7 +115,7 @@ public int hashCode() { * Checks if other is aheadof current replication point by comparing segmentInfosVersion. Returns true for null */ public boolean isAheadOf(@Nullable ReplicationCheckpoint other) { - return other == null || segmentInfosVersion > other.getSegmentInfosVersion(); + return other == null || segmentInfosVersion > other.getSegmentInfosVersion() || primaryTerm > other.getPrimaryTerm(); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java index 2b09901a947fe..6be524cea140e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java @@ -22,6 +22,7 @@ public class SegmentReplicationCheckpointPublisher { private final PublishAction publishAction; + // This Component is behind feature flag so we are manually binding this in IndicesModule. @Inject public SegmentReplicationCheckpointPublisher(PublishCheckpointAction publishAction) { this(publishAction::publish); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 250df3481435a..c0e0b4dee2b3f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -33,14 +33,20 @@ public class CopyState extends AbstractRefCounted { private final GatedCloseable segmentInfosRef; + /** ReplicationCheckpoint requested */ + private final ReplicationCheckpoint requestedReplicationCheckpoint; + /** Actual ReplicationCheckpoint returned by the shard */ private final ReplicationCheckpoint replicationCheckpoint; private final Store.MetadataSnapshot metadataSnapshot; private final HashSet pendingDeleteFiles; private final byte[] infosBytes; private GatedCloseable commitRef; + private final IndexShard shard; - public CopyState(IndexShard shard) throws IOException { + public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShard shard) throws IOException { super("CopyState-" + shard.shardId()); + this.requestedReplicationCheckpoint = requestedReplicationCheckpoint; + this.shard = shard; this.segmentInfosRef = shard.getSegmentInfosSnapshot(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); this.metadataSnapshot = shard.store().getMetadata(segmentInfos); @@ -100,4 +106,12 @@ public byte[] getInfosBytes() { public Set getPendingDeleteFiles() { return pendingDeleteFiles; } + + public IndexShard getShard() { + return shard; + } + + public ReplicationCheckpoint getRequestedReplicationCheckpoint() { + return requestedReplicationCheckpoint; + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index b8295f0685a7f..d648ca6041ff8 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -235,6 +235,16 @@ public boolean cancelForShard(ShardId shardId, String reason) { return cancelled; } + /** + * check if a shard is currently replicating + * + * @param shardId shardId for which to check if replicating + * @return true if shard is currently replicating + */ + public boolean isShardReplicating(ShardId shardId) { + return onGoingTargetEvents.values().stream().anyMatch(t -> t.indexShard.shardId().equals(shardId)); + } + /** * a reference to {@link ReplicationTarget}, which implements {@link AutoCloseable}. closing the reference * causes {@link ReplicationTarget#decRef()} to be called. This makes sure that the underlying resources diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java new file mode 100644 index 0000000000000..afdd0ce466f9b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; + +/** + * Exception thrown if replication fails + * + * @opensearch.internal + */ +public class ReplicationFailedException extends OpenSearchException { + + public ReplicationFailedException(IndexShard shard, Throwable cause) { + this(shard, null, cause); + } + + public ReplicationFailedException(IndexShard shard, @Nullable String extraInfo, Throwable cause) { + this(shard.shardId(), extraInfo, cause); + } + + public ReplicationFailedException(ShardId shardId, @Nullable String extraInfo, Throwable cause) { + super(shardId + ": Replication failed on " + (extraInfo == null ? "" : " (" + extraInfo + ")"), cause); + } + + public ReplicationFailedException(StreamInput in) throws IOException { + super(in); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index f8dc07f122c02..27e23ceafb15e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -23,6 +23,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.recovery.RecoveryTransportRequest; @@ -50,6 +51,7 @@ public abstract class ReplicationTarget extends AbstractRefCounted { protected final AtomicBoolean finished = new AtomicBoolean(); private final ShardId shardId; protected final IndexShard indexShard; + protected final Store store; protected final ReplicationListener listener; protected final Logger logger; protected final CancellableThreads cancellableThreads; @@ -59,7 +61,9 @@ public abstract class ReplicationTarget extends AbstractRefCounted { protected abstract void onDone(); - protected abstract void onCancel(String reason); + protected void onCancel(String reason) { + cancellableThreads.cancel(reason); + } public abstract ReplicationState state(); @@ -84,9 +88,11 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn this.id = ID_GENERATOR.incrementAndGet(); this.stateIndex = stateIndex; this.indexShard = indexShard; + this.store = indexShard.store(); this.shardId = indexShard.shardId(); // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); + store.incRef(); } public long getId() { @@ -119,6 +125,11 @@ public IndexShard indexShard() { return indexShard; } + public Store store() { + ensureRefCount(); + return store; + } + public ShardId shardId() { return shardId; } @@ -266,4 +277,8 @@ public abstract void writeFileChunk( int totalTranslogOps, ActionListener listener ); + + protected void closeInternal() { + store.decRef(); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java b/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java index db8206d131c13..09b14fb1b5333 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/SegmentReplicationTransportRequest.java @@ -46,4 +46,29 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(this.targetAllocationId); targetNode.writeTo(out); } + + public long getReplicationId() { + return replicationId; + } + + public String getTargetAllocationId() { + return targetAllocationId; + } + + public DiscoveryNode getTargetNode() { + return targetNode; + } + + @Override + public String toString() { + return "SegmentReplicationTransportRequest{" + + "replicationId=" + + replicationId + + ", targetAllocationId='" + + targetAllocationId + + '\'' + + ", targetNode=" + + targetNode + + '}'; + } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7a8f7ecb87dcc..346bff9afe296 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -40,6 +40,10 @@ import org.opensearch.index.IndexingPressureService; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.threadpool.RunnableTaskExecutionListener; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.SegmentReplicationSourceFactory; +import org.opensearch.indices.replication.SegmentReplicationTargetService; +import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.watcher.ResourceWatcherService; import org.opensearch.Assertions; import org.opensearch.Build; @@ -223,6 +227,7 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toList; +import static org.opensearch.common.util.FeatureFlags.REPLICATION_TYPE; import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY; /** @@ -944,6 +949,19 @@ protected Node( .toInstance(new PeerRecoverySourceService(transportService, indicesService, recoverySettings)); b.bind(PeerRecoveryTargetService.class) .toInstance(new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService)); + if (FeatureFlags.isEnabled(REPLICATION_TYPE)) { + b.bind(SegmentReplicationTargetService.class) + .toInstance( + new SegmentReplicationTargetService( + threadPool, + recoverySettings, + transportService, + new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService) + ) + ); + b.bind(SegmentReplicationSourceService.class) + .toInstance(new SegmentReplicationSourceService(indicesService, transportService, recoverySettings)); + } } b.bind(HttpServerTransport.class).toInstance(httpServerTransport); pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p)); diff --git a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java index cd4e065985d91..5a93d7c0bd86e 100644 --- a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java @@ -86,6 +86,7 @@ import org.opensearch.indices.InvalidIndexTemplateException; import org.opensearch.indices.recovery.PeerRecoveryNotFound; import org.opensearch.indices.recovery.RecoverFilesRecoveryException; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.ingest.IngestProcessorException; import org.opensearch.cluster.coordination.NodeHealthCheckFailureException; import org.opensearch.repositories.RepositoryException; @@ -856,6 +857,7 @@ public void testIds() { ids.put(158, PeerRecoveryNotFound.class); ids.put(159, NodeHealthCheckFailureException.class); ids.put(160, NoSeedNodeLeftException.class); + ids.put(161, ReplicationFailedException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index d99bde4764adf..b6bced9f038c0 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -31,7 +31,6 @@ package org.opensearch.index.store; -import org.apache.lucene.tests.analysis.MockAnalyzer; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -51,7 +50,6 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; -import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; @@ -60,9 +58,12 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.NIOFSDirectory; -import org.apache.lucene.util.BytesRef; +import org.apache.lucene.tests.analysis.MockAnalyzer; +import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.apache.lucene.tests.util.TestUtil; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Version; +import org.hamcrest.Matchers; import org.opensearch.ExceptionsHelper; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; @@ -81,9 +82,8 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.test.DummyShardLock; -import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.IndexSettingsModule; -import org.hamcrest.Matchers; +import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -102,7 +102,6 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.unmodifiableMap; -import static org.opensearch.test.VersionUtils.randomVersion; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -114,6 +113,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.opensearch.test.VersionUtils.randomVersion; public class StoreTests extends OpenSearchTestCase { @@ -1149,4 +1149,43 @@ public void testGetMetadataWithSegmentInfos() throws IOException { assertEquals(segmentInfos.getSegmentsFileName(), metadataSnapshot.getSegmentsFile().name()); store.close(); } + + public void testcleanupAndPreserveLatestCommitPoint() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId)); + IndexWriterConfig indexWriterConfig = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec( + TestUtil.getDefaultCodec() + ); + indexWriterConfig.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); + IndexWriter writer = new IndexWriter(store.directory(), indexWriterConfig); + int docs = 1 + random().nextInt(100); + writer.commit(); + Document doc = new Document(); + doc.add(new TextField("id", "" + docs++, random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add( + new TextField( + "body", + TestUtil.randomRealisticUnicodeString(random()), + random().nextBoolean() ? Field.Store.YES : Field.Store.NO + ) + ); + doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random())))); + writer.addDocument(doc); + writer.commit(); + writer.close(); + + Store.MetadataSnapshot commitMetadata = store.getMetadata(); + + Store.MetadataSnapshot refreshMetadata = Store.MetadataSnapshot.EMPTY; + + store.cleanupAndPreserveLatestCommitPoint("test", refreshMetadata); + + // we want to ensure commitMetadata files are preserved after calling cleanup + for (String existingFile : store.directory().listAll()) { + assert (commitMetadata.contains(existingFile) == true); + } + + deleteContent(store.directory()); + IOUtils.close(store); + } } diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java index fc5c429d74b16..2b5550b71a627 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySourceHandlerTests.java @@ -94,6 +94,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.RunUnderPrimaryPermit; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.test.CorruptionUtils; import org.opensearch.test.DummyShardLock; @@ -544,21 +545,22 @@ public void writeFileChunk( }); } }; + IndexShard mockShard = mock(IndexShard.class); + when(mockShard.shardId()).thenReturn(new ShardId("testIndex", "testUUID", 0)); + doAnswer(invocation -> { + assertFalse(failedEngine.get()); + failedEngine.set(true); + return null; + }).when(mockShard).failShard(any(), any()); RecoverySourceHandler handler = new RecoverySourceHandler( - null, + mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 8), between(1, 8) - ) { - @Override - protected void failEngine(IOException cause) { - assertFalse(failedEngine.get()); - failedEngine.set(true); - } - }; + ); SetOnce sendFilesError = new SetOnce<>(); CountDownLatch latch = new CountDownLatch(1); handler.sendFiles( @@ -570,6 +572,7 @@ protected void failEngine(IOException cause) { latch.await(); assertThat(sendFilesError.get(), instanceOf(IOException.class)); assertNotNull(ExceptionsHelper.unwrapCorruption(sendFilesError.get())); + failedEngine.get(); assertTrue(failedEngine.get()); // ensure all chunk requests have been completed; otherwise some files on the target are left open. IOUtils.close(() -> terminate(threadPool), () -> threadPool = null); @@ -617,21 +620,22 @@ public void writeFileChunk( } } }; + IndexShard mockShard = mock(IndexShard.class); + when(mockShard.shardId()).thenReturn(new ShardId("testIndex", "testUUID", 0)); + doAnswer(invocation -> { + assertFalse(failedEngine.get()); + failedEngine.set(true); + return null; + }).when(mockShard).failShard(any(), any()); RecoverySourceHandler handler = new RecoverySourceHandler( - null, + mockShard, new AsyncRecoveryTarget(target, recoveryExecutor), threadPool, request, Math.toIntExact(recoverySettings.getChunkSize().getBytes()), between(1, 10), between(1, 4) - ) { - @Override - protected void failEngine(IOException cause) { - assertFalse(failedEngine.get()); - failedEngine.set(true); - } - }; + ); PlainActionFuture sendFilesFuture = new PlainActionFuture<>(); handler.sendFiles(store, metas.toArray(new StoreFileMetadata[0]), () -> 0, sendFilesFuture); Exception ex = expectThrows(Exception.class, sendFilesFuture::actionGet); @@ -747,7 +751,7 @@ public void testCancellationsDoesNotLeakPrimaryPermits() throws Exception { Thread cancelingThread = new Thread(() -> cancellableThreads.cancel("test")); cancelingThread.start(); try { - RecoverySourceHandler.runUnderPrimaryPermit(() -> {}, "test", shard, cancellableThreads, logger); + RunUnderPrimaryPermit.run(() -> {}, "test", shard, cancellableThreads, logger); } catch (CancellableThreads.ExecutionCancelledException e) { // expected. } diff --git a/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java new file mode 100644 index 0000000000000..260f6a13b5010 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java @@ -0,0 +1,231 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.junit.Assert; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class OngoingSegmentReplicationsTests extends IndexShardTestCase { + + private final IndicesService mockIndicesService = mock(IndicesService.class); + private ReplicationCheckpoint testCheckpoint; + private DiscoveryNode primaryDiscoveryNode; + private DiscoveryNode replicaDiscoveryNode; + private IndexShard primary; + private IndexShard replica; + + private GetSegmentFilesRequest getSegmentFilesRequest; + + final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); + + @Override + public void setUp() throws Exception { + super.setUp(); + primary = newStartedShard(true); + replica = newShard(primary.shardId(), false); + recoverReplica(replica, primary, true); + replicaDiscoveryNode = replica.recoveryState().getTargetNode(); + primaryDiscoveryNode = replica.recoveryState().getSourceNode(); + + ShardId testShardId = primary.shardId(); + + // This mirrors the creation of the ReplicationCheckpoint inside CopyState + testCheckpoint = new ReplicationCheckpoint( + testShardId, + primary.getOperationPrimaryTerm(), + 0L, + primary.getProcessedLocalCheckpoint(), + 0L + ); + IndexService mockIndexService = mock(IndexService.class); + when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService); + when(mockIndexService.getShard(testShardId.id())).thenReturn(primary); + + TransportService transportService = mock(TransportService.class); + when(transportService.getThreadPool()).thenReturn(threadPool); + } + + @Override + public void tearDown() throws Exception { + closeShards(primary, replica); + super.tearDown(); + } + + public void testPrepareAndSendSegments() throws IOException { + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertTrue(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(1, replications.size()); + assertEquals(1, copyState.refCount()); + + getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + new ArrayList<>(copyState.getMetadataSnapshot().asMap().values()), + testCheckpoint + ); + + final Collection expectedFiles = List.copyOf(primary.store().getMetadata().asMap().values()); + replications.startSegmentCopy(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + assertEquals(1, getSegmentFilesResponse.files.size()); + assertEquals(1, expectedFiles.size()); + assertTrue(expectedFiles.stream().findFirst().get().isSame(getSegmentFilesResponse.files.get(0))); + assertEquals(0, copyState.refCount()); + assertFalse(replications.isInCopyStateMap(request.getCheckpoint())); + assertEquals(0, replications.size()); + } + + @Override + public void onFailure(Exception e) { + logger.error("Unexpected failure", e); + Assert.fail(); + } + }); + } + + public void testCancelReplication() throws IOException { + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + primaryDiscoveryNode, + testCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + // this shouldn't be called in this test. + Assert.fail(); + }; + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertEquals(1, replications.size()); + assertEquals(1, replications.cachedCopyStateSize()); + + replications.cancelReplication(primaryDiscoveryNode); + assertEquals(0, copyState.refCount()); + assertEquals(0, replications.size()); + assertEquals(0, replications.cachedCopyStateSize()); + } + + public void testMultipleReplicasUseSameCheckpoint() throws IOException { + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + primaryDiscoveryNode, + testCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + // this shouldn't be called in this test. + Assert.fail(); + }; + + final CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertEquals(1, copyState.refCount()); + + final CheckpointInfoRequest secondRequest = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + replications.prepareForReplication(secondRequest, segmentSegmentFileChunkWriter); + + assertEquals(2, copyState.refCount()); + assertEquals(2, replications.size()); + assertEquals(1, replications.cachedCopyStateSize()); + + replications.cancelReplication(primaryDiscoveryNode); + replications.cancelReplication(replicaDiscoveryNode); + assertEquals(0, copyState.refCount()); + assertEquals(0, replications.size()); + assertEquals(0, replications.cachedCopyStateSize()); + } + + public void testStartCopyWithoutPrepareStep() { + OngoingSegmentReplications replications = new OngoingSegmentReplications(mockIndicesService, recoverySettings); + final ActionListener listener = spy(new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + assertTrue(getSegmentFilesResponse.files.isEmpty()); + } + + @Override + public void onFailure(Exception e) { + Assert.fail(); + } + }); + + getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + Collections.emptyList(), + testCheckpoint + ); + + replications.startSegmentCopy(getSegmentFilesRequest, listener); + verify(listener, times(1)).onResponse(any()); + } + + public void testShardAlreadyReplicatingToNode() throws IOException { + OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings)); + final CheckpointInfoRequest request = new CheckpointInfoRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + testCheckpoint + ); + final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> { + listener.onResponse(null); + }; + replications.prepareForReplication(request, segmentSegmentFileChunkWriter); + assertThrows(OpenSearchException.class, () -> { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); }); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java new file mode 100644 index 0000000000000..63dab4f8883e8 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentFileTransferHandlerTests.java @@ -0,0 +1,261 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFileNames; +import org.junit.Assert; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.util.CancellableThreads; +import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.recovery.MultiChunkTransfer; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.IntSupplier; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verifyNoInteractions; + +public class SegmentFileTransferHandlerTests extends IndexShardTestCase { + + private IndexShard shard; + private StoreFileMetadata[] filesToSend; + private final DiscoveryNode targetNode = new DiscoveryNode( + "foo", + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + Version.CURRENT + ); + + final int fileChunkSizeInBytes = 5000; + final int maxConcurrentFileChunks = 1; + private CancellableThreads cancellableThreads; + final IntSupplier translogOps = () -> 0; + + @Override + public void setUp() throws Exception { + super.setUp(); + cancellableThreads = new CancellableThreads(); + shard = spy(newStartedShard(true)); + filesToSend = getFilestoSend(shard); + // we should only have a Segments_N file at this point. + assertEquals(1, filesToSend.length); + } + + private StoreFileMetadata[] getFilestoSend(IndexShard shard) throws IOException { + final Store.MetadataSnapshot metadata = shard.store().getMetadata(); + return metadata.asMap().values().toArray(StoreFileMetadata[]::new); + } + + @Override + public void tearDown() throws Exception { + closeShards(shard); + super.tearDown(); + } + + public void testSendFiles_invokesChunkWriter() throws IOException, InterruptedException { + // use counDownLatch and countDown when our chunkWriter is invoked. + final CountDownLatch countDownLatch = new CountDownLatch(1); + final FileChunkWriter chunkWriter = spy(new FileChunkWriter() { + @Override + public void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + assertTrue(filesToSend[0].isSame(fileMetadata)); + assertTrue(lastChunk); + countDownLatch.countDown(); + } + }); + + SegmentFileTransferHandler handler = new SegmentFileTransferHandler( + shard, + targetNode, + chunkWriter, + logger, + shard.getThreadPool(), + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + final MultiChunkTransfer transfer = handler.createTransfer( + shard.store(), + filesToSend, + translogOps, + mock(ActionListener.class) + ); + + // start the transfer + transfer.start(); + countDownLatch.await(5, TimeUnit.SECONDS); + verify(chunkWriter, times(1)).writeFileChunk(any(), anyLong(), any(), anyBoolean(), anyInt(), any()); + IOUtils.close(transfer); + } + + public void testSendFiles_cancelThreads_beforeStart() throws IOException, InterruptedException { + final FileChunkWriter chunkWriter = spy(new FileChunkWriter() { + @Override + public void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + Assert.fail(); + } + }); + SegmentFileTransferHandler handler = new SegmentFileTransferHandler( + shard, + targetNode, + chunkWriter, + logger, + shard.getThreadPool(), + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + + final MultiChunkTransfer transfer = handler.createTransfer( + shard.store(), + filesToSend, + translogOps, + mock(ActionListener.class) + ); + + // start the transfer + cancellableThreads.cancel("test"); + transfer.start(); + verifyNoInteractions(chunkWriter); + IOUtils.close(transfer); + } + + public void testSendFiles_cancelThreads_afterStart() throws IOException, InterruptedException { + // index a doc a flush so we have more than 1 file to send. + indexDoc(shard, "_doc", "test"); + flushShard(shard, true); + filesToSend = getFilestoSend(shard); + + // we should have 4 files to send now - + // [_0.cfe, _0.si, _0.cfs, segments_3] + assertEquals(4, filesToSend.length); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + FileChunkWriter chunkWriter = spy(new FileChunkWriter() { + @Override + public void writeFileChunk( + StoreFileMetadata fileMetadata, + long position, + BytesReference content, + boolean lastChunk, + int totalTranslogOps, + ActionListener listener + ) { + // cancel the threads at this point, we'll ensure this is not invoked more than once. + cancellableThreads.cancel("test"); + listener.onResponse(null); + } + }); + SegmentFileTransferHandler handler = new SegmentFileTransferHandler( + shard, + targetNode, + chunkWriter, + logger, + shard.getThreadPool(), + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + + final MultiChunkTransfer transfer = handler.createTransfer( + shard.store(), + filesToSend, + translogOps, + new ActionListener() { + @Override + public void onResponse(Void unused) { + // do nothing here, we will just resolve in test. + } + + @Override + public void onFailure(Exception e) { + assertEquals(CancellableThreads.ExecutionCancelledException.class, e.getClass()); + countDownLatch.countDown(); + } + } + ); + + // start the transfer + transfer.start(); + countDownLatch.await(30, TimeUnit.SECONDS); + verify(chunkWriter, times(1)).writeFileChunk(any(), anyLong(), any(), anyBoolean(), anyInt(), any()); + IOUtils.close(transfer); + } + + public void testSendFiles_CorruptIndexException() throws Exception { + final CancellableThreads cancellableThreads = new CancellableThreads(); + SegmentFileTransferHandler handler = new SegmentFileTransferHandler( + shard, + targetNode, + mock(FileChunkWriter.class), + logger, + shard.getThreadPool(), + cancellableThreads, + fileChunkSizeInBytes, + maxConcurrentFileChunks + ); + final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata( + IndexFileNames.SEGMENTS, + 1L, + "0", + org.apache.lucene.util.Version.LATEST + ); + + doNothing().when(shard).failShard(anyString(), any()); + assertThrows( + CorruptIndexException.class, + () -> { + handler.handleErrorOnSendFiles( + shard.store(), + new CorruptIndexException("test", "test"), + new StoreFileMetadata[] { SEGMENTS_FILE } + ); + } + ); + + verify(shard, times(1)).failShard(any(), any()); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java new file mode 100644 index 0000000000000..70061c54d0da2 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceHandlerTests.java @@ -0,0 +1,193 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.Assert; +import org.mockito.Mockito; +import org.opensearch.OpenSearchException; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.FileChunkWriter; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.CopyState; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class SegmentReplicationSourceHandlerTests extends IndexShardTestCase { + + private final DiscoveryNode localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); + private DiscoveryNode replicaDiscoveryNode; + private IndexShard primary; + private IndexShard replica; + + private FileChunkWriter chunkWriter; + + @Override + public void setUp() throws Exception { + super.setUp(); + primary = newStartedShard(true); + replica = newShard(primary.shardId(), false); + recoverReplica(replica, primary, true); + replicaDiscoveryNode = replica.recoveryState().getTargetNode(); + } + + @Override + public void tearDown() throws Exception { + closeShards(primary, replica); + super.tearDown(); + } + + public void testSendFiles() throws IOException { + chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onResponse(null); + + final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); + final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( + localNode, + chunkWriter, + threadPool, + copyState, + 5000, + 1 + ); + + final List expectedFiles = List.copyOf(copyState.getMetadataSnapshot().asMap().values()); + + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + expectedFiles, + latestReplicationCheckpoint + ); + + handler.sendFiles(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + MatcherAssert.assertThat(getSegmentFilesResponse.files, Matchers.containsInAnyOrder(expectedFiles.toArray())); + } + + @Override + public void onFailure(Exception e) { + Assert.fail(); + } + }); + } + + public void testSendFiles_emptyRequest() throws IOException { + chunkWriter = mock(FileChunkWriter.class); + + final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); + final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( + localNode, + chunkWriter, + threadPool, + copyState, + 5000, + 1 + ); + + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + Collections.emptyList(), + latestReplicationCheckpoint + ); + + handler.sendFiles(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + assertTrue(getSegmentFilesResponse.files.isEmpty()); + Mockito.verifyNoInteractions(chunkWriter); + } + + @Override + public void onFailure(Exception e) { + Assert.fail(); + } + }); + } + + public void testSendFileFails() throws IOException { + chunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> listener.onFailure( + new OpenSearchException("Test") + ); + + final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); + final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( + localNode, + chunkWriter, + threadPool, + copyState, + 5000, + 1 + ); + + final List expectedFiles = List.copyOf(copyState.getMetadataSnapshot().asMap().values()); + + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + expectedFiles, + latestReplicationCheckpoint + ); + + handler.sendFiles(getSegmentFilesRequest, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(e.getClass(), OpenSearchException.class); + } + }); + } + + public void testReplicationAlreadyRunning() throws IOException { + chunkWriter = mock(FileChunkWriter.class); + + final ReplicationCheckpoint latestReplicationCheckpoint = primary.getLatestReplicationCheckpoint(); + final CopyState copyState = new CopyState(latestReplicationCheckpoint, primary); + SegmentReplicationSourceHandler handler = new SegmentReplicationSourceHandler( + localNode, + chunkWriter, + threadPool, + copyState, + 5000, + 1 + ); + + final GetSegmentFilesRequest getSegmentFilesRequest = new GetSegmentFilesRequest( + 1L, + replica.routingEntry().allocationId().getId(), + replicaDiscoveryNode, + Collections.emptyList(), + latestReplicationCheckpoint + ); + + handler.sendFiles(getSegmentFilesRequest, mock(ActionListener.class)); + Assert.assertThrows(OpenSearchException.class, () -> { handler.sendFiles(getSegmentFilesRequest, mock(ActionListener.class)); }); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java index 67c867d360e70..4bfdd81d50a1e 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationSourceServiceTests.java @@ -9,13 +9,16 @@ package org.opensearch.indices.replication; import org.opensearch.Version; +import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexService; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyStateTests; import org.opensearch.test.OpenSearchTestCase; @@ -35,25 +38,19 @@ public class SegmentReplicationSourceServiceTests extends OpenSearchTestCase { - private ShardId testShardId; private ReplicationCheckpoint testCheckpoint; - private IndicesService mockIndicesService; - private IndexService mockIndexService; - private IndexShard mockIndexShard; private TestThreadPool testThreadPool; - private CapturingTransport transport; private TransportService transportService; private DiscoveryNode localNode; - private SegmentReplicationSourceService segmentReplicationSourceService; @Override public void setUp() throws Exception { super.setUp(); // setup mocks - mockIndexShard = CopyStateTests.createMockIndexShard(); - testShardId = mockIndexShard.shardId(); - mockIndicesService = mock(IndicesService.class); - mockIndexService = mock(IndexService.class); + IndexShard mockIndexShard = CopyStateTests.createMockIndexShard(); + ShardId testShardId = mockIndexShard.shardId(); + IndicesService mockIndicesService = mock(IndicesService.class); + IndexService mockIndexService = mock(IndexService.class); when(mockIndicesService.indexService(testShardId.getIndex())).thenReturn(mockIndexService); when(mockIndexService.getShard(testShardId.id())).thenReturn(mockIndexShard); @@ -66,7 +63,7 @@ public void setUp() throws Exception { 0L ); testThreadPool = new TestThreadPool("test", Settings.EMPTY); - transport = new CapturingTransport(); + CapturingTransport transport = new CapturingTransport(); localNode = new DiscoveryNode("local", buildNewFakeTransportAddress(), Version.CURRENT); transportService = transport.createTransportService( Settings.EMPTY, @@ -78,7 +75,16 @@ public void setUp() throws Exception { ); transportService.start(); transportService.acceptIncomingRequests(); - segmentReplicationSourceService = new SegmentReplicationSourceService(transportService, mockIndicesService); + + final Settings settings = Settings.builder().put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName()).build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); + + SegmentReplicationSourceService segmentReplicationSourceService = new SegmentReplicationSourceService( + mockIndicesService, + transportService, + recoverySettings + ); } @Override @@ -88,7 +94,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testGetSegmentFiles_EmptyResponse() { + public void testGetSegmentFiles() { final GetSegmentFilesRequest request = new GetSegmentFilesRequest( 1, "allocationId", @@ -96,19 +102,52 @@ public void testGetSegmentFiles_EmptyResponse() { Collections.emptyList(), testCheckpoint ); + executeGetSegmentFiles(request, new ActionListener<>() { + @Override + public void onResponse(GetSegmentFilesResponse response) { + assertEquals(0, response.files.size()); + } + + @Override + public void onFailure(Exception e) { + fail("unexpected exception: " + e); + } + }); + } + + public void testCheckpointInfo() { + executeGetCheckpointInfo(new ActionListener<>() { + @Override + public void onResponse(CheckpointInfoResponse response) { + assertEquals(testCheckpoint, response.getCheckpoint()); + assertNotNull(response.getInfosBytes()); + // CopyStateTests sets up one pending delete file and one committed segments file + assertEquals(1, response.getPendingDeleteFiles().size()); + assertEquals(1, response.getSnapshot().size()); + } + + @Override + public void onFailure(Exception e) { + fail("unexpected exception: " + e); + } + }); + } + + private void executeGetCheckpointInfo(ActionListener listener) { + final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, "testAllocationId", localNode, testCheckpoint); transportService.sendRequest( localNode, - SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, + SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, request, - new TransportResponseHandler() { + new TransportResponseHandler() { @Override - public void handleResponse(GetSegmentFilesResponse response) { - assertEquals(0, response.files.size()); + public void handleResponse(CheckpointInfoResponse response) { + listener.onResponse(response); } @Override public void handleException(TransportException e) { - fail("unexpected exception: " + e); + listener.onFailure(e); } @Override @@ -117,32 +156,27 @@ public String executor() { } @Override - public GetSegmentFilesResponse read(StreamInput in) throws IOException { - return new GetSegmentFilesResponse(in); + public CheckpointInfoResponse read(StreamInput in) throws IOException { + return new CheckpointInfoResponse(in); } } ); } - public void testCheckpointInfo() { - final CheckpointInfoRequest request = new CheckpointInfoRequest(1L, "testAllocationId", localNode, testCheckpoint); + private void executeGetSegmentFiles(GetSegmentFilesRequest request, ActionListener listener) { transportService.sendRequest( localNode, - SegmentReplicationSourceService.Actions.GET_CHECKPOINT_INFO, + SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES, request, - new TransportResponseHandler() { + new TransportResponseHandler() { @Override - public void handleResponse(CheckpointInfoResponse response) { - assertEquals(testCheckpoint, response.getCheckpoint()); - assertNotNull(response.getInfosBytes()); - // CopyStateTests sets up one pending delete file and one committed segments file - assertEquals(1, response.getPendingDeleteFiles().size()); - assertEquals(1, response.getSnapshot().size()); + public void handleResponse(GetSegmentFilesResponse response) { + listener.onResponse(response); } @Override public void handleException(TransportException e) { - fail("unexpected exception: " + e); + listener.onFailure(e); } @Override @@ -151,11 +185,10 @@ public String executor() { } @Override - public CheckpointInfoResponse read(StreamInput in) throws IOException { - return new CheckpointInfoResponse(in); + public GetSegmentFilesResponse read(StreamInput in) throws IOException { + return new GetSegmentFilesResponse(in); } } ); } - } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 7b23a03917f93..1d089c78159a6 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -9,6 +9,7 @@ package org.opensearch.indices.replication; import org.junit.Assert; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; @@ -18,15 +19,22 @@ import org.opensearch.index.shard.IndexShardTestCase; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.transport.TransportService; import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.eq; public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { @@ -35,6 +43,9 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { private SegmentReplicationSource replicationSource; private SegmentReplicationTargetService sut; + private ReplicationCheckpoint initialCheckpoint; + private ReplicationCheckpoint aheadCheckpoint; + @Override public void setUp() throws Exception { super.setUp(); @@ -42,13 +53,21 @@ public void setUp() throws Exception { final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); final TransportService transportService = mock(TransportService.class); - indexShard = newShard(false, settings); + indexShard = newStartedShard(false, settings); checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 0L, 0L, 0L, 0L); SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class); replicationSource = mock(SegmentReplicationSource.class); when(replicationSourceFactory.get(indexShard)).thenReturn(replicationSource); sut = new SegmentReplicationTargetService(threadPool, recoverySettings, transportService, replicationSourceFactory); + initialCheckpoint = indexShard.getLatestReplicationCheckpoint(); + aheadCheckpoint = new ReplicationCheckpoint( + initialCheckpoint.getShardId(), + initialCheckpoint.getPrimaryTerm(), + initialCheckpoint.getSegmentsGen(), + initialCheckpoint.getSeqNo(), + initialCheckpoint.getSegmentInfosVersion() + 1 + ); } @Override @@ -57,7 +76,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testTargetReturnsSuccess_listenerCompletes() throws IOException { + public void testTargetReturnsSuccess_listenerCompletes() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, indexShard, @@ -76,15 +95,16 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept ); final SegmentReplicationTarget spy = Mockito.spy(target); doAnswer(invocation -> { + // setting stage to REPLICATING so transition in markAsDone succeeds on listener completion + target.state().setStage(SegmentReplicationState.Stage.REPLICATING); final ActionListener listener = invocation.getArgument(0); listener.onResponse(null); return null; }).when(spy).startReplication(any()); sut.startReplication(spy); - closeShards(indexShard); } - public void testTargetThrowsException() throws IOException { + public void testTargetThrowsException() { final OpenSearchException expectedError = new OpenSearchException("Fail"); final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, @@ -98,7 +118,7 @@ public void onReplicationDone(SegmentReplicationState state) { @Override public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { - assertEquals(SegmentReplicationState.Stage.INIT, state.getStage()); + assertEquals(SegmentReplicationState.Stage.REPLICATING, state.getStage()); assertEquals(expectedError, e.getCause()); assertTrue(sendShardFailure); } @@ -106,15 +126,84 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept ); final SegmentReplicationTarget spy = Mockito.spy(target); doAnswer(invocation -> { + // setting stage to REPLICATING so transition in markAsDone succeeds on listener completion + target.state().setStage(SegmentReplicationState.Stage.REPLICATING); final ActionListener listener = invocation.getArgument(0); listener.onFailure(expectedError); return null; }).when(spy).startReplication(any()); sut.startReplication(spy); - closeShards(indexShard); } - public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOException { + public void testAlreadyOnNewCheckpoint() { + SegmentReplicationTargetService spy = spy(sut); + spy.onNewCheckpoint(indexShard.getLatestReplicationCheckpoint(), indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + } + + public void testShardAlreadyReplicating() throws InterruptedException { + // Create a spy of Target Service so that we can verify invocation of startReplication call with specific checkpoint on it. + SegmentReplicationTargetService serviceSpy = spy(sut); + final SegmentReplicationTarget target = new SegmentReplicationTarget( + checkpoint, + indexShard, + replicationSource, + mock(SegmentReplicationTargetService.SegmentReplicationListener.class) + ); + // Create a Mockito spy of target to stub response of few method calls. + final SegmentReplicationTarget targetSpy = Mockito.spy(target); + CountDownLatch latch = new CountDownLatch(1); + // Mocking response when startReplication is called on targetSpy we send a new checkpoint to serviceSpy and later reduce countdown + // of latch. + doAnswer(invocation -> { + final ActionListener listener = invocation.getArgument(0); + // a new checkpoint arrives before we've completed. + serviceSpy.onNewCheckpoint(aheadCheckpoint, indexShard); + listener.onResponse(null); + latch.countDown(); + return null; + }).when(targetSpy).startReplication(any()); + doNothing().when(targetSpy).onDone(); + + // start replication of this shard the first time. + serviceSpy.startReplication(targetSpy); + + // wait for the new checkpoint to arrive, before the listener completes. + latch.await(30, TimeUnit.SECONDS); + verify(serviceSpy, times(0)).startReplication(eq(aheadCheckpoint), eq(indexShard), any()); + } + + public void testNewCheckpointBehindCurrentCheckpoint() { + SegmentReplicationTargetService spy = spy(sut); + spy.onNewCheckpoint(checkpoint, indexShard); + verify(spy, times(0)).startReplication(any(), any(), any()); + } + + public void testShardNotStarted() throws IOException { + SegmentReplicationTargetService spy = spy(sut); + IndexShard shard = newShard(false); + spy.onNewCheckpoint(checkpoint, shard); + verify(spy, times(0)).startReplication(any(), any(), any()); + closeShards(shard); + } + + public void testNewCheckpoint_validationPassesAndReplicationFails() throws IOException { + allowShardFailures(); + SegmentReplicationTargetService spy = spy(sut); + IndexShard spyShard = spy(indexShard); + ArgumentCaptor captor = ArgumentCaptor.forClass( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + doNothing().when(spy).startReplication(any(), any(), any()); + spy.onNewCheckpoint(aheadCheckpoint, spyShard); + verify(spy, times(1)).startReplication(any(), any(), captor.capture()); + SegmentReplicationTargetService.SegmentReplicationListener listener = captor.getValue(); + listener.onFailure(new SegmentReplicationState(new ReplicationLuceneIndex()), new OpenSearchException("testing"), true); + verify(spyShard).failShard(any(), any()); + closeShard(indexShard, false); + } + + public void testBeforeIndexShardClosed_CancelsOngoingReplications() { final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, indexShard, @@ -124,7 +213,6 @@ public void testBeforeIndexShardClosed_CancelsOngoingReplications() throws IOExc final SegmentReplicationTarget spy = Mockito.spy(target); sut.startReplication(spy); sut.beforeIndexShardClosed(indexShard.shardId(), indexShard, Settings.EMPTY); - Mockito.verify(spy, times(1)).cancel(any()); - closeShards(indexShard); + verify(spy, times(1)).cancel(any()); } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java new file mode 100644 index 0000000000000..a0944ee249859 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -0,0 +1,370 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.ByteBuffersDataOutput; +import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.apache.lucene.util.Version; +import org.junit.Assert; +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationType; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SegmentReplicationTargetTests extends IndexShardTestCase { + + private SegmentReplicationTarget segrepTarget; + private IndexShard indexShard, spyIndexShard; + private ReplicationCheckpoint repCheckpoint; + private ByteBuffersDataOutput buffer; + + private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); + private static final StoreFileMetadata SEGMENTS_FILE_DIFF = new StoreFileMetadata( + IndexFileNames.SEGMENTS, + 5L, + "different", + Version.LATEST + ); + private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); + + private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot( + Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE), + null, + 0 + ); + + private static final Store.MetadataSnapshot SI_SNAPSHOT_DIFFERENT = new Store.MetadataSnapshot( + Map.of(SEGMENTS_FILE_DIFF.name(), SEGMENTS_FILE_DIFF), + null, + 0 + ); + + SegmentInfos testSegmentInfos; + + @Override + public void setUp() throws Exception { + + super.setUp(); + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + + indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); + spyIndexShard = spy(indexShard); + + Mockito.doNothing().when(spyIndexShard).finalizeReplication(any(SegmentInfos.class), anyLong()); + testSegmentInfos = spyIndexShard.store().readLastCommittedSegmentsInfo(); + buffer = new ByteBuffersDataOutput(); + try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { + testSegmentInfos.write(indexOutput); + } + repCheckpoint = new ReplicationCheckpoint( + spyIndexShard.shardId(), + spyIndexShard.getPendingPrimaryTerm(), + testSegmentInfos.getGeneration(), + spyIndexShard.seqNoStats().getLocalCheckpoint(), + testSegmentInfos.version + ); + } + + public void testSuccessfulResponse_startReplication() { + + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + assertEquals(filesToFetch.size(), 2); + assert (filesToFetch.contains(SEGMENTS_FILE)); + assert (filesToFetch.contains(PENDING_DELETE_FILE)); + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + try { + verify(spyIndexShard, times(1)).finalizeReplication(any(), anyLong()); + } catch (IOException ex) { + Assert.fail(); + } + } + + @Override + public void onFailure(Exception e) { + logger.error("Unexpected test error", e); + Assert.fail(); + } + }); + } + + public void testFailureResponse_getCheckpointMetadata() { + + Exception exception = new Exception("dummy failure"); + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onFailure(exception); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(exception, e.getCause().getCause()); + } + }); + } + + public void testFailureResponse_getSegmentFiles() { + + Exception exception = new Exception("dummy failure"); + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onFailure(exception); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(exception, e.getCause().getCause()); + } + }); + } + + public void testFailure_finalizeReplication_IOException() throws IOException { + + IOException exception = new IOException("dummy failure"); + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + doThrow(exception).when(spyIndexShard).finalizeReplication(any(), anyLong()); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(exception, e.getCause()); + } + }); + } + + public void testFailure_finalizeReplication_IndexFormatException() throws IOException { + + IndexFormatTooNewException exception = new IndexFormatTooNewException("string", 1, 2, 1); + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + doThrow(exception).when(spyIndexShard).finalizeReplication(any(), anyLong()); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(exception, e.getCause()); + } + }); + } + + public void testFailure_differentSegmentFiles() throws IOException { + + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); + when(segrepTarget.getMetadataSnapshot()).thenReturn(SI_SNAPSHOT_DIFFERENT); + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assert (e instanceof IllegalStateException); + } + }); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + segrepTarget.markAsDone(); + closeShards(spyIndexShard, indexShard); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java index b54237a130431..f1f55c67bfca6 100644 --- a/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/checkpoint/PublishCheckpointActionTests.java @@ -22,7 +22,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndicesService; -import org.opensearch.indices.recovery.RecoverySettings; +import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.transport.CapturingTransport; import org.opensearch.threadpool.TestThreadPool; @@ -36,6 +36,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; import static org.opensearch.test.ClusterServiceUtils.createClusterService; public class PublishCheckpointActionTests extends OpenSearchTestCase { @@ -75,7 +76,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testPublishCheckpointActionOnPrimary() throws InterruptedException { + public void testPublishCheckpointActionOnPrimary() { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -89,7 +90,7 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException { final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); + final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, @@ -98,7 +99,8 @@ public void testPublishCheckpointActionOnPrimary() throws InterruptedException { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet()) + new ActionFilters(Collections.emptySet()), + mockTargetService ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); @@ -118,7 +120,6 @@ public void testPublishCheckpointActionOnReplica() { final Index index = new Index("index", "uuid"); final IndexService indexService = mock(IndexService.class); when(indicesService.indexServiceSafe(index)).thenReturn(indexService); - final int id = randomIntBetween(0, 4); final IndexShard indexShard = mock(IndexShard.class); when(indexService.getShard(id)).thenReturn(indexShard); @@ -126,7 +127,7 @@ public void testPublishCheckpointActionOnReplica() { final ShardId shardId = new ShardId(index, id); when(indexShard.shardId()).thenReturn(shardId); - final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, clusterService.getClusterSettings()); + final SegmentReplicationTargetService mockTargetService = mock(SegmentReplicationTargetService.class); final PublishCheckpointAction action = new PublishCheckpointAction( Settings.EMPTY, @@ -135,7 +136,8 @@ public void testPublishCheckpointActionOnReplica() { indicesService, threadPool, shardStateAction, - new ActionFilters(Collections.emptySet()) + new ActionFilters(Collections.emptySet()), + mockTargetService ); final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint(indexShard.shardId(), 1111, 111, 11, 1); @@ -147,7 +149,7 @@ public void testPublishCheckpointActionOnReplica() { final TransportReplicationAction.ReplicaResult result = listener.actionGet(); // onNewCheckpoint should be called on shard with checkpoint request - verify(indexShard).onNewCheckpoint(request); + verify(mockTargetService, times(1)).onNewCheckpoint(checkpoint, indexShard); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index afa38afb0cf2f..a6f0cf7e98411 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -47,7 +47,15 @@ public class CopyStateTests extends IndexShardTestCase { ); public void testCopyStateCreation() throws IOException { - CopyState copyState = new CopyState(createMockIndexShard()); + final IndexShard mockIndexShard = createMockIndexShard(); + ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint( + mockIndexShard.shardId(), + mockIndexShard.getOperationPrimaryTerm(), + 0L, + mockIndexShard.getProcessedLocalCheckpoint(), + 0L + ); + CopyState copyState = new CopyState(testCheckpoint, mockIndexShard); ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); // version was never set so this should be zero