diff --git a/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java b/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java index cbbc9185..9ae21faf 100644 --- a/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java +++ b/cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java @@ -4,8 +4,6 @@ import io.scalecube.cluster.transport.api.Transport; import io.scalecube.net.Address; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import reactor.core.publisher.Mono; @@ -13,8 +11,6 @@ public class TransportWrapper { private final Transport transport; - private final Map> connections = new ConcurrentHashMap<>(); - public TransportWrapper(Transport transport) { this.transport = transport; } @@ -27,37 +23,16 @@ public TransportWrapper(Transport transport) { * @return mono result */ public Mono requestResponse(Member member, Message request) { - return connections - .compute( - member, - (m, resultMono) -> { - if (resultMono == null) { - return requestResponse(member.addresses(), request); - } - return resultMono.flatMap( - result -> - transport - .requestResponse(result.address, request) - .map(message -> new Result(result.address, message))); - }) - .map(result -> result.message); - } - - private Mono requestResponse(List
addresses, Message request) { return Mono.defer( () -> { + final List
addresses = member.addresses(); final AtomicInteger currentIndex = new AtomicInteger(); return Mono.defer( () -> { - final int index = currentIndex.getAndIncrement(); - return transport.requestResponse(addresses.get(index), request); + final Address address = addresses.get(currentIndex.getAndIncrement()); + return transport.requestResponse(address, request); }) - .retry(addresses.size() - 1) - .map( - message -> { - final int index = currentIndex.get() - 1; - return new Result(addresses.get(index), message); - }); + .retry(addresses.size() - 1); }); } @@ -69,53 +44,16 @@ private Mono requestResponse(List
addresses, Message request) { * @return mono result */ public Mono send(Member member, Message request) { - return connections - .compute( - member, - (m, resultMono) -> { - if (resultMono == null) { - return send(member.addresses(), request); - } - return resultMono.flatMap( - result -> - transport - .send(result.address, request) - .thenReturn(new Result(result.address))); - }) - .then(); - } - - private Mono send(List
addresses, Message request) { return Mono.defer( () -> { + final List
addresses = member.addresses(); final AtomicInteger currentIndex = new AtomicInteger(); return Mono.defer( () -> { final int index = currentIndex.getAndIncrement(); return transport.send(addresses.get(index), request); }) - .retry(addresses.size() - 1) - .then( - Mono.fromCallable( - () -> { - final int index = currentIndex.get() - 1; - return new Result(addresses.get(index)); - })); + .retry(addresses.size() - 1); }); } - - private static class Result { - - private final Address address; - private final Message message; - - private Result(Address address) { - this(address, null); - } - - private Result(Address address, Message message) { - this.address = address; - this.message = message; - } - } } diff --git a/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java b/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java index b3876d2f..e4f181cf 100644 --- a/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/TransportWrapperTest.java @@ -57,7 +57,7 @@ void requestResponseShouldWork() { when(transport.requestResponse(addresses.get(0), request)).thenReturn(Mono.just(response)); - StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2)) + StepVerifier.create(transportWrapper.requestResponse(member, request)) .assertNext(message -> Assertions.assertSame(response, message, "response")) .thenCancel() .verify(); @@ -89,7 +89,7 @@ void requestResponseShouldWorkMemberSingleAddress() { when(transport.requestResponse(addresses.get(0), request)).thenReturn(Mono.just(response)); - StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2)) + StepVerifier.create(transportWrapper.requestResponse(member, request)) .assertNext(message -> Assertions.assertSame(response, message, "response")) .thenCancel() .verify(); @@ -104,7 +104,7 @@ void requestResponseShouldWorkMemberTwoAddresses() { .thenReturn(Mono.error(new RuntimeException("Error"))); when(transport.requestResponse(addresses.get(1), request)).thenReturn(Mono.just(response)); - StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2)) + StepVerifier.create(transportWrapper.requestResponse(member, request)) .assertNext(message -> Assertions.assertSame(response, message, "response")) .thenCancel() .verify(); @@ -122,7 +122,7 @@ void requestResponseShouldWorkMemberThreeAddresses() { .thenReturn(Mono.error(new RuntimeException("Error"))); when(transport.requestResponse(addresses.get(2), request)).thenReturn(Mono.just(response)); - StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2)) + StepVerifier.create(transportWrapper.requestResponse(member, request)) .assertNext(message -> Assertions.assertSame(response, message, "response")) .thenCancel() .verify(); @@ -136,7 +136,7 @@ void requestResponseShouldFailMemberSingleAddress() { when(transport.requestResponse(addresses.get(0), request)) .thenReturn(Mono.error(new RuntimeException("Error"))); - StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2)) + StepVerifier.create(transportWrapper.requestResponse(member, request)) .verifyErrorSatisfies( throwable -> Assertions.assertEquals("Error", throwable.getMessage())); } @@ -151,7 +151,7 @@ void requestResponseShouldFailMemberTwoAddresses() { when(transport.requestResponse(addresses.get(1), request)) .thenReturn(Mono.error(new RuntimeException("Error - 1"))); - StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2)) + StepVerifier.create(transportWrapper.requestResponse(member, request)) .verifyErrorSatisfies( throwable -> Assertions.assertEquals("Error - 1", throwable.getMessage())); } @@ -169,7 +169,7 @@ void requestResponseShouldFailMemberThreeAddresses() { when(transport.requestResponse(addresses.get(2), request)) .thenReturn(Mono.error(new RuntimeException("Error - 2"))); - StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2)) + StepVerifier.create(transportWrapper.requestResponse(member, request)) .verifyErrorSatisfies( throwable -> Assertions.assertEquals("Error - 2", throwable.getMessage())); } @@ -185,7 +185,7 @@ void sendShouldWork() { when(transport.send(addresses.get(0), request)).thenReturn(Mono.empty()); - StepVerifier.create(transportWrapper.send(member, request).retry(2)).verifyComplete(); + StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); } @Test @@ -210,7 +210,7 @@ void sendShouldWorkMemberSingleAddress() { when(transport.send(addresses.get(0), request)).thenReturn(Mono.empty()); - StepVerifier.create(transportWrapper.send(member, request).retry(2)).verifyComplete(); + StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); } @Test @@ -222,7 +222,7 @@ void sendShouldWorkMemberTwoAddresses() { .thenReturn(Mono.error(new RuntimeException("Error"))); when(transport.send(addresses.get(1), request)).thenReturn(Mono.empty()); - StepVerifier.create(transportWrapper.send(member, request).retry(2)).verifyComplete(); + StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); } @Test @@ -237,7 +237,7 @@ void sendShouldWorkMemberThreeAddresses() { .thenReturn(Mono.error(new RuntimeException("Error"))); when(transport.send(addresses.get(2), request)).thenReturn(Mono.empty()); - StepVerifier.create(transportWrapper.send(member, request).retry(2)).verifyComplete(); + StepVerifier.create(transportWrapper.send(member, request)).verifyComplete(); } @Test @@ -248,7 +248,7 @@ void sendShouldFailMemberSingleAddress() { when(transport.send(addresses.get(0), request)) .thenReturn(Mono.error(new RuntimeException("Error"))); - StepVerifier.create(transportWrapper.send(member, request).retry(2)) + StepVerifier.create(transportWrapper.send(member, request)) .verifyErrorSatisfies( throwable -> Assertions.assertEquals("Error", throwable.getMessage())); } @@ -263,7 +263,7 @@ void sendShouldFailMemberTwoAddresses() { when(transport.send(addresses.get(1), request)) .thenReturn(Mono.error(new RuntimeException("Error - 1"))); - StepVerifier.create(transportWrapper.send(member, request).retry(2)) + StepVerifier.create(transportWrapper.send(member, request)) .verifyErrorSatisfies( throwable -> Assertions.assertEquals("Error - 1", throwable.getMessage())); } @@ -281,7 +281,7 @@ void sendShouldFailMemberThreeAddresses() { when(transport.send(addresses.get(2), request)) .thenReturn(Mono.error(new RuntimeException("Error - 2"))); - StepVerifier.create(transportWrapper.send(member, request).retry(2)) + StepVerifier.create(transportWrapper.send(member, request)) .verifyErrorSatisfies( throwable -> Assertions.assertEquals("Error - 2", throwable.getMessage())); } diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index 0678bd45..3e126fc6 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -122,6 +122,7 @@ public void testLeaveClusterCameBeforeAlive() { final Message leavingMessage = Message.builder() .qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP) + .sender(anotherMember) .data(leavingRecord) .build(); @@ -131,6 +132,7 @@ public void testLeaveClusterCameBeforeAlive() { final Message addedMessage = Message.builder() .qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP) + .sender(anotherMember) .data(addedRecord) .build(); @@ -165,6 +167,7 @@ public void testLeaveClusterOnly() { final Message leavingMessage = Message.builder() .qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP) + .sender(anotherMember) .data(leavingRecord) .build(); @@ -197,6 +200,7 @@ public void testLeaveClusterOnSuspectedNode() { final Message suspectMessage = Message.builder() .qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP) + .sender(anotherMember) .data(suspectedNode) .build(); @@ -208,6 +212,7 @@ public void testLeaveClusterOnSuspectedNode() { final Message leavingMessage = Message.builder() .qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP) + .sender(anotherMember) .data(leavingRecord) .build();