diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 5daa7bb80538a..e93d5a55a0755 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -548,6 +548,13 @@ def __init__( # sort out DAG's scheduling behavior scheduling_args = [schedule_interval, timetable, schedule] + + has_scheduling_args = any(a is not NOTSET and bool(a) for a in scheduling_args) + has_empty_start_date = not ("start_date" in self.default_args or self.start_date) + + if has_scheduling_args and has_empty_start_date: + raise ValueError("DAG is missing the start_date parameter") + if not at_most_one(*scheduling_args): raise ValueError("At most one allowed for args 'schedule_interval', 'timetable', and 'schedule'.") if schedule_interval is not NOTSET: @@ -2618,10 +2625,8 @@ def add_task(self, task: Operator) -> None: from airflow.utils.task_group import TaskGroupContext - if not self.start_date and not task.start_date: - raise AirflowException("DAG is missing the start_date parameter") # if the task has no start date, assign it the same as the DAG - elif not task.start_date: + if not task.start_date: task.start_date = self.start_date # otherwise, the task will start on the later of its own start date and # the DAG's start date diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 898d2663903ce..b2e70b37a595e 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1116,7 +1116,7 @@ def verify_integrity(self, *, session: Session = NEW_SESSION) -> None: def task_filter(task: Operator) -> bool: return task.task_id not in task_ids and ( self.is_backfill - or task.start_date <= self.execution_date + or (task.start_date is None or task.start_date <= self.execution_date) and (task.end_date is None or self.execution_date <= task.end_date) ) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index f4b872109f626..f7bf1ad6d0c91 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -765,7 +765,7 @@ def test_following_schedule_relativedelta(self): """ dag_id = "test_schedule_dag_relativedelta" delta = relativedelta(hours=+1) - dag = DAG(dag_id=dag_id, schedule=delta) + dag = DAG(dag_id=dag_id, schedule=delta, start_date=TEST_DATE) dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE)) _next = dag.following_schedule(TEST_DATE) @@ -780,7 +780,7 @@ def test_following_schedule_relativedelta_with_deprecated_schedule_interval(self """ dag_id = "test_schedule_dag_relativedelta" delta = relativedelta(hours=+1) - dag = DAG(dag_id=dag_id, schedule_interval=delta) + dag = DAG(dag_id=dag_id, schedule_interval=delta, start_date=TEST_DATE) dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE)) _next = dag.following_schedule(TEST_DATE) @@ -799,7 +799,7 @@ def test_following_schedule_relativedelta_with_depr_schedule_interval_decorated_ dag_id = "test_schedule_dag_relativedelta" delta = relativedelta(hours=+1) - @dag(dag_id=dag_id, schedule_interval=delta) + @dag(dag_id=dag_id, schedule_interval=delta, start_date=TEST_DATE) def mydag(): BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE) @@ -827,6 +827,20 @@ def test_following_schedule_datetime_timezone(self): when = dag.following_schedule(start) assert when.isoformat() == "2018-03-25T03:00:00+00:00" + def test_create_dagrun_when_schedule_is_none_and_empty_start_date(self): + # Check that we don't get an AttributeError 'start_date' for self.start_date when schedule is none + dag = DAG("dag_with_none_schedule_and_empty_start_date") + dag.add_task(BaseOperator(task_id="task_without_start_date")) + dagrun = dag.create_dagrun( + state=State.RUNNING, run_type=DagRunType.MANUAL, execution_date=DEFAULT_DATE + ) + assert dagrun is not None + + def test_fail_dag_when_schedule_is_non_none_and_empty_start_date(self): + # Check that we get a ValueError 'start_date' for self.start_date when schedule is non-none + with pytest.raises(ValueError, match="DAG is missing the start_date parameter"): + DAG(dag_id="dag_with_non_none_schedule_and_empty_start_date", schedule="@hourly") + def test_following_schedule_datetime_timezone_utc0530(self): # Check that we don't get an AttributeError 'name' for self.timezone class UTC0530(datetime.tzinfo): @@ -942,8 +956,8 @@ def test_bulk_write_to_db_interval_save_runtime(self, interval): mock_active_runs_of_dags = mock.MagicMock(side_effect=DagRun.active_runs_of_dags) with mock.patch.object(DagRun, "active_runs_of_dags", mock_active_runs_of_dags): dags_null_timetable = [ - DAG("dag-interval-None", schedule_interval=None), - DAG("dag-interval-test", schedule_interval=interval), + DAG("dag-interval-None", schedule_interval=None, start_date=TEST_DATE), + DAG("dag-interval-test", schedule_interval=interval, start_date=TEST_DATE), ] DAG.bulk_write_to_db(dags_null_timetable, session=settings.Session()) if interval: @@ -1530,7 +1544,7 @@ def test_schedule_dag_once(self): it is called, and not scheduled the second. """ dag_id = "test_schedule_dag_once" - dag = DAG(dag_id=dag_id, schedule="@once") + dag = DAG(dag_id=dag_id, schedule="@once", start_date=TEST_DATE) assert isinstance(dag.timetable, OnceTimetable) dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE)) @@ -1553,7 +1567,7 @@ def test_fractional_seconds(self): Tests if fractional seconds are stored in the database """ dag_id = "test_fractional_seconds" - dag = DAG(dag_id=dag_id, schedule="@once") + dag = DAG(dag_id=dag_id, schedule="@once", start_date=TEST_DATE) dag.add_task(BaseOperator(task_id="faketastic", owner="Also fake", start_date=TEST_DATE)) start_date = timezone.utcnow() @@ -1658,25 +1672,25 @@ def test_get_paused_dag_ids(self): def test_timetable_and_description_from_schedule_interval_arg( self, schedule_interval_arg, expected_timetable, interval_description ): - dag = DAG("test_schedule_interval_arg", schedule=schedule_interval_arg) + dag = DAG("test_schedule_interval_arg", schedule=schedule_interval_arg, start_date=TEST_DATE) assert dag.timetable == expected_timetable assert dag.schedule_interval == schedule_interval_arg assert dag.timetable.description == interval_description def test_timetable_and_description_from_dataset(self): - dag = DAG("test_schedule_interval_arg", schedule=[Dataset(uri="hello")]) + dag = DAG("test_schedule_interval_arg", schedule=[Dataset(uri="hello")], start_date=TEST_DATE) assert dag.timetable == DatasetTriggeredTimetable() assert dag.schedule_interval == "Dataset" assert dag.timetable.description == "Triggered by datasets" def test_schedule_interval_still_works(self): - dag = DAG("test_schedule_interval_arg", schedule_interval="*/5 * * * *") + dag = DAG("test_schedule_interval_arg", schedule_interval="*/5 * * * *", start_date=TEST_DATE) assert dag.timetable == cron_timetable("*/5 * * * *") assert dag.schedule_interval == "*/5 * * * *" assert dag.timetable.description == "Every 5 minutes" def test_timetable_still_works(self): - dag = DAG("test_schedule_interval_arg", timetable=cron_timetable("*/6 * * * *")) + dag = DAG("test_schedule_interval_arg", timetable=cron_timetable("*/6 * * * *"), start_date=TEST_DATE) assert dag.timetable == cron_timetable("*/6 * * * *") assert dag.schedule_interval == "*/6 * * * *" assert dag.timetable.description == "Every 6 minutes" @@ -1702,7 +1716,7 @@ def test_timetable_still_works(self): ], ) def test_description_from_timetable(self, timetable, expected_description): - dag = DAG("test_schedule_interval_description", timetable=timetable) + dag = DAG("test_schedule_interval_description", timetable=timetable, start_date=TEST_DATE) assert dag.timetable == timetable assert dag.timetable.description == expected_description @@ -2449,7 +2463,7 @@ def test_return_date_range_with_num_method(self): start_date = TEST_DATE delta = timedelta(days=1) - dag = DAG("dummy-dag", schedule=delta) + dag = DAG("dummy-dag", schedule=delta, start_date=start_date) dag_dates = dag.date_range(start_date=start_date, num=3) assert dag_dates == [ @@ -2502,10 +2516,10 @@ def test_dag_owner_links(self): ) def test_schedule_dag_param(self, kwargs): with pytest.raises(ValueError, match="At most one"): - with DAG(dag_id="hello", **kwargs): + with DAG(dag_id="hello", start_date=TEST_DATE, **kwargs): pass - def test_continuous_schedule_interval_limits_max_active_runs(self): + def test_continuous_schedule_interval_linmits_max_active_runs(self): dag = DAG("continuous", start_date=DEFAULT_DATE, schedule_interval="@continuous", max_active_runs=1) assert isinstance(dag.timetable, ContinuousTimetable) assert dag.max_active_runs == 1 @@ -3010,19 +3024,19 @@ def mydag(): @pytest.mark.parametrize("timetable", [NullTimetable(), OnceTimetable()]) def test_dag_timetable_match_schedule_interval(timetable): - dag = DAG("my-dag", timetable=timetable) + dag = DAG("my-dag", timetable=timetable, start_date=DEFAULT_DATE) assert dag._check_schedule_interval_matches_timetable() @pytest.mark.parametrize("schedule_interval", [None, "@once", "@daily", timedelta(days=1)]) def test_dag_schedule_interval_match_timetable(schedule_interval): - dag = DAG("my-dag", schedule=schedule_interval) + dag = DAG("my-dag", schedule=schedule_interval, start_date=DEFAULT_DATE) assert dag._check_schedule_interval_matches_timetable() @pytest.mark.parametrize("schedule_interval", [None, "@daily", timedelta(days=1)]) def test_dag_schedule_interval_change_after_init(schedule_interval): - dag = DAG("my-dag", timetable=OnceTimetable()) + dag = DAG("my-dag", timetable=OnceTimetable(), start_date=DEFAULT_DATE) dag.schedule_interval = schedule_interval assert not dag._check_schedule_interval_matches_timetable() @@ -3391,7 +3405,7 @@ def test_get_next_data_interval( data_interval_end, expected_data_interval, ): - dag = DAG(dag_id="test_get_next_data_interval", schedule="@daily") + dag = DAG(dag_id="test_get_next_data_interval", schedule="@daily", start_date=DEFAULT_DATE) dag_model = DagModel( dag_id="test_get_next_data_interval", next_dagrun=logical_date, diff --git a/tests/providers/google/cloud/sensors/test_gcs.py b/tests/providers/google/cloud/sensors/test_gcs.py index 422cd8f71a39e..1d4bbcec876e0 100644 --- a/tests/providers/google/cloud/sensors/test_gcs.py +++ b/tests/providers/google/cloud/sensors/test_gcs.py @@ -227,7 +227,9 @@ def test_gcs_object_existence_async_sensor_execute_complete(self): class TestTsFunction: def test_should_support_datetime(self): context = { - "dag": DAG(dag_id=TEST_DAG_ID, schedule=timedelta(days=5)), + "dag": DAG( + dag_id=TEST_DAG_ID, schedule=timedelta(days=5), start_date=datetime(2019, 2, 14, 0, 0) + ), "execution_date": datetime(2019, 2, 14, 0, 0), } result = ts_function(context)