Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Jun 26, 2024
1 parent 9973fd4 commit 0cbac52
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
import io.scalecube.cluster2.sbe.PingRequestDecoder;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ArrayListUtil;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.broadcast.BroadcastTransmitter;
import org.agrona.concurrent.broadcast.CopyBroadcastReceiver;
Expand All @@ -33,8 +33,8 @@ public class FailureDetector extends AbstractAgent {
private final FailureDetectorCodec codec = new FailureDetectorCodec();
private final MemberCodec memberCodec = new MemberCodec();
private final String roleName;
private final List<Member> pingMembers = new ArrayList<>();
private final List<Member> pingReqMembers = new ArrayList<>();
private final ArrayList<Member> pingMembers = new ArrayList<>();
private final ArrayList<Member> pingReqMembers = new ArrayList<>();

public FailureDetector(
Transport transport,
Expand Down Expand Up @@ -111,30 +111,26 @@ private void nextPingReqMembers(Member pingMember) {

for (int i = 0, limit = demand < size ? demand : size - 1; i < limit; ) {
final Member member = nextPingMember();
if (member != pingMember && !pingReqMembers.contains(member)) {
if (!pingMember.equals(member) && !pingReqMembers.contains(member)) {
pingReqMembers.add(member);
i++;
}
}
}

List<Member> pingMembers() {
return pingMembers;
}

List<Member> pingReqMembers() {
return pingReqMembers;
}

private void doPingRequest(Member pingMember) {
for (int i = 0, n = pingReqMembers.size(); i < n; i++) {
for (int n = pingReqMembers.size() - 1, i = n; i >= 0; i--) {
final Member member = pingReqMembers.get(i);
ArrayListUtil.fastUnorderedRemove(pingReqMembers, i);

final long cid = nextCid();

transport.send(
member.address(),
codec.encodePingRequest(cid, localMember, pingMember),
0,
codec.encodedLength());

addCallback(
cid,
config.pingTimeout(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import org.agrona.concurrent.broadcast.CopyBroadcastReceiver;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class FailureDetectorTest {

Expand Down Expand Up @@ -78,8 +76,6 @@ void testOnMembershipEventLocalMemberWillBeFiltered() {
emitMembershipEvent(MembershipEventType.ADDED, localMember);
failureDetector.doWork();

assertEquals(0, failureDetector.pingMembers().size(), "failureDetector.pingMembers");

epochClock.advance(1);
failureDetector.doWork();
verify(transport, never()).send(any(), any(), anyInt(), anyInt());
Expand All @@ -94,13 +90,9 @@ void testOnMembershipEventAddedThenRemoved() {
failureDetector.doWork();
failureDetector.doWork();

assertEquals(1, failureDetector.pingMembers().size(), "failureDetector.pingMembers");

emitMembershipEvent(MembershipEventType.REMOVED, fooMember);
failureDetector.doWork();

assertEquals(0, failureDetector.pingMembers().size(), "failureDetector.pingMembers");

epochClock.advance(1);
failureDetector.doWork();
verify(transport, never()).send(any(), any(), anyInt(), anyInt());
Expand All @@ -115,13 +107,9 @@ void testOnMembershipEventAddedThenLeaving() {
failureDetector.doWork();
failureDetector.doWork();

assertEquals(1, failureDetector.pingMembers().size(), "failureDetector.pingMembers");

emitMembershipEvent(MembershipEventType.LEAVING, fooMember);
failureDetector.doWork();

assertEquals(0, failureDetector.pingMembers().size(), "failureDetector.pingMembers");

epochClock.advance(1);
failureDetector.doWork();
verify(transport, never()).send(any(), any(), anyInt(), anyInt());
Expand Down Expand Up @@ -190,37 +178,6 @@ void testPingThenTimeout() {
});
}

@ParameterizedTest(name = "demand: {0}, size: 4")
@ValueSource(ints = {10, 4, 3, 2, 1})
void testPingRequestMembersLessThenDemand(int demand) {
config = new FailureDetectorConfig().pingReqMembers(demand);
failureDetector =
new FailureDetector(
transport, messageTx, messageRxSupplier, epochClock, config, localMember);

emitMembershipEvent(MembershipEventType.ADDED, fooMember);
emitMembershipEvent(MembershipEventType.ADDED, barMember);
emitMembershipEvent(MembershipEventType.ADDED, aliceMember);
emitMembershipEvent(MembershipEventType.ADDED, bobMember);
failureDetector.doWork();
failureDetector.doWork();
failureDetector.doWork();
failureDetector.doWork();

final int size = failureDetector.pingMembers().size();
assertEquals(4, size, "pingMembers");

epochClock.advance(1);
failureDetector.doWork();
verify(transport).send(any(), any(), anyInt(), anyInt());

epochClock.advance(config.pingTimeout() + 1);
failureDetector.doWork();

final int limit = demand < size ? demand : size - 1;
assertEquals(limit, failureDetector.pingReqMembers().size(), "pingReqMembers");
}

@Test
void testPingThenTimeoutThenPingRequestThenAck() {
emitMembershipEvent(MembershipEventType.ADDED, fooMember);
Expand Down

0 comments on commit 0cbac52

Please sign in to comment.