Skip to content

Commit

Permalink
Merge pull request #389 from scalecube/remove_removedMembersHistory
Browse files Browse the repository at this point in the history
Tests cleanup
  • Loading branch information
artem-v committed Sep 2, 2023
2 parents 036fd61 + d9f720f commit 71fe298
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public final class MembershipConfig implements Cloneable {
private int syncInterval = DEFAULT_SYNC_INTERVAL;
private int syncTimeout = DEFAULT_SYNC_TIMEOUT;
private int suspicionMult = DEFAULT_SUSPICION_MULT;
private int removedMembersHistorySize = 42;
private String namespace = "default";

public MembershipConfig() {}
Expand Down Expand Up @@ -158,22 +157,6 @@ public MembershipConfig namespace(String namespace) {
return m;
}

public int removedMembersHistorySize() {
return removedMembersHistorySize;
}

/**
* Setter for {@code removedMembersHistorySize}.
*
* @param removedMembersHistorySize history size for remove members
* @return new {@code MembershipConfig} instance
*/
public MembershipConfig removedMembersHistorySize(int removedMembersHistorySize) {
MembershipConfig m = clone();
m.removedMembersHistorySize = removedMembersHistorySize;
return m;
}

@Override
public MembershipConfig clone() {
try {
Expand All @@ -191,7 +174,6 @@ public String toString() {
.add("syncTimeout=" + syncTimeout)
.add("suspicionMult=" + suspicionMult)
.add("namespace='" + namespace + "'")
.add("removedMembersHistorySize=" + removedMembersHistorySize)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,4 @@ private boolean isTransitPingAck(Message message) {
return PING_ACK.equals(message.qualifier())
&& message.<PingData>data().getOriginalIssuer() != null;
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return transport
*/
Transport getTransport() {
return transport;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,22 +365,4 @@ private Set<String> getGossipsThatMostLikelyDisseminated(long period) {
.map(gossipState -> gossipState.gossip().gossipId())
.collect(Collectors.toSet());
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return transport
*/
Transport getTransport() {
return transport;
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return local member
*/
Member getMember() {
return localMember;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand All @@ -50,6 +48,7 @@
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

@SuppressWarnings({"FieldCanBeLocal", "unused"})
public final class MembershipProtocolImpl implements MembershipProtocol {

private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocol.class);
Expand Down Expand Up @@ -84,7 +83,6 @@ private enum MembershipUpdateReason {

private final Map<String, MembershipRecord> membershipTable = new HashMap<>();
private final Map<String, Member> members = new HashMap<>();
private final List<MembershipEvent> removedMembersHistory = new CopyOnWriteArrayList<>();
private final Set<String> aliveEmittedSet = new HashSet<>();

// Subject
Expand Down Expand Up @@ -157,10 +155,8 @@ public MembershipProtocolImpl(
.publishOn(scheduler)
.subscribe(
this::onMembershipGossip,
ex -> LOGGER.error("[{}][onMembershipGossip][error] cause:", localMember, ex)),
listen() // Listen removed members for monitoring
.filter(MembershipEvent::isRemoved)
.subscribe(this::onMemberRemoved)));
ex ->
LOGGER.error("[{}][onMembershipGossip][error] cause:", localMember, ex))));
}

// Remove duplicates and local address(es)
Expand Down Expand Up @@ -863,87 +859,4 @@ private Mono<Void> spreadMembershipGossip(MembershipRecord r) {
.then();
});
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return failure detector
*/
FailureDetector getFailureDetector() {
return failureDetector;
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return gossip
*/
GossipProtocol getGossipProtocol() {
return gossipProtocol;
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return transport
*/
Transport getTransport() {
return transport;
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return metadataStore
*/
MetadataStore getMetadataStore() {
return metadataStore;
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return transport
*/
List<MembershipRecord> getMembershipRecords() {
return Collections.unmodifiableList(new ArrayList<>(membershipTable.values()));
}

// ===============================================================
// ============== Helper Methods for Monitoring ==================
// ===============================================================

private int getIncarnation() {
return membershipTable.get(localMember.id()).incarnation();
}

private List<Member> getAliveMembers() {
return findRecordsByCondition(MembershipRecord::isAlive);
}

private List<Member> getSuspectedMembers() {
return findRecordsByCondition(MembershipRecord::isSuspect);
}

private List<Member> getRemovedMembers() {
return removedMembersHistory.stream().map(MembershipEvent::member).collect(Collectors.toList());
}

private List<Member> findRecordsByCondition(Predicate<MembershipRecord> condition) {
return getMembershipRecords().stream()
.filter(condition)
.map(MembershipRecord::member)
.collect(Collectors.toList());
}

private void onMemberRemoved(MembershipEvent event) {
int s = membershipConfig.removedMembersHistorySize();
if (s <= 0) {
return;
}
removedMembersHistory.add(event);
if (removedMembersHistory.size() > s) {
removedMembersHistory.remove(0);
}
}
}
22 changes: 17 additions & 5 deletions cluster/src/test/java/io/scalecube/cluster/BaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.cluster.utils.NetworkEmulatorTransport;
import io.scalecube.transport.netty.tcp.TcpTransportFactory;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -30,32 +31,43 @@ public final void baseTearDown(TestInfo testInfo) {
LOGGER.info("***** Test finished : " + testInfo.getDisplayName() + " *****");
}

protected void awaitSeconds(long seconds) {
public static <T> T getField(Object obj, String fieldName) {
try {
final Field field = obj.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
//noinspection unchecked
return (T) field.get(obj);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

public static void awaitSeconds(long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
}

protected void awaitSuspicion(int clusterSize) {
public static void awaitSuspicion(int clusterSize) {
int defaultSuspicionMult = MembershipConfig.DEFAULT_SUSPICION_MULT;
int pingInterval = MembershipProtocolTest.PING_INTERVAL;
long suspicionTimeoutSec =
ClusterMath.suspicionTimeout(defaultSuspicionMult, clusterSize, pingInterval) / 1000;
awaitSeconds(suspicionTimeoutSec + 2);
}

protected NetworkEmulatorTransport createTransport() {
public static NetworkEmulatorTransport createTransport() {
return createTransport(TransportConfig.defaultConfig());
}

protected NetworkEmulatorTransport createTransport(TransportConfig transportConfig) {
public static NetworkEmulatorTransport createTransport(TransportConfig transportConfig) {
return new NetworkEmulatorTransport(
Transport.bindAwait(transportConfig.transportFactory(new TcpTransportFactory())));
}

protected void destroyTransport(Transport transport) {
public static void destroyTransport(Transport transport) {
if (transport == null || transport.isStopped()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,22 +421,22 @@ private FailureDetectorImpl createFd(
return new FailureDetectorImpl(localMember, transport, membershipFlux, config, scheduler);
}

private void start(List<FailureDetectorImpl> fdetectors) {
private static void start(List<FailureDetectorImpl> fdetectors) {
for (FailureDetectorImpl fd : fdetectors) {
fd.start();
}
}

private void stop(List<FailureDetectorImpl> fdetectors) {
private static void stop(List<FailureDetectorImpl> fdetectors) {
for (FailureDetectorImpl fd : fdetectors) {
fd.stop();
}
for (FailureDetectorImpl fd : fdetectors) {
destroyTransport(fd.getTransport());
destroyTransport(BaseTest.getField(fd, "transport"));
}
}

private void assertStatus(
private static void assertStatus(
Address address,
MemberStatus status,
Collection<FailureDetectorEvent> events,
Expand All @@ -461,10 +461,11 @@ private void assertStatus(
}
}

private Future<List<FailureDetectorEvent>> listenNextEventFor(
private static Future<List<FailureDetectorEvent>> listenNextEventFor(
FailureDetectorImpl fd, List<Address> addresses) {
final Transport transport = BaseTest.getField(fd, "transport");
addresses = new ArrayList<>(addresses);
addresses.remove(fd.getTransport().address()); // exclude self
addresses.remove(transport.address()); // exclude self
if (addresses.isEmpty()) {
throw new IllegalArgumentException();
}
Expand All @@ -479,15 +480,16 @@ private Future<List<FailureDetectorEvent>> listenNextEventFor(
return allOf(resultFuture);
}

private Collection<FailureDetectorEvent> awaitEvents(Future<List<FailureDetectorEvent>> events) {
private static Collection<FailureDetectorEvent> awaitEvents(
Future<List<FailureDetectorEvent>> events) {
try {
return events.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
private static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
CompletableFuture<Void> allFuturesResult =
CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
return allFuturesResult.thenApply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,20 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E
final CountDownLatch latch = new CountDownLatch(membersNum - 1);
final Map<Member, Member> receivers = new ConcurrentHashMap<>();
final AtomicBoolean doubleDelivery = new AtomicBoolean(false);
for (final GossipProtocolImpl protocol : gossipProtocols) {
protocol
for (final GossipProtocolImpl gossipProtocol : gossipProtocols) {
gossipProtocol
.listen()
.subscribe(
gossip -> {
final Member localMember = BaseTest.getField(gossipProtocol, "localMember");
final Transport transport = BaseTest.getField(gossipProtocol, "transport");

if (gossipData.equals(gossip.data())) {
boolean firstTimeAdded =
receivers.put(protocol.getMember(), protocol.getMember()) == null;
boolean firstTimeAdded = receivers.put(localMember, localMember) == null;
if (firstTimeAdded) {
latch.countDown();
} else {
LOGGER.error(
"Delivered gossip twice to: {}", protocol.getTransport().address());
LOGGER.error("Delivered gossip twice to: {}", transport.address());
doubleDelivery.set(true);
}
}
Expand Down Expand Up @@ -212,7 +213,7 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E
private LongSummaryStatistics computeMessageSentStats(List<GossipProtocolImpl> gossipProtocols) {
List<Long> messageSentPerNode = new ArrayList<>(gossipProtocols.size());
for (GossipProtocolImpl gossipProtocol : gossipProtocols) {
NetworkEmulatorTransport transport = (NetworkEmulatorTransport) gossipProtocol.getTransport();
final NetworkEmulatorTransport transport = BaseTest.getField(gossipProtocol, "transport");
messageSentPerNode.add(transport.networkEmulator().totalMessageSentCount());
}
return messageSentPerNode.stream().mapToLong(v -> v).summaryStatistics();
Expand All @@ -221,7 +222,7 @@ private LongSummaryStatistics computeMessageSentStats(List<GossipProtocolImpl> g
private LongSummaryStatistics computeMessageLostStats(List<GossipProtocolImpl> gossipProtocols) {
List<Long> messageLostPerNode = new ArrayList<>(gossipProtocols.size());
for (GossipProtocolImpl gossipProtocol : gossipProtocols) {
NetworkEmulatorTransport transport = (NetworkEmulatorTransport) gossipProtocol.getTransport();
final NetworkEmulatorTransport transport = BaseTest.getField(gossipProtocol, "transport");
messageLostPerNode.add(transport.networkEmulator().totalOutboundMessageLostCount());
}
return messageLostPerNode.stream().mapToLong(v -> v).summaryStatistics();
Expand Down Expand Up @@ -272,7 +273,7 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List<Address>
return gossipProtocol;
}

private void destroyGossipProtocols(List<GossipProtocolImpl> gossipProtocols) {
private static void destroyGossipProtocols(List<GossipProtocolImpl> gossipProtocols) {
// Stop all gossip protocols
for (GossipProtocolImpl gossipProtocol : gossipProtocols) {
gossipProtocol.stop();
Expand All @@ -281,7 +282,7 @@ private void destroyGossipProtocols(List<GossipProtocolImpl> gossipProtocols) {
// Stop all transports
List<Mono<Void>> futures = new ArrayList<>();
for (GossipProtocolImpl gossipProtocol : gossipProtocols) {
futures.add(gossipProtocol.getTransport().stop());
futures.add(BaseTest.<Transport>getField(gossipProtocol, "transport").stop());
}

try {
Expand Down
Loading

0 comments on commit 71fe298

Please sign in to comment.