From 38747ee281c452bfe6e36e90e954f50328f10f63 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 14 Oct 2021 11:13:26 +0100 Subject: [PATCH 1/3] Row lock TI query in SchedulerJob._process_executor_events Using multiple schedulers causes Deadlock in _process_executor_events. This PR fixes it. --- airflow/jobs/scheduler_job.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 2ea0bb5bf8652..ab238d967e408 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -510,7 +510,15 @@ def _process_executor_events(self, session: Session = None) -> int: # Check state of finished tasks filter_for_tis = TI.filter_for_tis(tis_with_right_state) - tis: List[TI] = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all() + query = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')) + # row lock this entire set of tasks to make sure the scheduler doesn't fail when we have + # multi-schedulers + tis: List[TI] = with_row_locks( + query, + of=TI, + session=session, + **skip_locked(session=session), + ).all() for ti in tis: try_number = ti_primary_key_to_try_number_map[ti.key.primary] buffer_key = ti.key.with_try_number(try_number) From e0d34dd4cd07b0f3dc10d8bdf56ac8c47c336480 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 14 Oct 2021 13:01:51 +0100 Subject: [PATCH 2/3] Update airflow/jobs/scheduler_job.py Co-authored-by: Tzu-ping Chung --- airflow/jobs/scheduler_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index ab238d967e408..20bd4fb0e9b7c 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -513,12 +513,12 @@ def _process_executor_events(self, session: Session = None) -> int: query = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')) # row lock this entire set of tasks to make sure the scheduler doesn't fail when we have # multi-schedulers - tis: List[TI] = with_row_locks( + tis: Iterator[TI] = with_row_locks( query, of=TI, session=session, **skip_locked(session=session), - ).all() + ) for ti in tis: try_number = ti_primary_key_to_try_number_map[ti.key.primary] buffer_key = ti.key.with_try_number(try_number) From 320f267b17c9f6d89af6d07ac41d20b2f18dd769 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 14 Oct 2021 13:03:46 +0100 Subject: [PATCH 3/3] fixup! Update airflow/jobs/scheduler_job.py --- airflow/jobs/scheduler_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 20bd4fb0e9b7c..1e74e99b2b87b 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -27,7 +27,7 @@ import warnings from collections import defaultdict from datetime import timedelta -from typing import Collection, DefaultDict, Dict, List, Optional, Tuple +from typing import Collection, DefaultDict, Dict, Iterator, List, Optional, Tuple from sqlalchemy import and_, func, not_, or_, tuple_ from sqlalchemy.exc import OperationalError @@ -511,7 +511,7 @@ def _process_executor_events(self, session: Session = None) -> int: # Check state of finished tasks filter_for_tis = TI.filter_for_tis(tis_with_right_state) query = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')) - # row lock this entire set of tasks to make sure the scheduler doesn't fail when we have + # row lock this entire set of taskinstances to make sure the scheduler doesn't fail when we have # multi-schedulers tis: Iterator[TI] = with_row_locks( query,