Skip to content

Commit

Permalink
feat: support include_deferred in pool sync for airflow 2.7.0 (#775)
Browse files Browse the repository at this point in the history
* fix: add 'include_deferred' column to sync_pools script

Signed-off-by: Einav Daniel <einavd@datricks.com>

* fix: include_deferred implementation

Signed-off-by: Mathew Wicks <thesuperzapper@users.noreply.github.com>

---------

Signed-off-by: Einav Daniel <einavd@datricks.com>
Signed-off-by: Mathew Wicks <thesuperzapper@users.noreply.github.com>
Co-authored-by: Mathew Wicks <thesuperzapper@users.noreply.github.com>
  • Loading branch information
EinavDanielDX and thesuperzapper authored Aug 29, 2023
1 parent 31dc335 commit 696e613
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 0 deletions.
3 changes: 3 additions & 0 deletions charts/airflow/docs/faq/dags/airflow-pools.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ airflow:
- name: "pool_1"
description: "example pool with 5 slots"
slots: 5

- name: "pool_2"
description: "example pool with 10 slots"
slots: 10
## if deferred tasks count towards the slot limit, requires airflow 2.7.0+ (default: false)
#include_deferred: false

## if we create a Deployment to perpetually sync `airflow.pools`
poolsUpdate: true
Expand Down
17 changes: 17 additions & 0 deletions charts/airflow/templates/sync/_helpers/sync_pools.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,23 @@ class PoolWrapper(object):
name: str,
description: str,
slots: int,
include_deferred: bool,
policies: List[ScheduledPolicy],
enable_policies: bool,
):
self.name = name
self.description = description
self.slots = slots
self.include_deferred = include_deferred
self.policies = policies
self.enable_policies = enable_policies

def as_pool(self) -> Pool:
pool = Pool()
pool.pool = self.name
# NOTE: include_deferred is only available in Airflow 2.7.0+
if hasattr(Pool, "include_deferred"):
pool.include_deferred = self.include_deferred
if self._has_policies():
most_recent_policy = self._most_recent_policy()
pool.slots = most_recent_policy.slots
Expand Down Expand Up @@ -92,6 +97,15 @@ VAR__POOL_WRAPPERS = {
{{ required "the `slots` in each `airflow.pools[]` must be int-type!" nil }}
{{- end }}
slots={{ (required "the `slots` in each `airflow.pools[]` must be non-empty!" .slots) }},
{{- $include_deferred := dig "include_deferred" nil . }}
{{- if not (or (typeIs "bool" $include_deferred) (eq $include_deferred nil)) }}
{{ required "if specified, the `include_deferred` in each `airflow.pools[]` must be bool-type!" nil }}
{{- end }}
{{- if $include_deferred }}
include_deferred=True,
{{- else }}
include_deferred=False,
{{- end }}
policies=[
{{- range .policies }}
ScheduledPolicy(
Expand Down Expand Up @@ -126,6 +140,7 @@ def compare_pools(p1: Pool, p2: Pool) -> bool:
p1.pool == p1.pool
and p1.description == p2.description
and p1.slots == p2.slots
and getattr(p1, "include_deferred", False) == getattr(p2, "include_deferred", False)
)


Expand All @@ -152,6 +167,8 @@ def sync_pool(pool_wrapper: PoolWrapper) -> None:
logging.info(f"Pool=`{p_name}` exists but has changed, updating...")
p_old.description = p_new.description
p_old.slots = p_new.slots
if hasattr(Pool, "include_deferred"):
p_old.include_deferred = p_new.include_deferred
pool_updated = True

if pool_added:
Expand Down
2 changes: 2 additions & 0 deletions charts/airflow/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ airflow:
## - name: "pool_2"
## description: "example pool with 2 cron policies"
## slots: 0
## ## if deferred tasks count towards the slot limit, requires airflow 2.7.0+ (default: false)
## include_deferred: false
## ## at each sync interval, the policy with the most recently past `recurrence` is applied
## policies:
## - name: "scale up at 7pm UTC"
Expand Down

0 comments on commit 696e613

Please sign in to comment.