From 0f16819d8d00013388ee03d191c5a44e7d583e35 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Fri, 1 Sep 2023 13:03:48 +0300 Subject: [PATCH] WIP --- .../scalecube/cluster/TransportWrapper.java | 21 +- .../cluster/TransportWrapperTest.java | 366 ++++++++---------- 2 files changed, 176 insertions(+), 211 deletions(-) diff --git a/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java b/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java index f207c994..16d46490 100644 --- a/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java +++ b/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java @@ -39,7 +39,8 @@ public Mono requestResponse(Member member, Message request) { int increment = currentIndex.getAndIncrement(); if (increment == addresses.size()) { - currentIndex.set(increment = 0); + increment = 0; + currentIndex.set(1); } final Address address = addresses.get(increment); @@ -60,13 +61,23 @@ public Mono send(Member member, Message request) { return Mono.defer( () -> { final List
addresses = member.addresses(); - final AtomicInteger currentIndex = new AtomicInteger(); + final int numRetries = addresses.size() - 1; + final Integer index = addressIndexByMember.getOrDefault(member, 0); + final AtomicInteger currentIndex = new AtomicInteger(index); + return Mono.defer( () -> { - final int index = currentIndex.getAndIncrement(); - return transport.send(addresses.get(index), request); + int increment = currentIndex.getAndIncrement(); + + if (increment == addresses.size()) { + increment = 0; + currentIndex.set(1); + } + + final Address address = addresses.get(increment); + return transport.send(address, request); }) - .retry(addresses.size() - 1); + .retry(numRetries); }); } } diff --git a/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java b/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java index 8bd56214..b7e111d3 100644 --- a/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java @@ -6,12 +6,18 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.net.Address; -import java.util.Arrays; +import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.stream.Stream; +import java.util.stream.Stream.Builder; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -47,257 +53,205 @@ public TransportWrapperTest() { transportWrapper = new TransportWrapper(transport); } - @Nested - class RequestResponseTests { + static Stream methodSource() { + final Builder builder = Stream.builder(); - @Test - void requestResponseShouldWorkIndex2() { - final List
addresses = Collections.singletonList(Address.from("test:0")); - final Member member = new Member("test", null, addresses, "namespace"); - - when(transport.requestResponse(addresses.get(0), request)).thenReturn(Mono.just(response)); - - StepVerifier.create(transportWrapper.requestResponse(member, request)) - .assertNext(message -> Assertions.assertSame(response, message, "response")) - .thenCancel() - .verify(); + // int size, int startIndex, int successIndex + for (int size = 0; size < 5; size++) { + populateBuilder(builder, size); } - @Test - void requestResponseShouldWork() { - final List
addresses = Collections.singletonList(Address.from("test:0")); - final Member member = new Member("test", null, addresses, "namespace"); + return builder.build(); + } - when(transport.requestResponse(addresses.get(0), request)).thenReturn(Mono.just(response)); + static void populateBuilder(Builder builder, int size) { + // int startIndex, int successIndex - StepVerifier.create(transportWrapper.requestResponse(member, request)) - .assertNext(message -> Assertions.assertSame(response, message, "response")) - .thenCancel() - .verify(); + for (int startIndex = 0; startIndex < size; startIndex++) { + for (int successIndex = 0; successIndex < size; successIndex++) { + builder.add(Arguments.of(size, startIndex, successIndex)); + } } + } - @Test - void requestResponseShouldWorkThenFail() { - final List
addresses = Collections.singletonList(Address.from("test:0")); - final Member member = new Member("test", null, addresses, "namespace"); - - when(transport.requestResponse(addresses.get(0), request)) - .thenReturn(Mono.just(response)) - .thenReturn(Mono.error(new RuntimeException("Error"))); - - StepVerifier.create(transportWrapper.requestResponse(member, request)) - .assertNext(message -> Assertions.assertSame(response, message, "response")) - .thenCancel() - .verify(); + private Map addressIndexByMember() + throws NoSuchFieldException, IllegalAccessException { + final Field field = TransportWrapper.class.getDeclaredField("addressIndexByMember"); + field.setAccessible(true); + //noinspection unchecked + return (Map) field.get(transportWrapper); + } - StepVerifier.create(transportWrapper.requestResponse(member, request)) - .verifyErrorSatisfies( - throwable -> Assertions.assertEquals("Error", throwable.getMessage())); + @ParameterizedTest + @MethodSource("methodSource") + void requestResponseShouldWorkByRoundRobin(int size, int startIndex, int successIndex) + throws Exception { + final List
addresses = new ArrayList<>(); + final Member member = new Member("test", null, addresses, "namespace"); + for (int i = 0; i < size; i++) { + addresses.add(Address.from("test:" + i)); } - @Test - void requestResponseShouldWorkMemberSingleAddress() { - final List
addresses = Collections.singletonList(Address.from("test:0")); - final Member member = new Member("test", null, addresses, "namespace"); - - when(transport.requestResponse(addresses.get(0), request)).thenReturn(Mono.just(response)); - - StepVerifier.create(transportWrapper.requestResponse(member, request)) - .assertNext(message -> Assertions.assertSame(response, message, "response")) - .thenCancel() - .verify(); + if (startIndex > 0) { + addressIndexByMember().put(member, startIndex); } - @Test - void requestResponseShouldWorkMemberTwoAddresses() { - final List
addresses = Arrays.asList(Address.from("test:0"), Address.from("test:1")); - final Member member = new Member("test", null, addresses, "namespace"); - - when(transport.requestResponse(addresses.get(0), request)) - .thenReturn(Mono.error(new RuntimeException("Error"))); - when(transport.requestResponse(addresses.get(1), request)).thenReturn(Mono.just(response)); - - StepVerifier.create(transportWrapper.requestResponse(member, request)) - .assertNext(message -> Assertions.assertSame(response, message, "response")) - .thenCancel() - .verify(); + for (int i = 0; i < size; i++) { + final Address address = addresses.get(i); + if (i == successIndex) { + when(transport.requestResponse(address, request)).thenReturn(Mono.just(response)); + } else { + when(transport.requestResponse(address, request)) + .thenReturn(Mono.error(new RuntimeException("Error - " + i))); + } } - @Test - void requestResponseShouldWorkMemberThreeAddresses() { - final List
addresses = - Arrays.asList(Address.from("test:0"), Address.from("test:1"), Address.from("test:2")); - final Member member = new Member("test", null, addresses, "namespace"); - - when(transport.requestResponse(addresses.get(0), request)) - .thenReturn(Mono.error(new RuntimeException("Error"))); - when(transport.requestResponse(addresses.get(1), request)) - .thenReturn(Mono.error(new RuntimeException("Error"))); - when(transport.requestResponse(addresses.get(2), request)).thenReturn(Mono.just(response)); + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .assertNext(message -> Assertions.assertSame(response, message, "response")) + .thenCancel() + .verify(); + } - StepVerifier.create(transportWrapper.requestResponse(member, request)) - .assertNext(message -> Assertions.assertSame(response, message, "response")) - .thenCancel() - .verify(); - } + @Test + void requestResponseShouldWorkThenFail() { + final List
addresses = Collections.singletonList(Address.from("test:0")); + final Member member = new Member("test", null, addresses, "namespace"); - @Test - void requestResponseShouldFailMemberSingleAddress() { - final List
addresses = Collections.singletonList(Address.from("test:0")); - final Member member = new Member("test", null, addresses, "namespace"); + when(transport.requestResponse(addresses.get(0), request)) + .thenReturn(Mono.just(response)) + .thenReturn(Mono.error(new RuntimeException("Error"))); - when(transport.requestResponse(addresses.get(0), request)) - .thenReturn(Mono.error(new RuntimeException("Error"))); + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .assertNext(message -> Assertions.assertSame(response, message, "response")) + .thenCancel() + .verify(); - StepVerifier.create(transportWrapper.requestResponse(member, request)) - .verifyErrorSatisfies( - throwable -> Assertions.assertEquals("Error", throwable.getMessage())); - } + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .verifyErrorSatisfies( + throwable -> Assertions.assertEquals("Error", throwable.getMessage())); + } - @Test - void requestResponseShouldFailMemberTwoAddresses() { - final List
addresses = Arrays.asList(Address.from("test:0"), Address.from("test:1")); - final Member member = new Member("test", null, addresses, "namespace"); + @Test + void requestResponseShouldFailThenWork() { + final List
addresses = Collections.singletonList(Address.from("test:0")); + final Member member = new Member("test", null, addresses, "namespace"); - when(transport.requestResponse(addresses.get(0), request)) - .thenReturn(Mono.error(new RuntimeException("Error - 0"))); - when(transport.requestResponse(addresses.get(1), request)) - .thenReturn(Mono.error(new RuntimeException("Error - 1"))); + when(transport.requestResponse(addresses.get(0), request)) + .thenReturn(Mono.error(new RuntimeException("Error"))) + .thenReturn(Mono.just(response)); - StepVerifier.create(transportWrapper.requestResponse(member, request)) - .verifyErrorSatisfies( - throwable -> Assertions.assertEquals("Error - 1", throwable.getMessage())); - } + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .verifyErrorSatisfies( + throwable -> Assertions.assertEquals("Error", throwable.getMessage())); - @Test - void requestResponseShouldFailMemberThreeAddresses() { - final List
addresses = - Arrays.asList(Address.from("test:0"), Address.from("test:1"), Address.from("test:2")); - final Member member = new Member("test", null, addresses, "namespace"); - - when(transport.requestResponse(addresses.get(0), request)) - .thenReturn(Mono.error(new RuntimeException("Error - 0"))); - when(transport.requestResponse(addresses.get(1), request)) - .thenReturn(Mono.error(new RuntimeException("Error - 1"))); - when(transport.requestResponse(addresses.get(2), request)) - .thenReturn(Mono.error(new RuntimeException("Error - 2"))); - - StepVerifier.create(transportWrapper.requestResponse(member, request)) - .verifyErrorSatisfies( - throwable -> Assertions.assertEquals("Error - 2", throwable.getMessage())); - } + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .assertNext(message -> Assertions.assertSame(response, message, "response")) + .thenCancel() + .verify(); } - @Nested - class SendTests { - - @Test - void sendShouldWork() { - final List
addresses = Collections.singletonList(Address.from("test:0")); - final Member member = new Member("test", null, addresses, "namespace"); - - when(transport.send(addresses.get(0), request)).thenReturn(Mono.empty()); - - StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); + @ParameterizedTest + @MethodSource("methodSource") + void requestResponseShouldFailByRoundRobin(int size, int startIndex, int ignore) + throws Exception { + final List
addresses = new ArrayList<>(); + final Member member = new Member("test", null, addresses, "namespace"); + for (int i = 0; i < size; i++) { + addresses.add(Address.from("test:" + i)); } - @Test - void sendShouldWorkThenFail() { - final List
addresses = Collections.singletonList(Address.from("test:0")); - final Member member = new Member("test", null, addresses, "namespace"); + if (startIndex > 0) { + addressIndexByMember().put(member, startIndex); + } - when(transport.send(addresses.get(0), request)) - .thenReturn(Mono.empty()) + for (int i = 0; i < size; i++) { + final Address address = addresses.get(i); + when(transport.requestResponse(address, request)) .thenReturn(Mono.error(new RuntimeException("Error"))); - - StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); - StepVerifier.create(transportWrapper.send(member, request)) - .verifyErrorSatisfies( - throwable -> Assertions.assertEquals("Error", throwable.getMessage())); } - @Test - void sendShouldWorkMemberSingleAddress() { - final List
addresses = Collections.singletonList(Address.from("test:0")); - final Member member = new Member("test", null, addresses, "namespace"); + StepVerifier.create(transportWrapper.requestResponse(member, request)) + .verifyErrorSatisfies( + throwable -> Assertions.assertEquals("Error", throwable.getMessage())); + } - when(transport.send(addresses.get(0), request)).thenReturn(Mono.empty()); + @ParameterizedTest + @MethodSource("methodSource") + void sendShouldWorkByRoundRobin(int size, int startIndex, int successIndex) throws Exception { + final List
addresses = new ArrayList<>(); + final Member member = new Member("test", null, addresses, "namespace"); + for (int i = 0; i < size; i++) { + addresses.add(Address.from("test:" + i)); + } - StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); + if (startIndex > 0) { + addressIndexByMember().put(member, startIndex); } - @Test - void sendShouldWorkMemberTwoAddresses() { - final List
addresses = Arrays.asList(Address.from("test:0"), Address.from("test:1")); - final Member member = new Member("test", null, addresses, "namespace"); + for (int i = 0; i < size; i++) { + final Address address = addresses.get(i); + if (i == successIndex) { + when(transport.send(address, request)).thenReturn(Mono.empty()); + } else { + when(transport.send(address, request)) + .thenReturn(Mono.error(new RuntimeException("Error - " + i))); + } + } - when(transport.send(addresses.get(0), request)) - .thenReturn(Mono.error(new RuntimeException("Error"))); - when(transport.send(addresses.get(1), request)).thenReturn(Mono.empty()); + StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); + } - StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); + @ParameterizedTest + @MethodSource("methodSource") + void sendShouldFailByRoundRobin(int size, int startIndex, int ignore) throws Exception { + final List
addresses = new ArrayList<>(); + final Member member = new Member("test", null, addresses, "namespace"); + for (int i = 0; i < size; i++) { + addresses.add(Address.from("test:" + i)); } - @Test - void sendShouldWorkMemberThreeAddresses() { - final List
addresses = - Arrays.asList(Address.from("test:0"), Address.from("test:1"), Address.from("test:2")); - final Member member = new Member("test", null, addresses, "namespace"); - - when(transport.send(addresses.get(0), request)) - .thenReturn(Mono.error(new RuntimeException("Error"))); - when(transport.send(addresses.get(1), request)) - .thenReturn(Mono.error(new RuntimeException("Error"))); - when(transport.send(addresses.get(2), request)).thenReturn(Mono.empty()); + if (startIndex > 0) { + addressIndexByMember().put(member, startIndex); + } - StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); + for (int i = 0; i < size; i++) { + final Address address = addresses.get(i); + when(transport.send(address, request)).thenReturn(Mono.error(new RuntimeException("Error"))); } - @Test - void sendShouldFailMemberSingleAddress() { - final List
addresses = Collections.singletonList(Address.from("test:0")); - final Member member = new Member("test", null, addresses, "namespace"); + StepVerifier.create(transportWrapper.send(member, request)) + .verifyErrorSatisfies( + throwable -> Assertions.assertEquals("Error", throwable.getMessage())); + } - when(transport.send(addresses.get(0), request)) - .thenReturn(Mono.error(new RuntimeException("Error"))); + @Test + void sendShouldWorkThenFail() { + final List
addresses = Collections.singletonList(Address.from("test:0")); + final Member member = new Member("test", null, addresses, "namespace"); - StepVerifier.create(transportWrapper.send(member, request)) - .verifyErrorSatisfies( - throwable -> Assertions.assertEquals("Error", throwable.getMessage())); - } + when(transport.send(addresses.get(0), request)) + .thenReturn(Mono.empty()) + .thenReturn(Mono.error(new RuntimeException("Error"))); - @Test - void sendShouldFailMemberTwoAddresses() { - final List
addresses = Arrays.asList(Address.from("test:0"), Address.from("test:1")); - final Member member = new Member("test", null, addresses, "namespace"); + StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); + StepVerifier.create(transportWrapper.send(member, request)) + .verifyErrorSatisfies( + throwable -> Assertions.assertEquals("Error", throwable.getMessage())); + } - when(transport.send(addresses.get(0), request)) - .thenReturn(Mono.error(new RuntimeException("Error - 0"))); - when(transport.send(addresses.get(1), request)) - .thenReturn(Mono.error(new RuntimeException("Error - 1"))); + @Test + void sendShouldFailThenWork() { + final List
addresses = Collections.singletonList(Address.from("test:0")); + final Member member = new Member("test", null, addresses, "namespace"); - StepVerifier.create(transportWrapper.send(member, request)) - .verifyErrorSatisfies( - throwable -> Assertions.assertEquals("Error - 1", throwable.getMessage())); - } + when(transport.send(addresses.get(0), request)) + .thenReturn(Mono.error(new RuntimeException("Error"))) + .thenReturn(Mono.empty()); - @Test - void sendShouldFailMemberThreeAddresses() { - final List
addresses = - Arrays.asList(Address.from("test:0"), Address.from("test:1"), Address.from("test:2")); - final Member member = new Member("test", null, addresses, "namespace"); - - when(transport.send(addresses.get(0), request)) - .thenReturn(Mono.error(new RuntimeException("Error - 0"))); - when(transport.send(addresses.get(1), request)) - .thenReturn(Mono.error(new RuntimeException("Error - 1"))); - when(transport.send(addresses.get(2), request)) - .thenReturn(Mono.error(new RuntimeException("Error - 2"))); - - StepVerifier.create(transportWrapper.send(member, request)) - .verifyErrorSatisfies( - throwable -> Assertions.assertEquals("Error - 2", throwable.getMessage())); - } + StepVerifier.create(transportWrapper.send(member, request)) + .verifyErrorSatisfies( + throwable -> Assertions.assertEquals("Error", throwable.getMessage())); + StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); } }