Skip to content

Commit

Permalink
Reverted TransportWrapper to its simplest version, fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Sep 1, 2023
1 parent 837d09e commit 93cf26a
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 82 deletions.
74 changes: 6 additions & 68 deletions cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,13 @@
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.publisher.Mono;

public class TransportWrapper {

private final Transport transport;

private final Map<Member, Mono<Result>> connections = new ConcurrentHashMap<>();

public TransportWrapper(Transport transport) {
this.transport = transport;
}
Expand All @@ -27,37 +23,16 @@ public TransportWrapper(Transport transport) {
* @return mono result
*/
public Mono<Message> requestResponse(Member member, Message request) {
return connections
.compute(
member,
(m, resultMono) -> {
if (resultMono == null) {
return requestResponse(member.addresses(), request);
}
return resultMono.flatMap(
result ->
transport
.requestResponse(result.address, request)
.map(message -> new Result(result.address, message)));
})
.map(result -> result.message);
}

private Mono<Result> requestResponse(List<Address> addresses, Message request) {
return Mono.defer(
() -> {
final List<Address> addresses = member.addresses();
final AtomicInteger currentIndex = new AtomicInteger();
return Mono.defer(
() -> {
final int index = currentIndex.getAndIncrement();
return transport.requestResponse(addresses.get(index), request);
final Address address = addresses.get(currentIndex.getAndIncrement());
return transport.requestResponse(address, request);
})
.retry(addresses.size() - 1)
.map(
message -> {
final int index = currentIndex.get() - 1;
return new Result(addresses.get(index), message);
});
.retry(addresses.size() - 1);
});
}

Expand All @@ -69,53 +44,16 @@ private Mono<Result> requestResponse(List<Address> addresses, Message request) {
* @return mono result
*/
public Mono<Void> send(Member member, Message request) {
return connections
.compute(
member,
(m, resultMono) -> {
if (resultMono == null) {
return send(member.addresses(), request);
}
return resultMono.flatMap(
result ->
transport
.send(result.address, request)
.thenReturn(new Result(result.address)));
})
.then();
}

private Mono<Result> send(List<Address> addresses, Message request) {
return Mono.defer(
() -> {
final List<Address> addresses = member.addresses();
final AtomicInteger currentIndex = new AtomicInteger();
return Mono.defer(
() -> {
final int index = currentIndex.getAndIncrement();
return transport.send(addresses.get(index), request);
})
.retry(addresses.size() - 1)
.then(
Mono.fromCallable(
() -> {
final int index = currentIndex.get() - 1;
return new Result(addresses.get(index));
}));
.retry(addresses.size() - 1);
});
}

private static class Result {

private final Address address;
private final Message message;

private Result(Address address) {
this(address, null);
}

private Result(Address address, Message message) {
this.address = address;
this.message = message;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void requestResponseShouldWork() {

when(transport.requestResponse(addresses.get(0), request)).thenReturn(Mono.just(response));

StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2))
StepVerifier.create(transportWrapper.requestResponse(member, request))
.assertNext(message -> Assertions.assertSame(response, message, "response"))
.thenCancel()
.verify();
Expand Down Expand Up @@ -89,7 +89,7 @@ void requestResponseShouldWorkMemberSingleAddress() {

when(transport.requestResponse(addresses.get(0), request)).thenReturn(Mono.just(response));

StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2))
StepVerifier.create(transportWrapper.requestResponse(member, request))
.assertNext(message -> Assertions.assertSame(response, message, "response"))
.thenCancel()
.verify();
Expand All @@ -104,7 +104,7 @@ void requestResponseShouldWorkMemberTwoAddresses() {
.thenReturn(Mono.error(new RuntimeException("Error")));
when(transport.requestResponse(addresses.get(1), request)).thenReturn(Mono.just(response));

StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2))
StepVerifier.create(transportWrapper.requestResponse(member, request))
.assertNext(message -> Assertions.assertSame(response, message, "response"))
.thenCancel()
.verify();
Expand All @@ -122,7 +122,7 @@ void requestResponseShouldWorkMemberThreeAddresses() {
.thenReturn(Mono.error(new RuntimeException("Error")));
when(transport.requestResponse(addresses.get(2), request)).thenReturn(Mono.just(response));

StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2))
StepVerifier.create(transportWrapper.requestResponse(member, request))
.assertNext(message -> Assertions.assertSame(response, message, "response"))
.thenCancel()
.verify();
Expand All @@ -136,7 +136,7 @@ void requestResponseShouldFailMemberSingleAddress() {
when(transport.requestResponse(addresses.get(0), request))
.thenReturn(Mono.error(new RuntimeException("Error")));

StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2))
StepVerifier.create(transportWrapper.requestResponse(member, request))
.verifyErrorSatisfies(
throwable -> Assertions.assertEquals("Error", throwable.getMessage()));
}
Expand All @@ -151,7 +151,7 @@ void requestResponseShouldFailMemberTwoAddresses() {
when(transport.requestResponse(addresses.get(1), request))
.thenReturn(Mono.error(new RuntimeException("Error - 1")));

StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2))
StepVerifier.create(transportWrapper.requestResponse(member, request))
.verifyErrorSatisfies(
throwable -> Assertions.assertEquals("Error - 1", throwable.getMessage()));
}
Expand All @@ -169,7 +169,7 @@ void requestResponseShouldFailMemberThreeAddresses() {
when(transport.requestResponse(addresses.get(2), request))
.thenReturn(Mono.error(new RuntimeException("Error - 2")));

StepVerifier.create(transportWrapper.requestResponse(member, request).retry(2))
StepVerifier.create(transportWrapper.requestResponse(member, request))
.verifyErrorSatisfies(
throwable -> Assertions.assertEquals("Error - 2", throwable.getMessage()));
}
Expand All @@ -185,7 +185,7 @@ void sendShouldWork() {

when(transport.send(addresses.get(0), request)).thenReturn(Mono.empty());

StepVerifier.create(transportWrapper.send(member, request).retry(2)).verifyComplete();
StepVerifier.create(transportWrapper.send(member, request)).verifyComplete();
}

