Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Aug 31, 2023
1 parent 5ded407 commit b0f3155
Show file tree
Hide file tree
Showing 3 changed files with 506 additions and 511 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.util.Collections;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -51,7 +50,7 @@ public boolean isStopped() {
public Mono<Void> send(Address address, Message message) {
return Mono.defer(
() ->
Mono.just(enhanceWithSender(message))
Mono.just(Message.with(message).build())
.flatMap(msg -> networkEmulator.tryFailOutbound(msg, address))
.flatMap(msg -> networkEmulator.tryDelayOutbound(msg, address))
.flatMap(msg -> transport.send(address, msg)));
Expand All @@ -61,7 +60,7 @@ public Mono<Void> send(Address address, Message message) {
public Mono<Message> requestResponse(Address address, Message request) {
return Mono.defer(
() ->
Mono.just(enhanceWithSender(request))
Mono.just(Message.with(request).build())
.flatMap(msg -> networkEmulator.tryFailOutbound(msg, address))
.flatMap(msg -> networkEmulator.tryDelayOutbound(msg, address))
.flatMap(
Expand All @@ -85,8 +84,4 @@ public Flux<Message> listen() {
.filter(message -> networkEmulator.inboundSettings((Member) message.sender()).shallPass())
.onBackpressureBuffer();
}

private Message enhanceWithSender(Message message) {
return Message.with(message).sender(Collections.singletonList(transport.address())).build();
}
}
Loading

0 comments on commit b0f3155

Please sign in to comment.