Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AttributeError while using the function update_state #34604

Closed
1 of 2 tasks
fuatcakici opened this issue Sep 25, 2023 · 5 comments · Fixed by #36712
Closed
1 of 2 tasks

AttributeError while using the function update_state #34604

fuatcakici opened this issue Sep 25, 2023 · 5 comments · Fixed by #36712
Labels
affected_version:2.7 Issues Reported for 2.7 area:core kind:bug This is a clearly a bug

Comments

@fuatcakici
Copy link
Contributor

fuatcakici commented Sep 25, 2023

Apache Airflow version

2.7.1

What happened

After changing the states of some of the tasks that failed to SUCCESS in a DAG run, we call the update_state function in order to update the state of that particular DAG run. However, at this point we get an AttributeError from here in dagrun.py.

We believe this is probably caused by this change in this PR, where self.state was probably expected to give an output in the format <DagRunState.SUCCESS: 'success'>, but it simply gives an output in the format 'success' instead.

Hence, self.state.value ends up giving an AttributeError instead of the state, e.g., 'success'.

What you think should happen instead

Replacing self.state.value with self.state in the line in question and the one below should solve the issue.

How to reproduce

Simply calling the function update_state on a finished DAG run (finished because the function _emit_duration_stats_for_finished_state called in update_state simply returns if the state is RUNNING without reaching the erroneous lines) should give this error.

Operating System

Amazon Linux 2

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@fuatcakici fuatcakici added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Sep 25, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Sep 25, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@jscheffl
Copy link
Contributor

jscheffl commented Sep 25, 2023

Before manually drilling down, would you be willing to supply an example DAG that is able to reproduce the bug? Can be anonymized.
Otherwise to speed understanding up, can you also add the stack trace that was generated during your analysis?

Otherwise we would be happy also if you have started investigating, if possible, to receive an PR as fix as well.

@fuatcakici
Copy link
Contributor Author

fuatcakici commented Sep 27, 2023

Hello, thanks for picking this up! Upon further inspection, I have realised that I misidentified the issue in place, however, the issue exists nonetheless.

I have written the following simple DAG in order to reproduce the issue and have triggered it manually twice:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import DagRun
from airflow.utils.state import TaskInstanceState

import datetime as dt


with DAG(
    dag_id="test",
    start_date=dt.datetime(2023, 9, 26),
) as dag:
    def simple_function():
        return "simple function"


    def error_function():
        dag_runs = DagRun.find(dag_id="test")
        if len(dag_runs) == 1:
            raise Exception("Intentional error raised.")
        else:
            previous_dag_run = dag_runs[0]
            previous_task_instance = previous_dag_run.get_task_instance("error_task")
            previous_task_instance.set_state(TaskInstanceState.SUCCESS)

            previous_dag_run.dag = dag
            previous_dag_run.update_state()
    
    tasks = {}

    simple_task = "simple_task"
    tasks[simple_task] = PythonOperator(
        task_id=simple_task,
        python_callable=simple_function
    )

    error_task = "error_task"
    tasks[error_task] = PythonOperator(
        task_id=error_task,
        python_callable=error_function
    )

    tasks[simple_task] >> tasks[error_task]

So as you can see, in the first DAG run, the simple_task succeeds and the error_task fails, as shown:

Screenshot 2023-09-26 at 16 22 45

After triggering it for the second time, I was expecting the error in question, but the DAG simply ran the expected way:

Screenshot 2023-09-27 at 10 42 05

After this, I inspected the issue further as I said before, and realised that the update_state function only fails when the DAG run state is SUCCESS. Hence, when I clear the error_function in the second DAG run, the task fails:

Screenshot 2023-09-27 at 10 47 18

with the following stack trace:

[2023-09-27, 08:55:47 UTC] {{dagrun.py:653}} INFO - Marking run <DagRun test @ 2023-09-26 00:00:00+00:00: scheduled__2023-09-26T00:00:00+00:00, state:success, queued_at: 2023-09-27 08:41:05.459933+00:00. externally triggered: False> successful
[2023-09-27, 08:55:47 UTC] {{dagrun.py:704}} INFO - DagRun Finished: dag_id=test, execution_date=2023-09-26 00:00:00+00:00, run_id=scheduled__2023-09-26T00:00:00+00:00, run_start_date=2023-09-27 08:41:05.504188+00:00, run_end_date=2023-09-27 08:41:20.217921+00:00, run_duration=14.713733, state=success, external_trigger=False, run_type=scheduled, data_interval_start=2023-09-26 00:00:00+00:00, data_interval_end=2023-09-27 00:00:00+00:00, dag_hash=05d215ef6d71a0bacdc1dd5714497000
[2023-09-27, 08:47:07 UTC] {{taskinstance.py:1936}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/operators/python.py", line 192, in execute
    return_value = self.execute_callable()
  File "/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/operators/python.py", line 209, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/data_flows/dags/test/dag.py", line 28, in error_function
    previous_dag_run.update_state()
  File "/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/utils/session.py", line 77, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/models/dagrun.py", line 724, in update_state
    self._emit_duration_stats_for_finished_state()
  File "/home/airflow/airflow_env/lib/python3.10/site-packages/airflow/models/dagrun.py", line 964, in _emit_duration_stats_for_finished_state
    Stats.timing(f"dagrun.duration.{self.state.value}.{self.dag_id}", **timer_params)
AttributeError: 'str' object has no attribute 'value'

At this point, I wondered if this was somehow because the DAG run state was already right, i.e., would be changed from SUCCESS to SUCCESS since all the task instances in it were also successful. However, the update_state function also fails when one of the DAG run state is SUCCESS, but one of the task instances within it is FAILED. You can see this behaviour by manually setting the simple_task in the first DAG run to FAILED, and clearing the error_task in the second DAG run:

Screenshot 2023-09-27 at 10 55 58

The stack trace is the same.

Hence, replacing self.state.value with self.state will not solve the issue, as it will likely create another issue for the DAG runs in the FAILED state. Thus, my intuition is that somewhere in the code, the self.state for successful DAG runs is returned as only 'success', but for failed DAG runs it is returned as <DagRunState.FAILED: 'failed'>, but I cannot figure out where.

Hope this helps, and let me know if there is anything else I can do!

@fuatcakici
Copy link
Contributor Author

I have potentially also found another error which you may want to know. After manually setting the simple_task in the first DAG run to FAILED, if I manually set that whole DAG run to FAILED as well, clearing the error_task in the second DAG run changes the state of the previous DAG run from FAILED to SUCCESS, even though the simple_task remains in a FAILED state and hence so should the DAG run. You can see it below:

Screenshot 2023-09-27 at 15 47 59

As expected, you can see that the error_task in the second DAG run succeeds which is in line with my hypothesis above; the function update_state only works if the DAG run is in the state FAILED.

I just wanted to also add this even though it is not entirely part of the worry in hand, since I believe it is still somewhat related and hope could somehow be helpful.

@eladkal eladkal added the affected_version:2.7 Issues Reported for 2.7 label Nov 9, 2023
@internetcoffeephone
Copy link
Contributor

This issue is occurring for us since our upgrade from Airflow 2.6.1 to 2.7.1.

After doing some digging, I suspect the cause is 83f5950 by @ephraimbuddy. This makes changes to DagRun.find(), which now possibly no longer correctly loads the state as a DagRunState, but as a string. Not 100% certain though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.7 Issues Reported for 2.7 area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants