Skip to content

Commit

Permalink
Add active_deadline_seconds parameter to KubernetesPodOperator (#…
Browse files Browse the repository at this point in the history
…33379)

* Inserting active_deadline_seconds in KPO

* Fixing tests

* Fix active_deadline_seconds test

* Fixing tests

* parametrize task_id to create a pod per task

---------

Co-authored-by: Hussein Awala <hussein@awala.fr>
  • Loading branch information
amoghrajesh and hussein-awala authored Aug 18, 2023
1 parent 46aa429 commit e991f60
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
5 changes: 5 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ class KubernetesPodOperator(BaseOperator):
Deprecated - use `on_finish_action` instead.
:param termination_message_policy: The termination message policy of the base container.
Default value is "File"
:param active_deadline_seconds: The active_deadline_seconds which matches to active_deadline_seconds
in V1PodSpec.
"""

# This field can be overloaded at the instance level via base_container_name
Expand Down Expand Up @@ -320,6 +322,7 @@ def __init__(
on_finish_action: str = "delete_pod",
is_delete_operator_pod: None | bool = None,
termination_message_policy: str = "File",
active_deadline_seconds: int | None = None,
**kwargs,
) -> None:
# TODO: remove in provider 6.0.0 release. This is a mitigate step to advise users to switch to the
Expand Down Expand Up @@ -417,6 +420,7 @@ def __init__(
self.on_finish_action = OnFinishAction(on_finish_action)
self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD
self.termination_message_policy = termination_message_policy
self.active_deadline_seconds = active_deadline_seconds

self._config_dict: dict | None = None # TODO: remove it when removing convert_config_file_to_dict

Expand Down Expand Up @@ -860,6 +864,7 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod:
restart_policy="Never",
priority_class_name=self.priority_class_name,
volumes=self.volumes,
active_deadline_seconds=self.active_deadline_seconds,
),
)

Expand Down
31 changes: 31 additions & 0 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import pendulum
import pytest
from kubernetes import client
from kubernetes.client import V1EnvVar, V1PodSecurityContext, V1SecurityContext, models as k8s
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException
Expand All @@ -43,6 +44,7 @@
from airflow.utils.context import Context
from airflow.utils.types import DagRunType
from airflow.version import version as airflow_version
from kubernetes_tests.test_base import BaseK8STest

HOOK_CLASS = "airflow.providers.cncf.kubernetes.operators.pod.KubernetesHook"
POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager"
Expand Down Expand Up @@ -1331,3 +1333,32 @@ def __getattr__(self, name):
task.render_template_fields(context=context)
assert "password" in caplog.text
assert "secretpassword" not in caplog.text


class TestKubernetesPodOperator(BaseK8STest):
@pytest.mark.parametrize("active_deadline_seconds", [10, 20])
def test_kubernetes_pod_operator_active_deadline_seconds(self, active_deadline_seconds):
k = KubernetesPodOperator(
task_id=f"test_task_{active_deadline_seconds}",
active_deadline_seconds=active_deadline_seconds,
image="busybox",
cmds=["sh", "-c", "echo 'hello world' && sleep 60"],
namespace="default",
on_finish_action="keep_pod",
)

context = create_context(k)

with pytest.raises(AirflowException):
k.execute(context)

pod = k.find_pod("default", context, exclude_checked=False)

k8s_client = client.CoreV1Api()

pod_status = k8s_client.read_namespaced_pod_status(name=pod.metadata.name, namespace="default")
phase = pod_status.status.phase
reason = pod_status.status.reason

assert phase == "Failed"
assert reason == "DeadlineExceeded"

0 comments on commit e991f60

Please sign in to comment.