Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Aug 31, 2023
1 parent c38a23a commit 95ea168
Showing 1 changed file with 44 additions and 16 deletions.
60 changes: 44 additions & 16 deletions cluster/src/main/java/io/scalecube/cluster/TransportWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,49 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.publisher.Mono;

public class TransportWrapper {

private final Transport transport;

private final Map<Member, Address> connections = new ConcurrentHashMap<>();
private final Map<Member, Mono<Result>> connections = new ConcurrentHashMap<>();

public TransportWrapper(Transport transport) {
this.transport = transport;
}

public Mono<Message> requestResponse(Member member, Message request) {
// return requestResponse(transport, addresses, 0, request);
connections.computeIfAbsent(member, m -> requestResponse0(m, request));
return connections
.compute(
member,
(m, resultMono) -> {
if (resultMono == null) {
return requestResponse(member.addresses(), request);
}
return resultMono.flatMap(
result ->
transport
.requestResponse(result.address, request)
.map(message -> new Result(result.address, message)));
})
.map(result -> result.message);
}

private Address requestResponse0(Member member, Message request) {
return null;
}

private Mono<Message> requestResponse(
List<Address> addresses, int currentIndex, Message request) {
if (currentIndex >= addresses.size()) {
return Mono.error(new RuntimeException("All addresses have been tried and failed"));
}

return transport
.requestResponse(addresses.get(currentIndex), request)
.onErrorResume(th -> requestResponse(addresses, currentIndex + 1, request));
private Mono<Result> requestResponse(List<Address> addresses, Message request) {
final AtomicInteger currentIndex = new AtomicInteger();
return Mono.defer(
() -> {
final int index = currentIndex.getAndIncrement();
return transport.requestResponse(addresses.get(index), request);
})
.retry(addresses.size() - 1)
.map(
message -> {
final int index = currentIndex.get();
return new Result(addresses.get(index), message);
});
}

public static Mono<Void> send(Transport transport, List<Address> addresses, Message request) {
Expand All @@ -52,4 +65,19 @@ private static Mono<Void> send(
.send(addresses.get(currentIndex), request)
.onErrorResume(th -> send(transport, addresses, currentIndex + 1, request));
}

private static class Result {

private final Address address;
private final Message message;

private Result(Address address) {
this(address, null);
}

private Result(Address address, Message message) {
this.address = address;
this.message = message;
}
}
}

0 comments on commit 95ea168

Please sign in to comment.