Skip to content

Commit

Permalink
Fix comments, Improve error handler for updateMembership
Browse files Browse the repository at this point in the history
  • Loading branch information
rostyslav.baldovskyi committed Aug 30, 2023
1 parent b757da6 commit 2a94fc5
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public final class FailureDetectorImpl implements FailureDetector {
private final Transport transport;
private final FailureDetectorConfig config;

private final TransportWrapper transportWrapper;

// State

private final List<Member> pingMembers = new ArrayList<>();
Expand Down Expand Up @@ -84,8 +82,6 @@ public FailureDetectorImpl(
this.config = Objects.requireNonNull(config);
this.scheduler = Objects.requireNonNull(scheduler);

this.transportWrapper = new TransportWrapper(this.transport);

// Subscribe
actionsDisposables.addAll(
Arrays.asList(
Expand Down Expand Up @@ -151,8 +147,7 @@ private void doPing() {

LOGGER.debug("[{}][{}] Send Ping to {}", localMember, period, pingMember);
List<Address> addresses = pingMember.addresses();
transportWrapper
.requestResponse(addresses, pingMsg)
TransportWrapper.requestResponse(transport, addresses, pingMsg)
.timeout(Duration.ofMillis(config.pingTimeout()), scheduler)
.publishOn(scheduler)
.subscribe(
Expand Down Expand Up @@ -194,8 +189,7 @@ private void doPingReq(
Duration timeout = Duration.ofMillis(config.pingInterval() - config.pingTimeout());
pingReqMembers.forEach(
member ->
transportWrapper
.requestResponse(member.addresses(), pingReqMsg)
TransportWrapper.requestResponse(transport, member.addresses(), pingReqMsg)
.timeout(timeout, scheduler)
.publishOn(scheduler)
.subscribe(
Expand Down Expand Up @@ -256,8 +250,7 @@ private void onPing(Message message) {
Message.withData(data).qualifier(PING_ACK).correlationId(correlationId).build();
List<Address> addresses = data.getFrom().addresses();
LOGGER.debug("[{}][{}] Send PingAck to {}", localMember, period, addresses);
transportWrapper
.send(addresses, ackMessage)
TransportWrapper.send(transport, addresses, ackMessage)
.subscribe(
null,
ex ->
Expand All @@ -282,8 +275,7 @@ private void onPingReq(Message message) {
Message.withData(pingReqData).qualifier(PING).correlationId(correlationId).build();
List<Address> addresses = target.addresses();
LOGGER.debug("[{}][{}] Send transit Ping to {}", localMember, period, addresses);
transportWrapper
.send(addresses, pingMessage)
TransportWrapper.send(transport, addresses, pingMessage)
.subscribe(
null,
ex ->
Expand Down Expand Up @@ -312,8 +304,7 @@ private void onTransitPingAck(Message message) {
Message.withData(originalAckData).qualifier(PING_ACK).correlationId(correlationId).build();
List<Address> addresses = target.addresses();
LOGGER.debug("[{}][{}] Resend transit PingAck to {}", localMember, period, addresses);
transportWrapper
.send(addresses, originalAckMessage)
TransportWrapper.send(transport, addresses, originalAckMessage)
.subscribe(
null,
ex ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ private enum MembershipUpdateReason {
private final GossipProtocol gossipProtocol;
private final MetadataStore metadataStore;

private final TransportWrapper transportWrapper;

// State

private final Map<String, MembershipRecord> membershipTable = new HashMap<>();
Expand Down Expand Up @@ -129,8 +127,6 @@ public MembershipProtocolImpl(
this.membershipConfig = Objects.requireNonNull(config).membershipConfig();
this.failureDetectorConfig = Objects.requireNonNull(config).failureDetectorConfig();

this.transportWrapper = new TransportWrapper(this.transport);

// Prepare seeds
seedMembers = cleanUpSeedMembers(membershipConfig.seedMembers());

Expand Down Expand Up @@ -355,8 +351,7 @@ private void doSync() {

Message message = prepareSyncDataMsg(SYNC, null);
LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, addresses);
transportWrapper
.send(addresses, message)
TransportWrapper.send(transport, addresses, message)
.subscribe(
null,
ex ->
Expand Down Expand Up @@ -413,8 +408,7 @@ private Mono<Void> onSync(Message syncMsg) {
.doOnSuccess(
avoid -> {
Message message = prepareSyncDataMsg(SYNC_ACK, syncMsg.correlationId());
transportWrapper
.send(sender, message)
TransportWrapper.send(transport, sender, message)
.subscribe(
null,
ex ->
Expand Down Expand Up @@ -443,8 +437,7 @@ private void onFailureDetectorEvent(FailureDetectorEvent fdEvent) {
// alive with inc + 1
Message syncMsg = prepareSyncDataMsg(SYNC, null);
List<Address> addresses = fdEvent.member().addresses();
transportWrapper
.send(addresses, syncMsg)
TransportWrapper.send(transport, addresses, syncMsg)
.subscribe(
null,
ex ->
Expand Down Expand Up @@ -532,11 +525,11 @@ private Mono<Void> syncMembership(SyncData syncData, boolean onStart) {
updateMembership(r1, reason)
.doOnError(
ex ->
LOGGER.warn(
LOGGER.error(
"[{}][syncMembership][{}][error] cause: {}",
localMember,
reason,
ex.toString()))
ex))
.onErrorResume(ex -> Mono.empty()))
.toArray(Mono[]::new);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ public class MetadataStoreImpl implements MetadataStore {
private final Transport transport;
private final ClusterConfig config;

private final TransportWrapper transportWrapper;

// State

private final Map<Member, ByteBuffer> membersMetadata = new HashMap<>();
Expand Down Expand Up @@ -73,8 +71,6 @@ public MetadataStoreImpl(
this.config = Objects.requireNonNull(config);
this.scheduler = Objects.requireNonNull(scheduler);
this.localMetadata = localMetadata; // optional

this.transportWrapper = new TransportWrapper(this.transport);
}

@Override
Expand Down Expand Up @@ -164,12 +160,9 @@ public Mono<ByteBuffer> fetchMetadata(Member member) {
.data(new GetMetadataRequest(member))
.build();

// TODO. Make transport abstraction around this logic

List<Address> addresses = member.addresses();

return transportWrapper
.requestResponse(addresses, request)
return TransportWrapper.requestResponse(transport, addresses, request)
.timeout(Duration.ofMillis(config.metadataTimeout()), scheduler)
.publishOn(scheduler)
.doOnSuccess(
Expand Down Expand Up @@ -230,8 +223,7 @@ private void onMetadataRequest(Message message) {
.build();

LOGGER.debug("[{}] Send GetMetadataResp to {}", localMember, sender);
transportWrapper
.send(sender, response)
TransportWrapper.send(transport, sender, response)
.subscribe(
null,
ex ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ public List<Address> sender() {
}

return Arrays.stream(headerValue.split(","))
.map(String::trim) // Removes leading and trailing spaces.
.map(Address::from)
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,6 @@

public class TransportWrapper {

private final Transport transport;

public TransportWrapper(Transport transport) {
this.transport = transport;
}

public Mono<Message> requestResponse(List<Address> addresses, Message request) {
return requestResponse(transport, addresses, 0, request);
}

public static Mono<Message> requestResponse(
Transport transport, List<Address> addresses, Message request) {
return requestResponse(transport, addresses, 0, request);
Expand All @@ -24,18 +14,14 @@ public static Mono<Message> requestResponse(
private static Mono<Message> requestResponse(
Transport transport, List<Address> addresses, int currentIndex, Message request) {
if (currentIndex >= addresses.size()) {
return Mono.error(new RuntimeException("All addresses have been tried and failed."));
return Mono.error(new RuntimeException("All addresses have been tried and failed"));
}

return transport
.requestResponse(addresses.get(currentIndex), request)
.onErrorResume(th -> requestResponse(transport, addresses, currentIndex + 1, request));
}

public Mono<Void> send(List<Address> addresses, Message request) {
return send(transport, addresses, 0, request);
}

public static Mono<Void> send(Transport transport, List<Address> addresses, Message request) {
return send(transport, addresses, 0, request);
}
Expand Down

0 comments on commit 2a94fc5

Please sign in to comment.