Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AttributeError: Can't pickle local object 'DaskExecutor.execute_async.<locals>.airflow_run' #11451

Closed
lafrinte opened this issue Oct 12, 2020 · 7 comments
Labels
area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug

Comments

@lafrinte
Copy link

lafrinte commented Oct 12, 2020

Apache Airflow version: 1.10.12

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

  • OS (e.g. from /etc/os-release): centos7.4

What happened:

DaskExecutor will raise ValueError: Cell is empty error when scheduling task. no dag can be scheduled. debug list below:

-bash-4.2$ airflow scheduler
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-07-16 18:48:30,885] {__init__.py:51} INFO - Using executor DaskExecutor
[2020-07-16 18:48:30,890] {scheduler_job.py:1346} INFO - Starting the scheduler
[2020-07-16 18:48:30,890] {scheduler_job.py:1354} INFO - Running execute loop for -1 seconds
[2020-07-16 18:48:30,890] {scheduler_job.py:1355} INFO - Processing each file at most -1 times
[2020-07-16 18:48:30,890] {scheduler_job.py:1358} INFO - Searching for files in /ansdep/opsapi/dags
[2020-07-16 18:48:30,890] {scheduler_job.py:1360} INFO - There are 0 files in /ansdep/opsapi/dags
[2020-07-16 18:48:30,898] {scheduler_job.py:1411} INFO - Resetting orphaned tasks for active dag runs
[2020-07-16 18:48:30,912] {dag_processing.py:556} INFO - Launched DagFileProcessorManager with pid: 70385
[2020-07-16 18:48:30,916] {settings.py:54} INFO - Configured default timezone <Timezone [Asia/Shanghai]>
[2020-07-16 18:49:22,984] {scheduler_job.py:951} INFO - 1 tasks up for execution:
        <TaskInstance: bc081861-e948-4e59-92fa-fc14f4330eb2.first_for_flush_state 2020-07-16 10:49:00+00:00 [scheduled]>
[2020-07-16 18:49:22,990] {scheduler_job.py:982} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
[2020-07-16 18:49:22,990] {scheduler_job.py:1010} INFO - DAG bc081861-e948-4e59-92fa-fc14f4330eb2 has 0/512 running and queued tasks
[2020-07-16 18:49:22,994] {scheduler_job.py:1060} INFO - Setting the following tasks to queued state:
        <TaskInstance: bc081861-e948-4e59-92fa-fc14f4330eb2.first_for_flush_state 2020-07-16 10:49:00+00:00 [scheduled]>
[2020-07-16 18:49:23,000] {scheduler_job.py:1134} INFO - Setting the following 1 tasks to queued state:
        <TaskInstance: bc081861-e948-4e59-92fa-fc14f4330eb2.first_for_flush_state 2020-07-16 10:49:00+00:00 [queued]>
