From 6926ec03d2a849fdf423da7aeb697b6356bb81d1 Mon Sep 17 00:00:00 2001 From: Sadayuki Furuhashi Date: Mon, 13 Mar 2017 11:44:57 -0700 Subject: [PATCH] Use atomic counter to calculate guaranteed number of idling threads MultiThreadAgent.run needs to know how many tasks can start immediately. But it doesn't have to be exact number. It is OK to acquire smaller number as long as it can start the tasks immediately. --- .../digdag/core/agent/MultiThreadAgent.java | 56 ++++++++++--------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/digdag-core/src/main/java/io/digdag/core/agent/MultiThreadAgent.java b/digdag-core/src/main/java/io/digdag/core/agent/MultiThreadAgent.java index 8015f55767..3805b31e90 100644 --- a/digdag-core/src/main/java/io/digdag/core/agent/MultiThreadAgent.java +++ b/digdag-core/src/main/java/io/digdag/core/agent/MultiThreadAgent.java @@ -1,18 +1,19 @@ package io.digdag.core.agent; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.digdag.core.ErrorReporter; +import io.digdag.spi.TaskRequest; +import java.time.Duration; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.BlockingQueue; -import java.time.Duration; -import com.google.common.base.Optional; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import io.digdag.spi.TaskRequest; -import io.digdag.core.ErrorReporter; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,10 +28,10 @@ public class MultiThreadAgent private final OperatorManager runner; private final ErrorReporter errorReporter; - private final Object taskCountLock = new Object(); + private final Object addActiveTaskLock = new Object(); private final BlockingQueue executorQueue; private final ThreadPoolExecutor executor; - private volatile int activeTaskCount = 0; + private final AtomicInteger activeTaskCount = new AtomicInteger(0); private volatile boolean stop = false; @@ -74,15 +75,15 @@ public void shutdown(Optional maximumCompletionWait) { stop = true; taskServer.interruptLocalWait(); - int activeTaskCountSnapshot; - synchronized (taskCountLock) { - // synchronize taskCountLock not to reject task execution after acquiring them from taskServer - executor.shutdown(); - activeTaskCountSnapshot = activeTaskCount; - taskCountLock.notifyAll(); + int maximumActiveTasks; + synchronized (addActiveTaskLock) { + // synchronize addActiveTaskLock not to reject task execution after acquiring them from taskServer + executor.shutdown(); // Since here, no one can increase activeTaskCount. + maximumActiveTasks = activeTaskCount.get(); /// Now get the maximum count. + addActiveTaskLock.notifyAll(); } - if (activeTaskCountSnapshot > 0) { - logger.info("Waiting for completion of {} running tasks...", activeTaskCountSnapshot); + if (maximumActiveTasks > 0) { + logger.info("Waiting for completion of {} running tasks...", maximumActiveTasks); } if (maximumCompletionWait.isPresent()) { long seconds = maximumCompletionWait.get().getSeconds(); @@ -102,11 +103,16 @@ public void run() { while (!stop) { try { - synchronized (taskCountLock) { + synchronized (addActiveTaskLock) { if (executor.isShutdown()) { break; } - int maxAcquire = Math.min(executor.getMaximumPoolSize() - activeTaskCount, 10); + // Because addActiveTaskLock is locked, no one increases activeTaskCount in this synchronized block. Now get the maximum count. + int maximumActiveTasks = activeTaskCount.get(); + // Because the maximum count doesn't increase, here can know that at least N number of threads are idling. + int guaranteedAvaialbleThreads = executor.getMaximumPoolSize() - maximumActiveTasks; + // Acquire at most guaranteedAvaialbleThreads or 10. This guarantees that all tasks start immediately. + int maxAcquire = Math.min(guaranteedAvaialbleThreads, 10); if (maxAcquire > 0) { List reqs = taskServer.lockSharedAgentTasks(maxAcquire, agentId, config.getLockRetentionTime(), 1000); for (TaskRequest req : reqs) { @@ -119,17 +125,15 @@ public void run() errorReporter.reportUncaughtError(t); } finally { - synchronized (taskCountLock) { - activeTaskCount--; - } + activeTaskCount.decrementAndGet(); } }); - activeTaskCount++; + activeTaskCount.incrementAndGet(); } } else { // no executor thread is available. sleep for a while until a task execution finishes - taskCountLock.wait(500); + addActiveTaskLock.wait(500); } } }