From 43c0ea79243d10c39719bb36074a1042264a1137 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Fri, 1 Sep 2023 13:35:04 +0300 Subject: [PATCH] Done with TransportWrapperTest.java --- .../scalecube/cluster/TransportWrapper.java | 70 ++++++++----------- .../cluster/TransportWrapperTest.java | 13 ++-- 2 files changed, 36 insertions(+), 47 deletions(-) diff --git a/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java b/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java index 16d46490..c37bc4e7 100644 --- a/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java +++ b/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java @@ -13,7 +13,7 @@ public class TransportWrapper { private final Transport transport; - private final Map addressIndexByMember = new ConcurrentHashMap<>(); + private final Map addressIndexByMember = new ConcurrentHashMap<>(); public TransportWrapper(Transport transport) { this.transport = transport; @@ -27,27 +27,21 @@ public TransportWrapper(Transport transport) { * @return mono result */ public Mono requestResponse(Member member, Message request) { + final List
addresses = member.addresses(); + final AtomicInteger currentIndex = + addressIndexByMember.computeIfAbsent(member, m -> new AtomicInteger()); return Mono.defer( - () -> { - final List
addresses = member.addresses(); - final int numRetries = addresses.size() - 1; - final Integer index = addressIndexByMember.getOrDefault(member, 0); - final AtomicInteger currentIndex = new AtomicInteger(index); - - return Mono.defer( - () -> { - int increment = currentIndex.getAndIncrement(); - - if (increment == addresses.size()) { - increment = 0; - currentIndex.set(1); - } - - final Address address = addresses.get(increment); - return transport.requestResponse(address, request); - }) - .retry(numRetries); - }); + () -> { + synchronized (this) { + if (currentIndex.get() == addresses.size()) { + currentIndex.set(0); + } + final Address address = addresses.get(currentIndex.getAndIncrement()); + return transport.requestResponse(address, request); + } + }) + .retry(addresses.size() - 1) + .doOnError(throwable -> addressIndexByMember.remove(member, currentIndex)); } /** @@ -58,26 +52,20 @@ public Mono requestResponse(Member member, Message request) { * @return mono result */ public Mono send(Member member, Message request) { + final List
addresses = member.addresses(); + final AtomicInteger currentIndex = + addressIndexByMember.computeIfAbsent(member, m -> new AtomicInteger()); return Mono.defer( - () -> { - final List
addresses = member.addresses(); - final int numRetries = addresses.size() - 1; - final Integer index = addressIndexByMember.getOrDefault(member, 0); - final AtomicInteger currentIndex = new AtomicInteger(index); - - return Mono.defer( - () -> { - int increment = currentIndex.getAndIncrement(); - - if (increment == addresses.size()) { - increment = 0; - currentIndex.set(1); - } - - final Address address = addresses.get(increment); - return transport.send(address, request); - }) - .retry(numRetries); - }); + () -> { + synchronized (this) { + if (currentIndex.get() == addresses.size()) { + currentIndex.set(0); + } + final Address address = addresses.get(currentIndex.getAndIncrement()); + return transport.send(address, request); + } + }) + .retry(addresses.size() - 1) + .doOnError(throwable -> addressIndexByMember.remove(member, currentIndex)); } } diff --git a/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java b/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java index b7e111d3..eb0882bc 100644 --- a/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java @@ -11,6 +11,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import java.util.stream.Stream.Builder; import org.junit.jupiter.api.Assertions; @@ -75,12 +76,12 @@ static void populateBuilder(Builder builder, int size) { } } - private Map addressIndexByMember() + private Map addressIndexByMember() throws NoSuchFieldException, IllegalAccessException { final Field field = TransportWrapper.class.getDeclaredField("addressIndexByMember"); field.setAccessible(true); //noinspection unchecked - return (Map) field.get(transportWrapper); + return (Map) field.get(transportWrapper); } @ParameterizedTest @@ -94,7 +95,7 @@ void requestResponseShouldWorkByRoundRobin(int size, int startIndex, int success } if (startIndex > 0) { - addressIndexByMember().put(member, startIndex); + addressIndexByMember().put(member, new AtomicInteger(startIndex)); } for (int i = 0; i < size; i++) { @@ -162,7 +163,7 @@ void requestResponseShouldFailByRoundRobin(int size, int startIndex, int ignore) } if (startIndex > 0) { - addressIndexByMember().put(member, startIndex); + addressIndexByMember().put(member, new AtomicInteger(startIndex)); } for (int i = 0; i < size; i++) { @@ -186,7 +187,7 @@ void sendShouldWorkByRoundRobin(int size, int startIndex, int successIndex) thro } if (startIndex > 0) { - addressIndexByMember().put(member, startIndex); + addressIndexByMember().put(member, new AtomicInteger(startIndex)); } for (int i = 0; i < size; i++) { @@ -212,7 +213,7 @@ void sendShouldFailByRoundRobin(int size, int startIndex, int ignore) throws Exc } if (startIndex > 0) { - addressIndexByMember().put(member, startIndex); + addressIndexByMember().put(member, new AtomicInteger(startIndex)); } for (int i = 0; i < size; i++) {