Skip to content

Commit

Permalink
Added exposeAddress TransportConfig setting
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Sep 2, 2023
1 parent c540c88 commit 77e8552
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

@SuppressWarnings({"FieldCanBeLocal", "unused"})
public final class FailureDetectorImpl implements FailureDetector {

private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetector.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

@SuppressWarnings({"FieldCanBeLocal", "unused"})
public final class GossipProtocolImpl implements GossipProtocol {

private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocol.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ private void doSync() {

Message message = prepareSyncDataMsg(SYNC, null);
LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, addresses);
send(transport, addresses, message)
send(transport, addresses, 0, message)
.subscribe(
null,
ex ->
Expand Down Expand Up @@ -889,4 +889,15 @@ private Mono<Void> spreadMembershipGossip(MembershipRecord record) {
.then();
});
}

private static Mono<Void> send(
Transport transport, List<Address> addresses, int currentIndex, Message request) {
if (currentIndex >= addresses.size()) {
return Mono.error(new RuntimeException("All addresses have been tried and failed"));
}

return transport
.send(addresses.get(currentIndex), request)
.onErrorResume(th -> send(transport, addresses, currentIndex + 1, request));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testMessageDelayMoreThanGossipSweepTime() throws InterruptedExceptio
gossipProtocol3.listen().subscribe(message -> protocol3GossipCounter.incrementAndGet());

for (int i = 0; i < 3; i++) {
final Member member = gossipProtocol1.getMember();
final Member member = BaseTest.getField(gossipProtocol1, "localMember");
gossipProtocol1
.spread(Message.builder().sender(member).data("message: " + i).build())
.subscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E
// Spread gossip, measure and verify delivery metrics
long start = System.currentTimeMillis();
final GossipProtocolImpl gossipProtocol = gossipProtocols.get(0);
final Member member = gossipProtocol.getMember();
final Member member = BaseTest.getField(gossipProtocol, "localMember");
gossipProtocol.spread(Message.builder().sender(member).data(gossipData).build()).subscribe();
latch.await(2 * gossipTimeout, TimeUnit.MILLISECONDS); // Await for double gossip timeout
disseminationTime = System.currentTimeMillis() - start;
Expand Down

0 comments on commit 77e8552

Please sign in to comment.