diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index a3958b6237977..7a1cd975e6931 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -240,6 +240,8 @@ class KubernetesPodOperator(BaseOperator): Deprecated - use `on_finish_action` instead. :param termination_message_policy: The termination message policy of the base container. Default value is "File" + :param active_deadline_seconds: The active_deadline_seconds which matches to active_deadline_seconds + in V1PodSpec. """ # This field can be overloaded at the instance level via base_container_name @@ -320,6 +322,7 @@ def __init__( on_finish_action: str = "delete_pod", is_delete_operator_pod: None | bool = None, termination_message_policy: str = "File", + active_deadline_seconds: int | None = None, **kwargs, ) -> None: # TODO: remove in provider 6.0.0 release. This is a mitigate step to advise users to switch to the @@ -417,6 +420,7 @@ def __init__( self.on_finish_action = OnFinishAction(on_finish_action) self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD self.termination_message_policy = termination_message_policy + self.active_deadline_seconds = active_deadline_seconds self._config_dict: dict | None = None # TODO: remove it when removing convert_config_file_to_dict @@ -860,6 +864,7 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod: restart_policy="Never", priority_class_name=self.priority_class_name, volumes=self.volumes, + active_deadline_seconds=self.active_deadline_seconds, ), ) diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 3d7a2fd4ef525..a1182581f97d3 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -29,6 +29,7 @@ import pendulum import pytest +from kubernetes import client from kubernetes.client import V1EnvVar, V1PodSecurityContext, V1SecurityContext, models as k8s from kubernetes.client.api_client import ApiClient from kubernetes.client.rest import ApiException @@ -43,6 +44,7 @@ from airflow.utils.context import Context from airflow.utils.types import DagRunType from airflow.version import version as airflow_version +from kubernetes_tests.test_base import BaseK8STest HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook" POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager" @@ -1331,3 +1333,32 @@ def __getattr__(self, name): task.render_template_fields(context=context) assert "password" in caplog.text assert "secretpassword" not in caplog.text + + +class TestKubernetesPodOperator(BaseK8STest): + @pytest.mark.parametrize("active_deadline_seconds", [10, 20]) + def test_kubernetes_pod_operator_active_deadline_seconds(self, active_deadline_seconds): + k = KubernetesPodOperator( + task_id=f"test_task_{active_deadline_seconds}", + active_deadline_seconds=active_deadline_seconds, + image="busybox", + cmds=["sh", "-c", "echo 'hello world' && sleep 60"], + namespace="default", + on_finish_action="keep_pod", + ) + + context = create_context(k) + + with pytest.raises(AirflowException): + k.execute(context) + + pod = k.find_pod("default", context, exclude_checked=False) + + k8s_client = client.CoreV1Api() + + pod_status = k8s_client.read_namespaced_pod_status(name=pod.metadata.name, namespace="default") + phase = pod_status.status.phase + reason = pod_status.status.reason + + assert phase == "Failed" + assert reason == "DeadlineExceeded"