Skip to content

Commit

Permalink
Fix Scheduler restarting due to too many completed pods in cluster (#…
Browse files Browse the repository at this point in the history
…40183)

* Fix Scheduler restarting due to too many completed pods in cluster

Currently, when a pod completes and is not deleted due to the user's configuration,
the watcher keeps listing these pods and checking their status. We should instead stop
watching the pod once it succeeds. To do that, pods are created with the executor done
label set to False and changed to True when the pod completes. The watcher then watches
only those pods that the pod executor done label is False

closes: #22612

* Update airflow/providers/cncf/kubernetes/pod_generator.py

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

* Add back removed section

* Don't add pod key label from get go

* Update airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

---------

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
  • Loading branch information
ephraimbuddy and jedcunningham authored Jun 13, 2024
1 parent 2272ea2 commit 67798b2
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ def _run(
) -> str | None:
self.log.info("Event: and now my watch begins starting at resource_version: %s", resource_version)

kwargs: dict[str, Any] = {"label_selector": f"airflow-worker={scheduler_job_id}"}
kwargs: dict[str, Any] = {
"label_selector": f"airflow-worker={scheduler_job_id},{POD_EXECUTOR_DONE_KEY}!=True",
}
if resource_version:
kwargs["resource_version"] = resource_version
if kube_config.kube_client_request_args:
Expand Down Expand Up @@ -271,14 +273,9 @@ def process_status(
elif status == "Succeeded":
# We get multiple events once the pod hits a terminal state, and we only want to
# send it along to the scheduler once.
# If our event type is DELETED, we have the POD_EXECUTOR_DONE_KEY, or the pod has
# a deletion timestamp, we've already seen the initial Succeeded event and sent it
# along to the scheduler.
if (
event["type"] == "DELETED"
or POD_EXECUTOR_DONE_KEY in pod.metadata.labels
or pod.metadata.deletion_timestamp
):
# If our event type is DELETED, or the pod has a deletion timestamp, we've already
# seen the initial Succeeded event and sent it along to the scheduler.
if event["type"] == "DELETED" or pod.metadata.deletion_timestamp:
self.log.info(
"Skipping event for Succeeded pod %s - event for this pod already sent to executor",
pod_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import (
AirflowKubernetesScheduler,
Expand Down Expand Up @@ -1539,15 +1538,28 @@ def setup_method(self):
def _run(self):
with mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.watch"
) as mock_watch:
) as mock_watch, mock.patch.object(
KubernetesJobWatcher,
"_pod_events",
) as mock_pod_events:
mock_watch.Watch.return_value.stream.return_value = self.events
mock_pod_events.return_value = self.events
latest_resource_version = self.watcher._run(
self.kube_client,
self.watcher.resource_version,
self.watcher.scheduler_job_id,
self.watcher.kube_config,
)
assert self.pod.metadata.resource_version == latest_resource_version
mock_pod_events.assert_called_once_with(
kube_client=self.kube_client,
query_kwargs={
"label_selector": "airflow-worker=123,airflow_executor_done!=True",
"resource_version": "0",
"_request_timeout": 30,
"timeout_seconds": 3600,
},
)

def assert_watcher_queue_called_once_with_state(self, state):
self.watcher.watcher_queue.put.assert_called_once_with(
Expand Down Expand Up @@ -1763,14 +1775,6 @@ def test_process_status_succeeded(self):
# We don't know the TI state, so we send in None
self.assert_watcher_queue_called_once_with_state(None)

def test_process_status_succeeded_dedup_label(self):
self.pod.status.phase = "Succeeded"
self.pod.metadata.labels[POD_EXECUTOR_DONE_KEY] = "True"
self.events.append({"type": "MODIFIED", "object": self.pod})

self._run()
self.watcher.watcher_queue.put.assert_not_called()

def test_process_status_succeeded_dedup_timestamp(self):
self.pod.status.phase = "Succeeded"
self.pod.metadata.deletion_timestamp = timezone.utcnow()
Expand Down

0 comments on commit 67798b2

Please sign in to comment.