Skip to content

Commit

Permalink
Bugfix: Check next run exists before reading data interval (#19307)
Browse files Browse the repository at this point in the history
Fix #19304, and also an issue on scheduling a DAG's first-ever run introduced in #18897. We could fix it outside this function, but if `next_dagrun` is None, the next run's data interval is supposed to be None in the first place, so checking inside this function just makes sense.

closes #19343
closes #19304

(cherry picked from commit dc4dcaa)
  • Loading branch information
uranusjr authored and jedcunningham committed Nov 3, 2021
1 parent 44caa7e commit 06c1cea
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 5 deletions.
13 changes: 8 additions & 5 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,12 +607,12 @@ def previous_schedule(self, dttm):
return None
return self.timetable._get_prev(timezone.coerce_datetime(dttm))

def get_next_data_interval(self, dag_model: "DagModel") -> DataInterval:
def get_next_data_interval(self, dag_model: "DagModel") -> Optional[DataInterval]:
"""Get the data interval of the next scheduled run.
For compatibility, this method infers the data interval from the DAG's
schedule if the run does not have an explicit one set, which is possible for
runs created prior to AIP-39.
schedule if the run does not have an explicit one set, which is possible
for runs created prior to AIP-39.
This function is private to Airflow core and should not be depended as a
part of the Python API.
Expand All @@ -621,11 +621,14 @@ def get_next_data_interval(self, dag_model: "DagModel") -> DataInterval:
"""
if self.dag_id != dag_model.dag_id:
raise ValueError(f"Arguments refer to different DAGs: {self.dag_id} != {dag_model.dag_id}")
if dag_model.next_dagrun is None: # Next run not scheduled.
return None
data_interval = dag_model.next_dagrun_data_interval
if data_interval is not None:
return data_interval
# Compatibility: runs scheduled before AIP-39 implementation don't have an
# explicit data interval. Try to infer from the logical date.
# Compatibility: A run was scheduled without an explicit data interval.
# This means the run was scheduled before AIP-39 implementation. Try to
# infer from the logical date.
return self.infer_automated_data_interval(dag_model.next_dagrun)

def get_run_data_interval(self, run: DagRun) -> DataInterval:
Expand Down
37 changes: 37 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2234,3 +2234,40 @@ def next_dagrun_info(self, last_automated_data_interval, restriction):
),
]
assert caplog.records[0].exc_info is not None, "should contain exception context"


@pytest.mark.parametrize(
"logical_date, data_interval_start, data_interval_end, expected_data_interval",
[
pytest.param(None, None, None, None, id="no-next-run"),
pytest.param(
DEFAULT_DATE,
DEFAULT_DATE,
DEFAULT_DATE + timedelta(days=2),
DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=2)),
id="modern",
),
pytest.param(
DEFAULT_DATE,
None,
None,
DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=1)),
id="legacy",
),
],
)
def test_get_next_data_interval(
logical_date,
data_interval_start,
data_interval_end,
expected_data_interval,
):
dag = DAG(dag_id="test_get_next_data_interval", schedule_interval="@daily")
dag_model = DagModel(
dag_id="test_get_next_data_interval",
next_dagrun=logical_date,
next_dagrun_data_interval_start=data_interval_start,
next_dagrun_data_interval_end=data_interval_end,
)

assert dag.get_next_data_interval(dag_model) == expected_data_interval

0 comments on commit 06c1cea

Please sign in to comment.