Skip to content

Commit

Permalink
Change RecoverySourceHandler to use SegmentFileTransferHandler as a c…
Browse files Browse the repository at this point in the history
…omponent over inheritence.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Jun 8, 2022
1 parent 2387cd2 commit 45f8f7e
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.indices.recovery;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooNewException;
Expand Down Expand Up @@ -85,6 +86,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -105,8 +107,9 @@
*
* @opensearch.internal
*/
public class RecoverySourceHandler extends SegmentFileTransferHandler {
public class RecoverySourceHandler {

protected final Logger logger;
// Shard that is going to be recovered (the "source")
private final IndexShard shard;
private final int shardId;
Expand All @@ -116,8 +119,11 @@ public class RecoverySourceHandler extends SegmentFileTransferHandler {
private final RecoveryTargetHandler recoveryTarget;
private final int maxConcurrentOperations;
private final ThreadPool threadPool;
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final ListenableFuture<RecoveryResponse> future = new ListenableFuture<>();
public static final String PEER_RECOVERY_NAME = "peer-recovery";
private final SegmentFileTransferHandler transferHandler;

public RecoverySourceHandler(
IndexShard shard,
Expand All @@ -128,12 +134,15 @@ public RecoverySourceHandler(
int maxConcurrentFileChunks,
int maxConcurrentOperations
) {
super(
this.logger = Loggers.getLogger(RecoverySourceHandler.class, request.shardId(), "recover to " + request.targetNode().getName());
this.transferHandler = new SegmentFileTransferHandler(
shard,
request.targetNode(),
recoveryTarget,
Loggers.getLogger(RecoverySourceHandler.class, request.shardId(), "recover to " + request.targetNode().getName()),
logger,
threadPool,
cancellableThreads,
this::failEngine,
fileChunkSizeInBytes,
maxConcurrentFileChunks
);
Expand Down Expand Up @@ -654,6 +663,17 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A
}
}

void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener<Void> listener) {
final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = transferHandler.sendFiles(
store,
files,
translogOps,
listener
);
resources.add(transfer);
transfer.start();
}

void createRetentionLease(final long startingSeqNo, ActionListener<RetentionLease> listener) {
RunUnderPrimaryPermit.run(() -> {
// Clone the peer recovery retention lease belonging to the source shard. We are retaining history between the the local
Expand Down Expand Up @@ -1025,9 +1045,13 @@ private void cleanFiles(
ActionListener.delegateResponse(listener, (l, e) -> ActionListener.completeWith(l, () -> {
StoreFileMetadata[] mds = StreamSupport.stream(sourceMetadata.spliterator(), false).toArray(StoreFileMetadata[]::new);
ArrayUtil.timSort(mds, Comparator.comparingLong(StoreFileMetadata::length)); // check small files first
handleErrorOnSendFiles(store, e, mds);
transferHandler.handleErrorOnSendFiles(store, e, mds);
throw e;
}))
);
}

protected void failEngine(IOException cause) {
shard.failShard("recovery", cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,18 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.MultiChunkTransfer;
import org.opensearch.indices.recovery.FileChunkWriter;
import org.opensearch.indices.recovery.MultiChunkTransfer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.RemoteTransportException;
import org.opensearch.transport.Transports;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.IntSupplier;

/**
Expand All @@ -49,24 +47,26 @@
* @opensearch.internal
* // TODO: make this package-private after combining recovery and replication into single package.
*/
public class SegmentFileTransferHandler {
public final class SegmentFileTransferHandler {

protected final Logger logger;
protected final CancellableThreads cancellableThreads = new CancellableThreads();
protected final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final Logger logger;
private final IndexShard shard;
private final FileChunkWriter chunkWriter;
private final ThreadPool threadPool;
private final int chunkSizeInBytes;
private final int maxConcurrentFileChunks;
private final DiscoveryNode targetNode;
private final CancellableThreads cancellableThreads;
private final Consumer<IOException> onCorruptException;

public SegmentFileTransferHandler(
IndexShard shard,
DiscoveryNode targetNode,
FileChunkWriter chunkWriter,
Logger logger,
ThreadPool threadPool,
CancellableThreads cancellableThreads,
Consumer<IOException> onCorruptException,
int fileChunkSizeInBytes,
int maxConcurrentFileChunks
) {
Expand All @@ -75,21 +75,30 @@ public SegmentFileTransferHandler(
this.chunkWriter = chunkWriter;
this.logger = logger;
this.threadPool = threadPool;
this.cancellableThreads = cancellableThreads;
this.onCorruptException = onCorruptException;
this.chunkSizeInBytes = fileChunkSizeInBytes;
// if the target is on an old version, it won't be able to handle out-of-order file chunks.
this.maxConcurrentFileChunks = maxConcurrentFileChunks;
}

public final void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener<Void> listener) {
/**
* Returns a closeable {@link MultiChunkTransfer} to initiate sending a list of files.
* Callers are responsible for starting the transfer and closing the resource.
* @param store {@link Store}
* @param files {@link StoreFileMetadata[]}
* @param translogOps {@link IntSupplier}
* @param listener {@link ActionListener}
* @return {@link MultiChunkTransfer}
*/
public MultiChunkTransfer<StoreFileMetadata, FileChunk> sendFiles(
Store store,
StoreFileMetadata[] files,
IntSupplier translogOps,
ActionListener<Void> listener
) {
ArrayUtil.timSort(files, Comparator.comparingLong(StoreFileMetadata::length)); // send smallest first

final MultiChunkTransfer<StoreFileMetadata, FileChunk> multiFileSender = new MultiChunkTransfer<>(
logger,
threadPool.getThreadContext(),
listener,
maxConcurrentFileChunks,
Arrays.asList(files)
) {
return new MultiChunkTransfer<>(logger, threadPool.getThreadContext(), listener, maxConcurrentFileChunks, Arrays.asList(files)) {

final Deque<byte[]> buffers = new ConcurrentLinkedDeque<>();
InputStreamIndexInput currentInput = null;
Expand Down Expand Up @@ -138,15 +147,15 @@ protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException {
}

@Override
protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener) {
protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener1) {
cancellableThreads.checkForCancel();
chunkWriter.writeFileChunk(
request.md,
request.position,
request.content,
request.lastChunk,
translogOps.getAsInt(),
ActionListener.runBefore(listener, request::close)
ActionListener.runBefore(listener1, request::close)
);
}

Expand All @@ -160,11 +169,9 @@ public void close() throws IOException {
IOUtils.close(currentInput, () -> currentInput = null);
}
};
resources.add(multiFileSender);
multiFileSender.start();
}

public final void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] mds) throws Exception {
public void handleErrorOnSendFiles(Store store, Exception e, StoreFileMetadata[] mds) throws Exception {
final IOException corruptIndexException = ExceptionsHelper.unwrapCorruption(e);
assert Transports.assertNotTransportThread(this + "[handle error on send/clean files]");
if (corruptIndexException != null) {
Expand All @@ -177,7 +184,7 @@ public final void handleErrorOnSendFiles(Store store, Exception e, StoreFileMeta
if (localException == null) {
localException = corruptIndexException;
}
failEngine(corruptIndexException);
onCorruptException.accept(corruptIndexException);
}
}
if (localException != null) {
Expand All @@ -203,10 +210,6 @@ public final void handleErrorOnSendFiles(Store store, Exception e, StoreFileMeta
throw e;
}

protected void failEngine(IOException cause) {
shard.failShard("recovery", cause);
}

/**
* A file chunk from the recovery source
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.util.concurrent.ListenableFuture;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.internal.io.IOUtils;
Expand All @@ -23,11 +24,13 @@
import org.opensearch.indices.RunUnderPrimaryPermit;
import org.opensearch.indices.recovery.DelayRecoveryException;
import org.opensearch.indices.recovery.FileChunkWriter;
import org.opensearch.indices.recovery.MultiChunkTransfer;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transports;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
Expand All @@ -42,11 +45,12 @@
*/
class SegmentReplicationSourceHandler {

protected final IndexShard shard;
private final IndexShard shard;
private final CopyState copyState;
private final SegmentFileTransferHandler segmentFileTransferHandler;
protected final ListenableFuture<GetSegmentFilesResponse> future = new ListenableFuture<>();
protected final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final ListenableFuture<GetSegmentFilesResponse> future = new ListenableFuture<>();
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final Logger logger;

/**
Expand Down Expand Up @@ -79,6 +83,8 @@ class SegmentReplicationSourceHandler {
writer,
logger,
threadPool,
cancellableThreads,
this::failEngine,
fileChunkSizeInBytes,
maxConcurrentFileChunks
);
Expand Down Expand Up @@ -115,7 +121,7 @@ public void sendFiles(GetSegmentFilesRequest request, ActionListener<GetSegmentF
},
shard.shardId() + " validating recovery target [" + request.getTargetAllocationId() + "] registered ",
shard,
segmentFileTransferHandler.cancellableThreads,
cancellableThreads,
logger
);

Expand All @@ -126,8 +132,10 @@ public void sendFiles(GetSegmentFilesRequest request, ActionListener<GetSegmentF
.filter(file -> storeFiles.contains(file.name()))
.toArray(StoreFileMetadata[]::new);

segmentFileTransferHandler.sendFiles(shard.store(), storeFileMetadata, () -> 0, sendFileStep);
resources.addAll(segmentFileTransferHandler.resources);
final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = segmentFileTransferHandler
.sendFiles(shard.store(), storeFileMetadata, () -> 0, sendFileStep);
resources.add(transfer);
transfer.start();

sendFileStep.whenComplete(r -> {
try {
Expand All @@ -145,7 +153,11 @@ public void sendFiles(GetSegmentFilesRequest request, ActionListener<GetSegmentF
* Cancels the recovery and interrupts all eligible threads.
*/
public void cancel(String reason) {
segmentFileTransferHandler.cancellableThreads.cancel(reason);
cancellableThreads.cancel(reason);
}

private void failEngine(IOException e) {
shard.failShard("Failed Replication", e);
}

CopyState getCopyState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ protected void failEngine(IOException cause) {
latch.await();
assertThat(sendFilesError.get(), instanceOf(IOException.class));
assertNotNull(ExceptionsHelper.unwrapCorruption(sendFilesError.get()));
failedEngine.get();
assertTrue(failedEngine.get());
// ensure all chunk requests have been completed; otherwise some files on the target are left open.
IOUtils.close(() -> terminate(threadPool), () -> threadPool = null);
Expand Down

0 comments on commit 45f8f7e

Please sign in to comment.