[2020-07-16 18:49:23,000] {scheduler_job.py:1170} INFO - Sending ('bc081861-e948-4e59-92fa-fc14f4330eb2', 'first_for_flush_state', datetime.datetime(2020, 7, 16, 10, 49, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 4 and queue default
[2020-07-16 18:49:23,000] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'bc081861-e948-4e59-92fa-fc14f4330eb2', 'first_for_flush_state', '2020-07-16T10:49:00+00:00', '--local', '--pool', 'default_pool', '-sd', '/ansdep/opsapi/dags/bc081861-e948-4e59-92fa-fc14f4330eb2.py']
/ansdep/python3/lib/python3.7/site-packages/airflow/executors/dask_executor.py:63: UserWarning: DaskExecutor does not support queues. All tasks will be run in the same cluster
  'DaskExecutor does not support queues. '
distributed.protocol.pickle - INFO - Failed to serialize <function DaskExecutor.execute_async.<locals>.airflow_run at 0x7f748ce808c8>. Exception: Cell is empty
[2020-07-16 18:49:23,003] {scheduler_job.py:1384} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
  File "/ansdep/python3/lib/python3.7/site-packages/distributed/worker.py", line 3323, in dumps_function
    result = cache_dumps[func]
  File "/ansdep/python3/lib/python3.7/site-packages/distributed/utils.py", line 1549, in __getitem__
    value = super().__getitem__(key)
  File "/ansdep/python3/lib/python3.7/collections/__init__.py", line 1025, in __getitem__
    raise KeyError(key)
KeyError: <function DaskExecutor.execute_async.<locals>.airflow_run at 0x7f748ce808c8>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/ansdep/python3/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 41, in dumps
    result = pickle.dumps(x, **dump_kwargs)
AttributeError: Can't pickle local object 'DaskExecutor.execute_async.<locals>.airflow_run'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/ansdep/python3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1382, in _execute
    self._execute_helper()
  File "/ansdep/python3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1443, in _execute_helper
    if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
  File "/ansdep/python3/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1505, in _validate_and_run_task_instances
    self.executor.heartbeat()
  File "/ansdep/python3/lib/python3.7/site-packages/airflow/executors/base_executor.py", line 130, in heartbeat
    self.trigger_tasks(open_slots)
  File "/ansdep/python3/lib/python3.7/site-packages/airflow/executors/base_executor.py", line 154, in trigger_tasks
    executor_config=simple_ti.executor_config)
  File "/ansdep/python3/lib/python3.7/site-packages/airflow/executors/dask_executor.py", line 70, in execute_async
    future = self.client.submit(airflow_run, pure=False)
  File "/ansdep/python3/lib/python3.7/site-packages/distributed/client.py", line 1579, in submit
    actors=actor,
  File "/ansdep/python3/lib/python3.7/site-packages/distributed/client.py", line 2598, in _graph_to_futures
    "tasks": valmap(dumps_task, dsk3),
  File "/ansdep/python3/lib/python3.7/site-packages/toolz/dicttoolz.py", line 83, in valmap
    rv.update(zip(iterkeys(d), map(func, itervalues(d))))
  File "/ansdep/python3/lib/python3.7/site-packages/distributed/worker.py", line 3361, in dumps_task
    return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
  File "/ansdep/python3/lib/python3.7/site-packages/distributed/worker.py", line 3325, in dumps_function
    result = pickle.dumps(func)
  File "/ansdep/python3/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 52, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/ansdep/python3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 101, in dumps
    cp.dump(obj)
  File "/ansdep/python3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 540, in dump
    return Pickler.dump(self, obj)
  File "/ansdep/python3/lib/python3.7/pickle.py", line 437, in dump
    self.save(obj)
  File "/ansdep/python3/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/ansdep/python3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 722, in save_function
    *self._dynamic_function_reduce(obj), obj=obj
  File "/ansdep/python3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 659, in _save_reduce_pickle5
    dictitems=dictitems, obj=obj
  File "/ansdep/python3/lib/python3.7/pickle.py", line 638, in save_reduce
    save(args)
  File "/ansdep/python3/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/ansdep/python3/lib/python3.7/pickle.py", line 786, in save_tuple
    save(element)
  File "/ansdep/python3/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/ansdep/python3/lib/python3.7/pickle.py", line 771, in save_tuple
    save(element)
  File "/ansdep/python3/lib/python3.7/pickle.py", line 504, in save
    f(self, obj) # Call unbound method with explicit self
  File "/ansdep/python3/lib/python3.7/site-packages/dill/_dill.py", line 1169, in save_cell
    f = obj.cell_contents
ValueError: Cell is empty
[2020-07-16 18:49:23,014] {helpers.py:325} INFO - Sending Signals.SIGTERM to GPID 70385
[2020-07-16 18:49:23,041] {helpers.py:291} INFO - Process psutil.Process(pid=70385, status='terminated') (70385) terminated with exit code 0
[2020-07-16 18:49:23,041] {helpers.py:291} INFO - Process psutil.Process(pid=70575, status='terminated') (70575) terminated with exit code None
[2020-07-16 18:49:23,042] {scheduler_job.py:1387} INFO - Exited execute loop

How to reproduce it:

keep argument executor equal to DaskExecutor. startup services dask-scheduler/dask-worker/airflow webserver/airflow scheduler and trigger any example dag in website.

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = DaskExecutor

Anything else we need to know:

