Skip to content

Commit

Permalink
[Segment Replication] Override segment replication handler for duplic…
Browse files Browse the repository at this point in the history
…ate request from replica (#6693)

* [Segment Replication] Override segment replication handler for new request from same replica

Signed-off-by: Suraj Singh <surajrider@gmail.com>

* Spotless and use map.compute

Signed-off-by: Suraj Singh <surajrider@gmail.com>

---------

Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 authored Mar 16, 2023
1 parent 0cf543c commit 6bbe31a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,13 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
*/
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
if (allocationIdToHandlers.putIfAbsent(
request.getTargetAllocationId(),
createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter)
) != null) {
throw new OpenSearchException(
"Shard copy {} on node {} already replicating",
request.getCheckpoint().getShardId(),
request.getTargetNode()
);
}
allocationIdToHandlers.compute(request.getTargetAllocationId(), (allocationId, segrepHandler) -> {
if (segrepHandler != null) {
logger.warn("Override handler for allocation id {}", request.getTargetAllocationId());
cancelHandlers(handler -> handler.getAllocationId().equals(request.getTargetAllocationId()), "cancel due to retry");
}
return createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter);
});
return copyState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.indices.replication;

import org.junit.Assert;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -277,7 +276,8 @@ public void testShardAlreadyReplicatingToNode() throws IOException {
listener.onResponse(null);
};
replications.prepareForReplication(request, segmentSegmentFileChunkWriter);
assertThrows(OpenSearchException.class, () -> { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); });
CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter);
assertEquals(1, copyState.refCount());
}

public void testStartReplicationWithNoFilesToFetch() throws IOException {
Expand Down

0 comments on commit 6bbe31a

Please sign in to comment.