Skip to content

Commit

Permalink
Done with TransportWrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Aug 31, 2023
1 parent 95ea168 commit 76d1576
Showing 1 changed file with 31 additions and 13 deletions.
44 changes: 31 additions & 13 deletions cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ public Mono<Message> requestResponse(Member member, Message request) {
.map(result -> result.message);
}

public Mono<Void> send(Member member, Message request) {
return connections
.compute(
member,
(m, resultMono) -> {
if (resultMono == null) {
return send(member.addresses(), request);
}
return resultMono.flatMap(
result ->
transport
.send(result.address, request)
.thenReturn(new Result(result.address)));
})
.then();
}

private Mono<Result> requestResponse(List<Address> addresses, Message request) {
final AtomicInteger currentIndex = new AtomicInteger();
return Mono.defer(
Expand All @@ -51,19 +68,20 @@ private Mono<Result> requestResponse(List<Address> addresses, Message request) {
});
}

public static Mono<Void> send(Transport transport, List<Address> addresses, Message request) {
return send(transport, addresses, 0, request);
}

private static Mono<Void> send(
Transport transport, List<Address> addresses, int currentIndex, Message request) {
if (currentIndex >= addresses.size()) {
return Mono.error(new RuntimeException("All addresses have been tried and failed."));
}

return transport
.send(addresses.get(currentIndex), request)
.onErrorResume(th -> send(transport, addresses, currentIndex + 1, request));
private Mono<Result> send(List<Address> addresses, Message request) {
final AtomicInteger currentIndex = new AtomicInteger();
return Mono.defer(
() -> {
final int index = currentIndex.getAndIncrement();
return transport.send(addresses.get(index), request);
})
.retry(addresses.size() - 1)
.then(
Mono.fromCallable(
() -> {
final int index = currentIndex.get();
return new Result(addresses.get(index));
}));
}

private static class Result {
Expand Down

0 comments on commit 76d1576

Please sign in to comment.