From 28f56a007e106d1373c6c231a67b87c45b63697c Mon Sep 17 00:00:00 2001 From: Steve Zhang <86570314+steveyz-astro@users.noreply.github.com> Date: Fri, 22 Oct 2021 08:35:26 -0700 Subject: [PATCH] add detailed information to logging when a dag or a task finishes. (#19097) * add detailed information to logging when a dag or a task finishes. * make logging of start_date/end_date ISO format to be consist * fix pre-commit * use only %s in new logging statements to gracefully handle when certain variables are None * fix precommit * use self._state instead of get_state(). add computation for dag run duration based on start_date and end_date * make linter happy with format * fix typo * put back missing reference to _state Co-authored-by: Daniel Imberman (cherry picked from commit 324c31c2d7ad756ce3814f74f0b6654d02f19426) --- airflow/jobs/scheduler_job.py | 25 +++++++++++++++++++++++++ airflow/models/dagrun.py | 25 +++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index bf2410a67e323..87dc158397f5a 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -533,6 +533,31 @@ def _process_executor_events(self, session: Session = None) -> int: self.log.info("Setting external_id for %s to %s", ti, info) continue + msg = ( + "TaskInstance Finished: dag_id=%s, task_id=%s, run_id=%s, " + "run_start_date=%s, run_end_date=%s, " + "run_duration=%s, state=%s, executor_state=%s, try_number=%s, max_tries=%s, job_id=%s, " + "pool=%s, queue=%s, priority_weight=%d, operator=%s" + ) + self.log.info( + msg, + ti.dag_id, + ti.task_id, + ti.run_id, + ti.start_date, + ti.end_date, + ti.duration, + ti.state, + state, + try_number, + ti.max_tries, + ti.job_id, + ti.pool, + ti.queue, + ti.priority_weight, + ti.operator, + ) + if ti.try_number == buffer_key.try_number and ti.state == State.QUEUED: Stats.incr('scheduler.tasks.killed_externally') msg = ( diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 1d5326581295c..e8eb6853e53cd 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -542,6 +542,31 @@ def update_state( else: self.set_state(State.RUNNING) + if self._state == State.FAILED or self._state == State.SUCCESS: + msg = ( + "DagRun Finished: dag_id=%s, execution_date=%s, run_id=%s, " + "run_start_date=%s, run_end_date=%s, run_duration=%s, " + "state=%s, external_trigger=%s, run_type=%s, " + "data_interval_start=%s, data_interval_end=%s, dag_hash=%s" + ) + self.log.info( + msg, + self.dag_id, + self.execution_date, + self.run_id, + self.start_date, + self.end_date, + (self.end_date - self.start_date).total_seconds() + if self.start_date and self.end_date + else None, + self._state, + self.external_trigger, + self.run_type, + self.data_interval_start, + self.data_interval_end, + self.dag_hash, + ) + self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks) self._emit_duration_stats_for_finished_state()