Skip to content

Commit

Permalink
add detailed information to logging when a dag or a task finishes. (#…
Browse files Browse the repository at this point in the history
…19097)

* add detailed information to logging when a dag or a task finishes.

* make logging of start_date/end_date ISO format to be consist

* fix pre-commit

* use only %s in new logging statements to gracefully handle when certain variables are None

* fix precommit

* use self._state instead of get_state().  add computation for dag run duration based on start_date and end_date

* make linter happy with format

* fix typo

* put back missing reference to _state

Co-authored-by: Daniel Imberman <daniel.imberman@gmail.com>
(cherry picked from commit 324c31c)
  • Loading branch information
steveyz-astro authored and jedcunningham committed Oct 22, 2021
1 parent 5b1c76a commit 28f56a0
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
25 changes: 25 additions & 0 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,31 @@ def _process_executor_events(self, session: Session = None) -> int:
self.log.info("Setting external_id for %s to %s", ti, info)
continue

msg = (
"TaskInstance Finished: dag_id=%s, task_id=%s, run_id=%s, "
"run_start_date=%s, run_end_date=%s, "
"run_duration=%s, state=%s, executor_state=%s, try_number=%s, max_tries=%s, job_id=%s, "
"pool=%s, queue=%s, priority_weight=%d, operator=%s"
)
self.log.info(
msg,
ti.dag_id,
ti.task_id,
ti.run_id,
ti.start_date,
ti.end_date,
ti.duration,
ti.state,
state,
try_number,
ti.max_tries,
ti.job_id,
ti.pool,
ti.queue,
ti.priority_weight,
ti.operator,
)

if ti.try_number == buffer_key.try_number and ti.state == State.QUEUED:
Stats.incr('scheduler.tasks.killed_externally')
msg = (
Expand Down
25 changes: 25 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,31 @@ def update_state(
else:
self.set_state(State.RUNNING)

if self._state == State.FAILED or self._state == State.SUCCESS:
msg = (
"DagRun Finished: dag_id=%s, execution_date=%s, run_id=%s, "
"run_start_date=%s, run_end_date=%s, run_duration=%s, "
"state=%s, external_trigger=%s, run_type=%s, "
"data_interval_start=%s, data_interval_end=%s, dag_hash=%s"
)
self.log.info(
msg,
self.dag_id,
self.execution_date,
self.run_id,
self.start_date,
self.end_date,
(self.end_date - self.start_date).total_seconds()
if self.start_date and self.end_date
else None,
self._state,
self.external_trigger,
self.run_type,
self.data_interval_start,
self.data_interval_end,
self.dag_hash,
)

self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks)
self._emit_duration_stats_for_finished_state()

Expand Down

0 comments on commit 28f56a0

Please sign in to comment.