@Test
Expand All @@ -210,7 +210,7 @@ void sendShouldWorkMemberSingleAddress() {

when(transport.send(addresses.get(0), request)).thenReturn(Mono.empty());

StepVerifier.create(transportWrapper.send(member, request).retry(2)).verifyComplete();
StepVerifier.create(transportWrapper.send(member, request)).verifyComplete();
}

@Test
Expand All @@ -222,7 +222,7 @@ void sendShouldWorkMemberTwoAddresses() {
.thenReturn(Mono.error(new RuntimeException("Error")));
when(transport.send(addresses.get(1), request)).thenReturn(Mono.empty());

StepVerifier.create(transportWrapper.send(member, request).retry(2)).verifyComplete();
StepVerifier.create(transportWrapper.send(member, request)).verifyComplete();
}

@Test
Expand All @@ -237,7 +237,7 @@ void sendShouldWorkMemberThreeAddresses() {
.thenReturn(Mono.error(new RuntimeException("Error")));
when(transport.send(addresses.get(2), request)).thenReturn(Mono.empty());

StepVerifier.create(transportWrapper.send(member, request).retry(2)).verifyComplete();
StepVerifier.create(transportWrapper.send(member, request)).verifyComplete();
}

@Test
Expand All @@ -248,7 +248,7 @@ void sendShouldFailMemberSingleAddress() {
when(transport.send(addresses.get(0), request))
.thenReturn(Mono.error(new RuntimeException("Error")));

StepVerifier.create(transportWrapper.send(member, request).retry(2))
StepVerifier.create(transportWrapper.send(member, request))
.verifyErrorSatisfies(
throwable -> Assertions.assertEquals("Error", throwable.getMessage()));
}
Expand All @@ -263,7 +263,7 @@ void sendShouldFailMemberTwoAddresses() {
when(transport.send(addresses.get(1), request))
.thenReturn(Mono.error(new RuntimeException("Error - 1")));

StepVerifier.create(transportWrapper.send(member, request).retry(2))
StepVerifier.create(transportWrapper.send(member, request))
.verifyErrorSatisfies(
throwable -> Assertions.assertEquals("Error - 1", throwable.getMessage()));
}
Expand All @@ -281,7 +281,7 @@ void sendShouldFailMemberThreeAddresses() {
when(transport.send(addresses.get(2), request))
.thenReturn(Mono.error(new RuntimeException("Error - 2")));

StepVerifier.create(transportWrapper.send(member, request).retry(2))
StepVerifier.create(transportWrapper.send(member, request))
.verifyErrorSatisfies(
throwable -> Assertions.assertEquals("Error - 2", throwable.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void testLeaveClusterCameBeforeAlive() {
final Message leavingMessage =
Message.builder()
.qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP)
.sender(anotherMember)
.data(leavingRecord)
.build();

Expand All @@ -131,6 +132,7 @@ public void testLeaveClusterCameBeforeAlive() {
final Message addedMessage =
Message.builder()
.qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP)
.sender(anotherMember)
.data(addedRecord)
.build();

Expand Down Expand Up @@ -165,6 +167,7 @@ public void testLeaveClusterOnly() {
final Message leavingMessage =
Message.builder()
.qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP)
.sender(anotherMember)
.data(leavingRecord)
.build();

Expand Down Expand Up @@ -197,6 +200,7 @@ public void testLeaveClusterOnSuspectedNode() {
final Message suspectMessage =
Message.builder()
.qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP)
.sender(anotherMember)
.data(suspectedNode)
.build();

Expand All @@ -208,6 +212,7 @@ public void testLeaveClusterOnSuspectedNode() {
final Message leavingMessage =
Message.builder()
.qualifier(MembershipProtocolImpl.MEMBERSHIP_GOSSIP)
.sender(anotherMember)
.data(leavingRecord)
.build();

Expand Down

0 comments on commit 93cf26a

Please sign in to comment.