it seems that the exception raised when picker serialize the airflow_run method defined in airflow.executor.dask_executor.DaskExecutor.execute_async

    def execute_async(self, key, command, queue=None, executor_config=None):
        if queue is not None:
            warnings.warn(
                'DaskExecutor does not support queues. '
                'All tasks will be run in the same cluster'
            )

        if command[0:2] != ["airflow", "run"]:
            raise ValueError('The command must start with ["airflow", "run"].')

        def airflow_run():
            return subprocess.check_call(command, close_fds=True)

        future = self.client.submit(airflow_run, pure=False)  < -- airflow_run will be pickle serialized and submit to dask-scheduler
        self.futures[future] = key

picker seem not enable dumps a function defined in function. so, i change the code and scheduler work well:

def airflow_run(command):
    return subprocess.check_call(command, close_fds=True)


class DaskExecutor(BaseExecutor):
...
    def execute_async(self, key, command, queue=None, executor_config=None):
        if queue is not None:
            warnings.warn(
                'DaskExecutor does not support queues. '
                'All tasks will be run in the same cluster'
            )

        if command[0:2] != ["airflow", "run"]:
            raise ValueError('The command must start with ["airflow", "run"].')

        future = self.client.submit(airflow_run, command, pure=False) 
        
        self.futures[future] = key
@lafrinte lafrinte added the kind:bug This is a clearly a bug label Oct 12, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 12, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@lafrinte
Copy link
Author

Thanks for opening your first issue here! Be sure to follow the issue template!

i modify the description and follow the template. wish your replay

@eladkal eladkal added the area:Scheduler including HA (high availability) scheduler label Nov 19, 2020
@toughrogrammer
Copy link
Contributor

I got similar error when I upgraded to 2.0.0

ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/Users/loki/Library/Caches/pypoetry/virtualenvs/airflow-RLSyOqyF-py3.8/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
    self._run_scheduler_loop()
  File "/Users/loki/Library/Caches/pypoetry/virtualenvs/airflow-RLSyOqyF-py3.8/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1379, in _run_scheduler_loop
    self.executor.heartbeat()
  File "/Users/loki/Library/Caches/pypoetry/virtualenvs/airflow-RLSyOqyF-py3.8/lib/python3.8/site-packages/airflow/executors/base_executor.py", line 155, in heartbeat
    self.trigger_tasks(open_slots)
  File "/Users/loki/Library/Caches/pypoetry/virtualenvs/airflow-RLSyOqyF-py3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 260, in trigger_tasks
    self._process_tasks(task_tuples_to_send)
  File "/Users/loki/Library/Caches/pypoetry/virtualenvs/airflow-RLSyOqyF-py3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 269, in _process_tasks
    key_and_async_results = self._send_tasks_to_celery(task_tuples_to_send)
  File "/Users/loki/Library/Caches/pypoetry/virtualenvs/airflow-RLSyOqyF-py3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 327, in _send_tasks_to_celery
    with Pool(processes=num_processes, initializer=reset_signals) as send_pool:
  File "/Users/loki/.asdf/installs/python/3.8.5/lib/python3.8/multiprocessing/context.py", line 119, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,
  File "/Users/loki/.asdf/installs/python/3.8.5/lib/python3.8/multiprocessing/pool.py", line 212, in __init__
    self._repopulate_pool()
  File "/Users/loki/.asdf/installs/python/3.8.5/lib/python3.8/multiprocessing/pool.py", line 303, in _repopulate_pool
    return self._repopulate_pool_static(self._ctx, self.Process,
  File "/Users/loki/.asdf/installs/python/3.8.5/lib/python3.8/multiprocessing/pool.py", line 326, in _repopulate_pool_static
    w.start()
  File "/Users/loki/.asdf/installs/python/3.8.5/lib/python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/Users/loki/.asdf/installs/python/3.8.5/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/Users/loki/.asdf/installs/python/3.8.5/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Users/loki/.asdf/installs/python/3.8.5/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/Users/loki/.asdf/installs/python/3.8.5/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/Users/loki/.asdf/installs/python/3.8.5/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'CeleryExecutor._send_tasks_to_celery.<locals>.reset_signals'

@xawyong
Copy link

xawyong commented Jan 7, 2021

use 2.0.0, after changed the code as @lafrinte proposed, got below error.

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2021-01-07 02:01:14,768] {scheduler_job.py:1241} INFO - Starting the scheduler
[2021-01-07 02:01:14,769] {scheduler_job.py:1246} INFO - Processing each file at most -1 times
/usr/local/lib/python3.7/site-packages/distributed/client.py:1128 VersionMismatchWarning: Mismatched versions found

