Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes executor running slots leak fix #36240

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@
raise
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import POD_EXECUTOR_DONE_KEY
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
)
from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key
from airflow.utils.event_scheduler import EventScheduler
Expand Down Expand Up @@ -463,14 +466,23 @@ def sync(self) -> None:
def _change_state(
self,
key: TaskInstanceKey,
state: TaskInstanceState | None,
state: TaskInstanceState | str | None,
pod_name: str,
namespace: str,
session: Session = NEW_SESSION,
) -> None:
if TYPE_CHECKING:
assert self.kube_scheduler

if state == ADOPTED:
# When the task pod is adopted by another executor,
# then remove the task from the current executor running queue.
try:
self.running.remove(key)
except KeyError:
self.log.debug("TI key not in running: %s", key)
return

if state == TaskInstanceState.RUNNING:
self.event_buffer[key] = state, None
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union

ADOPTED = "adopted"
if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType
from airflow.models.taskinstance import TaskInstanceKey
Expand All @@ -27,10 +28,10 @@
KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]

# key, pod state, pod_name, namespace, resource_version
KubernetesResultsType = Tuple[TaskInstanceKey, Optional[TaskInstanceState], str, str, str]
KubernetesResultsType = Tuple[TaskInstanceKey, Optional[Union[TaskInstanceState, str]], str, str, str]

# pod_name, namespace, pod state, annotations, resource_version
KubernetesWatchType = Tuple[str, str, Optional[TaskInstanceState], Dict[str, str], str]
KubernetesWatchType = Tuple[str, str, Optional[Union[TaskInstanceState, str]], Dict[str, str], str]

ALL_NAMESPACES = "ALL_NAMESPACES"
POD_EXECUTOR_DONE_KEY = "airflow_executor_done"
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

try:
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
ALL_NAMESPACES,
POD_EXECUTOR_DONE_KEY,
)
Expand Down Expand Up @@ -220,7 +221,13 @@ def process_status(
pod = event["object"]
annotations_string = annotations_for_logging_task_metadata(annotations)
"""Process status response."""
if status == "Pending":
if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp:
dirrao marked this conversation as resolved.
Show resolved Hide resolved
# This will happen only when the task pods are adopted by another executor.
# So, there is no change in the pod state.
# However, need to free the executor slot from the current executor.
self.log.info("Event: pod %s adopted, annotations: %s", pod_name, annotations_string)
self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, resource_version))
elif status == "Pending":
# deletion_timestamp is set by kube server when a graceful deletion is requested.
# since kube server have received request to delete pod set TI state failed
if event["type"] == "DELETED" and pod.metadata.deletion_timestamp:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
KubernetesExecutor,
PodReconciliationError,
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import POD_EXECUTOR_DONE_KEY
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import (
AirflowKubernetesScheduler,
KubernetesJobWatcher,
Expand Down Expand Up @@ -644,6 +647,25 @@ def test_change_state_none(
finally:
executor.end()

@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod"
)
def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = self.kubernetes_executor
executor.start()
try:
key = ("dag_id", "task_id", "run_id", "try_number2")
executor.running = {key}
executor._change_state(key, ADOPTED, "pod_name", "default")
assert len(executor.event_buffer) == 0
assert len(executor.running) == 0
mock_delete_pod.assert_not_called()
finally:
executor.end()

@pytest.mark.db_test
@pytest.mark.parametrize(
"multi_namespace_mode_namespace_list, watchers_keys",
Expand Down Expand Up @@ -1431,12 +1453,31 @@ def test_process_status_succeeded_dedup_timestamp(self):
self._run()
self.watcher.watcher_queue.put.assert_not_called()

def test_process_status_succeeded_type_delete(self):
self.pod.status.phase = "Succeeded"
@pytest.mark.parametrize(
"ti_state",
[
TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED,
TaskInstanceState.RUNNING,
TaskInstanceState.QUEUED,
TaskInstanceState.UP_FOR_RETRY,
],
)
def test_process_status_pod_adopted(self, ti_state):
self.pod.status.phase = ti_state
self.events.append({"type": "DELETED", "object": self.pod})
self.pod.metadata.deletion_timestamp = None

self._run()
self.watcher.watcher_queue.put.assert_not_called()
self.watcher.watcher_queue.put.assert_called_once_with(
(
self.pod.metadata.name,
self.watcher.namespace,
ADOPTED,
self.core_annotations,
self.pod.metadata.resource_version,
)
)

def test_process_status_running_deleted(self):
self.pod.status.phase = "Running"
Expand Down