Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add detailed information to logging when a dag or a task finishes. #19097

Merged
merged 12 commits into from
Oct 22, 2021

Conversation

steveyz-astro
Copy link
Contributor

@steveyz-astro steveyz-astro commented Oct 20, 2021

Adding 2 new log messages for when a task run and a dag run reaches a terminal state (SUCCESS/FAILED).

Examples:

scheduler | [2021-10-19 21:27:55,681] {scheduler_job.py:529} INFO - Task Finished: dag_id=example_bash_operator, task_id=runme_0, run_id=scheduled__2021-10-19T00:00:00+00:00, run_start_date=2021-10-19 21:27:34.142493, run_end_date=2021-10-19 21:27:35.508459, run_duration=1.365966, state=success, executor_state=success, try_number=1, max_tries=0, job_id=87, pool=default_pool, queue=default, priority_weight=3, operator=BashOperator
scheduler | [2021-10-19 22:11:22,730] {dagrun.py:535} INFO - DagRun Finished: dag_id=example_branch_dop_operator_v3, execution_date=2021-10-19 21:52:00+00:00, run_id=scheduled__2021-10-19T21:52:00+00:00, run_start_time=2021-10-19 21:55:28.363682, run_end_time=2021-10-19 22:11:22.730577, run_duration=954.366894, state=failed, external_trigger=False, run_type=scheduled, data_interval_start=2021-10-19 21:52:00+00:00, data_interval_end=2021-10-19 21:53:00+00:00, dag_hash=3f534eba2aff8e72d5ad1db5aff1e64b

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Oct 20, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 20, 2021

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
@uranusjr
Copy link
Member

Also, please check linter errors locally; this will not pass CI.

@@ -526,6 +526,28 @@ def update_state(
else:
self.set_state(State.RUNNING)

if self.get_state() == State.FAILED or self.get_state() == State.SUCCESS:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use self.state here, but I kind of feel maybe this logging should be done in set_state, or when the state is actually set to instead? Not sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did consider doing this in set_state(), but decided against it a for a few reasons

  • The existing logging messages are at this layer
  • While I believe think it would logically equivalent right now, it's possible the further code changes could end up tweaking the value of the fields that this is logging after the set_state() call, and that would be more obvious if the logging was at this layer rather than in the setter.
  • Generally i prefer not having logging in the lowest level getter/setter method because they rarely have the full context across why the value is being referenced. In this case, the logging message isn't using anything that's specific to the update_state() context, but we could easily augment the message in the future to provide more context. E.g. failed due to deadlock vs task failure.

I can certainly change to use self._state directly here though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the reasoning makes sense. In that case, would it be possible to move the TaskInstance logging to a similar abstraction level, instead of doing it in SchedulerJob? Or does the TI lack such abstraction to do the same? I see there's a TaskInstance._log_state() function, which is called (for SUCCESS and FAILURE states) in _run_raw_task, but this new log seems to be emitted at a difference place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think having it in TaskInstance._log_state() would certainly make sense from an abstraction point of view. However, looking at the code, _log_state() seems like it doesn't get called if the task fails.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indirectly called in handle_failure; not sure if it covers all cases, can you check?

Copy link
Contributor Author

@steveyz-astro steveyz-astro Oct 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't appear to cover all of the cases. e.g.

except AirflowException as e:
if not test_mode:
self.refresh_from_db(lock_for_update=True, session=session)
# for case when task is marked as success/failed externally
# or dagrun timed out and task is marked as skipped
# current behavior doesn't hit the callbacks
if self.state in State.finished:
return

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yeah. Would it be a good idea to just add a _log_state here? (and other similar paths in _run_raw_task)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to make major changes to code flow just for the additional logging. doesn't seem the worth the risk tradeoff of potentially breaking existing behavior. It seems like there are a lot of edge conditions in terms of error/skipping handling in this code.

@dimberman dimberman self-requested a review October 22, 2021 14:50
@dimberman dimberman dismissed uranusjr’s stale review October 22, 2021 15:35

I think this looks sufficient for getting a quick fix in and we can move around if needed.

@dimberman dimberman merged commit 324c31c into apache:main Oct 22, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Oct 22, 2021

Awesome work, congrats on your first merged pull request!

@jedcunningham jedcunningham added this to the Airflow 2.2.1 milestone Oct 22, 2021
jedcunningham pushed a commit that referenced this pull request Oct 22, 2021
…19097)

* add detailed information to logging when a dag or a task finishes.

* make logging of start_date/end_date ISO format to be consist

* fix pre-commit

* use only %s in new logging statements to gracefully handle when certain variables are None

* fix precommit

* use self._state instead of get_state().  add computation for dag run duration based on start_date and end_date

* make linter happy with format

* fix typo

* put back missing reference to _state

Co-authored-by: Daniel Imberman <daniel.imberman@gmail.com>
(cherry picked from commit 324c31c)
@steveyz-astro steveyz-astro deleted the log-dag-task-details branch October 23, 2021 20:55
jedcunningham pushed a commit that referenced this pull request Oct 24, 2021
…19097)

* add detailed information to logging when a dag or a task finishes.

* make logging of start_date/end_date ISO format to be consist

* fix pre-commit

* use only %s in new logging statements to gracefully handle when certain variables are None

* fix precommit

* use self._state instead of get_state().  add computation for dag run duration based on start_date and end_date

* make linter happy with format

* fix typo

* put back missing reference to _state

Co-authored-by: Daniel Imberman <daniel.imberman@gmail.com>
(cherry picked from commit 324c31c)
sharon2719 pushed a commit to sharon2719/airflow that referenced this pull request Oct 27, 2021
…pache#19097)

* add detailed information to logging when a dag or a task finishes.

* make logging of start_date/end_date ISO format to be consist

* fix pre-commit

* use only %s in new logging statements to gracefully handle when certain variables are None

* fix precommit

* use self._state instead of get_state().  add computation for dag run duration based on start_date and end_date

* make linter happy with format

* fix typo

* put back missing reference to _state

Co-authored-by: Daniel Imberman <daniel.imberman@gmail.com>
@jedcunningham jedcunningham added the type:bug-fix Changelog: Bug Fixes label Apr 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants