Skip to content

Commit

Permalink
fix tsets
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish committed Mar 24, 2022
1 parent 8b964aa commit e2a6417
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 13 deletions.
15 changes: 7 additions & 8 deletions astronomer/providers/cncf/kubernetes/triggers/wait_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,20 @@ async def wait_for_pod_start(self, v1_api: CoreV1Api) -> Any:
await asyncio.sleep(self.poll_interval)
raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase within specified timeout")

async def wait_for_container_completion(self, v1_api: CoreV1Api) -> Optional["TriggerEvent"]:
async def wait_for_container_completion(self, v1_api: CoreV1Api) -> "TriggerEvent":
"""
Waits until container ``self.container_name`` is no longer in running state.
If trigger is configured with a logging period, then will emit an event to
resume the task for the purpose of fetching more logs.
"""
time_begin = timezone.utcnow()
time_get_more_logs = None
if self.logging_interval:
if self.logging_interval is not None:
time_get_more_logs = time_begin + timedelta(seconds=self.logging_interval)
while True:
pod = await v1_api.read_namespaced_pod(self.pod_name, self.pod_namespace)
if not container_is_running(pod=pod, container_name=self.container_name):
break
return TriggerEvent({"status": "done"})
if time_get_more_logs and timezone.utcnow() > time_get_more_logs:
return TriggerEvent({"status": "running", "last_log_time": self.last_log_time})
await asyncio.sleep(self.poll_interval)
Expand All @@ -119,12 +119,11 @@ async def run(self) -> AsyncIterator["TriggerEvent"]: # type: ignore[override]
async with await hook.get_api_client_async() as api_client:
v1_api = CoreV1Api(api_client)
state = await self.wait_for_pod_start(v1_api)
if state not in PodPhase.terminal_states:
if state in PodPhase.terminal_states:
event = TriggerEvent({"status": "done"})
else:
event = await self.wait_for_container_completion(v1_api)
if event:
yield event
return
yield TriggerEvent({"status": "done"})
yield event
except Exception as e:
description = self._format_exception_description(e)
yield TriggerEvent(
Expand Down
64 changes: 59 additions & 5 deletions tests/cncf/kubernetes/triggers/test_wait_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import pytest
from airflow.triggers.base import TriggerEvent
from pendulum import DateTime
from pytest import param

from astronomer.providers.cncf.kubernetes.triggers.wait_container import (
WaitContainerTrigger,
Expand Down Expand Up @@ -39,7 +41,9 @@ def test_serialize():
assert actual_kwargs == expected_kwargs


def get_read_pod_mock(phases_to_emit=None):
def get_read_pod_mock_phases(phases_to_emit=None):
"""emit pods with given phases sequentially"""

async def mock_read_namespaced_pod(*args, **kwargs):
event_mock = MagicMock()
event_mock.status.phase = phases_to_emit.pop(0)
Expand All @@ -48,8 +52,26 @@ async def mock_read_namespaced_pod(*args, **kwargs):
return mock_read_namespaced_pod


def get_read_pod_mock_containers(statuses_to_emit=None):
"""
Emit pods with given phases sequentially.
`statuses_to_emit` should be a list of bools indicating running or not.
"""

async def mock_read_namespaced_pod(*args, **kwargs):
container_mock = MagicMock()
container_mock.state.running = statuses_to_emit.pop(0)
event_mock = MagicMock()
event_mock.status.container_statuses = [container_mock]
return event_mock

return mock_read_namespaced_pod


@pytest.mark.asyncio
@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock(["Pending", "Pending", "Pending", "Pending"]))
@mock.patch(
READ_NAMESPACED_POD_PATH, new=get_read_pod_mock_phases(["Pending", "Pending", "Pending", "Pending"])
)
@mock.patch("kubernetes_asyncio.config.load_kube_config")
async def test_pending_timeout(load_kube_config):
"""Verify that PodLaunchTimeoutException is yielded when timeout reached"""
Expand Down Expand Up @@ -101,7 +123,7 @@ async def test_other_exception(load_kube_config, read_mock):


@pytest.mark.asyncio
@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock(["Pending", "Succeeded"]))
@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock_phases(["Pending", "Succeeded"]))
@mock.patch(f"{TRIGGER_CLASS}.wait_for_container_completion")
@mock.patch("kubernetes_asyncio.config.load_kube_config")
async def test_pending_succeeded(load_kube_config, wait_completion):
Expand All @@ -122,7 +144,7 @@ async def test_pending_succeeded(load_kube_config, wait_completion):


@pytest.mark.asyncio
@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock(["Pending", "Running"]))
@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock_phases(["Pending", "Running"]))
@mock.patch(f"{TRIGGER_CLASS}.wait_for_container_completion")
@mock.patch("kubernetes_asyncio.config.load_kube_config")
async def test_pending_running(load_kube_config, wait_completion):
Expand All @@ -145,7 +167,7 @@ async def test_pending_running(load_kube_config, wait_completion):


@pytest.mark.asyncio
@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock(["Failed"]))
@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock_phases(["Failed"]))
@mock.patch(f"{TRIGGER_CLASS}.wait_for_container_completion")
@mock.patch("kubernetes_asyncio.config.load_kube_config")
async def test_failed(load_kube_config, wait_completion):
Expand All @@ -164,3 +186,35 @@ async def test_failed(load_kube_config, wait_completion):

assert await trigger.run().__anext__() == TriggerEvent({"status": "done"})
wait_completion.assert_not_awaited()


@pytest.mark.asyncio
@pytest.mark.parametrize(
"logging_interval, exp_event",
[
param(0, {"status": "running", "last_log_time": DateTime(2022, 1, 1)}, id="short_interval"),
param(None, {"status": "done"}, id="no_interval"),
],
)
@mock.patch(READ_NAMESPACED_POD_PATH, new=get_read_pod_mock_containers([1, 1, None, None]))
@mock.patch("kubernetes_asyncio.config.load_kube_config")
async def test_running_log_inteval(load_kube_config, logging_interval, exp_event):
"""
If log interval given, should emit event with running status and last log time.
Otherwise, should should make it to second loop and emit "done" event.
For this test we emit container statuses "running running not".
The first "running" status gets us out of wait_for_pod_start.
The second "running" will fire a "running" event when logging interval is non-None. When logging
interval is None, the second "running" status will just result in continuation of the loop. And
when in the next loop we get a non-running status, the trigger fires a "done" event.
"""
trigger = WaitContainerTrigger(
pod_name=mock.ANY,
pod_namespace=mock.ANY,
container_name=mock.ANY,
pending_phase_timeout=5,
poll_interval=1,
logging_interval=logging_interval,
last_log_time=DateTime(2022, 1, 1),
)
assert await trigger.run().__anext__() == TriggerEvent(exp_event)

0 comments on commit e2a6417

Please sign in to comment.