From 2e2ce1032b55ed666b7913dab6e286ef13f3f7f8 Mon Sep 17 00:00:00 2001 From: Vishesh Jain Date: Mon, 27 Apr 2020 18:30:45 +0530 Subject: [PATCH 1/4] Patch pool in BaseOperator --- airflow/models/baseoperator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 2e431ca21dcf9..95f8821f7bd21 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -316,7 +316,7 @@ def __init__( priority_weight: int = 1, weight_rule: str = WeightRule.DOWNSTREAM, queue: str = conf.get('celery', 'default_queue'), - pool: str = Pool.DEFAULT_POOL_NAME, + pool: Optional[str] = None, pool_slots: int = 1, sla: Optional[timedelta] = None, execution_timeout: Optional[timedelta] = None, @@ -385,7 +385,7 @@ def __init__( self.retries = retries self.queue = queue - self.pool = pool + self.pool = Pool.DEFAULT_POOL_NAME if Pool is None else pool self.pool_slots = pool_slots if self.pool_slots < 1: raise AirflowException("pool slots for %s in dag %s cannot be less than 1" From 1f3db8ad2a4b6fc14bd657fb918b54a21c803f04 Mon Sep 17 00:00:00 2001 From: Vishesh Jain Date: Mon, 27 Apr 2020 19:17:48 +0530 Subject: [PATCH 2/4] updating UPDATING.md for the fix --- UPDATING.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/UPDATING.md b/UPDATING.md index d231f7f1d36bd..c6749e003627e 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -62,6 +62,17 @@ https://developers.google.com/style/inclusive-documentation --> +### Ability to patch Pool.DEFAULT_POOL_NAME in BaseOperator +It was not possible to patch pool in BaseOperator as the signature sets the default value of pool +as Pool.DEFAULT_POOL_NAME. +While using subdagoperator in unittest(without initializing the sqlite db), it was throwing the +following error: +``` +sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: slot_pool. +``` +Fix for this, https://github.com/apache/airflow/pull/8587 + + ### Added mypy plugin to preserve types of decorated functions Mypy currently doesn't support precise type information for decorated From 4c2936464ca67210c667085880f3dbdc963fb717 Mon Sep 17 00:00:00 2001 From: Vishesh Jain Date: Mon, 27 Apr 2020 19:30:18 +0530 Subject: [PATCH 3/4] typo fix --- airflow/models/baseoperator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 95f8821f7bd21..a9576a1681127 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -385,7 +385,7 @@ def __init__( self.retries = retries self.queue = queue - self.pool = Pool.DEFAULT_POOL_NAME if Pool is None else pool + self.pool = Pool.DEFAULT_POOL_NAME if pool is None else pool self.pool_slots = pool_slots if self.pool_slots < 1: raise AirflowException("pool slots for %s in dag %s cannot be less than 1" From 6e38f8a6ffeceaf6573f7b5bfd69c9232779702c Mon Sep 17 00:00:00 2001 From: vshshjn7 Date: Thu, 7 May 2020 20:27:30 +0530 Subject: [PATCH 4/4] test fix --- tests/serialization/test_dag_serialization.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index cddb6bf9edf25..05023a52aa5b6 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -70,6 +70,7 @@ "bash_command": "echo {{ task.task_id }}", "_task_type": "BashOperator", "_task_module": "airflow.operators.bash", + "pool": "default_pool", }, { "task_id": "custom_task", @@ -84,6 +85,7 @@ "template_fields": ['bash_command'], "_task_type": "CustomOperator", "_task_module": "tests.test_utils.mock_operators", + "pool": "default_pool", }, ], "timezone": "UTC",