Skip to content

Commit

Permalink
Consolidate the call of change_state to fail or success in the core e…
Browse files Browse the repository at this point in the history
…xecutors (#35901)
  • Loading branch information
hussein-awala authored Nov 28, 2023
1 parent f6962a9 commit ce7f043
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 8 deletions.
6 changes: 3 additions & 3 deletions airflow/executors/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def sync(self) -> None:
elif self._terminated.is_set():
self.log.info("Executor is terminated! Stopping %s to %s", ti.key, TaskInstanceState.FAILED)
ti.set_state(TaskInstanceState.FAILED)
self.change_state(ti.key, TaskInstanceState.FAILED)
self.fail(ti.key)
else:
task_succeeded = self._run_task(ti)

Expand All @@ -84,11 +84,11 @@ def _run_task(self, ti: TaskInstance) -> bool:
try:
params = self.tasks_params.pop(ti.key, {})
ti.run(job_id=ti.job_id, **params)
self.change_state(key, TaskInstanceState.SUCCESS)
self.success(key)
return True
except Exception as e:
ti.set_state(TaskInstanceState.FAILED)
self.change_state(key, TaskInstanceState.FAILED)
self.fail(key)
self.log.exception("Failed to execute task: %s.", e)
return False

Expand Down
5 changes: 2 additions & 3 deletions airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from typing import TYPE_CHECKING, Any

from airflow.executors.base_executor import BaseExecutor
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType
Expand Down Expand Up @@ -75,9 +74,9 @@ def sync(self) -> None:

try:
subprocess.check_call(command, close_fds=True)
self.change_state(key, TaskInstanceState.SUCCESS)
self.success(key)
except subprocess.CalledProcessError as e:
self.change_state(key, TaskInstanceState.FAILED)
self.fail(key)
self.log.error("Failed to execute task %s.", e)

self.commands_to_run = []
Expand Down
4 changes: 2 additions & 2 deletions tests/executors/test_debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def test_fail_fast(self, change_state_mock):
assert not executor.tasks_to_run
change_state_mock.assert_has_calls(
[
mock.call(ti1.key, State.FAILED),
mock.call(ti1.key, State.FAILED, None),
mock.call(ti2.key, State.UPSTREAM_FAILED),
]
)
Expand Down Expand Up @@ -145,6 +145,6 @@ def test_sync_after_terminate(self, change_state_mock):

change_state_mock.assert_has_calls(
[
mock.call(ti1.key, State.FAILED),
mock.call(ti1.key, State.FAILED, None),
]
)

0 comments on commit ce7f043

Please sign in to comment.