Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate task execution when running long tasks in parallel #487

Closed
akirakw opened this issue Feb 14, 2017 · 2 comments
Closed

Duplicate task execution when running long tasks in parallel #487

akirakw opened this issue Feb 14, 2017 · 2 comments

Comments

@akirakw
Copy link
Contributor

akirakw commented Feb 14, 2017

Summary

I often get duplicate task execution problem when running many tasks in parallel with following conditions.

  • specify --max-task-threads to digdag server.
  • run tasks that larger than the value of --max-task-threads in parallel.
  • each tasks run for over 5 minutes.

Steps to reproduce

  1. start digdag server with --max-task-threads option.
digdag server --max-task-threads 10
  1. push the following workflow and script.

test-duplicate.dig

timezone: Asia/Tokyo

+main:
  _parallel: true

  loop>: 20
  _do:

    +run:
      sh>: scripts/sleep-6min.sh ${(i+1)}

scripts/sleep-6min.sh

#!/bin/bash

sleep 360

exit 0
  1. start workflow test-duplicate. The example of this execution result is:
2017-02-13 15:39:43 +0900 [INFO] (0062@+test-duplicate+main^sub+loop-2+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 3
2017-02-13 15:39:43 +0900 [INFO] (0064@+test-duplicate+main^sub+loop-4+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 5
2017-02-13 15:39:43 +0900 [INFO] (0068@+test-duplicate+main^sub+loop-8+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 9
2017-02-13 15:39:43 +0900 [INFO] (0060@+test-duplicate+main^sub+loop-0+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 1
2017-02-13 15:39:43 +0900 [INFO] (0061@+test-duplicate+main^sub+loop-1+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 2
2017-02-13 15:39:43 +0900 [INFO] (0067@+test-duplicate+main^sub+loop-7+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 8
2017-02-13 15:39:43 +0900 [INFO] (0063@+test-duplicate+main^sub+loop-3+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 4
2017-02-13 15:39:43 +0900 [INFO] (0066@+test-duplicate+main^sub+loop-6+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 7
2017-02-13 15:39:43 +0900 [INFO] (0065@+test-duplicate+main^sub+loop-5+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 6
2017-02-13 15:39:43 +0900 [INFO] (0056@+test-duplicate+main^sub+loop-9+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 10

2017-02-13 15:44:45 +0900 [WARN] (lock-expire-0) io.digdag.core.database.DatabaseTaskQueueServer: 1 task locks are expired. Tasks will be retried.

2017-02-13 15:45:44 +0900 [INFO] (0062@+test-duplicate+main^sub+loop-10+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 11
2017-02-13 15:45:44 +0900 [INFO] (0067@+test-duplicate+main^sub+loop-14+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 15
2017-02-13 15:45:44 +0900 [INFO] (0064@+test-duplicate+main^sub+loop-10+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 11
2017-02-13 15:45:44 +0900 [INFO] (0068@+test-duplicate+main^sub+loop-11+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 12
2017-02-13 15:45:44 +0900 [INFO] (0063@+test-duplicate+main^sub+loop-15+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 16
2017-02-13 15:45:44 +0900 [INFO] (0060@+test-duplicate+main^sub+loop-12+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 13
2017-02-13 15:45:44 +0900 [INFO] (0065@+test-duplicate+main^sub+loop-17+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 18
2017-02-13 15:45:44 +0900 [INFO] (0061@+test-duplicate+main^sub+loop-13+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 14
2017-02-13 15:45:44 +0900 [INFO] (0066@+test-duplicate+main^sub+loop-16+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 17
2017-02-13 15:45:44 +0900 [INFO] (0056@+test-duplicate+main^sub+loop-18+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 19

2017-02-13 15:50:45 +0900 [WARN] (lock-expire-0) io.digdag.core.database.DatabaseTaskQueueServer: 1 task locks are expired. Tasks will be retried.

2017-02-13 15:51:45 +0900 [INFO] (0062@+test-duplicate+main^sub+loop-19+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 20
2017-02-13 15:51:45 +0900 [INFO] (0065@+test-duplicate+main^sub+loop-19+run) io.digdag.core.agent.OperatorManager: sh>: scripts/sleep-6min.sh 20

In this case, task +loop-10+run and loop-19+run were executed twice.

Details

I think the cause of this problem seems to be how to start the task execution thread on MultiThreadAgent#run.

https://github.com/treasure-data/digdag/blob/v0.9.4/digdag-core/src/main/java/io/digdag/core/agent/MultiThreadAgent.java#L89

ThreadPoolExecutor#getActiveCount returns approximate active number of threads.
For example, if a certain thread is started and getActiveCount called immediately after that, the thread count that was just started thread may not be reflected.

To testing this problem, I add some info logs to MultiThreadAgent#run as follows.

@Override
public void run()
{
    while (!stop) {
        try {
            synchronized (newTaskLock) {
                if (executor.isShutdown()) {
                    break;
                }
                int max = Math.min(executor.getMaximumPoolSize() - executor.getActiveCount(), 10);
                logger.info("max={}", max); // Add for tesitng
                if (max > 0) {
                    List<TaskRequest> reqs = taskServer.lockSharedAgentTasks(max, agentId, config.getLockRetentionTime(), 1000);
                    logger.info("reqs.size={}", reqs.size()); // Add for tesitng
                    for (TaskRequest req : reqs) {
                        executor.submit(() -> {
                            try {
                                runner.run(req);
                            }
...

The execution result is:

2017-02-13 15:39:42 +0900 [INFO] (local-agent-0) io.digdag.core.agent.MultiThreadAgent: max=10
2017-02-13 15:39:42 +0900 [INFO] (local-agent-0) io.digdag.core.agent.MultiThreadAgent: reqs.size=1
2017-02-13 15:39:42 +0900 [INFO] (local-agent-0) io.digdag.core.agent.MultiThreadAgent: max=9
2017-02-13 15:39:42 +0900 [INFO] (local-agent-0) io.digdag.core.agent.MultiThreadAgent: reqs.size=3
2017-02-13 15:39:42 +0900 [INFO] (local-agent-0) io.digdag.core.agent.MultiThreadAgent: max=6
2017-02-13 15:39:42 +0900 [INFO] (local-agent-0) io.digdag.core.agent.MultiThreadAgent: reqs.size=6
2017-02-13 15:39:42 +0900 [INFO] (local-agent-0) io.digdag.core.agent.MultiThreadAgent: max=1 // previous thread count is not reflected correctly
2017-02-13 15:39:42 +0900 [INFO] (local-agent-0) io.digdag.core.agent.MultiThreadAgent: reqs.size=1 // will start task over max thread size
2017-02-13 15:39:42 +0900 [INFO] (local-agent-0) io.digdag.core.agent.MultiThreadAgent: max=0
2017-02-13 15:39:43 +0900 [INFO] (local-agent-0) io.digdag.core.agent.MultiThreadAgent: max=0
...

And at this time, queued_task_locks had 11 records of updated lock_expire_time.

digdag=# select * from queued_task_locks order by id;
 id  | site_id | queue_id | priority | retry_count | lock_expire_time |       lock_agent_id
-----+---------+----------+----------+-------------+------------------+----------------------------
 150 |       0 |          |        0 |           0 |       1486968325 | 38159@...
 151 |       0 |          |        0 |           0 |       1486968325 | 38159@...
 152 |       0 |          |        0 |           0 |       1486968325 | 38159@...
 153 |       0 |          |        0 |           0 |       1486968325 | 38159@...
 154 |       0 |          |        0 |           0 |       1486968325 | 38159@...
 155 |       0 |          |        0 |           0 |       1486968325 | 38159@...
 156 |       0 |          |        0 |           0 |       1486968325 | 38159@...
 157 |       0 |          |        0 |           0 |       1486968325 | 38159@...
 158 |       0 |          |        0 |           0 |       1486968325 | 38159@...
 159 |       0 |          |        0 |           0 |       1486968325 | 38159@...
 160 |       0 |          |        0 |           0 |       1486968283 | 38159@...
 161 |       0 |          |        0 |           0 |                  |
 162 |       0 |          |        0 |           0 |                  |
 163 |       0 |          |        0 |           0 |                  |
 164 |       0 |          |        0 |           0 |                  |
 165 |       0 |          |        0 |           0 |                  |
 166 |       0 |          |        0 |           0 |                  |
 167 |       0 |          |        0 |           0 |                  |
 168 |       0 |          |        0 |           0 |                  |
 169 |       0 |          |        0 |           0 |                  |

In ths case, task with id 160 (task +loop-10+run) is not executed yet, waiting in the queue.
Since this task is inactive and cannot send a heartbeat, this task expires lock after 5 minutes (if another task does not finished during this time) and will be retried with following logs.

2017-02-13 15:44:45 +0900 [WARN] (lock-expire-0) io.digdag.core.database.DatabaseTaskQueueServer: 1 task locks are expired. Tasks will be retried.

And I have confirmed that updating retry_count and lock_expire_time on id 160 at this time.

digdag=# select * from queued_task_locks order by id;
 id  | site_id | queue_id | priority | retry_count | lock_expire_time |       lock_agent_id
-----+---------+----------+----------+-------------+------------------+----------------------------
 150 |       0 |          |        0 |           0 |       1486968625 | 38159@...
 151 |       0 |          |        0 |           0 |       1486968625 | 38159@...
 152 |       0 |          |        0 |           0 |       1486968625 | 38159@...
 153 |       0 |          |        0 |           0 |       1486968625 | 38159@...
 154 |       0 |          |        0 |           0 |       1486968625 | 38159@...
 155 |       0 |          |        0 |           0 |       1486968625 | 38159@...
 156 |       0 |          |        0 |           0 |       1486968625 | 38159@...
 157 |       0 |          |        0 |           0 |       1486968625 | 38159@...
 158 |       0 |          |        0 |           0 |       1486968625 | 38159@...
 159 |       0 |          |        0 |           0 |       1486968625 | 38159@...
 160 |       0 |          |        0 |           1 |                  |
 161 |       0 |          |        0 |           0 |                  |
 162 |       0 |          |        0 |           0 |                  |
 163 |       0 |          |        0 |           0 |                  |
 164 |       0 |          |        0 |           0 |                  |
 165 |       0 |          |        0 |           0 |                  |
 166 |       0 |          |        0 |           0 |                  |
 167 |       0 |          |        0 |           0 |                  |
 168 |       0 |          |        0 |           0 |                  |
 169 |       0 |          |        0 |           0 |                  |

This may cause the duplicate task execution.
I also confirmed that the task with id 169 (task +loop-19+run) also behaved similarly.

id  | site_id | queue_id | priority | retry_count | lock_expire_time |       lock_agent_id        
-----+---------+----------+----------+-------------+------------------+----------------------------
169 |       0 |          |        0 |           1 |       1486969165 | 38159@...

System configuration

digdag version: 0.9.4

@frsyuki
Copy link
Member

frsyuki commented Feb 14, 2017

Very good catch 👍 Thank you for the detailed report.
We'll address this problem.

@frsyuki
Copy link
Member

frsyuki commented Feb 14, 2017

MultiThreadAgent is the only instance that can increase number of active threads. There're no other threads that may start another thread right after executor.getActiveCount() call. Thus the cause I guess is that getActiveCount() returns snapshot of an old value which may not reflect the previous executor.submit() call even if executor.getActiveCount() and executor.submit() are called from a single same thread.

So, solution would be something like this: instead of checking number of active threads, MultiThreadAgent needs to monitor number of active (non-finished) tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants