From e2a6417cfc2ac074865fe3cc06ab32c7fa8e5948 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 23 Mar 2022 22:42:32 -0700 Subject: [PATCH] fix tsets --- .../kubernetes/triggers/wait_container.py | 15 ++--- .../triggers/test_wait_container.py | 64 +++++++++++++++++-- 2 files changed, 66 insertions(+), 13 deletions(-) diff --git a/astronomer/providers/cncf/kubernetes/triggers/wait_container.py b/astronomer/providers/cncf/kubernetes/triggers/wait_container.py index 6de7ac5d4..de253575d 100644 --- a/astronomer/providers/cncf/kubernetes/triggers/wait_container.py +++ b/astronomer/providers/cncf/kubernetes/triggers/wait_container.py @@ -94,7 +94,7 @@ async def wait_for_pod_start(self, v1_api: CoreV1Api) -> Any: await asyncio.sleep(self.poll_interval) raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase within specified timeout") - async def wait_for_container_completion(self, v1_api: CoreV1Api) -> Optional["TriggerEvent"]: + async def wait_for_container_completion(self, v1_api: CoreV1Api) -> "TriggerEvent": """ Waits until container ``self.container_name`` is no longer in running state. If trigger is configured with a logging period, then will emit an event to @@ -102,12 +102,12 @@ async def wait_for_container_completion(self, v1_api: CoreV1Api) -> Optional["Tr """ time_begin = timezone.utcnow() time_get_more_logs = None - if self.logging_interval: + if self.logging_interval is not None: time_get_more_logs = time_begin + timedelta(seconds=self.logging_interval) while True: pod = await v1_api.read_namespaced_pod(self.pod_name, self.pod_namespace) if not container_is_running(pod=pod, container_name=self.container_name): - break + return TriggerEvent({"status": "done"}) if time_get_more_logs and timezone.utcnow() > time_get_more_logs: return TriggerEvent({"status": "running", "last_log_time": self.last_log_time}) await asyncio.sleep(self.poll_interval) @@ -119,12 +119,11 @@ async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override] async with await hook.get_api_client_async() as api_client: v1_api = CoreV1Api(api_client) state = await self.wait_for_pod_start(v1_api) - if state not in PodPhase.terminal_states: + if state in PodPhase.terminal_states: + event = TriggerEvent({"status": "done"}) + else: event = await self.wait_for_container_completion(v1_api) - if event: - yield event - return - yield TriggerEvent({"status": "done"}) + yield event except Exception as e: description = self._format_exception_description(e) yield TriggerEvent( diff --git a/tests/cncf/kubernetes/triggers/test_wait_container.py b/tests/cncf/kubernetes/triggers/test_wait_container.py index f3f3c24ae..97efd62bd 100644 --- a/tests/cncf/kubernetes/triggers/test_wait_container.py +++ b/tests/cncf/kubernetes/triggers/test_wait_container.py @@ -4,6 +4,8 @@ import pytest from airflow.triggers.base import TriggerEvent +from pendulum import DateTime +from pytest import param from astronomer.providers.cncf.kubernetes.triggers.wait_container import ( WaitContainerTrigger, @@ -39,7 +41,9 @@ def test_serialize(): assert actual_kwargs == expected_kwargs -def get_read_pod_mock(phases_to_emit=None): +def get_read_pod_mock_phases(phases_to_emit=None): + """emit pods with given phases sequentially""" + async def mock_read_namespaced_pod(*args, **kwargs): event_mock = MagicMock() event_mock.status.phase = phases_to_emit.pop(0) @@ -48,8 +52,26 @@ async def mock_read_namespaced_pod(*args, **kwargs): return mock_read_namespaced_pod +def get_read_pod_mock_containers(statuses_to_emit=None): + """ + Emit pods with given phases sequentially. + `statuses_to_emit` should be a list of bools indicating running or not. + """ + + async def mock_read_namespaced_pod(*args, **kwargs): + container_mock = MagicMock() + container_mock.state.running = statuses_to_emit.pop(0) + event_mock = MagicMock() + event_mock.status.container_statuses = [container_mock] + return event_mock + + return mock_read_namespaced_pod + + @pytest.mark.asyncio -@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock(["Pending", "Pending", "Pending", "Pending"])) +@mock.patch( + READ_NAMESPACED_POD_PATH, new=get_read_pod_mock_phases(["Pending", "Pending", "Pending", "Pending"]) +) @mock.patch("kubernetes_asyncio.config.load_kube_config") async def test_pending_timeout(load_kube_config): """Verify that PodLaunchTimeoutException is yielded when timeout reached""" @@ -101,7 +123,7 @@ async def test_other_exception(load_kube_config, read_mock): @pytest.mark.asyncio -@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock(["Pending", "Succeeded"])) +@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock_phases(["Pending", "Succeeded"])) @mock.patch(f"{TRIGGER_CLASS}.wait_for_container_completion") @mock.patch("kubernetes_asyncio.config.load_kube_config") async def test_pending_succeeded(load_kube_config, wait_completion): @@ -122,7 +144,7 @@ async def test_pending_succeeded(load_kube_config, wait_completion): @pytest.mark.asyncio -@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock(["Pending", "Running"])) +@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock_phases(["Pending", "Running"])) @mock.patch(f"{TRIGGER_CLASS}.wait_for_container_completion") @mock.patch("kubernetes_asyncio.config.load_kube_config") async def test_pending_running(load_kube_config, wait_completion): @@ -145,7 +167,7 @@ async def test_pending_running(load_kube_config, wait_completion): @pytest.mark.asyncio -@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock(["Failed"])) +@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock_phases(["Failed"])) @mock.patch(f"{TRIGGER_CLASS}.wait_for_container_completion") @mock.patch("kubernetes_asyncio.config.load_kube_config") async def test_failed(load_kube_config, wait_completion): @@ -164,3 +186,35 @@ async def test_failed(load_kube_config, wait_completion): assert await trigger.run().__anext__() == TriggerEvent({"status": "done"}) wait_completion.assert_not_awaited() + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "logging_interval, exp_event", + [ + param(0, {"status": "running", "last_log_time": DateTime(2022, 1, 1)}, id="short_interval"), + param(None, {"status": "done"}, id="no_interval"), + ], +) +@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock_containers([1, 1, None, None])) +@mock.patch("kubernetes_asyncio.config.load_kube_config") +async def test_running_log_inteval(load_kube_config, logging_interval, exp_event): + """ + If log interval given, should emit event with running status and last log time. + Otherwise, should should make it to second loop and emit "done" event. + For this test we emit container statuses "running running not". + The first "running" status gets us out of wait_for_pod_start. + The second "running" will fire a "running" event when logging interval is non-None. When logging + interval is None, the second "running" status will just result in continuation of the loop. And + when in the next loop we get a non-running status, the trigger fires a "done" event. + """ + trigger = WaitContainerTrigger( + pod_name=mock.ANY, + pod_namespace=mock.ANY, + container_name=mock.ANY, + pending_phase_timeout=5, + poll_interval=1, + logging_interval=logging_interval, + last_log_time=DateTime(2022, 1, 1), + ) + assert await trigger.run().__anext__() == TriggerEvent(exp_event)