diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 0c32e44391ceb..ae88dab5cf3e4 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -532,6 +532,31 @@ def _process_executor_events(self, session: Session = None) -> int: self.log.info("Setting external_id for %s to %s", ti, info) continue + msg = ( + "TaskInstance Finished: dag_id=%s, task_id=%s, run_id=%s, " + "run_start_date=%s, run_end_date=%s, " + "run_duration=%s, state=%s, executor_state=%s, try_number=%s, max_tries=%s, job_id=%s, " + "pool=%s, queue=%s, priority_weight=%d, operator=%s" + ) + self.log.info( + msg, + ti.dag_id, + ti.task_id, + ti.run_id, + ti.start_date, + ti.end_date, + ti.duration, + ti.state, + state, + try_number, + ti.max_tries, + ti.job_id, + ti.pool, + ti.queue, + ti.priority_weight, + ti.operator, + ) + if ti.try_number == buffer_key.try_number and ti.state == State.QUEUED: Stats.incr('scheduler.tasks.killed_externally') msg = ( diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 1d5326581295c..e8eb6853e53cd 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -542,6 +542,31 @@ def update_state( else: self.set_state(State.RUNNING) + if self._state == State.FAILED or self._state == State.SUCCESS: + msg = ( + "DagRun Finished: dag_id=%s, execution_date=%s, run_id=%s, " + "run_start_date=%s, run_end_date=%s, run_duration=%s, " + "state=%s, external_trigger=%s, run_type=%s, " + "data_interval_start=%s, data_interval_end=%s, dag_hash=%s" + ) + self.log.info( + msg, + self.dag_id, + self.execution_date, + self.run_id, + self.start_date, + self.end_date, + (self.end_date - self.start_date).total_seconds() + if self.start_date and self.end_date + else None, + self._state, + self.external_trigger, + self.run_type, + self.data_interval_start, + self.data_interval_end, + self.dag_hash, + ) + self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks) self._emit_duration_stats_for_finished_state()