Skip to content

Commit

Permalink
ci: refactor NeonBeeTestExecutionListener into a StaleVertx/ThreadChe…
Browse files Browse the repository at this point in the history
…cker
  • Loading branch information
kristian committed Oct 4, 2021
1 parent 007b8ce commit 1b38619
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 55 deletions.
51 changes: 0 additions & 51 deletions src/test/java/io/neonbee/NeonBeeTestExecutionListener.java

This file was deleted.

3 changes: 2 additions & 1 deletion src/test/java/io/neonbee/data/DataVerticleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.google.common.truth.Truth.assertThat;
import static io.vertx.core.Future.succeededFuture;
import static java.lang.Boolean.parseBoolean;

import java.util.Locale;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -158,7 +159,7 @@ public String getName() {

@Override
public Future<String> retrieveData(DataQuery query, DataMap require, DataContext context) {
if ("true".equalsIgnoreCase(query.getParameter("ping"))) {
if (parseBoolean(query.getParameter("ping"))) {
return succeededFuture("Pong");
}
throw new DataException(400, "Bad Request");
Expand Down
9 changes: 7 additions & 2 deletions src/test/java/io/neonbee/test/base/NeonBeeTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.neonbee.test.helper.DummyVerticleHelper.DummyEntityVerticleFactory;
import io.neonbee.test.helper.FileSystemHelper;
import io.neonbee.test.helper.WorkingDirectoryBuilder;
import io.neonbee.test.listeners.StaleVertxChecker;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Verticle;
Expand Down Expand Up @@ -74,8 +75,12 @@ public class NeonBeeTestBase {

@BeforeEach
@Timeout(value = 5, timeUnit = TimeUnit.SECONDS)
public void setUp(TestInfo testInfo, Vertx vertx, VertxTestContext testContext) throws Exception {
// Build working directory
public void setUp(Vertx vertx, VertxTestContext testContext, TestInfo testInfo) throws Exception {
// associate the Vert.x instance to the current test (unfortunately the only "identifier" that is shared between
// TestInfo and TestIdentifier is the display name)
StaleVertxChecker.VERTX_TEST_MAP.put(vertx, testInfo.getDisplayName());

// build working directory
workingDirPath = FileSystemHelper.createTempDirectory();
provideWorkingDirectoryBuilder(testInfo, testContext).build(workingDirPath);

Expand Down
82 changes: 82 additions & 0 deletions src/test/java/io/neonbee/test/listeners/StaleThreadChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package io.neonbee.test.listeners;

import static java.lang.Boolean.parseBoolean;

import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.junit.platform.engine.TestExecutionResult;
import org.junit.platform.launcher.TestExecutionListener;
import org.junit.platform.launcher.TestIdentifier;
import org.junit.platform.launcher.TestPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import io.vertx.core.Vertx;

/**
* The {@link StaleThreadChecker} checks for any stale threads after test execution. Generally after a test finishes to
* execute it must clean up all resources. If not this listener will print an error to the logs.
*/
public class StaleThreadChecker implements TestExecutionListener {
public static final SetMultimap<Vertx, String> VERTX_TEST_MAP = HashMultimap.create();

static final String VERTX_THREAD_NAME_PREFIX = "vert.x-";

private static final Logger LOGGER = LoggerFactory.getLogger(StaleThreadChecker.class);

protected boolean parallelExecution;

@Override
public void testPlanExecutionStarted(TestPlan testPlan) {
parallelExecution = parseBoolean(System.getProperty("junit.jupiter.execution.parallel.enabled"));
if (parallelExecution) {
LOGGER.warn("Cannot check for stale threads when running JUnit in parallel execution mode");
}
}

@Override
public void testPlanExecutionFinished(TestPlan testPlan) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Checking for non-daemon threads after test plan execution finished");
Thread.getAllStackTraces().keySet().stream().filter(Thread::isAlive).filter(Predicate.not(Thread::isDaemon))
.forEach(thread -> LOGGER.debug("Non-daemon thread {} still is still alive", thread));
}
}

@Override
public void executionFinished(TestIdentifier testIdentifier, TestExecutionResult testExecutionResult) {
if (!parallelExecution) {
checkForStaleThreads("Vert.x", VERTX_THREAD_NAME_PREFIX);
checkForStaleThreads("Hazelcast", "hz.");
checkForStaleThreads("WatchService", "FileSystemWatch");
}
}

private static void checkForStaleThreads(String name, String namePrefix) {
LOGGER.info("Checking for stale {} threads with '{}' prefix", name, namePrefix);
List<Thread> staleThreads = findStaleThreads(namePrefix).collect(Collectors.toList());
if (!staleThreads.isEmpty()) {
if (LOGGER.isErrorEnabled()) {
LOGGER.error(
"Stale {} thread(s) detected!! Not closing the thread {} "
+ "could result in the test runner not signaling completion",
name, staleThreads.get(0));
}

if (LOGGER.isDebugEnabled()) {
staleThreads.forEach(staleThread -> {
LOGGER.debug("Stale thread info: {} ({})", staleThread.getName(), staleThread.getId());
});
}
}
}

protected static Stream<Thread> findStaleThreads(String namePrefix) {
return Thread.getAllStackTraces().keySet().stream().filter(thread -> thread.getName().startsWith(namePrefix));
}
}
159 changes: 159 additions & 0 deletions src/test/java/io/neonbee/test/listeners/StaleVertxChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package io.neonbee.test.listeners;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.junit.platform.engine.TestExecutionResult;
import org.junit.platform.launcher.TestIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;

import io.vertx.core.Context;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxThread;

/**
* The {@link StaleVertxChecker} checks for any stale Vert.x threads after test execution. Generally after a test
* finishes to execute it must clean up all Vert.x resources. If not this listener will print an error to the logs.
*/
public class StaleVertxChecker extends StaleThreadChecker {
public static final SetMultimap<Vertx, String> VERTX_TEST_MAP = HashMultimap.create();

private static final Logger LOGGER = LoggerFactory.getLogger(StaleVertxChecker.class);

private static final Method CONTEXT_METHOD;

static {
Method contextMethod = null;
try {
(contextMethod = VertxThread.class.getDeclaredMethod("context")).setAccessible(true);
} catch (NoSuchMethodException | SecurityException e) {
LOGGER.warn("Cannot set context method of VertxThread accessible, checking for stale threads is limited");
} finally {
CONTEXT_METHOD = contextMethod;
}
}

@Override
public void executionFinished(TestIdentifier testIdentifier, TestExecutionResult testExecutionResult) {
super.executionFinished(testIdentifier, testExecutionResult);
if (CONTEXT_METHOD != null) {
checkForStaleVertxInstances(testIdentifier);
}
}

@SuppressWarnings({ "PMD.EmptyIfStmt", "checkstyle:MultipleVariableDeclarations" })
private void checkForStaleVertxInstances(TestIdentifier testIdentifier) {
LOGGER.info("Checking for stale Vert.x instances");

// first try to determine all Vert.x instances that are currently available (and running?)
Set<Vertx> vertxInstances = findStaleThreads(VERTX_THREAD_NAME_PREFIX).filter(VertxThread.class::isInstance)
.map(VertxThread.class::cast).map(thread -> {
try {
Context context = (Context) CONTEXT_METHOD.invoke(thread);
if (context == null) {
LOGGER.debug("Vert.x thread {} is current not associated to any context", thread);
return null;
}

Vertx vertx = context.owner();
if (vertx == null) {
LOGGER.debug("Vert.x thread {} has a context {} with no owner, is this a bug?!", context,
thread);
return null;
}

return vertx;
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
LOGGER.debug("Failed to determine Vert.x context for thread {}", thread);
return null;
}
}).collect(Collectors.toSet());

if (vertxInstances.isEmpty()) {
// perfect! we don't need to bother ourself!
LOGGER.info("No stale Vert.x instances found");
return;
} else if (vertxInstances.contains(null)) {
LOGGER.warn("Could not determine Vert.x instance for all Vert.x threads");
}

// try to find instances associated to this test or any instances that we cannot associate to any test
Set<Vertx> unassociatedVertxInstances = new HashSet<>(), associatedVertxInstances = new HashSet<>(),
notAssociatedVertxInstances = new HashSet<>();
vertxInstances.stream().filter(Objects::nonNull).forEach(vertx -> {
Set<String> associatedToTests = VERTX_TEST_MAP.get(vertx);
if (associatedToTests.isEmpty()) {
// this Vert.x instance was never associated to any test, thus we don't know if it is ours
unassociatedVertxInstances.add(vertx);
} else if (associatedToTests.contains(testIdentifier.getDisplayName())) {
// unfortunately the display name is the only "identifier" shared between TestInfo and TestIdentifier
associatedVertxInstances.add(vertx);
} else {
// this Vert.x instance is owned by a test, but it's not us! phew...
notAssociatedVertxInstances.add(vertx);
}
});

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Vert.x instance overview:\n\nUnassociated: {}\nAssociated: {}\nNot Associated: {}",
unassociatedVertxInstances, associatedVertxInstances, notAssociatedVertxInstances);
}

// in case there are any associated Vert.x instances running, log them out to console
if (!associatedVertxInstances.isEmpty()) {
logStaleVertxInstances(associatedVertxInstances);
} else if (!parallelExecution && (!unassociatedVertxInstances.isEmpty() || vertxInstances.contains(null))) {
// we can only make sense of this, in case we are NOT in parallel execution, if we find a unassociated
// instance or an instance that we cannot determine the Vert.x instance for in parallel execution mode it
// could likely be from another test, that is currently being executed, thus better don't log anything.
// however if we are NOT in parallel execution, the first test which this log message appears leaks:
if (!unassociatedVertxInstances.isEmpty()) {
logStaleVertxInstances(unassociatedVertxInstances);
} else if (vertxInstances.contains(null)) {
// there are Vert.x threads, that we were unable to determine the Vert.x instance for, do not deal with
// this here, because we have the StaleThreadChecker backing us up in such cases
}
}
}

private void logStaleVertxInstances(Collection<Vertx> vertxInstances) {
Optional<Vertx> notRunningVertx =
vertxInstances.stream().filter(Predicate.not(StaleVertxChecker::probeVertxRunning)).findAny();
if (!notRunningVertx.isPresent()) {
if (LOGGER.isErrorEnabled()) {
LOGGER.error(
"Stale and running Vert.x instance found!! Not closing Vert.x instance {} "
+ "could result in the test runner not signaling completion",
vertxInstances.stream().findAny());
}
} else {
LOGGER.error("Stale closed (!) Vert.x instance {} with running threads found!! This is a bug!",
notRunningVertx.get());
}
}

private static boolean probeVertxRunning(Vertx vertx) {
try {
vertx.deployVerticle(() -> (Verticle) null, new DeploymentOptions().setInstances(0));
return false; // if it is not running the deployVerticles call will immediately return with a failed future
} catch (IllegalArgumentException e) {
// we do probe for an illegal argument exception, as if Vert.x is closed, the deployVerticle call will
// actually return a failed future instead, if it is not closed however, the DeploymentManager throws
// the exception instead, which indicates to us, Vert.x was not closed properly!
return true;
}
}
}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
io.neonbee.NeonBeeTestExecutionListener
io.neonbee.test.listeners.StaleVertxChecker

0 comments on commit 1b38619

Please sign in to comment.