Skip to content

Commit

Permalink
[AIRFLOW-2921][AIRFLOW-2922] Fix bugs in CeleryExecutor (apache#3773)
Browse files Browse the repository at this point in the history
Bug-1:
if a task state becomes either SUCCESS or FAILURE or REVOKED,
it will be removed from self.tasks() and self.last_state().
However, because line 108 is not indented properly,
this task will be added back to self.last_state() again.

Bug-2:
When the state is updated, it's referring to the latest
state `task.state` rather than variable `state`.
This may result in dead-lock if the state changed from
`STARTED` to `SUCCESS` after the if-elif-else block
started.

Test case is updated for fix to bug-1.
  • Loading branch information
XD-DENG authored and ashb committed Sep 3, 2018
1 parent 26e0d44 commit 948b09d
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
4 changes: 2 additions & 2 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ def sync(self):
del self.tasks[key]
del self.last_state[key]
else:
self.log.info("Unexpected state: %s", task.state)
self.last_state[key] = task.state
self.log.info("Unexpected state: %s", state)
self.last_state[key] = state
except Exception as e:
self.log.error("Error syncing the celery executor, ignoring it:")
self.log.exception(e)
Expand Down
6 changes: 5 additions & 1 deletion tests/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
from airflow.utils.state import State

# leave this it is used by the test worker
import celery.contrib.testing.tasks
import celery.contrib.testing.tasks # noqa: F401


class CeleryExecutorTest(unittest.TestCase):
def test_celery_integration(self):
Expand Down Expand Up @@ -53,6 +54,9 @@ def test_celery_integration(self):
self.assertNotIn('success', executor.tasks)
self.assertNotIn('fail', executor.tasks)

self.assertNotIn('success', executor.last_state)
self.assertNotIn('fail', executor.last_state)


if __name__ == '__main__':
unittest.main()

0 comments on commit 948b09d

Please sign in to comment.