Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix queued dag runs changes catchup=False behaviour #19130

Merged
merged 1 commit into from
Oct 22, 2021

Conversation

robinedwards
Copy link
Contributor

@robinedwards robinedwards commented Oct 21, 2021

closes: #18487
When a dag has catchup=False we want to select the max value from the earliest possible execution_date and the end of the last execution period.

Thus if a dag is behind schedule it will now select the latest execution date as opposed to just picking the next execution date from the previous dag run

@ephraimbuddy
Copy link
Contributor

ephraimbuddy commented Oct 21, 2021

cc: @uranusjr

We need some test to avoid regression

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Oct 21, 2021
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

airflow/timetables/interval.py Outdated Show resolved Hide resolved
@uranusjr uranusjr added this to the Airflow 2.2.1 milestone Oct 21, 2021
@eladkal eladkal changed the title Fixes #18487 Fix queued dag runs changes catchup=False behaviour Oct 21, 2021
When a dag has catchup=False we want to select the max value from the
earliest date and the end of the last execution period.

Thus if a dag is behind schedule it will select the latest execution
date.
@robinedwards robinedwards force-pushed the fix/catchup-false-ignored branch from 6065747 to cda6290 Compare October 21, 2021 13:39
@ephraimbuddy ephraimbuddy reopened this Oct 21, 2021
@ephraimbuddy
Copy link
Contributor

Closed/reopened to retrigger tests

@ephraimbuddy ephraimbuddy requested a review from uranusjr October 21, 2021 16:12
@ashb
Copy link
Member

ashb commented Oct 21, 2021

@ephraimbuddy is this different to the case you were working on/have a PR for?

@ephraimbuddy
Copy link
Contributor

@ephraimbuddy is this different to the case you were working on/have a PR for?

My patch did not work for a case where the dag was paused and the file was edited to set the catchup to False

Comment on lines +84 to +91
# There's a previous run.
if earliest is not None:
# Catchup is False or DAG has new start date in the future.
# Make sure we get the latest start date
start = max(last_automated_data_interval.end, earliest)
else:
# Create a data interval starting from when the end of the previous interval.
start = last_automated_data_interval.end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move the first comment to the end of the else:? Like this

else:  # There's a previous run.

Because right now it isn't very clear whether this comment is describing the else, or the if earliest is not None: block below.

Otherwise this looks good to me. Could you add a test in tests/timetables for this? There's already a file in there, and you should be able to set up cases roughly following its structure to test for the catchup, start_date, and last_automated_data_interval cases (so 8 in total) with pytest.mark.parametrize.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course will look into this tomorrow

@robinedwards
Copy link
Contributor Author

My patch did not work for a case where the dag was paused and the file was edited to set the catchup to False

Just to clarify the paused / editing step in my example was just to get the DAG into a state where it is behind schedule. In main and 2.2.0 any catchup=False dag that's behind schedule will execute all dagruns between its current execution date and the latest run.

@ephraimbuddy
Copy link
Contributor

My patch did not work for a case where the dag was paused and the file was edited to set the catchup to False

Just to clarify the paused / editing step in my example was just to get the DAG into a state where it is behind schedule. In main and 2.2.0 any catchup=False dag that's behind schedule will execute all dagruns between its current execution date and the latest run.

You are right. I have just learned what catchup really means. My PR only made sure no runs are created in advance. Thanks for your contribution.
Please add this test to the test_scheduler_job.py:

   def test_catchup_works_correctly(self, dag_maker):
        """Test that catchup works correctly"""
        with dag_maker(
            dag_id='test_catchup_schedule_dag',
            schedule_interval=timedelta(days=1),
            start_date=DEFAULT_DATE,
            catchup=True,
            max_active_runs=1,
        ) as dag:
            DummyOperator(
                task_id='dummy',
            )
        
        session = settings.Session()
        self.scheduler_job = SchedulerJob(subdir=os.devnull)
        self.scheduler_job.executor = MockExecutor()
        self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)

        self.scheduler_job._create_dag_runs([dag_maker.dag_model], session)
        self.scheduler_job._start_queued_dagruns(session)
        # first dagrun execution date is DEFAULT_DATE 2016-01-01T00:00:00+00:00
        drs = DagRun.find(execution_date=DEFAULT_DATE)
        ti = drs[0].get_task_instance(task_id='dummy')
        ti.state = State.SUCCESS
        session.merge(ti)
        session.commit()
        self.scheduler_job._schedule_dag_run(drs[0], session)
        session.flush()
        # Run the second time so _update_dag_next_dagrun will run
        self.scheduler_job._schedule_dag_run(drs[0], session)
        session.flush()
        dag.catchup = False
        dag.sync_to_db()
        assert dag.catchup is False
        dm = DagModel.get_dagmodel(dag.dag_id)
        self.scheduler_job._create_dag_runs([dm], session)
        #exclude the first run
        dr = session.query(DagRun).filter(DagRun.execution_date!=DEFAULT_DATE).one()
        # Check catchup worked correctly by ensuring execution_date is quite new
        # Our dag is a daily dag
        assert dr.execution_date > (timezone.utcnow()-timedelta(days=2))

@uranusjr
Copy link
Member

Since we're going to have 2.2.1 very soon, I'm going to merge this and work on tests separately to make sure we make the deadline.

@uranusjr uranusjr merged commit 829f90a into apache:main Oct 22, 2021
@robinedwards
Copy link
Contributor Author

Thank you @uranusjr and @ephraimbuddy much appreciated

jedcunningham pushed a commit that referenced this pull request Oct 22, 2021
jedcunningham pushed a commit that referenced this pull request Oct 24, 2021
jedcunningham pushed a commit to astronomer/airflow that referenced this pull request Oct 26, 2021
sharon2719 pushed a commit to sharon2719/airflow that referenced this pull request Oct 27, 2021
jedcunningham pushed a commit to astronomer/airflow that referenced this pull request Oct 27, 2021
@jedcunningham jedcunningham added the type:bug-fix Changelog: Bug Fixes label Apr 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
full tests needed We need to run full set of tests for this PR to merge type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2.1.3/4 queued dag runs changes catchup=False behaviour
5 participants