+-------------+-----------+-----------------------+-----------------------+
| Package     | client    | scheduler             | workers               |
+-------------+-----------+-----------------------+-----------------------+
| distributed | 2020.12.0 | 2020.12.0+11.g4386b75 | 2020.12.0+11.g4386b75 |
+-------------+-----------+-----------------------+-----------------------+
[2021-01-07 02:01:14,838] {dag_processing.py:250} INFO - Launched DagFileProcessorManager with pid: 18779
[2021-01-07 02:01:14,841] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs
[2021-01-07 02:01:14,849] {settings.py:52} INFO - Configured default timezone Timezone('UTC')
[2021-01-07 02:02:04,469] {scheduler_job.py:938} INFO - 4 tasks up for execution:
	<TaskInstance: example_bash_operator.runme_0 2021-01-07 07:02:03.530249+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.runme_1 2021-01-07 07:02:03.530249+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.runme_2 2021-01-07 07:02:03.530249+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.also_run_this 2021-01-07 07:02:03.530249+00:00 [scheduled]>
[2021-01-07 02:02:04,470] {scheduler_job.py:972} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 4 task instances ready to be queued
[2021-01-07 02:02:04,470] {scheduler_job.py:999} INFO - DAG example_bash_operator has 0/16 running and queued tasks
[2021-01-07 02:02:04,471] {scheduler_job.py:999} INFO - DAG example_bash_operator has 1/16 running and queued tasks
[2021-01-07 02:02:04,471] {scheduler_job.py:999} INFO - DAG example_bash_operator has 2/16 running and queued tasks
[2021-01-07 02:02:04,471] {scheduler_job.py:999} INFO - DAG example_bash_operator has 3/16 running and queued tasks
[2021-01-07 02:02:04,471] {scheduler_job.py:1060} INFO - Setting the following tasks to queued state:
	<TaskInstance: example_bash_operator.runme_0 2021-01-07 07:02:03.530249+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.runme_1 2021-01-07 07:02:03.530249+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.runme_2 2021-01-07 07:02:03.530249+00:00 [scheduled]>
	<TaskInstance: example_bash_operator.also_run_this 2021-01-07 07:02:03.530249+00:00 [scheduled]>
