Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Add a public interface for custom weight_rule implementation (#35210)" #36066

Merged
merged 2 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3738,8 +3738,6 @@ components:
readOnly: true
weight_rule:
$ref: "#/components/schemas/WeightRule"
priority_weight_strategy:
$ref: "#/components/schemas/PriorityWeightStrategy"
ui_color:
$ref: "#/components/schemas/Color"
ui_fgcolor:
Expand Down Expand Up @@ -4769,16 +4767,11 @@ components:
WeightRule:
description: Weight rule.
type: string
nullable: true
enum:
- downstream
- upstream
- absolute

PriorityWeightStrategy:
description: Priority weight strategy.
type: string

HealthStatus:
description: Health status
type: string
Expand Down
1 change: 0 additions & 1 deletion airflow/api_connexion/schemas/task_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class TaskSchema(Schema):
retry_exponential_backoff = fields.Boolean(dump_only=True)
priority_weight = fields.Number(dump_only=True)
weight_rule = WeightRuleField(dump_only=True)
priority_weight_strategy = fields.String(dump_only=True)
ui_color = ColorField(dump_only=True)
ui_fgcolor = ColorField(dump_only=True)
template_fields = fields.List(fields.String(), dump_only=True)
Expand Down
11 changes: 0 additions & 11 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,6 @@ core:
description: |
The weighting method used for the effective total priority weight of the task
version_added: 2.2.0
version_deprecated: 2.8.0
deprecation_reason: |
This option is deprecated and will be removed in Airflow 3.0.
Please use ``default_task_priority_weight_strategy`` instead.
type: string
example: ~
default: ~
default_task_priority_weight_strategy:
description: |
The strategy used for the effective total priority weight of the task
version_added: 2.8.0
type: string
example: ~
default: "downstream"
Expand Down
69 changes: 0 additions & 69 deletions airflow/example_dags/example_priority_weight_strategy.py

This file was deleted.

2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def queue_task_instance(
self.queue_command(
task_instance,
command_list_to_run,
priority=task_instance.priority_weight,
priority=task_instance.task.priority_weight_total,
queue=task_instance.task.queue,
)

Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/debug_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def queue_task_instance(
self.queue_command(
task_instance,
[str(task_instance)], # Just for better logging, it's not used anywhere
priority=task_instance.priority_weight,
priority=task_instance.task.priority_weight_total,
queue=task_instance.task.queue,
)
# Save params for TaskInstance._run_raw_task
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""Add processor_subdir to ImportError.

Revision ID: 10b52ebd31f7
Revises: 624ecf3b6a5e
Revises: bd5dfbe21f88
Create Date: 2023-11-29 16:54:48.101834

"""
Expand All @@ -30,7 +30,7 @@

# revision identifiers, used by Alembic.
revision = "10b52ebd31f7"
down_revision = "624ecf3b6a5e"
down_revision = "bd5dfbe21f88"
branch_labels = None
depends_on = None
airflow_version = "2.8.0"
Expand Down
20 changes: 3 additions & 17 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import datetime
import inspect
import warnings
from functools import cached_property
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Collection, Iterable, Iterator, Sequence

Expand Down Expand Up @@ -71,14 +70,8 @@
)
MAX_RETRY_DELAY: int = conf.getint("core", "max_task_retry_delay", fallback=24 * 60 * 60)

DEFAULT_WEIGHT_RULE: WeightRule | None = (
WeightRule(conf.get("core", "default_task_weight_rule", fallback=None))
if conf.get("core", "default_task_weight_rule", fallback=None)
else None
)

DEFAULT_PRIORITY_WEIGHT_STRATEGY: str = conf.get(
"core", "default_task_priority_weight_strategy", fallback=WeightRule.DOWNSTREAM
DEFAULT_WEIGHT_RULE: WeightRule = WeightRule(
conf.get("core", "default_task_weight_rule", fallback=WeightRule.DOWNSTREAM)
)
DEFAULT_TRIGGER_RULE: TriggerRule = TriggerRule.ALL_SUCCESS
DEFAULT_TASK_EXECUTION_TIMEOUT: datetime.timedelta | None = conf.gettimedelta(
Expand All @@ -105,8 +98,7 @@ class AbstractOperator(Templater, DAGNode):

operator_class: type[BaseOperator] | dict[str, Any]

weight_rule: str | None
priority_weight_strategy: str
weight_rule: str
priority_weight: int

# Defines the operator level extra links.
Expand Down Expand Up @@ -406,12 +398,6 @@ def priority_weight_total(self) -> int:
- WeightRule.DOWNSTREAM - adds priority weight of all downstream tasks
- WeightRule.UPSTREAM - adds priority weight of all upstream tasks
"""
warnings.warn(
"Accessing `priority_weight_total` from AbstractOperator instance is deprecated."
" Please use `priority_weight` from task instance instead.",
DeprecationWarning,
stacklevel=2,
)
if self.weight_rule == WeightRule.ABSOLUTE:
return self.priority_weight
elif self.weight_rule == WeightRule.DOWNSTREAM:
Expand Down
36 changes: 11 additions & 25 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
DEFAULT_OWNER,
DEFAULT_POOL_SLOTS,
DEFAULT_PRIORITY_WEIGHT,
DEFAULT_PRIORITY_WEIGHT_STRATEGY,
DEFAULT_QUEUE,
DEFAULT_RETRIES,
DEFAULT_RETRY_DELAY,
Expand All @@ -77,7 +76,6 @@
from airflow.models.taskinstance import TaskInstance, clear_task_instances
from airflow.models.taskmixin import DependencyMixin
from airflow.serialization.enums import DagAttributeTypes
from airflow.task.priority_strategy import get_priority_weight_strategy
from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
from airflow.ti_deps.deps.not_previously_skipped_dep import NotPreviouslySkippedDep
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
Expand All @@ -92,6 +90,7 @@
from airflow.utils.setup_teardown import SetupTeardownContext
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import NOTSET
from airflow.utils.weight_rule import WeightRule
from airflow.utils.xcom import XCOM_RETURN_KEY

if TYPE_CHECKING:
Expand Down Expand Up @@ -208,7 +207,6 @@ def partial(**kwargs):
"retry_exponential_backoff": False,
"priority_weight": DEFAULT_PRIORITY_WEIGHT,
"weight_rule": DEFAULT_WEIGHT_RULE,
"priority_weight_strategy": DEFAULT_PRIORITY_WEIGHT_STRATEGY,
"inlets": [],
"outlets": [],
}
Expand Down Expand Up @@ -242,7 +240,6 @@ def partial(
retry_exponential_backoff: bool | ArgNotSet = NOTSET,
priority_weight: int | ArgNotSet = NOTSET,
weight_rule: str | ArgNotSet = NOTSET,
priority_weight_strategy: str | ArgNotSet = NOTSET,
sla: timedelta | None | ArgNotSet = NOTSET,
max_active_tis_per_dag: int | None | ArgNotSet = NOTSET,
max_active_tis_per_dagrun: int | None | ArgNotSet = NOTSET,
Expand Down Expand Up @@ -306,7 +303,6 @@ def partial(
"retry_exponential_backoff": retry_exponential_backoff,
"priority_weight": priority_weight,
"weight_rule": weight_rule,
"priority_weight_strategy": priority_weight_strategy,
"sla": sla,
"max_active_tis_per_dag": max_active_tis_per_dag,
"max_active_tis_per_dagrun": max_active_tis_per_dagrun,
Expand Down Expand Up @@ -548,9 +544,9 @@ class derived from this one results in the creation of a task object,
This allows the executor to trigger higher priority tasks before
others when things get backed up. Set priority_weight as a higher
number for more important tasks.
:param weight_rule: Deprecated field, please use ``priority_weight_strategy`` instead.
weighting method used for the effective total priority weight of the task. Options are:
``{ downstream | upstream | absolute }`` default is ``None``
:param weight_rule: weighting method used for the effective total
priority weight of the task. Options are:
``{ downstream | upstream | absolute }`` default is ``downstream``
When set to ``downstream`` the effective weight of the task is the
aggregate sum of all downstream descendants. As a result, upstream
tasks will have higher weight and will be scheduled more aggressively
Expand All @@ -570,11 +566,6 @@ class derived from this one results in the creation of a task object,
significantly speeding up the task creation process as for very large
DAGs. Options can be set as string or using the constants defined in
the static class ``airflow.utils.WeightRule``
:param priority_weight_strategy: weighting method used for the effective total priority weight
of the task. You can provide one of the following options:
``{ downstream | upstream | absolute }`` or the path to a custom
strategy class that extends ``airflow.task.priority_strategy.PriorityWeightStrategy``.
Default is ``downstream``.
:param queue: which queue to target when running this job. Not
all executors implement queue management, the CeleryExecutor
does support targeting specific queues.
Expand Down Expand Up @@ -763,8 +754,7 @@ def __init__(
params: collections.abc.MutableMapping | None = None,
default_args: dict | None = None,
priority_weight: int = DEFAULT_PRIORITY_WEIGHT,
weight_rule: str | None = DEFAULT_WEIGHT_RULE,
priority_weight_strategy: str = DEFAULT_PRIORITY_WEIGHT_STRATEGY,
weight_rule: str = DEFAULT_WEIGHT_RULE,
queue: str = DEFAULT_QUEUE,
pool: str | None = None,
pool_slots: int = DEFAULT_POOL_SLOTS,
Expand Down Expand Up @@ -911,17 +901,13 @@ def __init__(
f"received '{type(priority_weight)}'."
)
self.priority_weight = priority_weight
self.weight_rule = weight_rule
self.priority_weight_strategy = priority_weight_strategy
if weight_rule:
warnings.warn(
"weight_rule is deprecated. Please use `priority_weight_strategy` instead.",
DeprecationWarning,
stacklevel=2,
if not WeightRule.is_valid(weight_rule):
raise AirflowException(
f"The weight_rule must be one of "
f"{WeightRule.all_weight_rules},'{dag.dag_id if dag else ''}.{task_id}'; "
f"received '{weight_rule}'."
)
self.priority_weight_strategy = weight_rule
# validate the priority weight strategy
get_priority_weight_strategy(self.priority_weight_strategy)
self.weight_rule = weight_rule
self.resources = coerce_resources(resources)
if task_concurrency and not max_active_tis_per_dag:
# TODO: Remove in Airflow 3.0
Expand Down
Loading