Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests cleanup #389

Merged
merged 1 commit into from
Sep 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public final class MembershipConfig implements Cloneable {
private int syncInterval = DEFAULT_SYNC_INTERVAL;
private int syncTimeout = DEFAULT_SYNC_TIMEOUT;
private int suspicionMult = DEFAULT_SUSPICION_MULT;
private int removedMembersHistorySize = 42;
private String namespace = "default";

public MembershipConfig() {}
Expand Down Expand Up @@ -158,22 +157,6 @@ public MembershipConfig namespace(String namespace) {
return m;
}

public int removedMembersHistorySize() {
return removedMembersHistorySize;
}

/**
* Setter for {@code removedMembersHistorySize}.
*
* @param removedMembersHistorySize history size for remove members
* @return new {@code MembershipConfig} instance
*/
public MembershipConfig removedMembersHistorySize(int removedMembersHistorySize) {
MembershipConfig m = clone();
m.removedMembersHistorySize = removedMembersHistorySize;
return m;
}

@Override
public MembershipConfig clone() {
try {
Expand All @@ -191,7 +174,6 @@ public String toString() {
.add("syncTimeout=" + syncTimeout)
.add("suspicionMult=" + suspicionMult)
.add("namespace='" + namespace + "'")
.add("removedMembersHistorySize=" + removedMembersHistorySize)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,13 +417,4 @@ private boolean isTransitPingAck(Message message) {
return PING_ACK.equals(message.qualifier())
&& message.<PingData>data().getOriginalIssuer() != null;
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return transport
*/
Transport getTransport() {
return transport;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,22 +365,4 @@ private Set<String> getGossipsThatMostLikelyDisseminated(long period) {
.map(gossipState -> gossipState.gossip().gossipId())
.collect(Collectors.toSet());
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return transport
*/
Transport getTransport() {
return transport;
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return local member
*/
Member getMember() {
return localMember;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand All @@ -50,6 +48,7 @@
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

@SuppressWarnings({"FieldCanBeLocal", "unused"})
public final class MembershipProtocolImpl implements MembershipProtocol {

private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocol.class);
Expand Down Expand Up @@ -84,7 +83,6 @@ private enum MembershipUpdateReason {

private final Map<String, MembershipRecord> membershipTable = new HashMap<>();
private final Map<String, Member> members = new HashMap<>();
private final List<MembershipEvent> removedMembersHistory = new CopyOnWriteArrayList<>();
private final Set<String> aliveEmittedSet = new HashSet<>();

// Subject
Expand Down Expand Up @@ -157,10 +155,8 @@ public MembershipProtocolImpl(
.publishOn(scheduler)
.subscribe(
this::onMembershipGossip,
ex -> LOGGER.error("[{}][onMembershipGossip][error] cause:", localMember, ex)),
listen() // Listen removed members for monitoring
.filter(MembershipEvent::isRemoved)
.subscribe(this::onMemberRemoved)));
ex ->
LOGGER.error("[{}][onMembershipGossip][error] cause:", localMember, ex))));
}

// Remove duplicates and local address(es)
Expand Down Expand Up @@ -863,87 +859,4 @@ private Mono<Void> spreadMembershipGossip(MembershipRecord r) {
.then();
});
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return failure detector
*/
FailureDetector getFailureDetector() {
return failureDetector;
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return gossip
*/
GossipProtocol getGossipProtocol() {
return gossipProtocol;
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return transport
*/
Transport getTransport() {
return transport;
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return metadataStore
*/
MetadataStore getMetadataStore() {
return metadataStore;
}

/**
* <b>NOTE:</b> this method is for testing purpose only.
*
* @return transport
*/
List<MembershipRecord> getMembershipRecords() {
return Collections.unmodifiableList(new ArrayList<>(membershipTable.values()));
}

// ===============================================================
// ============== Helper Methods for Monitoring ==================
// ===============================================================

private int getIncarnation() {
return membershipTable.get(localMember.id()).incarnation();
}

private List<Member> getAliveMembers() {
return findRecordsByCondition(MembershipRecord::isAlive);
}

private List<Member> getSuspectedMembers() {
return findRecordsByCondition(MembershipRecord::isSuspect);
}

private List<Member> getRemovedMembers() {
return removedMembersHistory.stream().map(MembershipEvent::member).collect(Collectors.toList());
}

private List<Member> findRecordsByCondition(Predicate<MembershipRecord> condition) {
return getMembershipRecords().stream()
.filter(condition)
.map(MembershipRecord::member)
.collect(Collectors.toList());
}

private void onMemberRemoved(MembershipEvent event) {
int s = membershipConfig.removedMembersHistorySize();
if (s <= 0) {
return;
}
removedMembersHistory.add(event);
if (removedMembersHistory.size() > s) {
removedMembersHistory.remove(0);
}
}
}
22 changes: 17 additions & 5 deletions cluster/src/test/java/io/scalecube/cluster/BaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.cluster.utils.NetworkEmulatorTransport;
import io.scalecube.transport.netty.tcp.TcpTransportFactory;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -30,32 +31,43 @@ public final void baseTearDown(TestInfo testInfo) {
LOGGER.info("***** Test finished : " + testInfo.getDisplayName() + " *****");
}

protected void awaitSeconds(long seconds) {
public static <T> T getField(Object obj, String fieldName) {
try {
final Field field = obj.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
//noinspection unchecked
return (T) field.get(obj);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

public static void awaitSeconds(long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
throw Exceptions.propagate(e);
}
}

protected void awaitSuspicion(int clusterSize) {
public static void awaitSuspicion(int clusterSize) {
int defaultSuspicionMult = MembershipConfig.DEFAULT_SUSPICION_MULT;
int pingInterval = MembershipProtocolTest.PING_INTERVAL;
long suspicionTimeoutSec =
ClusterMath.suspicionTimeout(defaultSuspicionMult, clusterSize, pingInterval) / 1000;
awaitSeconds(suspicionTimeoutSec + 2);
}

protected NetworkEmulatorTransport createTransport() {
public static NetworkEmulatorTransport createTransport() {
return createTransport(TransportConfig.defaultConfig());
}

protected NetworkEmulatorTransport createTransport(TransportConfig transportConfig) {
public static NetworkEmulatorTransport createTransport(TransportConfig transportConfig) {
return new NetworkEmulatorTransport(
Transport.bindAwait(transportConfig.transportFactory(new TcpTransportFactory())));
}

protected void destroyTransport(Transport transport) {
public static void destroyTransport(Transport transport) {
if (transport == null || transport.isStopped()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,22 +421,22 @@ private FailureDetectorImpl createFd(
return new FailureDetectorImpl(localMember, transport, membershipFlux, config, scheduler);
}

private void start(List<FailureDetectorImpl> fdetectors) {
private static void start(List<FailureDetectorImpl> fdetectors) {
for (FailureDetectorImpl fd : fdetectors) {
fd.start();
}
}

private void stop(List<FailureDetectorImpl> fdetectors) {
private static void stop(List<FailureDetectorImpl> fdetectors) {
for (FailureDetectorImpl fd : fdetectors) {
fd.stop();
}
for (FailureDetectorImpl fd : fdetectors) {
destroyTransport(fd.getTransport());
destroyTransport(BaseTest.getField(fd, "transport"));
}
}

private void assertStatus(
private static void assertStatus(
Address address,
MemberStatus status,
Collection<FailureDetectorEvent> events,
Expand All @@ -461,10 +461,11 @@ private void assertStatus(
}
}

private Future<List<FailureDetectorEvent>> listenNextEventFor(
private static Future<List<FailureDetectorEvent>> listenNextEventFor(
FailureDetectorImpl fd, List<Address> addresses) {
final Transport transport = BaseTest.getField(fd, "transport");
addresses = new ArrayList<>(addresses);
addresses.remove(fd.getTransport().address()); // exclude self
addresses.remove(transport.address()); // exclude self
if (addresses.isEmpty()) {
throw new IllegalArgumentException();
}
Expand All @@ -479,15 +480,16 @@ private Future<List<FailureDetectorEvent>> listenNextEventFor(
return allOf(resultFuture);
}

private Collection<FailureDetectorEvent> awaitEvents(Future<List<FailureDetectorEvent>> events) {
private static Collection<FailureDetectorEvent> awaitEvents(
Future<List<FailureDetectorEvent>> events) {
try {
return events.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
private static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
CompletableFuture<Void> allFuturesResult =
CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
return allFuturesResult.thenApply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,20 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E
final CountDownLatch latch = new CountDownLatch(membersNum - 1);
final Map<Member, Member> receivers = new ConcurrentHashMap<>();
final AtomicBoolean doubleDelivery = new AtomicBoolean(false);
for (final GossipProtocolImpl protocol : gossipProtocols) {
protocol
for (final GossipProtocolImpl gossipProtocol : gossipProtocols) {
gossipProtocol
.listen()
.subscribe(
gossip -> {
final Member localMember = BaseTest.getField(gossipProtocol, "localMember");
final Transport transport = BaseTest.getField(gossipProtocol, "transport");

if (gossipData.equals(gossip.data())) {
boolean firstTimeAdded =
receivers.put(protocol.getMember(), protocol.getMember()) == null;
boolean firstTimeAdded = receivers.put(localMember, localMember) == null;
if (firstTimeAdded) {
latch.countDown();
} else {
LOGGER.error(
"Delivered gossip twice to: {}", protocol.getTransport().address());
LOGGER.error("Delivered gossip twice to: {}", transport.address());
doubleDelivery.set(true);
}
}
Expand Down Expand Up @@ -212,7 +213,7 @@ void testGossipProtocol(int membersNum, int lossPercent, int meanDelay) throws E
private LongSummaryStatistics computeMessageSentStats(List<GossipProtocolImpl> gossipProtocols) {
List<Long> messageSentPerNode = new ArrayList<>(gossipProtocols.size());
for (GossipProtocolImpl gossipProtocol : gossipProtocols) {
NetworkEmulatorTransport transport = (NetworkEmulatorTransport) gossipProtocol.getTransport();
final NetworkEmulatorTransport transport = BaseTest.getField(gossipProtocol, "transport");
messageSentPerNode.add(transport.networkEmulator().totalMessageSentCount());
}
return messageSentPerNode.stream().mapToLong(v -> v).summaryStatistics();
Expand All @@ -221,7 +222,7 @@ private LongSummaryStatistics computeMessageSentStats(List<GossipProtocolImpl> g
private LongSummaryStatistics computeMessageLostStats(List<GossipProtocolImpl> gossipProtocols) {
List<Long> messageLostPerNode = new ArrayList<>(gossipProtocols.size());
for (GossipProtocolImpl gossipProtocol : gossipProtocols) {
NetworkEmulatorTransport transport = (NetworkEmulatorTransport) gossipProtocol.getTransport();
final NetworkEmulatorTransport transport = BaseTest.getField(gossipProtocol, "transport");
messageLostPerNode.add(transport.networkEmulator().totalOutboundMessageLostCount());
}
return messageLostPerNode.stream().mapToLong(v -> v).summaryStatistics();
Expand Down Expand Up @@ -272,7 +273,7 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List<Address>
return gossipProtocol;
}

private void destroyGossipProtocols(List<GossipProtocolImpl> gossipProtocols) {
private static void destroyGossipProtocols(List<GossipProtocolImpl> gossipProtocols) {
// Stop all gossip protocols
for (GossipProtocolImpl gossipProtocol : gossipProtocols) {
gossipProtocol.stop();
Expand All @@ -281,7 +282,7 @@ private void destroyGossipProtocols(List<GossipProtocolImpl> gossipProtocols) {
// Stop all transports
List<Mono<Void>> futures = new ArrayList<>();
for (GossipProtocolImpl gossipProtocol : gossipProtocols) {
futures.add(gossipProtocol.getTransport().stop());
futures.add(BaseTest.<Transport>getField(gossipProtocol, "transport").stop());
}

try {
Expand Down
Loading
Loading