Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferred TI's next_method is not cleared when Clearing a Successful Task #19120

Closed
2 tasks done
ReadytoRocc opened this issue Oct 20, 2021 · 2 comments · Fixed by #19183
Closed
2 tasks done

Deferred TI's next_method is not cleared when Clearing a Successful Task #19120

ReadytoRocc opened this issue Oct 20, 2021 · 2 comments · Fixed by #19183
Assignees
Labels
area:core kind:bug This is a clearly a bug
Milestone

Comments

@ReadytoRocc
Copy link
Contributor

Apache Airflow version

2.2.0 (latest released)

Operating System

Container-Optimized OS with Containerd (cos_containerd) - GKE

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

No response

What happened

I went to clear a successful task instance that is using a Deferred operator, and noticed it immediately went into a state of successful. The Operator is designed to run a trigger that will wait 1 minute from approximately task start. Upon investigation, I did not see the following in the second run's task log: {taskinstance.py:1332} INFO - Pausing task as DEFERRED..

What you expected to happen

I would expect the task to once again go into a state of deferred for approx. 1 minute and then succeed.

How to reproduce

Run the following DAG. Once a task instance is successful, clear it, and see the task fail. This confirms ti.next_method was not cleared, as execute did not rerun and reset execute_try_number. execute did not rerun, as ti.next_method was not cleared.

from datetime import datetime

from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.triggers.testing import SuccessTrigger


class RetryOperator(BaseOperator):
    def execute(self, context):
        ti = context["ti"]
        next_method = ti.next_method
        try_number = ti.try_number

        self.log.info(
            f"In `execute`: try_number: {try_number}, next_method {next_method}."
        )

        self.defer(
            trigger=SuccessTrigger(),
            method_name="next",
            kwargs={"execute_try_number": try_number},
        )

    def next(self, context, execute_try_number, event=None):
        ti = context["ti"]
        next_method = ti.next_method
        try_number = ti.try_number

        self.log.info(
            f"In `next`: try_number: {try_number}, next_method {next_method}, execute_try_number: {execute_try_number}."
        )

        if execute_try_number != try_number:
            raise AirflowException("`execute` wasn't run during clear!")

        return None  # Success!


with DAG(
    "triggerer_clear", schedule_interval=None, start_date=datetime(2021, 10, 20)
) as dag:
    RetryOperator(task_id="clear")

Anything else

I believe this is due to ti.next_method (and ti.next_method_kwargs) not being cleared after a task has completed. A similar issue was raised in #18146.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@ReadytoRocc ReadytoRocc added area:core kind:bug This is a clearly a bug labels Oct 20, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 20, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@ephraimbuddy
Copy link
Contributor

@ReadytoRocc assigned it to you. Let me know if you need help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment