From 18af94c024bb99994ad292f59f7332780ca66e77 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Mon, 12 Feb 2024 20:24:15 +0530 Subject: [PATCH 1/9] Restore KPO trigger event param name --- airflow/providers/cncf/kubernetes/triggers/pod.py | 4 ++-- tests/providers/cncf/kubernetes/triggers/test_pod.py | 8 ++++---- .../google/cloud/triggers/test_kubernetes_engine.py | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index e34a73f146fe2..22b2dfd050830 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -162,7 +162,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] state = await self._wait_for_pod_start() if state in PodPhase.terminal_states: event = TriggerEvent( - {"status": "done", "namespace": self.pod_namespace, "pod_name": self.pod_name} + {"status": "done", "namespace": self.pod_namespace, "name": self.pod_name} ) else: event = await self._wait_for_container_completion() @@ -216,7 +216,7 @@ async def _wait_for_container_completion(self) -> TriggerEvent: pod = await self.hook.get_pod(self.pod_name, self.pod_namespace) if not container_is_running(pod=pod, container_name=self.base_container_name): return TriggerEvent( - {"status": "done", "namespace": self.pod_namespace, "pod_name": self.pod_name} + {"status": "done", "namespace": self.pod_namespace, "name": self.pod_name} ) if time_get_more_logs and timezone.utcnow() > time_get_more_logs: return TriggerEvent({"status": "running", "last_log_time": self.last_log_time}) diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py b/tests/providers/cncf/kubernetes/triggers/test_pod.py index d12100e4e35c7..2f4d116dd587f 100644 --- a/tests/providers/cncf/kubernetes/triggers/test_pod.py +++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py @@ -122,7 +122,7 @@ async def test_run_loop_return_success_event(self, mock_hook, mock_method, trigg expected_event = TriggerEvent( { - "pod_name": POD_NAME, + "name": POD_NAME, "namespace": NAMESPACE, "status": "done", } @@ -188,7 +188,7 @@ async def test_run_loop_return_failed_event(self, mock_hook, mock_method, trigge expected_event = TriggerEvent( { - "pod_name": POD_NAME, + "name": POD_NAME, "namespace": NAMESPACE, "status": "done", } @@ -236,7 +236,7 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( "logging_interval, exp_event", [ param(0, {"status": "running", "last_log_time": DateTime(2022, 1, 1)}, id="short_interval"), - param(None, {"status": "done", "namespace": mock.ANY, "pod_name": mock.ANY}, id="no_interval"), + param(None, {"status": "done", "namespace": mock.ANY, "name": mock.ANY}, id="no_interval"), ], ) @mock.patch( @@ -325,4 +325,4 @@ async def test_run_loop_return_timeout_event( generator = trigger.run() actual = await generator.asend(None) - assert actual == TriggerEvent({"status": "done", "namespace": NAMESPACE, "pod_name": POD_NAME}) + assert actual == TriggerEvent({"status": "done", "namespace": NAMESPACE, "name": POD_NAME}) diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py index ca7b7ba3588fd..22149f29248d8 100644 --- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py @@ -118,7 +118,7 @@ async def test_run_loop_return_success_event_should_execute_successfully( expected_event = TriggerEvent( { - "pod_name": POD_NAME, + "name": POD_NAME, "namespace": NAMESPACE, "status": "done", } @@ -144,7 +144,7 @@ async def test_run_loop_return_failed_event_should_execute_successfully( expected_event = TriggerEvent( { - "pod_name": POD_NAME, + "name": POD_NAME, "namespace": NAMESPACE, "status": "done", } From 89fc14b6fe444ea189f576a460c0b5e9a6cabc5d Mon Sep 17 00:00:00 2001 From: Pankaj Koti Date: Mon, 12 Feb 2024 22:20:23 +0530 Subject: [PATCH 2/9] sync with execute_complete --- .../cncf/kubernetes/operators/pod.py | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 73389f4038282..f68aa3359f87d 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -699,10 +699,10 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: If ``logging_interval`` is not None, it could be that the pod is still running and we'll just grab the latest logs and defer back to the trigger again. """ - remote_pod = None + pod = None try: self.pod_request_obj = self.build_pod_request_obj(context) - self.pod = self.find_pod( + pod = self.find_pod( namespace=self.namespace or self.pod_request_obj.metadata.namespace, context=context, ) @@ -710,7 +710,7 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: # we try to find pod before possibly raising so that on_kill will have `pod` attr self.raise_for_trigger_status(event) - if not self.pod: + if not pod: raise PodNotFoundException("Could not find pod after resuming from deferral") if self.get_logs: @@ -718,7 +718,7 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: if last_log_time: self.log.info("Resuming logs read from time %r", last_log_time) pod_log_status = self.pod_manager.fetch_container_logs( - pod=self.pod, + pod=pod, container_name=self.BASE_CONTAINER_NAME, follow=self.logging_interval is None, since_time=last_log_time, @@ -728,22 +728,22 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: self.invoke_defer_method(pod_log_status.last_log_time) if self.do_xcom_push: - result = self.extract_xcom(pod=self.pod) - remote_pod = self.pod_manager.await_pod_completion(self.pod) + result = self.extract_xcom(pod=pod) + return result + pod = self.pod_manager.await_pod_completion(pod) except TaskDeferred: raise - except Exception: - self.cleanup( - pod=self.pod or self.pod_request_obj, - remote_pod=remote_pod, - ) - raise - self.cleanup( - pod=self.pod or self.pod_request_obj, - remote_pod=remote_pod, - ) - if self.do_xcom_push: - return result + finally: + istio_enabled = self.is_istio_enabled(pod) + # Skip await_pod_completion when the event is 'timeout' due to the pod can hang + # on the ErrImagePull or ContainerCreating step and it will never complete + if event["status"] != "timeout": + pod = self.pod_manager.await_pod_completion(pod, istio_enabled, self.base_container_name) + if pod is not None: + self.post_complete_action( + pod=pod, + remote_pod=pod, + ) def execute_complete(self, context: Context, event: dict, **kwargs): self.log.debug("Triggered with event: %s", event) From 38c3740c1adb9deda0981ecb925624c0f027acd4 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 13 Feb 2024 01:25:01 +0530 Subject: [PATCH 3/9] change trigger emit state done to failed/scuccess --- .../cncf/kubernetes/operators/pod.py | 90 +++++++------------ .../providers/cncf/kubernetes/triggers/pod.py | 29 ++++-- 2 files changed, 54 insertions(+), 65 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index f68aa3359f87d..a31256463826d 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -699,10 +699,10 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: If ``logging_interval`` is not None, it could be that the pod is still running and we'll just grab the latest logs and defer back to the trigger again. """ - pod = None + self.pod = None try: self.pod_request_obj = self.build_pod_request_obj(context) - pod = self.find_pod( + self.pod = self.find_pod( namespace=self.namespace or self.pod_request_obj.metadata.namespace, context=context, ) @@ -710,15 +710,20 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: # we try to find pod before possibly raising so that on_kill will have `pod` attr self.raise_for_trigger_status(event) - if not pod: + if not self.pod: raise PodNotFoundException("Could not find pod after resuming from deferral") + if self.callbacks: + self.callbacks.on_operator_resuming( + pod=self.pod, event=event, client=self.client, mode=ExecutionMode.SYNC + ) + if self.get_logs: last_log_time = event and event.get("last_log_time") if last_log_time: self.log.info("Resuming logs read from time %r", last_log_time) pod_log_status = self.pod_manager.fetch_container_logs( - pod=pod, + pod=self.pod, container_name=self.BASE_CONTAINER_NAME, follow=self.logging_interval is None, since_time=last_log_time, @@ -728,67 +733,34 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: self.invoke_defer_method(pod_log_status.last_log_time) if self.do_xcom_push: - result = self.extract_xcom(pod=pod) + result = self.extract_xcom(pod=self.pod) + if event["status"] in ("error", "failed", "timeout"): + raise AirflowException(event) return result - pod = self.pod_manager.await_pod_completion(pod) + # self.pod = self.pod_manager.await_pod_completion(self.pod) except TaskDeferred: raise finally: - istio_enabled = self.is_istio_enabled(pod) - # Skip await_pod_completion when the event is 'timeout' due to the pod can hang - # on the ErrImagePull or ContainerCreating step and it will never complete - if event["status"] != "timeout": - pod = self.pod_manager.await_pod_completion(pod, istio_enabled, self.base_container_name) - if pod is not None: - self.post_complete_action( - pod=pod, - remote_pod=pod, - ) + self._clean(event) - def execute_complete(self, context: Context, event: dict, **kwargs): - self.log.debug("Triggered with event: %s", event) - pod = None - try: - pod = self.hook.get_pod( - event["name"], - event["namespace"], + def _clean(self, event: dict[str, Any]): + if event["status"] == "running": + return + istio_enabled = self.is_istio_enabled(self.pod) + # Skip await_pod_completion when the event is 'timeout' due to the pod can hang + # on the ErrImagePull or ContainerCreating step and it will never complete + if event["status"] != "timeout": + self.pod = self.pod_manager.await_pod_completion( + self.pod, istio_enabled, self.base_container_name ) - if self.callbacks: - self.callbacks.on_operator_resuming( - pod=pod, event=event, client=self.client, mode=ExecutionMode.SYNC - ) - if event["status"] in ("error", "failed", "timeout"): - # fetch some logs when pod is failed - if self.get_logs: - self.write_logs(pod) - if "stack_trace" in event: - message = f"{event['message']}\n{event['stack_trace']}" - else: - message = event["message"] - if self.do_xcom_push: - # In the event of base container failure, we need to kill the xcom sidecar. - # We disregard xcom output and do that here - _ = self.extract_xcom(pod=pod) - raise AirflowException(message) - elif event["status"] == "success": - # fetch some logs when pod is executed successfully - if self.get_logs: - self.write_logs(pod) - - if self.do_xcom_push: - xcom_sidecar_output = self.extract_xcom(pod=pod) - return xcom_sidecar_output - finally: - istio_enabled = self.is_istio_enabled(pod) - # Skip await_pod_completion when the event is 'timeout' due to the pod can hang - # on the ErrImagePull or ContainerCreating step and it will never complete - if event["status"] != "timeout": - pod = self.pod_manager.await_pod_completion(pod, istio_enabled, self.base_container_name) - if pod is not None: - self.post_complete_action( - pod=pod, - remote_pod=pod, - ) + if self.pod is not None: + self.post_complete_action( + pod=self.pod, + remote_pod=self.pod, + ) + + def execute_complete(self, context: Context, event: dict, **kwargs): + self.trigger_reentry(context=context, event=event) def write_logs(self, pod: k8s.V1Pod): try: diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index 22b2dfd050830..79ca1737c07cf 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -155,18 +155,32 @@ def serialize(self) -> tuple[str, dict[str, Any]]: }, ) + def _get_terminal_event(self, state) -> TriggerEvent: + if state == PodPhase.SUCCEEDED: + status = "success" + else: + status = "failed" + return TriggerEvent({"status": status, "namespace": self.pod_namespace, "name": self.pod_name}) + async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] """Get current pod status and yield a TriggerEvent.""" self.log.info("Checking pod %r in namespace %r.", self.pod_name, self.pod_namespace) try: state = await self._wait_for_pod_start() if state in PodPhase.terminal_states: - event = TriggerEvent( - {"status": "done", "namespace": self.pod_namespace, "name": self.pod_name} - ) + event = self._get_terminal_event(state) else: event = await self._wait_for_container_completion() yield event + except PodLaunchTimeoutException as e: + description = self._format_exception_description(e) + yield TriggerEvent( + { + "status": "timeout", + "error_type": e.__class__.__name__, + "description": description, + } + ) except Exception as e: description = self._format_exception_description(e) yield TriggerEvent( @@ -215,9 +229,12 @@ async def _wait_for_container_completion(self) -> TriggerEvent: while True: pod = await self.hook.get_pod(self.pod_name, self.pod_namespace) if not container_is_running(pod=pod, container_name=self.base_container_name): - return TriggerEvent( - {"status": "done", "namespace": self.pod_namespace, "name": self.pod_name} - ) + container_state = self.define_container_state(pod) + if container_state == ContainerState.TERMINATED: + state = "success" + else: + state = "failed" + return TriggerEvent({"status": state, "namespace": self.pod_namespace, "name": self.pod_name}) if time_get_more_logs and timezone.utcnow() > time_get_more_logs: return TriggerEvent({"status": "running", "last_log_time": self.last_log_time}) await asyncio.sleep(self.poll_interval) From 660a4179b1650814ee1976774581edd121ad9fdf Mon Sep 17 00:00:00 2001 From: Pankaj Date: Tue, 13 Feb 2024 01:30:43 +0530 Subject: [PATCH 4/9] Add deprecation warning --- airflow/providers/cncf/kubernetes/operators/pod.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index a31256463826d..137ecff18a046 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -30,6 +30,7 @@ from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence import kubernetes +from deprecated import deprecated from kubernetes.client import CoreV1Api, V1Pod, models as k8s from kubernetes.stream import stream from urllib3.exceptions import HTTPError @@ -759,6 +760,7 @@ def _clean(self, event: dict[str, Any]): remote_pod=self.pod, ) + @deprecated(reason="use `trigger_reentry` instead.", category=AirflowProviderDeprecationWarning) def execute_complete(self, context: Context, event: dict, **kwargs): self.trigger_reentry(context=context, event=event) From 8f3a3fa4e86e141374bd4c6e0279be6723a157b6 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 14 Feb 2024 00:19:32 +0530 Subject: [PATCH 5/9] Address review feedback --- .../cncf/kubernetes/operators/pod.py | 15 +++-- .../providers/cncf/kubernetes/triggers/pod.py | 58 +++++++++++-------- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 137ecff18a046..1148aab2b7838 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -18,6 +18,7 @@ from __future__ import annotations +import datetime import json import logging import re @@ -80,7 +81,6 @@ from airflow.settings import pod_mutation_hook from airflow.utils import yaml from airflow.utils.helpers import prune_dict, validate_key -from airflow.utils.timezone import utcnow from airflow.version import version as airflow_version if TYPE_CHECKING: @@ -657,7 +657,7 @@ def execute_async(self, context: Context): def invoke_defer_method(self, last_log_time: DateTime | None = None): """Redefine triggers which are being used in child classes.""" - trigger_start_time = utcnow() + trigger_start_time = datetime.datetime.now(tz=datetime.timezone.utc) self.defer( trigger=KubernetesPodTrigger( pod_name=self.pod.metadata.name, # type: ignore[union-attr] @@ -714,7 +714,7 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: if not self.pod: raise PodNotFoundException("Could not find pod after resuming from deferral") - if self.callbacks: + if self.callbacks and event["status"] != "running": self.callbacks.on_operator_resuming( pod=self.pod, event=event, client=self.client, mode=ExecutionMode.SYNC ) @@ -736,9 +736,12 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: if self.do_xcom_push: result = self.extract_xcom(pod=self.pod) if event["status"] in ("error", "failed", "timeout"): - raise AirflowException(event) + if "stack_trace" in event: + message = f"{event['message']}\n{event['stack_trace']}" + else: + message = event["message"] + raise AirflowException(message) return result - # self.pod = self.pod_manager.await_pod_completion(self.pod) except TaskDeferred: raise finally: @@ -747,6 +750,8 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: def _clean(self, event: dict[str, Any]): if event["status"] == "running": return + if self.get_logs: + self.write_logs(self.pod) istio_enabled = self.is_istio_enabled(self.pod) # Skip await_pod_completion when the event is 'timeout' due to the pod can hang # on the ErrImagePull or ContainerCreating step and it will never complete diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index 79ca1737c07cf..d86523883d379 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -30,10 +30,8 @@ OnFinishAction, PodLaunchTimeoutException, PodPhase, - container_is_running, ) from airflow.triggers.base import BaseTrigger, TriggerEvent -from airflow.utils import timezone if TYPE_CHECKING: from kubernetes_asyncio.client.models import V1Pod @@ -167,29 +165,40 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] self.log.info("Checking pod %r in namespace %r.", self.pod_name, self.pod_namespace) try: state = await self._wait_for_pod_start() - if state in PodPhase.terminal_states: - event = self._get_terminal_event(state) + if state == ContainerState.TERMINATED: + event = TriggerEvent( + { + "status": "success", + "namespace": self.pod_namespace, + "name": self.pod_name, + "message": "All containers inside pod have started successfully.", + } + ) else: event = await self._wait_for_container_completion() yield event + return except PodLaunchTimeoutException as e: - description = self._format_exception_description(e) + message = self._format_exception_description(e) yield TriggerEvent( { + "name": self.pod_name, + "namespace": self.pod_namespace, "status": "timeout", - "error_type": e.__class__.__name__, - "description": description, + "message": message, } ) except Exception as e: - description = self._format_exception_description(e) yield TriggerEvent( { + "name": self.pod_name, + "namespace": self.pod_namespace, "status": "error", - "error_type": e.__class__.__name__, - "description": description, + "message": str(e), + "stack_trace": traceback.format_exc(), } ) + return def _format_exception_description(self, exc: Exception) -> Any: if isinstance(exc, PodLaunchTimeoutException): @@ -203,14 +212,13 @@ def _format_exception_description(self, exc: Exception) -> Any: description += f"\ntrigger traceback:\n{curr_traceback}" return description - async def _wait_for_pod_start(self) -> Any: + async def _wait_for_pod_start(self) -> ContainerState: """Loops until pod phase leaves ``PENDING`` If timeout is reached, throws error.""" - start_time = timezone.utcnow() - timeout_end = start_time + datetime.timedelta(seconds=self.startup_timeout) - while timeout_end > timezone.utcnow(): + delta = datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time + while self.startup_timeout >= delta.total_seconds(): pod = await self.hook.get_pod(self.pod_name, self.pod_namespace) if not pod.status.phase == "Pending": - return pod.status.phase + return self.define_container_state(pod) self.log.info("Still waiting for pod to start. The pod state is %s", pod.status.phase) await asyncio.sleep(self.poll_interval) raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase within specified timeout") @@ -222,20 +230,22 @@ async def _wait_for_container_completion(self) -> TriggerEvent: Waits until container is no longer in running state. If trigger is configured with a logging period, then will emit an event to resume the task for the purpose of fetching more logs. """ - time_begin = timezone.utcnow() + time_begin = datetime.datetime.now(tz=datetime.timezone.utc) time_get_more_logs = None if self.logging_interval is not None: time_get_more_logs = time_begin + datetime.timedelta(seconds=self.logging_interval) while True: pod = await self.hook.get_pod(self.pod_name, self.pod_namespace) - if not container_is_running(pod=pod, container_name=self.base_container_name): - container_state = self.define_container_state(pod) - if container_state == ContainerState.TERMINATED: - state = "success" - else: - state = "failed" - return TriggerEvent({"status": state, "namespace": self.pod_namespace, "name": self.pod_name}) - if time_get_more_logs and timezone.utcnow() > time_get_more_logs: + container_state = self.define_container_state(pod) + if container_state == ContainerState.TERMINATED: + return TriggerEvent( + {"status": "success", "namespace": self.pod_namespace, "name": self.pod_name} + ) + elif container_state == ContainerState.FAILED: + return TriggerEvent( + {"status": "failed", "namespace": self.pod_namespace, "name": self.pod_name} + ) + if time_get_more_logs and datetime.datetime.now(tz=datetime.timezone.utc) > time_get_more_logs: return TriggerEvent({"status": "running", "last_log_time": self.last_log_time}) await asyncio.sleep(self.poll_interval) From 508f78bffa02c01cae32bda7493b22bf2c46795b Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 14 Feb 2024 16:20:34 +0530 Subject: [PATCH 6/9] Fix tests and cleanup --- .../cncf/kubernetes/operators/pod.py | 76 ++++++++--------- .../providers/cncf/kubernetes/triggers/pod.py | 16 ++-- .../cncf/kubernetes/operators/test_pod.py | 34 ++++---- .../cncf/kubernetes/triggers/test_pod.py | 83 ++++++++++--------- .../cloud/triggers/test_kubernetes_engine.py | 33 ++++---- 5 files changed, 120 insertions(+), 122 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 1148aab2b7838..99208a44b561f 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -70,7 +70,6 @@ EMPTY_XCOM_RESULT, OnFinishAction, PodLaunchFailedException, - PodLaunchTimeoutException, PodManager, PodNotFoundException, PodOperatorHookProtocol, @@ -679,37 +678,23 @@ def invoke_defer_method(self, last_log_time: DateTime | None = None): method_name="trigger_reentry", ) - @staticmethod - def raise_for_trigger_status(event: dict[str, Any]) -> None: - """Raise exception if pod is not in expected state.""" - if event["status"] == "error": - error_type = event["error_type"] - description = event["description"] - if error_type == "PodLaunchTimeoutException": - raise PodLaunchTimeoutException(description) - else: - raise AirflowException(description) - def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: """ Point of re-entry from trigger. - If ``logging_interval`` is None, then at this point the pod should be done and we'll just fetch + If ``logging_interval`` is None, then at this point, the pod should be done, and we'll just fetch the logs and exit. - If ``logging_interval`` is not None, it could be that the pod is still running and we'll just + If ``logging_interval`` is not None, it could be that the pod is still running, and we'll just grab the latest logs and defer back to the trigger again. """ self.pod = None + self.log.info("dadadadad %s", event) try: - self.pod_request_obj = self.build_pod_request_obj(context) - self.pod = self.find_pod( - namespace=self.namespace or self.pod_request_obj.metadata.namespace, - context=context, - ) + pod_name = event["name"] + pod_namespace = event["namespace"] - # we try to find pod before possibly raising so that on_kill will have `pod` attr - self.raise_for_trigger_status(event) + self.pod = self.hook.get_pod(pod_name, pod_namespace) if not self.pod: raise PodNotFoundException("Could not find pod after resuming from deferral") @@ -719,29 +704,36 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: pod=self.pod, event=event, client=self.client, mode=ExecutionMode.SYNC ) - if self.get_logs: - last_log_time = event and event.get("last_log_time") - if last_log_time: + if event["status"] in ("error", "failed", "timeout"): + if self.do_xcom_push: + _ = self.extract_xcom(pod=self.pod) + + message = event.get("stack_trace", event["message"]) + raise AirflowException(message) + + elif event["status"] == "running": + if self.get_logs: + last_log_time = event.get("last_log_time") self.log.info("Resuming logs read from time %r", last_log_time) - pod_log_status = self.pod_manager.fetch_container_logs( - pod=self.pod, - container_name=self.BASE_CONTAINER_NAME, - follow=self.logging_interval is None, - since_time=last_log_time, - ) - if pod_log_status.running: - self.log.info("Container still running; deferring again.") - self.invoke_defer_method(pod_log_status.last_log_time) - if self.do_xcom_push: - result = self.extract_xcom(pod=self.pod) - if event["status"] in ("error", "failed", "timeout"): - if "stack_trace" in event: - message = f"{event['message']}\n{event['stack_trace']}" - else: - message = event["message"] - raise AirflowException(message) - return result + pod_log_status = self.pod_manager.fetch_container_logs( + pod=self.pod, + container_name=self.BASE_CONTAINER_NAME, + follow=self.logging_interval is None, + since_time=last_log_time, + ) + + if pod_log_status.running: + self.log.info("Container still running; deferring again.") + self.invoke_defer_method(pod_log_status.last_log_time) + else: + self.invoke_defer_method() + + elif event["status"] == "success": + if self.do_xcom_push: + xcom_sidecar_output = self.extract_xcom(pod=self.pod) + return xcom_sidecar_output + return except TaskDeferred: raise finally: diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index d86523883d379..f210cd6f7a03f 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -153,13 +153,6 @@ def serialize(self) -> tuple[str, dict[str, Any]]: }, ) - def _get_terminal_event(self, state) -> TriggerEvent: - if state == PodPhase.SUCCEEDED: - status = "success" - else: - status = "failed" - return TriggerEvent({"status": status, "namespace": self.pod_namespace, "name": self.pod_name}) - async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] """Get current pod status and yield a TriggerEvent.""" self.log.info("Checking pod %r in namespace %r.", self.pod_name, self.pod_namespace) @@ -246,7 +239,14 @@ async def _wait_for_container_completion(self) -> TriggerEvent: {"status": "failed", "namespace": self.pod_namespace, "name": self.pod_name} ) if time_get_more_logs and datetime.datetime.now(tz=datetime.timezone.utc) > time_get_more_logs: - return TriggerEvent({"status": "running", "last_log_time": self.last_log_time}) + return TriggerEvent( + { + "status": "running", + "last_log_time": self.last_log_time, + "namespace": self.pod_namespace, + "name": self.pod_name, + } + ) await asyncio.sleep(self.poll_interval) def _get_async_hook(self) -> AsyncKubernetesHook: diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index c27cd231465cb..faa21eb7d75fc 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -35,7 +35,6 @@ from airflow.providers.cncf.kubernetes.secret import Secret from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger from airflow.providers.cncf.kubernetes.utils.pod_manager import ( - PodLaunchTimeoutException, PodLoggingStatus, PodPhase, ) @@ -1973,41 +1972,39 @@ def test_cleanup_log_pod_spec_on_failure(self, log_pod_spec_on_failure, expect_m with pytest.raises(AirflowException, match=expect_match): k.cleanup(pod, pod) - @mock.patch( - "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.raise_for_trigger_status" - ) - @mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod") + @mock.patch(f"{HOOK_CLASS}.get_pod") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs") def test_get_logs_running( self, fetch_container_logs, await_pod_completion, - find_pod, - raise_for_trigger_status, + get_pod, ): """When logs fetch exits with status running, raise task deferred""" pod = MagicMock() - find_pod.return_value = pod + get_pod.return_value = pod op = KubernetesPodOperator(task_id="test_task", name="test-pod", get_logs=True) await_pod_completion.return_value = None fetch_container_logs.return_value = PodLoggingStatus(True, None) with pytest.raises(TaskDeferred): - op.trigger_reentry(create_context(op), None) + op.trigger_reentry( + create_context(op), + event={"name": TEST_NAME, "namespace": TEST_NAMESPACE, "status": "running"}, + ) fetch_container_logs.is_called_with(pod, "base") @mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup") - @mock.patch( - "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.raise_for_trigger_status" - ) @mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs") - def test_get_logs_not_running(self, fetch_container_logs, find_pod, raise_for_trigger_status, cleanup): + def test_get_logs_not_running(self, fetch_container_logs, find_pod, cleanup): pod = MagicMock() find_pod.return_value = pod op = KubernetesPodOperator(task_id="test_task", name="test-pod", get_logs=True) fetch_container_logs.return_value = PodLoggingStatus(False, None) - op.trigger_reentry(create_context(op), None) + op.trigger_reentry( + create_context(op), event={"name": TEST_NAME, "namespace": TEST_NAMESPACE, "status": "success"} + ) fetch_container_logs.is_called_with(pod, "base") @mock.patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup") @@ -2016,14 +2013,15 @@ def test_trigger_error(self, find_pod, cleanup): """Assert that trigger_reentry raise exception in case of error""" find_pod.return_value = MagicMock() op = KubernetesPodOperator(task_id="test_task", name="test-pod", get_logs=True) - with pytest.raises(PodLaunchTimeoutException): + with pytest.raises(AirflowException): context = create_context(op) op.trigger_reentry( context, { - "status": "error", - "error_type": "PodLaunchTimeoutException", - "description": "any message", + "status": "timeout", + "message": "any message", + "name": TEST_NAME, + "namespace": TEST_NAMESPACE, }, ) diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py b/tests/providers/cncf/kubernetes/triggers/test_pod.py index 2f4d116dd587f..6080e9d8ad6f4 100644 --- a/tests/providers/cncf/kubernetes/triggers/test_pod.py +++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py @@ -122,9 +122,10 @@ async def test_run_loop_return_success_event(self, mock_hook, mock_method, trigg expected_event = TriggerEvent( { - "name": POD_NAME, - "namespace": NAMESPACE, - "status": "done", + "status": "success", + "namespace": "default", + "name": "test-pod-name", + "message": "All containers inside pod have started successfully.", } ) actual_event = await trigger.run().asend(None) @@ -132,16 +133,11 @@ async def test_run_loop_return_success_event(self, mock_hook, mock_method, trigg assert actual_event == expected_event @pytest.mark.asyncio - @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running") - @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod") - @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") + @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") - async def test_run_loop_return_waiting_event( - self, mock_hook, mock_method, mock_get_pod, mock_container_is_running, trigger, caplog - ): + async def test_run_loop_return_waiting_event(self, mock_hook, mock_method, trigger, caplog): mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.WAITING - mock_container_is_running.return_value = True caplog.set_level(logging.INFO) @@ -153,16 +149,11 @@ async def test_run_loop_return_waiting_event( assert f"Sleeping for {POLL_INTERVAL} seconds." @pytest.mark.asyncio - @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running") - @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod") - @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") + @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") - async def test_run_loop_return_running_event( - self, mock_hook, mock_method, mock_get_pod, mock_container_is_running, trigger, caplog - ): + async def test_run_loop_return_running_event(self, mock_hook, mock_method, trigger, caplog): mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.RUNNING - mock_container_is_running.return_value = True caplog.set_level(logging.INFO) @@ -186,13 +177,7 @@ async def test_run_loop_return_failed_event(self, mock_hook, mock_method, trigge ) mock_method.return_value = ContainerState.FAILED - expected_event = TriggerEvent( - { - "name": POD_NAME, - "namespace": NAMESPACE, - "status": "done", - } - ) + expected_event = TriggerEvent({"status": "failed", "namespace": "default", "name": "test-pod-name"}) actual_event = await trigger.run().asend(None) assert actual_event == expected_event @@ -210,8 +195,14 @@ async def test_logging_in_trigger_when_exception_should_execute_successfully( generator = trigger.run() actual = await generator.asend(None) - actual_stack_trace = actual.payload.pop("description") - assert actual_stack_trace.startswith("Trigger KubernetesPodTrigger failed with exception Exception") + actual_stack_trace = actual.payload.pop("stack_trace") + assert ( + TriggerEvent( + {"name": POD_NAME, "namespace": NAMESPACE, "status": "error", "message": "Test exception"} + ) + == actual + ) + assert actual_stack_trace.startswith("Traceback (most recent call last):") @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}.define_container_state") @@ -235,16 +226,24 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( @pytest.mark.parametrize( "logging_interval, exp_event", [ - param(0, {"status": "running", "last_log_time": DateTime(2022, 1, 1)}, id="short_interval"), - param(None, {"status": "done", "namespace": mock.ANY, "name": mock.ANY}, id="no_interval"), + param( + 0, + { + "status": "running", + "last_log_time": DateTime(2022, 1, 1), + "name": POD_NAME, + "namespace": NAMESPACE, + }, + id="short_interval", + ), ], ) @mock.patch( "kubernetes_asyncio.client.CoreV1Api.read_namespaced_pod", new=get_read_pod_mock_containers([1, 1, None, None]), ) - @mock.patch("kubernetes_asyncio.config.load_kube_config") - async def test_running_log_interval(self, load_kube_config, logging_interval, exp_event): + @mock.patch(f"{TRIGGER_PATH}.define_container_state") + async def test_running_log_interval(self, mock_container_state, logging_interval, exp_event): """ If log interval given, should emit event with running status and last log time. Otherwise, should make it to second loop and emit "done" event. @@ -255,10 +254,10 @@ async def test_running_log_interval(self, load_kube_config, logging_interval, ex when in the next loop we get a non-running status, the trigger fires a "done" event. """ trigger = KubernetesPodTrigger( - pod_name=mock.ANY, - pod_namespace=mock.ANY, - trigger_start_time=mock.ANY, - base_container_name=mock.ANY, + pod_name=POD_NAME, + pod_namespace=NAMESPACE, + trigger_start_time=datetime.datetime.now(tz=datetime.timezone.utc), + base_container_name=BASE_CONTAINER_NAME, startup_timeout=5, poll_interval=1, logging_interval=logging_interval, @@ -306,12 +305,12 @@ def test_define_container_state_should_execute_successfully( @pytest.mark.asyncio @pytest.mark.parametrize("container_state", [ContainerState.WAITING, ContainerState.UNDEFINED]) - @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") + @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") async def test_run_loop_return_timeout_event( self, mock_hook, mock_method, trigger, caplog, container_state ): - trigger.trigger_start_time = TRIGGER_START_TIME - datetime.timedelta(seconds=5) + trigger.trigger_start_time = TRIGGER_START_TIME - datetime.timedelta(minutes=2) mock_hook.get_pod.return_value = self._mock_pod_result( mock.MagicMock( status=mock.MagicMock( @@ -325,4 +324,14 @@ async def test_run_loop_return_timeout_event( generator = trigger.run() actual = await generator.asend(None) - assert actual == TriggerEvent({"status": "done", "namespace": NAMESPACE, "name": POD_NAME}) + assert ( + TriggerEvent( + { + "name": POD_NAME, + "namespace": NAMESPACE, + "status": "timeout", + "message": "Pod did not leave 'Pending' phase within specified timeout", + } + ) + == actual + ) diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py index 22149f29248d8..db0bc804cf338 100644 --- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py @@ -120,7 +120,8 @@ async def test_run_loop_return_success_event_should_execute_successfully( { "name": POD_NAME, "namespace": NAMESPACE, - "status": "done", + "status": "success", + "message": "All containers inside pod have started successfully.", } ) actual_event = await trigger.run().asend(None) @@ -146,7 +147,7 @@ async def test_run_loop_return_failed_event_should_execute_successfully( { "name": POD_NAME, "namespace": NAMESPACE, - "status": "done", + "status": "failed", } ) actual_event = await trigger.run().asend(None) @@ -154,18 +155,14 @@ async def test_run_loop_return_failed_event_should_execute_successfully( assert actual_event == expected_event @pytest.mark.asyncio - @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running") - @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod") - @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start") + @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state") @mock.patch(f"{TRIGGER_GKE_PATH}.hook") async def test_run_loop_return_waiting_event_should_execute_successfully( - self, mock_hook, mock_method, mock_get_pod, mock_container_is_running, trigger, caplog + self, mock_hook, mock_method, trigger, caplog ): mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) - mock_method.return_value = ContainerState.RUNNING - mock_container_is_running.return_value = True + mock_method.return_value = ContainerState.WAITING - trigger.logging_interval = 10 caplog.set_level(logging.INFO) task = asyncio.create_task(trigger.run().__anext__()) @@ -176,15 +173,12 @@ async def test_run_loop_return_waiting_event_should_execute_successfully( assert f"Sleeping for {POLL_INTERVAL} seconds." @pytest.mark.asyncio - @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.container_is_running") - @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.get_pod") - @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start") + @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state") @mock.patch(f"{TRIGGER_GKE_PATH}.hook") async def test_run_loop_return_running_event_should_execute_successfully( - self, mock_hook, mock_method, mock_get_pod, mock_container_is_running, trigger, caplog + self, mock_hook, mock_method, trigger, caplog ): mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) - mock_container_is_running.return_value = True mock_method.return_value = ContainerState.RUNNING caplog.set_level(logging.INFO) @@ -208,9 +202,14 @@ async def test_logging_in_trigger_when_exception_should_execute_successfully( generator = trigger.run() actual = await generator.asend(None) - - actual_stack_trace = actual.payload.pop("description") - assert actual_stack_trace.startswith("Trigger GKEStartPodTrigger failed with exception Exception") + actual_stack_trace = actual.payload.pop("stack_trace") + assert ( + TriggerEvent( + {"name": POD_NAME, "namespace": NAMESPACE, "status": "error", "message": "Test exception"} + ) + == actual + ) + assert actual_stack_trace.startswith("Traceback (most recent call last):") @pytest.mark.asyncio @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state") From f9a804ac841ddf6ddd93f78222a46220a439e6aa Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 14 Feb 2024 17:07:12 +0530 Subject: [PATCH 7/9] Fix tests --- .../providers/cncf/kubernetes/triggers/test_pod.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py b/tests/providers/cncf/kubernetes/triggers/test_pod.py index 6080e9d8ad6f4..fd12a0c4d54f1 100644 --- a/tests/providers/cncf/kubernetes/triggers/test_pod.py +++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py @@ -238,12 +238,12 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( ), ], ) - @mock.patch( - "kubernetes_asyncio.client.CoreV1Api.read_namespaced_pod", - new=get_read_pod_mock_containers([1, 1, None, None]), - ) @mock.patch(f"{TRIGGER_PATH}.define_container_state") - async def test_running_log_interval(self, mock_container_state, logging_interval, exp_event): + @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") + @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.AsyncKubernetesHook.get_pod") + async def test_running_log_interval( + self, mock_get_pod, mock_wait_for_pod_start, define_container_state, logging_interval, exp_event + ): """ If log interval given, should emit event with running status and last log time. Otherwise, should make it to second loop and emit "done" event. @@ -253,6 +253,7 @@ async def test_running_log_interval(self, mock_container_state, logging_interval interval is None, the second "running" status will just result in continuation of the loop. And when in the next loop we get a non-running status, the trigger fires a "done" event. """ + define_container_state.return_value = "running" trigger = KubernetesPodTrigger( pod_name=POD_NAME, pod_namespace=NAMESPACE, @@ -260,7 +261,7 @@ async def test_running_log_interval(self, mock_container_state, logging_interval base_container_name=BASE_CONTAINER_NAME, startup_timeout=5, poll_interval=1, - logging_interval=logging_interval, + logging_interval=1, last_log_time=DateTime(2022, 1, 1), ) assert await trigger.run().__anext__() == TriggerEvent(exp_event) From 83dfdcee7cdcee7d76400707e967749c54015b2b Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 14 Feb 2024 17:08:50 +0530 Subject: [PATCH 8/9] Remove test log stmpt --- airflow/providers/cncf/kubernetes/operators/pod.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 99208a44b561f..61442a6014ebc 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -689,7 +689,6 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: grab the latest logs and defer back to the trigger again. """ self.pod = None - self.log.info("dadadadad %s", event) try: pod_name = event["name"] pod_namespace = event["namespace"] From 3a4e921e5bd6b09d82f53845583e290171c4b3d4 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Wed, 14 Feb 2024 19:28:35 +0530 Subject: [PATCH 9/9] Fix tests --- .../providers/cncf/kubernetes/triggers/pod.py | 9 ++++++++ .../cncf/kubernetes/triggers/test_pod.py | 4 +++- .../cloud/triggers/test_kubernetes_engine.py | 22 +++++++++++-------- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/triggers/pod.py b/airflow/providers/cncf/kubernetes/triggers/pod.py index f210cd6f7a03f..c9b1e62226541 100644 --- a/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -167,6 +167,15 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] "message": "All containers inside pod have started successfully.", } ) + elif state == ContainerState.FAILED: + event = TriggerEvent( + { + "status": "failed", + "namespace": self.pod_namespace, + "name": self.pod_name, + "message": "pod failed", + } + ) else: event = await self._wait_for_container_completion() yield event diff --git a/tests/providers/cncf/kubernetes/triggers/test_pod.py b/tests/providers/cncf/kubernetes/triggers/test_pod.py index fd12a0c4d54f1..bed52811fc675 100644 --- a/tests/providers/cncf/kubernetes/triggers/test_pod.py +++ b/tests/providers/cncf/kubernetes/triggers/test_pod.py @@ -177,7 +177,9 @@ async def test_run_loop_return_failed_event(self, mock_hook, mock_method, trigge ) mock_method.return_value = ContainerState.FAILED - expected_event = TriggerEvent({"status": "failed", "namespace": "default", "name": "test-pod-name"}) + expected_event = TriggerEvent( + {"status": "failed", "namespace": "default", "name": "test-pod-name", "message": "pod failed"} + ) actual_event = await trigger.run().asend(None) assert actual_event == expected_event diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py index db0bc804cf338..c6a2d4e72fcbb 100644 --- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py @@ -108,13 +108,13 @@ def test_serialize_should_execute_successfully(self, trigger): } @pytest.mark.asyncio - @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state") + @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start") @mock.patch(f"{TRIGGER_GKE_PATH}.hook") async def test_run_loop_return_success_event_should_execute_successfully( - self, mock_hook, mock_method, trigger + self, mock_hook, mock_wait_pod, trigger ): mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) - mock_method.return_value = ContainerState.TERMINATED + mock_wait_pod.return_value = ContainerState.TERMINATED expected_event = TriggerEvent( { @@ -129,10 +129,10 @@ async def test_run_loop_return_success_event_should_execute_successfully( assert actual_event == expected_event @pytest.mark.asyncio - @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state") + @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start") @mock.patch(f"{TRIGGER_GKE_PATH}.hook") async def test_run_loop_return_failed_event_should_execute_successfully( - self, mock_hook, mock_method, trigger + self, mock_hook, mock_wait_pod, trigger ): mock_hook.get_pod.return_value = self._mock_pod_result( mock.MagicMock( @@ -141,13 +141,14 @@ async def test_run_loop_return_failed_event_should_execute_successfully( ) ) ) - mock_method.return_value = ContainerState.FAILED + mock_wait_pod.return_value = ContainerState.FAILED expected_event = TriggerEvent( { "name": POD_NAME, "namespace": NAMESPACE, "status": "failed", + "message": "pod failed", } ) actual_event = await trigger.run().asend(None) @@ -155,10 +156,11 @@ async def test_run_loop_return_failed_event_should_execute_successfully( assert actual_event == expected_event @pytest.mark.asyncio + @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start") @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state") @mock.patch(f"{TRIGGER_GKE_PATH}.hook") async def test_run_loop_return_waiting_event_should_execute_successfully( - self, mock_hook, mock_method, trigger, caplog + self, mock_hook, mock_method, mock_wait_pod, trigger, caplog ): mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.WAITING @@ -173,10 +175,11 @@ async def test_run_loop_return_waiting_event_should_execute_successfully( assert f"Sleeping for {POLL_INTERVAL} seconds." @pytest.mark.asyncio + @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start") @mock.patch(f"{TRIGGER_KUB_PATH}.define_container_state") @mock.patch(f"{TRIGGER_GKE_PATH}.hook") async def test_run_loop_return_running_event_should_execute_successfully( - self, mock_hook, mock_method, trigger, caplog + self, mock_hook, mock_method, mock_wait_pod, trigger, caplog ): mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) mock_method.return_value = ContainerState.RUNNING @@ -191,9 +194,10 @@ async def test_run_loop_return_running_event_should_execute_successfully( assert f"Sleeping for {POLL_INTERVAL} seconds." @pytest.mark.asyncio + @mock.patch(f"{TRIGGER_KUB_PATH}._wait_for_pod_start") @mock.patch(f"{TRIGGER_GKE_PATH}.hook") async def test_logging_in_trigger_when_exception_should_execute_successfully( - self, mock_hook, trigger, caplog + self, mock_hook, mock_wait_pod, trigger, caplog ): """ Test that GKEStartPodTrigger fires the correct event in case of an error.