Skip to content

Commit

Permalink
Use atomic counter to calculate guaranteed number of idling threads
Browse files Browse the repository at this point in the history
MultiThreadAgent.run needs to know how many tasks can start immediately.
But it doesn't have to be exact number. It is OK to acquire smaller
number as long as it can start the tasks immediately.
  • Loading branch information
frsyuki committed Mar 13, 2017
1 parent 981546e commit 6926ec0
Showing 1 changed file with 30 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package io.digdag.core.agent;

import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.digdag.core.ErrorReporter;
import io.digdag.spi.TaskRequest;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.BlockingQueue;
import java.time.Duration;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.digdag.spi.TaskRequest;
import io.digdag.core.ErrorReporter;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -27,10 +28,10 @@ public class MultiThreadAgent
private final OperatorManager runner;
private final ErrorReporter errorReporter;

private final Object taskCountLock = new Object();
private final Object addActiveTaskLock = new Object();
private final BlockingQueue<Runnable> executorQueue;
private final ThreadPoolExecutor executor;
private volatile int activeTaskCount = 0;
private final AtomicInteger activeTaskCount = new AtomicInteger(0);

private volatile boolean stop = false;

Expand Down Expand Up @@ -74,15 +75,15 @@ public void shutdown(Optional<Duration> maximumCompletionWait)
{
stop = true;
taskServer.interruptLocalWait();
int activeTaskCountSnapshot;
synchronized (taskCountLock) {
// synchronize taskCountLock not to reject task execution after acquiring them from taskServer
executor.shutdown();
activeTaskCountSnapshot = activeTaskCount;
taskCountLock.notifyAll();
int maximumActiveTasks;
synchronized (addActiveTaskLock) {
// synchronize addActiveTaskLock not to reject task execution after acquiring them from taskServer
executor.shutdown(); // Since here, no one can increase activeTaskCount.
maximumActiveTasks = activeTaskCount.get(); /// Now get the maximum count.
addActiveTaskLock.notifyAll();
}
if (activeTaskCountSnapshot > 0) {
logger.info("Waiting for completion of {} running tasks...", activeTaskCountSnapshot);
if (maximumActiveTasks > 0) {
logger.info("Waiting for completion of {} running tasks...", maximumActiveTasks);
}
if (maximumCompletionWait.isPresent()) {
long seconds = maximumCompletionWait.get().getSeconds();
Expand All @@ -102,11 +103,16 @@ public void run()
{
while (!stop) {
try {
synchronized (taskCountLock) {
synchronized (addActiveTaskLock) {
if (executor.isShutdown()) {
break;
}
int maxAcquire = Math.min(executor.getMaximumPoolSize() - activeTaskCount, 10);
// Because addActiveTaskLock is locked, no one increases activeTaskCount in this synchronized block. Now get the maximum count.
int maximumActiveTasks = activeTaskCount.get();
// Because the maximum count doesn't increase, here can know that at least N number of threads are idling.
int guaranteedAvaialbleThreads = executor.getMaximumPoolSize() - maximumActiveTasks;
// Acquire at most guaranteedAvaialbleThreads or 10. This guarantees that all tasks start immediately.
int maxAcquire = Math.min(guaranteedAvaialbleThreads, 10);
if (maxAcquire > 0) {
List<TaskRequest> reqs = taskServer.lockSharedAgentTasks(maxAcquire, agentId, config.getLockRetentionTime(), 1000);
for (TaskRequest req : reqs) {
Expand All @@ -119,17 +125,15 @@ public void run()
errorReporter.reportUncaughtError(t);
}
finally {
synchronized (taskCountLock) {
activeTaskCount--;
}
activeTaskCount.decrementAndGet();
}
});
activeTaskCount++;
activeTaskCount.incrementAndGet();
}
}
else {
// no executor thread is available. sleep for a while until a task execution finishes
taskCountLock.wait(500);
addActiveTaskLock.wait(500);
}
}
}
Expand Down

0 comments on commit 6926ec0

Please sign in to comment.