From ba9a292f5d2555600415aba127509b73d0265b91 Mon Sep 17 00:00:00 2001 From: gopal Date: Fri, 15 Dec 2023 17:05:02 +0530 Subject: [PATCH 1/3] kubernetes executor running slots leak fix --- .../executors/kubernetes_executor.py | 9 ++++ .../executors/kubernetes_executor_utils.py | 10 ++++- airflow/utils/state.py | 3 ++ .../executors/test_kubernetes_executor.py | 44 +++++++++++++++++-- 4 files changed, 62 insertions(+), 4 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 6e32c0047330..9d931a385e22 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -471,6 +471,15 @@ def _change_state( if TYPE_CHECKING: assert self.kube_scheduler + if state == TaskInstanceState.ADOPTED: + # When the task pod is adopted by another scheduler, + # then remove the task from the current scheduler running queue. + try: + self.running.remove(key) + except KeyError: + self.log.debug("TI key not in running: %s", key) + return + if state == TaskInstanceState.RUNNING: self.event_buffer[key] = state, None return diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 074b65d198f6..c99196d8a86a 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -220,7 +220,15 @@ def process_status( pod = event["object"] annotations_string = annotations_for_logging_task_metadata(annotations) """Process status response.""" - if status == "Pending": + if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp: + # This will happen only when the task pods are adopted by another scheduler. + # So, there is no change in the pod state. + # However, need to free the executor slot from the current scheduler. + self.log.info("Event: pod %s adopted, annotations: %s", pod_name, annotations_string) + self.watcher_queue.put( + (pod_name, namespace, TaskInstanceState.ADOPTED, annotations, resource_version) + ) + elif status == "Pending": # deletion_timestamp is set by kube server when a graceful deletion is requested. # since kube server have received request to delete pod set TI state failed if event["type"] == "DELETED" and pod.metadata.deletion_timestamp: diff --git a/airflow/utils/state.py b/airflow/utils/state.py index 1061970c4673..5b107ef74785 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -46,6 +46,9 @@ class TaskInstanceState(str, Enum): REMOVED = "removed" # Task vanished from DAG before it ran SCHEDULED = "scheduled" # Task should run and will be handed to executor soon + # Set by executor + ADOPTED = "adopted" + # Set by the task instance itself QUEUED = "queued" # Executor has enqueued the task RUNNING = "running" # Task is executing diff --git a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py index a0b187087ad3..7603c6c88aa0 100644 --- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -644,6 +644,25 @@ def test_change_state_none( finally: executor.end() + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + @mock.patch( + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod" + ) + def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher): + executor = self.kubernetes_executor + executor.start() + try: + key = ("dag_id", "task_id", "run_id", "try_number2") + executor.running = {key} + executor._change_state(key, TaskInstanceState.ADOPTED, "pod_name", "default") + assert len(executor.event_buffer) == 0 + assert len(executor.running) == 0 + mock_delete_pod.assert_not_called() + finally: + executor.end() + @pytest.mark.db_test @pytest.mark.parametrize( "multi_namespace_mode_namespace_list, watchers_keys", @@ -1431,12 +1450,31 @@ def test_process_status_succeeded_dedup_timestamp(self): self._run() self.watcher.watcher_queue.put.assert_not_called() - def test_process_status_succeeded_type_delete(self): - self.pod.status.phase = "Succeeded" + @pytest.mark.parametrize( + "ti_state", + [ + TaskInstanceState.SUCCESS, + TaskInstanceState.FAILED, + TaskInstanceState.RUNNING, + TaskInstanceState.QUEUED, + TaskInstanceState.UP_FOR_RETRY, + ], + ) + def test_process_status_pod_adopted(self, ti_state): + self.pod.status.phase = ti_state self.events.append({"type": "DELETED", "object": self.pod}) + self.pod.metadata.deletion_timestamp = None self._run() - self.watcher.watcher_queue.put.assert_not_called() + self.watcher.watcher_queue.put.assert_called_once_with( + ( + self.pod.metadata.name, + self.watcher.namespace, + TaskInstanceState.ADOPTED, + self.core_annotations, + self.pod.metadata.resource_version, + ) + ) def test_process_status_running_deleted(self): self.pod.status.phase = "Running" From 10de3605a81f64e89fa2696f15d507441d7c3ad7 Mon Sep 17 00:00:00 2001 From: gopal Date: Sun, 17 Dec 2023 08:34:39 +0530 Subject: [PATCH 2/3] review comments addressed --- .../cncf/kubernetes/executors/kubernetes_executor.py | 9 ++++++--- .../kubernetes/executors/kubernetes_executor_types.py | 7 ++++--- .../kubernetes/executors/kubernetes_executor_utils.py | 5 ++--- airflow/utils/state.py | 3 --- .../kubernetes/executors/test_kubernetes_executor.py | 9 ++++++--- 5 files changed, 18 insertions(+), 15 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 9d931a385e22..239f823da70c 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -75,7 +75,10 @@ raise from airflow.configuration import conf from airflow.executors.base_executor import BaseExecutor -from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import POD_EXECUTOR_DONE_KEY +from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( + ADOPTED, + POD_EXECUTOR_DONE_KEY, +) from airflow.providers.cncf.kubernetes.kube_config import KubeConfig from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key from airflow.utils.event_scheduler import EventScheduler @@ -463,7 +466,7 @@ def sync(self) -> None: def _change_state( self, key: TaskInstanceKey, - state: TaskInstanceState | None, + state: TaskInstanceState | str | None, pod_name: str, namespace: str, session: Session = NEW_SESSION, @@ -471,7 +474,7 @@ def _change_state( if TYPE_CHECKING: assert self.kube_scheduler - if state == TaskInstanceState.ADOPTED: + if state == ADOPTED: # When the task pod is adopted by another scheduler, # then remove the task from the current scheduler running queue. try: diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py index 80b8f1de7289..422913629802 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py @@ -16,8 +16,9 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union +ADOPTED = "adopted" if TYPE_CHECKING: from airflow.executors.base_executor import CommandType from airflow.models.taskinstance import TaskInstanceKey @@ -27,10 +28,10 @@ KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]] # key, pod state, pod_name, namespace, resource_version - KubernetesResultsType = Tuple[TaskInstanceKey, Optional[TaskInstanceState], str, str, str] + KubernetesResultsType = Tuple[TaskInstanceKey, Optional[Union[TaskInstanceState, str]], str, str, str] # pod_name, namespace, pod state, annotations, resource_version - KubernetesWatchType = Tuple[str, str, Optional[TaskInstanceState], Dict[str, str], str] + KubernetesWatchType = Tuple[str, str, Optional[Union[TaskInstanceState, str]], Dict[str, str], str] ALL_NAMESPACES = "ALL_NAMESPACES" POD_EXECUTOR_DONE_KEY = "airflow_executor_done" diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index c99196d8a86a..06d4d1fbcef4 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -40,6 +40,7 @@ try: from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( + ADOPTED, ALL_NAMESPACES, POD_EXECUTOR_DONE_KEY, ) @@ -225,9 +226,7 @@ def process_status( # So, there is no change in the pod state. # However, need to free the executor slot from the current scheduler. self.log.info("Event: pod %s adopted, annotations: %s", pod_name, annotations_string) - self.watcher_queue.put( - (pod_name, namespace, TaskInstanceState.ADOPTED, annotations, resource_version) - ) + self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, resource_version)) elif status == "Pending": # deletion_timestamp is set by kube server when a graceful deletion is requested. # since kube server have received request to delete pod set TI state failed diff --git a/airflow/utils/state.py b/airflow/utils/state.py index 5b107ef74785..1061970c4673 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -46,9 +46,6 @@ class TaskInstanceState(str, Enum): REMOVED = "removed" # Task vanished from DAG before it ran SCHEDULED = "scheduled" # Task should run and will be handed to executor soon - # Set by executor - ADOPTED = "adopted" - # Set by the task instance itself QUEUED = "queued" # Executor has enqueued the task RUNNING = "running" # Task is executing diff --git a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py index 7603c6c88aa0..15b17515c17a 100644 --- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -44,7 +44,10 @@ KubernetesExecutor, PodReconciliationError, ) - from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import POD_EXECUTOR_DONE_KEY + from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( + ADOPTED, + POD_EXECUTOR_DONE_KEY, + ) from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ( AirflowKubernetesScheduler, KubernetesJobWatcher, @@ -656,7 +659,7 @@ def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_ try: key = ("dag_id", "task_id", "run_id", "try_number2") executor.running = {key} - executor._change_state(key, TaskInstanceState.ADOPTED, "pod_name", "default") + executor._change_state(key, ADOPTED, "pod_name", "default") assert len(executor.event_buffer) == 0 assert len(executor.running) == 0 mock_delete_pod.assert_not_called() @@ -1470,7 +1473,7 @@ def test_process_status_pod_adopted(self, ti_state): ( self.pod.metadata.name, self.watcher.namespace, - TaskInstanceState.ADOPTED, + ADOPTED, self.core_annotations, self.pod.metadata.resource_version, ) From e5a4b219f8128ba7f6ea49caf90f2d6561185ab4 Mon Sep 17 00:00:00 2001 From: gopal Date: Sun, 17 Dec 2023 09:00:36 +0530 Subject: [PATCH 3/3] review comments addressed --- .../cncf/kubernetes/executors/kubernetes_executor.py | 4 ++-- .../cncf/kubernetes/executors/kubernetes_executor_utils.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 239f823da70c..a5d911f8ce0f 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -475,8 +475,8 @@ def _change_state( assert self.kube_scheduler if state == ADOPTED: - # When the task pod is adopted by another scheduler, - # then remove the task from the current scheduler running queue. + # When the task pod is adopted by another executor, + # then remove the task from the current executor running queue. try: self.running.remove(key) except KeyError: diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 06d4d1fbcef4..09b47aea194d 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -222,9 +222,9 @@ def process_status( annotations_string = annotations_for_logging_task_metadata(annotations) """Process status response.""" if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp: - # This will happen only when the task pods are adopted by another scheduler. + # This will happen only when the task pods are adopted by another executor. # So, there is no change in the pod state. - # However, need to free the executor slot from the current scheduler. + # However, need to free the executor slot from the current executor. self.log.info("Event: pod %s adopted, annotations: %s", pod_name, annotations_string) self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, resource_version)) elif status == "Pending":