From ad9db196429a13e0ac9327341dcd5dec825063fe Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Fri, 1 Sep 2023 12:02:09 +0300 Subject: [PATCH] WIP --- .../scalecube/cluster/TransportWrapper.java | 19 ++++++++++++++++--- .../cluster/TransportWrapperTest.java | 14 ++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java b/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java index 9ae21faf..f207c994 100644 --- a/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java +++ b/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java @@ -4,6 +4,8 @@ import io.scalecube.cluster.transport.api.Transport; import io.scalecube.net.Address; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import reactor.core.publisher.Mono; @@ -11,6 +13,8 @@ public class TransportWrapper { private final Transport transport; + private final Map addressIndexByMember = new ConcurrentHashMap<>(); + public TransportWrapper(Transport transport) { this.transport = transport; } @@ -26,13 +30,22 @@ public Mono requestResponse(Member member, Message request) { return Mono.defer( () -> { final List
addresses = member.addresses(); - final AtomicInteger currentIndex = new AtomicInteger(); + final int numRetries = addresses.size() - 1; + final Integer index = addressIndexByMember.getOrDefault(member, 0); + final AtomicInteger currentIndex = new AtomicInteger(index); + return Mono.defer( () -> { - final Address address = addresses.get(currentIndex.getAndIncrement()); + int increment = currentIndex.getAndIncrement(); + + if (increment == addresses.size()) { + currentIndex.set(increment = 0); + } + + final Address address = addresses.get(increment); return transport.requestResponse(address, request); }) - .retry(addresses.size() - 1); + .retry(numRetries); }); } diff --git a/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java b/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java index e4f181cf..8bd56214 100644 --- a/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java @@ -50,6 +50,20 @@ public TransportWrapperTest() { @Nested class RequestResponseTests { + @Test + void requestResponseShouldWorkIndex2() { + final List
addresses = Collections.singletonList(Address.from("test:0")); + final Member member = new Member("test", null, addresses, "namespace"); + + when(transport.requestResponse(addresses.get(0), request)).thenReturn(Mono.just(response)); + + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .assertNext(message -> Assertions.assertSame(response, message, "response")) + .thenCancel() + .verify(); + + } + @Test void requestResponseShouldWork() { final List
addresses = Collections.singletonList(Address.from("test:0"));