[2021-01-07 02:02:04,473] {scheduler_job.py:1102} INFO - Sending TaskInstanceKey(dag_id='example_bash_operator', task_id='runme_0', execution_date=datetime.datetime(2021, 1, 7, 7, 2, 3, 530249, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 3 and queue default
[2021-01-07 02:02:04,473] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py']
[2021-01-07 02:02:04,473] {scheduler_job.py:1102} INFO - Sending TaskInstanceKey(dag_id='example_bash_operator', task_id='runme_1', execution_date=datetime.datetime(2021, 1, 7, 7, 2, 3, 530249, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 3 and queue default
[2021-01-07 02:02:04,474] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_1', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py']
[2021-01-07 02:02:04,474] {scheduler_job.py:1102} INFO - Sending TaskInstanceKey(dag_id='example_bash_operator', task_id='runme_2', execution_date=datetime.datetime(2021, 1, 7, 7, 2, 3, 530249, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 3 and queue default
[2021-01-07 02:02:04,474] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_2', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py']
[2021-01-07 02:02:04,474] {scheduler_job.py:1102} INFO - Sending TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', execution_date=datetime.datetime(2021, 1, 7, 7, 2, 3, 530249, tzinfo=Timezone('UTC')), try_number=1) to executor with priority 2 and queue default
[2021-01-07 02:02:04,474] {base_executor.py:79} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py']
[2021-01-07 02:02:06,343] {dask_executor.py:94} ERROR - Failed to execute task: CalledProcessError(120, ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_0', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py'])
[2021-01-07 02:02:06,344] {dask_executor.py:94} ERROR - Failed to execute task: CalledProcessError(120, ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_2', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py'])
[2021-01-07 02:02:06,345] {dask_executor.py:94} ERROR - Failed to execute task: CalledProcessError(120, ['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py'])
[2021-01-07 02:02:06,345] {scheduler_job.py:1200} INFO - Executor reports execution of example_bash_operator.runme_0 execution_date=2021-01-07 07:02:03.530249+00:00 exited with status failed for try_number 1
[2021-01-07 02:02:06,345] {scheduler_job.py:1200} INFO - Executor reports execution of example_bash_operator.runme_2 execution_date=2021-01-07 07:02:03.530249+00:00 exited with status failed for try_number 1
[2021-01-07 02:02:06,345] {scheduler_job.py:1200} INFO - Executor reports execution of example_bash_operator.also_run_this execution_date=2021-01-07 07:02:03.530249+00:00 exited with status failed for try_number 1
[2021-01-07 02:02:06,350] {scheduler_job.py:1229} ERROR - Executor reports task instance <TaskInstance: example_bash_operator.runme_0 2021-01-07 07:02:03.530249+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2021-01-07 02:02:06,351] {scheduler_job.py:1229} ERROR - Executor reports task instance <TaskInstance: example_bash_operator.runme_2 2021-01-07 07:02:03.530249+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2021-01-07 02:02:06,352] {scheduler_job.py:1229} ERROR - Executor reports task instance <TaskInstance: example_bash_operator.also_run_this 2021-01-07 07:02:03.530249+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
[2021-01-07 02:02:07,410] {dask_executor.py:94} ERROR - Failed to execute task: CalledProcessError(120, ['airflow', 'tasks', 'run', 'example_bash_operator', 'runme_1', '2021-01-07T07:02:03.530249+00:00', '--local', '--pool', 'default_pool', '--subdir', '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py'])
[2021-01-07 02:02:07,410] {scheduler_job.py:1200} INFO - Executor reports execution of example_bash_operator.runme_1 execution_date=2021-01-07 07:02:03.530249+00:00 exited with status failed for try_number 1
[2021-01-07 02:02:07,415] {scheduler_job.py:1229} ERROR - Executor reports task instance <TaskInstance: example_bash_operator.runme_1 2021-01-07 07:02:03.530249+00:00 [queued]> finished (failed) although the task says its queued. (Info: None) Was the task killed externally?
Process ForkProcess-1:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 365, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 596, in start
    return self._run_parsing_loop()
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/dag_processing.py", line 659, in _run_parsing_loop
    self._processors.pop(processor.file_path)
KeyError: '/usr/local/lib/python3.7/site-packages/airflow/example_dags/example_bash_operator.py'
[2021-01-07 02:02:08,445] {dagrun.py:429} ERROR - Marking run <DagRun example_bash_operator @ 2021-01-07 07:02:03.530249+00:00: manual__2021-01-07T07:02:03.530249+00:00, externally triggered: True> failed
[2021-01-07 02:02:08,470] {dag_processing.py:399} WARNING - DagFileProcessorManager (PID=18779) exited with exit code 1 - re-launching
[2021-01-07 02:02:08,475] {dag_processing.py:250} INFO - Launched DagFileProcessorManager with pid: 21398
[2021-01-07 02:02:08,483] {settings.py:52} INFO - Configured default timezone Timezone('UTC')

@griffinqiu
Copy link

Same issue

@esc
Copy link

esc commented Mar 16, 2021

I had the same issue and this fixed it for me:

uqfoundation/dill#383 (comment)

# workaround from https://github.com/uqfoundation/dill/issues/383
import dill
dill.extend(False)
import cloudpickle
dill.extend(True)

@ms7463
Copy link
Contributor

ms7463 commented Oct 9, 2021

Could also just pass check_call directly to submit without needing to define the extra function. This works in a plugin I have defined.

future = self.client.submit(subprocess.check_call, command, pure=False, resources=resources, close_fds=True)

@apache apache locked and limited conversation to collaborators May 29, 2022
@eladkal eladkal converted this issue into discussion #24001 May 29, 2022

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
area:Scheduler including HA (high availability) scheduler kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

7 participants