Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not let EventsTimetable schedule past events if catchup=False #36134

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions airflow/timetables/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pendulum

from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
from airflow.utils import timezone

if TYPE_CHECKING:
from pendulum import DateTime
Expand Down Expand Up @@ -58,10 +59,13 @@ def __init__(
self.event_dates.sort()
self.restrict_to_events = restrict_to_events
if description is None:
self.description = (
f"{len(self.event_dates)} Events between {self.event_dates[0]} and {self.event_dates[-1]}"
)
self._summary = f"{len(self.event_dates)} Events"
if self.event_dates:
self.description = (
f"{len(self.event_dates)} events between {self.event_dates[0]} and {self.event_dates[-1]}"
)
else:
self.description = "No events"
self._summary = f"{len(self.event_dates)} events"
else:
self._summary = description
self.description = description
Expand All @@ -79,22 +83,31 @@ def next_dagrun_info(
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if last_automated_data_interval is None:
next_event = self.event_dates[0]
earliest = restriction.earliest
if not restriction.catchup:
current_time = timezone.utcnow()
if earliest is None or current_time > earliest:
earliest = pendulum.instance(current_time)

for next_event in self.event_dates:
if earliest and next_event < earliest:
continue
if last_automated_data_interval and next_event <= last_automated_data_interval.end:
continue
break
else:
future_dates = itertools.dropwhile(
lambda when: when <= last_automated_data_interval.end, # type: ignore
self.event_dates,
)
next_event = next(future_dates, None) # type: ignore
if next_event is None:
return None
# We need to return None if self.event_dates is empty or,
# if not empty, when no suitable event can be found.
return None

if restriction.latest is not None and next_event > restriction.latest:
return None

return DagRunInfo.exact(next_event)

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
# If Timetable not restricted to events, run for the time specified
if not self.restrict_to_events:
if not self.restrict_to_events or not self.event_dates:
return DataInterval.exact(run_after)

# If restricted to events, run for the most recent past event
Expand Down
64 changes: 61 additions & 3 deletions tests/timetables/test_events_timetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import pendulum
import pytest
import time_machine

from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.events import EventsTimetable
from airflow.utils.timezone import utc

START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events
BEFORE_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc) # Precedes all events
START_DATE = pendulum.DateTime(2021, 9, 7, tzinfo=utc)

EVENT_DATES = [
pendulum.DateTime(2021, 9, 6, tzinfo=utc),
Expand Down Expand Up @@ -93,16 +95,23 @@ def test_manual_with_restricted_before(restricted_timetable: Timetable, restrict
Test that when using strict event dates, manual runs before the first event have the first event's date
as the start interval
"""
manual_run_data_interval = restricted_timetable.infer_manual_data_interval(run_after=START_DATE)
manual_run_data_interval = restricted_timetable.infer_manual_data_interval(run_after=BEFORE_DATE)
expected_data_interval = DataInterval.exact(EVENT_DATES[0])
assert expected_data_interval == manual_run_data_interval


@pytest.mark.parametrize(
"last_automated_data_interval, expected_next_info",
[
pytest.param(None, DagRunInfo.interval(START_DATE, START_DATE)),
pytest.param(
DataInterval(EVENT_DATES_SORTED[0], EVENT_DATES_SORTED[0]),
DagRunInfo.interval(START_DATE, START_DATE),
),
]
+ [
pytest.param(DataInterval(day1, day1), DagRunInfo.interval(day2, day2))
for day1, day2 in zip(EVENT_DATES_SORTED, EVENT_DATES_SORTED[1:])
for day1, day2 in zip(EVENT_DATES_SORTED[1:], EVENT_DATES_SORTED[2:])
]
+ [pytest.param(DataInterval(EVENT_DATES_SORTED[-1], EVENT_DATES_SORTED[-1]), None)],
)
Expand All @@ -118,3 +127,52 @@ def test_subsequent_weekday_schedule(
restriction=restriction,
)
assert next_info == expected_next_info


@pytest.mark.parametrize(
"current_date",
[
pytest.param(pendulum.DateTime(2021, 9, 1, tzinfo=utc), id="when-current-date-is-before-first-event"),
pytest.param(pendulum.DateTime(2021, 9, 8, tzinfo=utc), id="when-current-date-is-in-the-middle"),
pytest.param(pendulum.DateTime(2021, 12, 9, tzinfo=utc), id="when-current-date-is-after-last-event"),
],
)
@pytest.mark.parametrize(
"last_automated_data_interval",
[
pytest.param(None, id="first-run"),
pytest.param(DataInterval(start=BEFORE_DATE, end=BEFORE_DATE), id="subsequent-run"),
],
)
def test_no_catchup_first_starts(
last_automated_data_interval: DataInterval | None,
current_date,
unrestricted_timetable: Timetable,
) -> None: