diff --git a/testng-core/src/main/java/org/testng/internal/ObjectBag.java b/testng-core/src/main/java/org/testng/internal/ObjectBag.java index 2276b293b..545610ca5 100644 --- a/testng-core/src/main/java/org/testng/internal/ObjectBag.java +++ b/testng-core/src/main/java/org/testng/internal/ObjectBag.java @@ -1,11 +1,10 @@ package org.testng.internal; -import java.io.Closeable; -import java.io.IOException; import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import org.testng.ISuite; import org.testng.log4testng.Logger; @@ -42,16 +41,9 @@ public Object createIfRequired(Class type, Supplier supplier) { public void cleanup() { bag.values().stream() - .filter(it -> it instanceof Closeable) - .map(it -> (Closeable) it) - .forEach( - it -> { - try { - it.close(); - } catch (IOException e) { - logger.debug("Could not clean-up " + it, e); - } - }); + .filter(it -> it instanceof ExecutorService) + .map(it -> (ExecutorService) it) + .forEach(ExecutorService::shutdown); bag.clear(); } } diff --git a/testng-core/src/main/java/org/testng/internal/PoolService.java b/testng-core/src/main/java/org/testng/internal/PoolService.java deleted file mode 100644 index 5ae8fc47e..000000000 --- a/testng-core/src/main/java/org/testng/internal/PoolService.java +++ /dev/null @@ -1,77 +0,0 @@ -package org.testng.internal; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; -import javax.annotation.Nonnull; -import org.testng.TestNGException; -import org.testng.collections.Lists; -import org.testng.internal.thread.ThreadUtil; - -/** Simple wrapper for an ExecutorCompletionService. */ -public class PoolService implements Closeable { - - private final ExecutorCompletionService m_completionService; - private final ExecutorService m_executor; - - private final boolean shutdownAfterExecution; - - public PoolService(int threadPoolSize) { - this(threadPoolSize, true); - } - - public PoolService(int threadPoolSize, boolean shutdownAfterExecution) { - - ThreadFactory threadFactory = - new ThreadFactory() { - - private final AtomicInteger threadNumber = new AtomicInteger(0); - - @Override - public Thread newThread(@Nonnull Runnable r) { - return new Thread( - r, ThreadUtil.THREAD_NAME + "-PoolService-" + threadNumber.getAndIncrement()); - } - }; - m_executor = Executors.newFixedThreadPool(threadPoolSize, threadFactory); - m_completionService = new ExecutorCompletionService<>(m_executor); - this.shutdownAfterExecution = shutdownAfterExecution; - } - - public List submitTasksAndWait(List> tasks) { - - List> takes = Lists.newArrayList(tasks.size()); - for (Callable callable : tasks) { - takes.add(m_completionService.submit(callable)); - } - - List result = Lists.newArrayList(takes.size()); - for (Future take : takes) { - try { - result.add(take.get()); - } catch (InterruptedException | ExecutionException e) { // NOSONAR - throw new TestNGException(e); - } - } - - if (shutdownAfterExecution) { - m_executor.shutdown(); - } - return result; - } - - @Override - public void close() throws IOException { - if (!shutdownAfterExecution) { - m_executor.shutdown(); - } - } -} diff --git a/testng-core/src/main/java/org/testng/internal/invokers/MethodRunner.java b/testng-core/src/main/java/org/testng/internal/invokers/MethodRunner.java index ddd8ec556..abda06720 100644 --- a/testng-core/src/main/java/org/testng/internal/invokers/MethodRunner.java +++ b/testng-core/src/main/java/org/testng/internal/invokers/MethodRunner.java @@ -1,17 +1,27 @@ package org.testng.internal.invokers; +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.supplyAsync; + +import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.testng.ITestContext; import org.testng.ITestResult; import org.testng.collections.CollectionUtils; import org.testng.collections.Lists; import org.testng.internal.ObjectBag; import org.testng.internal.Parameters; -import org.testng.internal.PoolService; import org.testng.internal.invokers.ITestInvoker.FailureContext; import org.testng.internal.invokers.TestMethodArguments.Builder; +import org.testng.internal.thread.ThreadUtil; import org.testng.xml.XmlSuite; public class MethodRunner implements IMethodRunner { @@ -95,11 +105,13 @@ public List runInParallel( Iterator allParamValues, boolean skipFailedInvocationCounts) { XmlSuite suite = context.getSuite().getXmlSuite(); - List result = Lists.newArrayList(); - List workers = Lists.newArrayList(); int parametersIndex = 0; - Iterable allParameterValues = CollectionUtils.asIterable(allParamValues); - for (Object[] next : allParameterValues) { + ObjectBag objectBag = ObjectBag.getInstance(context.getSuite()); + boolean reUse = suite.isShareThreadPoolForDataProviders(); + + ExecutorService service = getOrCreate(reUse, suite.getDataProviderThreadCount(), objectBag); + List>> all = new ArrayList<>(); + for (Object[] next : CollectionUtils.asIterable(allParamValues)) { if (next == null) { // skipped value parametersIndex += 1; @@ -126,26 +138,36 @@ public List runInParallel( invocationCount.get(), failure.count.get(), testInvoker.getNotifier()); - workers.add(w); + all.add(supplyAsync(w::call, service)); // testng387: increment the param index in the bag. parametersIndex += 1; } - ObjectBag objectBag = ObjectBag.getInstance(context.getSuite()); - boolean sharedThreadPool = context.getSuite().getXmlSuite().isShareThreadPoolForDataProviders(); - - @SuppressWarnings("unchecked") - PoolService> ps = - sharedThreadPool - ? (PoolService>) - objectBag.createIfRequired( - PoolService.class, - () -> new PoolService<>(suite.getDataProviderThreadCount(), false)) - : new PoolService<>(suite.getDataProviderThreadCount()); - List> r = ps.submitTasksAndWait(workers); - for (List l2 : r) { - result.addAll(l2); + CompletableFuture combined = allOf(all.toArray(new CompletableFuture[0])); + List squashed = + all.stream() + .map(CompletableFuture::join) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + List result = combined.thenApply(ignored -> squashed).join(); + if (!reUse) { + service.shutdown(); } return result; } + + private static ExecutorService getOrCreate(boolean reUse, int count, ObjectBag objectBag) { + if (reUse) { + return (ExecutorService) + objectBag.createIfRequired( + ExecutorService.class, () -> Executors.newFixedThreadPool(count, threadFactory())); + } + return Executors.newFixedThreadPool(count, threadFactory()); + } + + private static ThreadFactory threadFactory() { + AtomicInteger threadNumber = new AtomicInteger(0); + return r -> + new Thread(r, ThreadUtil.THREAD_NAME + "-PoolService-" + threadNumber.getAndIncrement()); + } }