From 06c1cea45bb0f94189b2f3488884d1b566b08fb2 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 2 Nov 2021 03:53:37 +0800 Subject: [PATCH] Bugfix: Check next run exists before reading data interval (#19307) Fix #19304, and also an issue on scheduling a DAG's first-ever run introduced in #18897. We could fix it outside this function, but if `next_dagrun` is None, the next run's data interval is supposed to be None in the first place, so checking inside this function just makes sense. closes https://github.com/apache/airflow/issues/19343 closes https://github.com/apache/airflow/issues/19304 (cherry picked from commit dc4dcaa9ccbec6a1b1ce84d5ee42322ce1fbb081) --- airflow/models/dag.py | 13 ++++++++----- tests/models/test_dag.py | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 55be0ec282a7f..d8fa1d063d1b7 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -607,12 +607,12 @@ def previous_schedule(self, dttm): return None return self.timetable._get_prev(timezone.coerce_datetime(dttm)) - def get_next_data_interval(self, dag_model: "DagModel") -> DataInterval: + def get_next_data_interval(self, dag_model: "DagModel") -> Optional[DataInterval]: """Get the data interval of the next scheduled run. For compatibility, this method infers the data interval from the DAG's - schedule if the run does not have an explicit one set, which is possible for - runs created prior to AIP-39. + schedule if the run does not have an explicit one set, which is possible + for runs created prior to AIP-39. This function is private to Airflow core and should not be depended as a part of the Python API. @@ -621,11 +621,14 @@ def get_next_data_interval(self, dag_model: "DagModel") -> DataInterval: """ if self.dag_id != dag_model.dag_id: raise ValueError(f"Arguments refer to different DAGs: {self.dag_id} != {dag_model.dag_id}") + if dag_model.next_dagrun is None: # Next run not scheduled. + return None data_interval = dag_model.next_dagrun_data_interval if data_interval is not None: return data_interval - # Compatibility: runs scheduled before AIP-39 implementation don't have an - # explicit data interval. Try to infer from the logical date. + # Compatibility: A run was scheduled without an explicit data interval. + # This means the run was scheduled before AIP-39 implementation. Try to + # infer from the logical date. return self.infer_automated_data_interval(dag_model.next_dagrun) def get_run_data_interval(self, run: DagRun) -> DataInterval: diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 5bea2b7d48ccd..efdff89e64445 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -2234,3 +2234,40 @@ def next_dagrun_info(self, last_automated_data_interval, restriction): ), ] assert caplog.records[0].exc_info is not None, "should contain exception context" + + +@pytest.mark.parametrize( + "logical_date, data_interval_start, data_interval_end, expected_data_interval", + [ + pytest.param(None, None, None, None, id="no-next-run"), + pytest.param( + DEFAULT_DATE, + DEFAULT_DATE, + DEFAULT_DATE + timedelta(days=2), + DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=2)), + id="modern", + ), + pytest.param( + DEFAULT_DATE, + None, + None, + DataInterval(DEFAULT_DATE, DEFAULT_DATE + timedelta(days=1)), + id="legacy", + ), + ], +) +def test_get_next_data_interval( + logical_date, + data_interval_start, + data_interval_end, + expected_data_interval, +): + dag = DAG(dag_id="test_get_next_data_interval", schedule_interval="@daily") + dag_model = DagModel( + dag_id="test_get_next_data_interval", + next_dagrun=logical_date, + next_dagrun_data_interval_start=data_interval_start, + next_dagrun_data_interval_end=data_interval_end, + ) + + assert dag.get_next_data_interval(dag_model) == expected_data_interval