From d9f720fcd5bce37eb56999080d1e71a49b052b9a Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Sat, 2 Sep 2023 12:08:14 +0300 Subject: [PATCH] Tests cleanup --- .../cluster/membership/MembershipConfig.java | 18 ---- .../fdetector/FailureDetectorImpl.java | 9 -- .../cluster/gossip/GossipProtocolImpl.java | 18 ---- .../membership/MembershipProtocolImpl.java | 93 +------------------ .../java/io/scalecube/cluster/BaseTest.java | 22 ++++- .../fdetector/FailureDetectorTest.java | 18 ++-- .../cluster/gossip/GossipProtocolTest.java | 21 +++-- .../membership/MembershipProtocolTest.java | 68 +++++++++----- 8 files changed, 85 insertions(+), 182 deletions(-) diff --git a/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java b/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java index 7ff9397f..01b8114c 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java @@ -28,7 +28,6 @@ public final class MembershipConfig implements Cloneable { private int syncInterval = DEFAULT_SYNC_INTERVAL; private int syncTimeout = DEFAULT_SYNC_TIMEOUT; private int suspicionMult = DEFAULT_SUSPICION_MULT; - private int removedMembersHistorySize = 42; private String namespace = "default"; public MembershipConfig() {} @@ -158,22 +157,6 @@ public MembershipConfig namespace(String namespace) { return m; } - public int removedMembersHistorySize() { - return removedMembersHistorySize; - } - - /** - * Setter for {@code removedMembersHistorySize}. - * - * @param removedMembersHistorySize history size for remove members - * @return new {@code MembershipConfig} instance - */ - public MembershipConfig removedMembersHistorySize(int removedMembersHistorySize) { - MembershipConfig m = clone(); - m.removedMembersHistorySize = removedMembersHistorySize; - return m; - } - @Override public MembershipConfig clone() { try { @@ -191,7 +174,6 @@ public String toString() { .add("syncTimeout=" + syncTimeout) .add("suspicionMult=" + suspicionMult) .add("namespace='" + namespace + "'") - .add("removedMembersHistorySize=" + removedMembersHistorySize) .toString(); } } diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index ac270c4e..c7540bc0 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -417,13 +417,4 @@ private boolean isTransitPingAck(Message message) { return PING_ACK.equals(message.qualifier()) && message.data().getOriginalIssuer() != null; } - - /** - * NOTE: this method is for testing purpose only. - * - * @return transport - */ - Transport getTransport() { - return transport; - } } diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index 2f24beb8..1acfd060 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -365,22 +365,4 @@ private Set getGossipsThatMostLikelyDisseminated(long period) { .map(gossipState -> gossipState.gossip().gossipId()) .collect(Collectors.toSet()); } - - /** - * NOTE: this method is for testing purpose only. - * - * @return transport - */ - Transport getTransport() { - return transport; - } - - /** - * NOTE: this method is for testing purpose only. - * - * @return local member - */ - Member getMember() { - return localMember; - } } diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index 53fed376..fad3bca3 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -34,10 +34,8 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; @@ -50,6 +48,7 @@ import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; +@SuppressWarnings({"FieldCanBeLocal", "unused"}) public final class MembershipProtocolImpl implements MembershipProtocol { private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocol.class); @@ -84,7 +83,6 @@ private enum MembershipUpdateReason { private final Map membershipTable = new HashMap<>(); private final Map members = new HashMap<>(); - private final List removedMembersHistory = new CopyOnWriteArrayList<>(); private final Set aliveEmittedSet = new HashSet<>(); // Subject @@ -157,10 +155,8 @@ public MembershipProtocolImpl( .publishOn(scheduler) .subscribe( this::onMembershipGossip, - ex -> LOGGER.error("[{}][onMembershipGossip][error] cause:", localMember, ex)), - listen() // Listen removed members for monitoring - .filter(MembershipEvent::isRemoved) - .subscribe(this::onMemberRemoved))); + ex -> + LOGGER.error("[{}][onMembershipGossip][error] cause:", localMember, ex)))); } // Remove duplicates and local address(es) @@ -863,87 +859,4 @@ private Mono spreadMembershipGossip(MembershipRecord r) { .then(); }); } - - /** - * NOTE: this method is for testing purpose only. - * - * @return failure detector - */ - FailureDetector getFailureDetector() { - return failureDetector; - } - - /** - * NOTE: this method is for testing purpose only. - * - * @return gossip - */ - GossipProtocol getGossipProtocol() { - return gossipProtocol; - } - - /** - * NOTE: this method is for testing purpose only. - * - * @return transport - */ - Transport getTransport() { - return transport; - } - - /** - * NOTE: this method is for testing purpose only. - * - * @return metadataStore - */ - MetadataStore getMetadataStore() { - return metadataStore; - } - - /** - * NOTE: this method is for testing purpose only. - * - * @return transport - */ - List getMembershipRecords() { - return Collections.unmodifiableList(new ArrayList<>(membershipTable.values())); - } - - // =============================================================== - // ============== Helper Methods for Monitoring ================== - // =============================================================== - - private int getIncarnation() { - return membershipTable.get(localMember.id()).incarnation(); - } - - private List getAliveMembers() { - return findRecordsByCondition(MembershipRecord::isAlive); - } - - private List getSuspectedMembers() { - return findRecordsByCondition(MembershipRecord::isSuspect); - } - - private List getRemovedMembers() { - return removedMembersHistory.stream().map(MembershipEvent::member).collect(Collectors.toList()); - } - - private List findRecordsByCondition(Predicate condition) { - return getMembershipRecords().stream() - .filter(condition) - .map(MembershipRecord::member) - .collect(Collectors.toList()); - } - - private void onMemberRemoved(MembershipEvent event) { - int s = membershipConfig.removedMembersHistorySize(); - if (s <= 0) { - return; - } - removedMembersHistory.add(event); - if (removedMembersHistory.size() > s) { - removedMembersHistory.remove(0); - } - } } diff --git a/cluster/src/test/java/io/scalecube/cluster/BaseTest.java b/cluster/src/test/java/io/scalecube/cluster/BaseTest.java index 673928a6..6ba2895d 100644 --- a/cluster/src/test/java/io/scalecube/cluster/BaseTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/BaseTest.java @@ -6,6 +6,7 @@ import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.utils.NetworkEmulatorTransport; import io.scalecube.transport.netty.tcp.TcpTransportFactory; +import java.lang.reflect.Field; import java.time.Duration; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterEach; @@ -30,7 +31,18 @@ public final void baseTearDown(TestInfo testInfo) { LOGGER.info("***** Test finished : " + testInfo.getDisplayName() + " *****"); } - protected void awaitSeconds(long seconds) { + public static T getField(Object obj, String fieldName) { + try { + final Field field = obj.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + //noinspection unchecked + return (T) field.get(obj); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + public static void awaitSeconds(long seconds) { try { TimeUnit.SECONDS.sleep(seconds); } catch (InterruptedException e) { @@ -38,7 +50,7 @@ protected void awaitSeconds(long seconds) { } } - protected void awaitSuspicion(int clusterSize) { + public static void awaitSuspicion(int clusterSize) { int defaultSuspicionMult = MembershipConfig.DEFAULT_SUSPICION_MULT; int pingInterval = MembershipProtocolTest.PING_INTERVAL; long suspicionTimeoutSec = @@ -46,16 +58,16 @@ protected void awaitSuspicion(int clusterSize) { awaitSeconds(suspicionTimeoutSec + 2); } - protected NetworkEmulatorTransport createTransport() { + public static NetworkEmulatorTransport createTransport() { return createTransport(TransportConfig.defaultConfig()); } - protected NetworkEmulatorTransport createTransport(TransportConfig transportConfig) { + public static NetworkEmulatorTransport createTransport(TransportConfig transportConfig) { return new NetworkEmulatorTransport( Transport.bindAwait(transportConfig.transportFactory(new TcpTransportFactory()))); } - protected void destroyTransport(Transport transport) { + public static void destroyTransport(Transport transport) { if (transport == null || transport.isStopped()) { return; } diff --git a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java index 316f814e..3c2241d8 100644 --- a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java @@ -421,22 +421,22 @@ private FailureDetectorImpl createFd( return new FailureDetectorImpl(localMember, transport, membershipFlux, config, scheduler); } - private void start(List fdetectors) { + private static void start(List fdetectors) { for (FailureDetectorImpl fd : fdetectors) { fd.start(); } } - private void stop(List fdetectors) { + private static void stop(List fdetectors) { for (FailureDetectorImpl fd : fdetectors) { fd.stop(); } for (FailureDetectorImpl fd : fdetectors) { - destroyTransport(fd.getTransport()); + destroyTransport(BaseTest.getField(fd, "transport")); } } - private void assertStatus( + private static void assertStatus( Address address, MemberStatus status, Collection events, @@ -461,10 +461,11 @@ private void assertStatus( } } - private Future> listenNextEventFor( + private static Future> listenNextEventFor( FailureDetectorImpl fd, List
addresses) { + final Transport transport = BaseTest.getField(fd, "transport"); addresses = new ArrayList<>(addresses); - addresses.remove(fd.getTransport().address()); // exclude self + addresses.remove(transport.address()); // exclude self if (addresses.isEmpty()) { throw new IllegalArgumentException(); } @@ -479,7 +480,8 @@ private Future> listenNextEventFor( return allOf(resultFuture); } - private Collection awaitEvents(Future> events) { + private static Collection awaitEvents( + Future> events) { try { return events.get(10, TimeUnit.SECONDS); } catch (Exception e) { @@ -487,7 +489,7 @@ private Collection awaitEvents(Future CompletableFuture> allOf(List> futuresList) { + private static CompletableFuture> allOf(List> futuresList) { CompletableFuture allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0])); return allFuturesResult.thenApply( diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java index 384c0c23..ae82bca2 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java @@ -125,19 +125,20 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E final CountDownLatch latch = new CountDownLatch(membersNum - 1); final Map receivers = new ConcurrentHashMap<>(); final AtomicBoolean doubleDelivery = new AtomicBoolean(false); - for (final GossipProtocolImpl protocol : gossipProtocols) { - protocol + for (final GossipProtocolImpl gossipProtocol : gossipProtocols) { + gossipProtocol .listen() .subscribe( gossip -> { + final Member localMember = BaseTest.getField(gossipProtocol, "localMember"); + final Transport transport = BaseTest.getField(gossipProtocol, "transport"); + if (gossipData.equals(gossip.data())) { - boolean firstTimeAdded = - receivers.put(protocol.getMember(), protocol.getMember()) == null; + boolean firstTimeAdded = receivers.put(localMember, localMember) == null; if (firstTimeAdded) { latch.countDown(); } else { - LOGGER.error( - "Delivered gossip twice to: {}", protocol.getTransport().address()); + LOGGER.error("Delivered gossip twice to: {}", transport.address()); doubleDelivery.set(true); } } @@ -212,7 +213,7 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E private LongSummaryStatistics computeMessageSentStats(List gossipProtocols) { List messageSentPerNode = new ArrayList<>(gossipProtocols.size()); for (GossipProtocolImpl gossipProtocol : gossipProtocols) { - NetworkEmulatorTransport transport = (NetworkEmulatorTransport) gossipProtocol.getTransport(); + final NetworkEmulatorTransport transport = BaseTest.getField(gossipProtocol, "transport"); messageSentPerNode.add(transport.networkEmulator().totalMessageSentCount()); } return messageSentPerNode.stream().mapToLong(v -> v).summaryStatistics(); @@ -221,7 +222,7 @@ private LongSummaryStatistics computeMessageSentStats(List g private LongSummaryStatistics computeMessageLostStats(List gossipProtocols) { List messageLostPerNode = new ArrayList<>(gossipProtocols.size()); for (GossipProtocolImpl gossipProtocol : gossipProtocols) { - NetworkEmulatorTransport transport = (NetworkEmulatorTransport) gossipProtocol.getTransport(); + final NetworkEmulatorTransport transport = BaseTest.getField(gossipProtocol, "transport"); messageLostPerNode.add(transport.networkEmulator().totalOutboundMessageLostCount()); } return messageLostPerNode.stream().mapToLong(v -> v).summaryStatistics(); @@ -272,7 +273,7 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List
return gossipProtocol; } - private void destroyGossipProtocols(List gossipProtocols) { + private static void destroyGossipProtocols(List gossipProtocols) { // Stop all gossip protocols for (GossipProtocolImpl gossipProtocol : gossipProtocols) { gossipProtocol.stop(); @@ -281,7 +282,7 @@ private void destroyGossipProtocols(List gossipProtocols) { // Stop all transports List> futures = new ArrayList<>(); for (GossipProtocolImpl gossipProtocol : gossipProtocols) { - futures.add(gossipProtocol.getTransport().stop()); + futures.add(BaseTest.getField(gossipProtocol, "transport").stop()); } try { diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index ef559256..4fced8e2 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -6,9 +6,12 @@ import io.scalecube.cluster.BaseTest; import io.scalecube.cluster.ClusterConfig; import io.scalecube.cluster.Member; +import io.scalecube.cluster.fdetector.FailureDetector; import io.scalecube.cluster.fdetector.FailureDetectorImpl; +import io.scalecube.cluster.gossip.GossipProtocol; import io.scalecube.cluster.gossip.GossipProtocolImpl; import io.scalecube.cluster.membership.MembershipEvent.Type; +import io.scalecube.cluster.metadata.MetadataStore; import io.scalecube.cluster.metadata.MetadataStoreImpl; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; @@ -23,6 +26,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -125,7 +129,9 @@ public void testLeaveClusterCameBeforeAlive() { .data(leavingRecord) .build(); - cmB.getGossipProtocol().spread(leavingMessage).block(TIMEOUT); + final GossipProtocol gossipProtocol = getField(cmB, "gossipProtocol"); + + gossipProtocol.spread(leavingMessage).block(TIMEOUT); final MembershipRecord addedRecord = new MembershipRecord(anotherMember, MemberStatus.ALIVE, 4); final Message addedMessage = @@ -134,7 +140,7 @@ public void testLeaveClusterCameBeforeAlive() { .data(addedRecord) .build(); - cmB.getGossipProtocol().spread(addedMessage).block(TIMEOUT); + gossipProtocol.spread(addedMessage).block(TIMEOUT); awaitSeconds(1); awaitSuspicion(3); @@ -168,7 +174,9 @@ public void testLeaveClusterOnly() { .data(leavingRecord) .build(); - cmB.getGossipProtocol().spread(leavingMessage).block(TIMEOUT); + final GossipProtocol gossipProtocol = getField(cmB, "gossipProtocol"); + + gossipProtocol.spread(leavingMessage).block(TIMEOUT); awaitSeconds(1); awaitSuspicion(3); @@ -200,7 +208,10 @@ public void testLeaveClusterOnSuspectedNode() { .data(suspectedNode) .build(); - cmB.getGossipProtocol().spread(suspectMessage).block(TIMEOUT); + final GossipProtocol gossipProtocol = getField(cmB, "gossipProtocol"); + + gossipProtocol.spread(suspectMessage).block(TIMEOUT); + awaitSeconds(3); final MembershipRecord leavingRecord = @@ -211,7 +222,8 @@ public void testLeaveClusterOnSuspectedNode() { .data(leavingRecord) .build(); - cmB.getGossipProtocol().spread(leavingMessage).block(TIMEOUT); + gossipProtocol.spread(leavingMessage).block(TIMEOUT); + awaitSeconds(2); awaitSuspicion(3); @@ -1105,7 +1117,7 @@ public void testNetworkPartitionManyDueNoInboundThenRemovedThenRecover() { } } - private ClusterConfig testConfig(List
seedAddresses) { + private static ClusterConfig testConfig(List
seedAddresses) { // Create faster config for local testing return new ClusterConfig() .membership(opts -> opts.seedMembers(seedAddresses)) @@ -1176,33 +1188,34 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf return membership; } - private String newMemberId() { + private static String newMemberId() { return Long.toHexString(UUID.randomUUID().getMostSignificantBits() & Long.MAX_VALUE); } - private void stopAll(MembershipProtocolImpl... memberships) { + private static void stopAll(MembershipProtocolImpl... memberships) { for (MembershipProtocolImpl membership : memberships) { stop(membership); } } - private void stop(MembershipProtocolImpl membership) { + private static void stop(MembershipProtocolImpl membership) { if (membership == null) { return; } - membership.getMetadataStore().stop(); + + BaseTest.getField(membership, "metadataStore").stop(); membership.stop(); - membership.getGossipProtocol().stop(); - membership.getFailureDetector().stop(); - membership.getTransport().stop().block(); + BaseTest.getField(membership, "gossipProtocol").stop(); + BaseTest.getField(membership, "failureDetector").stop(); + BaseTest.getField(membership, "transport").stop().block(); } - private Mono awaitUntil(Runnable assertAction) { + private static Mono awaitUntil(Runnable assertAction) { return Mono.fromRunnable(assertAction) .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))); } - private void assertTrusted(MembershipProtocolImpl membership, Member... expected) { + private static void assertTrusted(MembershipProtocolImpl membership, Member... expected) { List actual = membersByStatus(membership, MemberStatus.ALIVE); List expectedList = new ArrayList<>(Arrays.asList(expected)); expectedList.add(membership.member()); // add local since he always trusted (alive) @@ -1221,7 +1234,7 @@ private void assertTrusted(MembershipProtocolImpl membership, Member... expected } } - private void assertSuspected(MembershipProtocolImpl membership, Member... expected) { + private static void assertSuspected(MembershipProtocolImpl membership, Member... expected) { List actual = membersByStatus(membership, MemberStatus.SUSPECT); assertEquals( expected.length, @@ -1238,7 +1251,7 @@ private void assertSuspected(MembershipProtocolImpl membership, Member... expect } } - private void assertRemoved(Sinks.Many recording, Member... expected) { + private static void assertRemoved(Sinks.Many recording, Member... expected) { List actual = new ArrayList<>(); recording.asFlux().map(MembershipEvent::member).subscribe(actual::add); assertEquals( @@ -1256,29 +1269,36 @@ private void assertRemoved(Sinks.Many recording, Member... expe } } - private void assertSelfTrusted(MembershipProtocolImpl membership) { + private static void assertSelfTrusted(MembershipProtocolImpl membership) { assertTrusted(membership); } - private void assertMemberAndType( + private static void assertMemberAndType( MembershipEvent membershipEvent, String expectedMemberId, MembershipEvent.Type expectedType) { - assertEquals(expectedMemberId, membershipEvent.member().id()); assertEquals(expectedType, membershipEvent.type()); } - private void assertNoSuspected(MembershipProtocolImpl membership) { + private static void assertNoSuspected(MembershipProtocolImpl membership) { assertSuspected(membership); } - private List membersByStatus(MembershipProtocolImpl membership, MemberStatus status) { - return membership.getMembershipRecords().stream() + private static List membersByStatus( + MembershipProtocolImpl membership, MemberStatus status) { + final Map membershipTable = + BaseTest.getField(membership, "membershipTable"); + + final List membershipRecords = + Collections.unmodifiableList(new ArrayList<>(membershipTable.values())); + + return membershipRecords.stream() .filter(member -> member.status() == status) .map(MembershipRecord::member) .collect(Collectors.toList()); } - private Sinks.Many startRecordingRemoved(MembershipProtocolImpl membership) { + private static Sinks.Many startRecordingRemoved( + MembershipProtocolImpl membership) { Sinks.Many recording = Sinks.many().replay().all(); membership .listen()