Skip to content

Commit

Permalink
executorQueue.size() + executor.getActiveCount() is not atomic operation
Browse files Browse the repository at this point in the history
  • Loading branch information
frsyuki committed Mar 3, 2017
1 parent 14c21f5 commit d3950b4
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ public class MultiThreadAgent
private final OperatorManager runner;
private final ErrorReporter errorReporter;

private final Object newTaskLock = new Object();
private final Object taskCountLock = new Object();
private final BlockingQueue<Runnable> executorQueue;
private final ThreadPoolExecutor executor;
private volatile int activeTaskCount = 0;

private volatile boolean stop = false;

Expand Down Expand Up @@ -73,15 +74,15 @@ public void shutdown(Optional<Duration> maximumCompletionWait)
{
stop = true;
taskServer.interruptLocalWait();
int maximumPossibleActiveTaskCount;
synchronized (newTaskLock) {
// synchronize newTaskLock not to reject task execution after acquiring them from taskServer
int activeTaskCountSnapshot;
synchronized (taskCountLock) {
// synchronize taskCountLock not to reject task execution after acquiring them from taskServer
executor.shutdown();
maximumPossibleActiveTaskCount = executorQueue.size() + executor.getActiveCount();
newTaskLock.notifyAll();
activeTaskCountSnapshot = activeTaskCount;
taskCountLock.notifyAll();
}
if (maximumPossibleActiveTaskCount > 0) {
logger.info("Waiting for completion of {} running tasks...", maximumPossibleActiveTaskCount);
if (activeTaskCountSnapshot > 0) {
logger.info("Waiting for completion of {} running tasks...", activeTaskCountSnapshot);
}
if (maximumCompletionWait.isPresent()) {
long seconds = maximumCompletionWait.get().getSeconds();
Expand All @@ -101,13 +102,11 @@ public void run()
{
while (!stop) {
try {
synchronized (newTaskLock) {
synchronized (taskCountLock) {
if (executor.isShutdown()) {
break;
}
int maximumPossibleActiveTaskCount = executorQueue.size() + executor.getActiveCount();
int guaranteedAvaialbleThreads = executor.getMaximumPoolSize() - maximumPossibleActiveTaskCount;
int maxAcquire = Math.min(guaranteedAvaialbleThreads, 10);
int maxAcquire = Math.min(executor.getMaximumPoolSize() - activeTaskCount, 10);
if (maxAcquire > 0) {
List<TaskRequest> reqs = taskServer.lockSharedAgentTasks(maxAcquire, agentId, config.getLockRetentionTime(), 1000);
for (TaskRequest req : reqs) {
Expand All @@ -119,12 +118,18 @@ public void run()
logger.error("Uncaught exception. Task queue will detect this failure and this task will be retried later.", t);
errorReporter.reportUncaughtError(t);
}
finally {
synchronized (taskCountLock) {
activeTaskCount--;
}
}
});
activeTaskCount++;
}
}
else {
// no executor thread is available. sleep for a while until a task execution finishes
newTaskLock.wait(500);
taskCountLock.wait(500);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions digdag-tests/src/test/java/acceptance/AgentOverAcquireIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ public void testOverAcquire()
"-o", projectDir.toString(),
"--config", config.toString(),
"--project", projectDir.toString(),
"-X", "agent.heartbeat-interval=10",
"-X", "agent.heartbeat-interval=5",
"-X", "agent.lock-retention-time=20",
"-X", "agent.max-task-threads=5",
"-p", "outdir=" + outdir,
"over_acquire.dig");
assertThat(runStatus.errUtf8(), runStatus.code(), is(0));

for (int i = 0; i < 20; i++) {
String one = new String(Files.readAllBytes(outdir.resolve(Integer.toString(i))), UTF_8);
String one = new String(Files.readAllBytes(outdir.resolve(Integer.toString(i))), UTF_8).trim();
assertThat(one, is("1"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
loop>: 20
_parallel: true
_do:
sh>: sleep 30 && /bin/echo -n 1 >> ${outdir}/${i}
sh>: sleep 30 && echo 1 >> ${outdir}/${i}

0 comments on commit d3950b4

Please sign in to comment.