Skip to content

Commit

Permalink
Address PR feedback:
Browse files Browse the repository at this point in the history
Correct synchronization in OngoingSegmentReplications.
Throw when already replicating to a specific replica.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Jun 15, 2022
1 parent de6b554 commit 990246d
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A
}

void sendFiles(Store store, StoreFileMetadata[] files, IntSupplier translogOps, ActionListener<Void> listener) {
final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = transferHandler.sendFiles(
final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = transferHandler.createTransfer(
store,
files,
translogOps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@

package org.opensearch.indices.replication;

import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.ShardId;
Expand All @@ -26,6 +28,8 @@

/**
* Manages references to ongoing segrep events on a node.
* Each replica will have a new {@link SegmentReplicationSourceHandler} created when starting replication.
* CopyStates will be cached for reuse between replicas and only released when all replicas have finished copying segments.
*
* @opensearch.internal
*/
Expand All @@ -38,14 +42,15 @@ class OngoingSegmentReplications {

/**
* Constructor.
* @param indicesService {@link IndicesService}
*
* @param indicesService {@link IndicesService}
* @param recoverySettings {@link RecoverySettings}
*/
OngoingSegmentReplications(IndicesService indicesService, RecoverySettings recoverySettings) {
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
this.copyStateMap = Collections.synchronizedMap(new HashMap<>());
this.nodesToHandlers = Collections.synchronizedMap(new HashMap<>());
this.nodesToHandlers = ConcurrentCollections.newConcurrentMap();
}

/**
Expand Down Expand Up @@ -84,57 +89,29 @@ synchronized CopyState getCachedCopyState(ReplicationCheckpoint checkpoint) thro
}
}

void cancelReplication(DiscoveryNode node) {
if (nodesToHandlers.containsKey(node)) {
final SegmentReplicationSourceHandler handler = nodesToHandlers.remove(node);
handler.cancel("Cancel on node left");
removeCopyState(handler.getCopyState());
}
}

SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) {
return new SegmentReplicationSourceHandler(
node,
fileChunkWriter,
copyState.getShard().getThreadPool(),
copyState,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
recoverySettings.getMaxConcurrentFileChunks()
);
}

/**
* Adds the input {@link CopyState} object to {@link #copyStateMap}.
* The key is the CopyState's {@link ReplicationCheckpoint} object.
*/
private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) {
copyStateMap.putIfAbsent(checkpoint, copyState);
}

/**
* Given a {@link ReplicationCheckpoint}, return the corresponding
* {@link CopyState} object, if any, from {@link #copyStateMap}.
*/
private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
return copyStateMap.get(replicationCheckpoint);
}

/**
* Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint}
* as a key by invoking {@link Map#containsKey(Object)}.
* Start sending files to the replica.
*
* @param request {@link GetSegmentFilesRequest}
* @param listener {@link ActionListener} that resolves when sending files is complete.
*/
boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
return copyStateMap.containsKey(replicationCheckpoint);
}

void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
final DiscoveryNode node = request.getTargetNode();
if (nodesToHandlers.containsKey(node)) {
final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node);
final SegmentReplicationSourceHandler handler = nodesToHandlers.get(node);
if (handler != null) {
if (handler.isActive()) {
throw new OpenSearchException(
"Replication to shard {}, on node {} has already started",
request.getCheckpoint().getShardId(),
request.getTargetNode()
);
}
// update the given listener to release the CopyState before it resolves.
final ActionListener<GetSegmentFilesResponse> wrappedListener = ActionListener.runBefore(listener, () -> {
final SegmentReplicationSourceHandler sourceHandler = nodesToHandlers.remove(node);
removeCopyState(sourceHandler.getCopyState());
if (sourceHandler != null) {
removeCopyState(sourceHandler.getCopyState());
}
});
handler.sendFiles(request, wrappedListener);
} else {
Expand All @@ -143,14 +120,15 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
}

/**
* Remove a CopyState. Intended to be called after a replication event completes.
* This method will remove a copyState from the copyStateMap only if its refCount hits 0.
* @param copyState {@link CopyState}
* Cancel any ongoing replications for a given {@link DiscoveryNode}
*
* @param node {@link DiscoveryNode} node for which to cancel replication events.
*/
private synchronized void removeCopyState(CopyState copyState) {
copyState.decRef();
if (copyState.refCount() <= 0) {
copyStateMap.remove(copyState.getRequestedReplicationCheckpoint());
void cancelReplication(DiscoveryNode node) {
final SegmentReplicationSourceHandler handler = nodesToHandlers.remove(node);
if (handler != null) {
handler.cancel("Cancel on node left");
removeCopyState(handler.getCopyState());
}
}

Expand All @@ -159,18 +137,48 @@ private synchronized void removeCopyState(CopyState copyState) {
* nodes's store. This state is intended to be sent back to Replicas before copy is initiated so the replica can perform a diff against its
* local store. It will then build a handler to orchestrate the segment copy that will be stored locally and started on a subsequent request from replicas
* with the list of required files.
* @param request {@link CheckpointInfoRequest}
*
* @param request {@link CheckpointInfoRequest}
* @param fileChunkWriter {@link FileChunkWriter} writer to handle sending files over the transport layer.
* @return {@link CopyState} the built CopyState for this replication event.
* @throws IOException - When there is an IO error building CopyState.
*/
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
final SegmentReplicationSourceHandler handler = createTargetHandler(request.getTargetNode(), copyState, fileChunkWriter);
nodesToHandlers.putIfAbsent(request.getTargetNode(), handler);
if (nodesToHandlers.containsKey(request.getTargetNode())) {
throw new OpenSearchException(
"Shard copy {} on node {} already replicating",
request.getCheckpoint().getShardId(),
request.getTargetNode()
);
}
nodesToHandlers.computeIfAbsent(request.getTargetNode(), node -> createTargetHandler(node, copyState, fileChunkWriter));
return copyState;
}

/**
* Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down.
*
* @param shard {@link IndexShard}
* @param reason {@link String} - Reason for the cancel
*/
synchronized void cancel(IndexShard shard, String reason) {
for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) {
if (entry.getCopyState().getShard().equals(shard)) {
entry.cancel(reason);
}
}
copyStateMap.clear();
}

/**
* Checks if the {@link #copyStateMap} has the input {@link ReplicationCheckpoint}
* as a key by invoking {@link Map#containsKey(Object)}.
*/
boolean isInCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
return copyStateMap.containsKey(replicationCheckpoint);
}

int size() {
return nodesToHandlers.size();
}
Expand All @@ -179,17 +187,43 @@ int cachedCopyStateSize() {
return copyStateMap.size();
}

private SegmentReplicationSourceHandler createTargetHandler(DiscoveryNode node, CopyState copyState, FileChunkWriter fileChunkWriter) {
return new SegmentReplicationSourceHandler(
node,
fileChunkWriter,
copyState.getShard().getThreadPool(),
copyState,
Math.toIntExact(recoverySettings.getChunkSize().getBytes()),
recoverySettings.getMaxConcurrentFileChunks()
);
}

/**
* Cancel all Replication events for the given shard, intended to be called when the current primary is shutting down.
* @param shard {@link IndexShard}
* @param reason {@link String} - Reason for the cancel
* Adds the input {@link CopyState} object to {@link #copyStateMap}.
* The key is the CopyState's {@link ReplicationCheckpoint} object.
*/
public void cancel(IndexShard shard, String reason) {
for (SegmentReplicationSourceHandler entry : nodesToHandlers.values()) {
if (entry.getCopyState().getShard().equals(shard)) {
entry.cancel(reason);
}
private void addToCopyStateMap(ReplicationCheckpoint checkpoint, CopyState copyState) {
copyStateMap.putIfAbsent(checkpoint, copyState);
}

/**
* Given a {@link ReplicationCheckpoint}, return the corresponding
* {@link CopyState} object, if any, from {@link #copyStateMap}.
*/
private CopyState fetchFromCopyStateMap(ReplicationCheckpoint replicationCheckpoint) {
return copyStateMap.get(replicationCheckpoint);
}

/**
* Remove a CopyState. Intended to be called after a replication event completes.
* This method will remove a copyState from the copyStateMap only if its refCount hits 0.
*
* @param copyState {@link CopyState}
*/
private synchronized void removeCopyState(CopyState copyState) {
copyState.decRef();
if (copyState.refCount() <= 0) {
copyStateMap.remove(copyState.getRequestedReplicationCheckpoint());
}
copyStateMap.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public SegmentFileTransferHandler(
* @param listener {@link ActionListener}
* @return {@link MultiChunkTransfer}
*/
public MultiChunkTransfer<StoreFileMetadata, FileChunk> sendFiles(
public MultiChunkTransfer<StoreFileMetadata, FileChunk> createTransfer(
Store store,
StoreFileMetadata[] files,
IntSupplier translogOps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.indices.replication;

import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
Expand All @@ -51,6 +53,7 @@ class SegmentReplicationSourceHandler {
private final ListenableFuture<GetSegmentFilesResponse> future = new ListenableFuture<>();
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final Logger logger;
private final AtomicBoolean active = new AtomicBoolean();

/**
* Constructor.
Expand Down Expand Up @@ -95,7 +98,10 @@ class SegmentReplicationSourceHandler {
* @param request {@link GetSegmentFilesRequest} request object containing list of files to be sent.
* @param listener {@link ActionListener} that completes with the list of files sent.
*/
public void sendFiles(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListener<GetSegmentFilesResponse> listener) {
if (active.compareAndSet(false, true) == false) {
throw new OpenSearchException("Replication to {} is already running.", shard.shardId());
}
future.addListener(listener, OpenSearchExecutors.newDirectExecutorService());
final Closeable releaseResources = () -> IOUtils.close(resources);
try {
Expand Down Expand Up @@ -131,7 +137,7 @@ public void sendFiles(GetSegmentFilesRequest request, ActionListener<GetSegmentF
.toArray(StoreFileMetadata[]::new);

final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = segmentFileTransferHandler
.sendFiles(shard.store(), storeFileMetadata, () -> 0, sendFileStep);
.createTransfer(shard.store(), storeFileMetadata, () -> 0, sendFileStep);
resources.add(transfer);
transfer.start();

Expand All @@ -157,4 +163,8 @@ public void cancel(String reason) {
CopyState getCopyState() {
return copyState;
}

public boolean isActive() {
return active.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RetryableTransportClient;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.CopyState;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -88,9 +87,6 @@ public SegmentReplicationSourceService(
private class CheckpointInfoRequestHandler implements TransportRequestHandler<CheckpointInfoRequest> {
@Override
public void messageReceived(CheckpointInfoRequest request, TransportChannel channel, Task task) throws Exception {
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
logger.trace("Received request for checkpoint {}", checkpoint);

final RemoteSegmentFileChunkWriter segmentSegmentFileChunkWriter = new RemoteSegmentFileChunkWriter(
request.getReplicationId(),
recoverySettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.indices.replication;

import org.junit.Assert;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -213,4 +214,18 @@ public void onFailure(Exception e) {
verify(listener, times(1)).onResponse(any());
}

public void testShardAlreadyReplicatingToNode() throws IOException {
OngoingSegmentReplications replications = spy(new OngoingSegmentReplications(mockIndicesService, recoverySettings));
final CheckpointInfoRequest request = new CheckpointInfoRequest(
1L,
replica.routingEntry().allocationId().getId(),
replicaDiscoveryNode,
testCheckpoint
);
final FileChunkWriter segmentSegmentFileChunkWriter = (fileMetadata, position, content, lastChunk, totalTranslogOps, listener) -> {
listener.onResponse(null);
};
replications.prepareForReplication(request, segmentSegmentFileChunkWriter);
assertThrows(OpenSearchException.class, () -> { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void writeFileChunk(
fileChunkSizeInBytes,
maxConcurrentFileChunks
);
final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = handler.sendFiles(
final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = handler.createTransfer(
shard.store(),
filesToSend,
translogOps,
Expand Down Expand Up @@ -139,7 +139,7 @@ public void writeFileChunk(
maxConcurrentFileChunks
);

final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = handler.sendFiles(
final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = handler.createTransfer(
shard.store(),
filesToSend,
translogOps,
Expand Down Expand Up @@ -190,7 +190,7 @@ public void writeFileChunk(
maxConcurrentFileChunks
);

final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = handler.sendFiles(
final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = handler.createTransfer(
shard.store(),
filesToSend,
translogOps,
Expand Down
Loading

0 comments on commit 990246d

Please sign in to comment.