Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add active_deadline_seconds parameter to KubernetesPodOperator #33379

Merged
merged 8 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ class KubernetesPodOperator(BaseOperator):
Deprecated - use `on_finish_action` instead.
:param termination_message_policy: The termination message policy of the base container.
Default value is "File"
:param active_deadline_seconds: The active_deadline_seconds which matches to active_deadline_seconds
in V1PodSpec.
"""

# This field can be overloaded at the instance level via base_container_name
Expand Down Expand Up @@ -320,6 +322,7 @@ def __init__(
on_finish_action: str = "delete_pod",
is_delete_operator_pod: None | bool = None,
termination_message_policy: str = "File",
active_deadline_seconds: int | None = None,
**kwargs,
) -> None:
# TODO: remove in provider 6.0.0 release. This is a mitigate step to advise users to switch to the
Expand Down Expand Up @@ -417,6 +420,7 @@ def __init__(
self.on_finish_action = OnFinishAction(on_finish_action)
self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD
self.termination_message_policy = termination_message_policy
self.active_deadline_seconds = active_deadline_seconds

self._config_dict: dict | None = None # TODO: remove it when removing convert_config_file_to_dict

Expand Down Expand Up @@ -860,6 +864,7 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod:
restart_policy="Never",
priority_class_name=self.priority_class_name,
volumes=self.volumes,
active_deadline_seconds=self.active_deadline_seconds,
),
)

Expand Down
31 changes: 31 additions & 0 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import pendulum
import pytest
from kubernetes import client
from kubernetes.client import V1EnvVar, V1PodSecurityContext, V1SecurityContext, models as k8s
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException
Expand All @@ -43,6 +44,7 @@
from airflow.utils.context import Context
from airflow.utils.types import DagRunType
from airflow.version import version as airflow_version
from kubernetes_tests.test_base import BaseK8STest

HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook"
POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager"
Expand Down Expand Up @@ -1331,3 +1333,32 @@ def __getattr__(self, name):
task.render_template_fields(context=context)
assert "password" in caplog.text
assert "secretpassword" not in caplog.text


class TestKubernetesPodOperator(BaseK8STest):
@pytest.mark.parametrize("active_deadline_seconds", [10, 20])
def test_kubernetes_pod_operator_active_deadline_seconds(self, active_deadline_seconds):
k = KubernetesPodOperator(
task_id=f"test_task_{active_deadline_seconds}",
active_deadline_seconds=active_deadline_seconds,
image="busybox",
cmds=["sh", "-c", "echo 'hello world' && sleep 60"],
namespace="default",
on_finish_action="keep_pod",
)

context = create_context(k)

with pytest.raises(AirflowException):
k.execute(context)

pod = k.find_pod("default", context, exclude_checked=False)

k8s_client = client.CoreV1Api()

pod_status = k8s_client.read_namespaced_pod_status(name=pod.metadata.name, namespace="default")
phase = pod_status.status.phase
reason = pod_status.status.reason

assert phase == "Failed"
assert reason == "DeadlineExceeded"