Skip to content

Commit

Permalink
Test failures
Browse files Browse the repository at this point in the history
Signed-off-by: Suraj Singh <surajrider@gmail.com>
  • Loading branch information
dreamer-89 committed Sep 1, 2022
1 parent 0e11422 commit 9753add
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.transport.TransportService;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -152,17 +151,14 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
} else {
latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint);
}
Optional<SegmentReplicationTarget> ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(
replicaShard.shardId()
);
if (ongoingReplicationTarget.isPresent()) {
final SegmentReplicationTarget target = ongoingReplicationTarget.get();
if (target.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) {
SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId());
if (ongoingReplicationTarget != null) {
if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) {
logger.trace(
"Cancelling ongoing replication from old primary with primary term {}",
target.getCheckpoint().getPrimaryTerm()
ongoingReplicationTarget.getCheckpoint().getPrimaryTerm()
);
onGoingReplications.cancel(target.getId(), "Cancelling stuck target after new primary");
onGoingReplications.cancel(ongoingReplicationTarget.getId(), "Cancelling stuck target after new primary");
} else {
logger.trace(
() -> new ParameterizedMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;
import java.util.stream.Collectors;

/**
* This class holds a collection of all on going replication events on the current node (i.e., the node is the target node
Expand Down Expand Up @@ -241,12 +240,15 @@ public boolean cancelForShard(ShardId shardId, String reason) {
* Get target for shard
*
* @param shardId shardId
* @return Optional ReplicationTarget for input shardId
* @return ReplicationTarget for input shardId
*/
public Optional<T> getOngoingReplicationTarget(ShardId shardId) {
final Stream<T> replicationTargets = onGoingTargetEvents.values().stream().filter(t -> t.indexShard.shardId().equals(shardId));
assert replicationTargets.count() == 1 : "More than one on-going replication targets";
return replicationTargets.findAny();
public T getOngoingReplicationTarget(ShardId shardId) {
final List<T> replicationTargetList = onGoingTargetEvents.values()
.stream()
.filter(t -> t.indexShard.shardId().equals(shardId))
.collect(Collectors.toList());
assert replicationTargetList.size() <= 1 : "More than one on-going replication targets";
return replicationTargetList.size() > 0 ? replicationTargetList.get(0) : null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public void setUp() throws Exception {
initialCheckpoint.getSeqNo(),
initialCheckpoint.getSegmentInfosVersion() + 1
);

newPrimaryCheckpoint = new ReplicationCheckpoint(
initialCheckpoint.getShardId(),
initialCheckpoint.getPrimaryTerm() + 1,
Expand Down

0 comments on commit 9753add

Please sign in to comment.