diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index ad48b383..e7d8707a 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -37,6 +37,7 @@ import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -349,7 +350,7 @@ private void doSync() { Message message = prepareSyncDataMsg(SYNC, null); LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, addresses); - send(transport, addresses, 0, message) + send(transport, addresses, message) .subscribe( null, ex -> @@ -890,14 +891,14 @@ private Mono spreadMembershipGossip(MembershipRecord record) { }); } - private static Mono send( - Transport transport, List
addresses, int currentIndex, Message request) { - if (currentIndex >= addresses.size()) { - return Mono.error(new RuntimeException("All addresses have been tried and failed")); - } - - return transport - .send(addresses.get(currentIndex), request) - .onErrorResume(th -> send(transport, addresses, currentIndex + 1, request)); + private static Mono send(Transport transport, List
addresses, Message request) { + final AtomicInteger currentIndex = new AtomicInteger(); + return Mono.defer( + () -> { + final Address address = addresses.get(currentIndex.get()); + return transport.send(address, request); + }) + .doOnError(ex -> currentIndex.incrementAndGet()) + .retry(addresses.size() - 1); } }