Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a public interface for custom weight_rule implementation #35210

Merged
merged 16 commits into from
Nov 28, 2023

Conversation

hussein-awala
Copy link
Member

@hussein-awala hussein-awala commented Oct 26, 2023

This PR is a proposition to add a new public interface for weight rules (weight strategy) to allow the users to create custom classes to calculate the task priority weight.

Slack thread

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the modelling and the code - I am just not sure whether the pickling of DAGs after parsing is sufficient such that a custom weight rule gets to Scheduler. Have you tested with separated DAG parser and Scheduler w/o DAG code available?

If this works then I beleieve with some documentation we can make a full review. LIKE it!

@hussein-awala
Copy link
Member Author

I am just not sure whether the pickling of DAGs after parsing is sufficient such that a custom weight rule gets to Scheduler. Have you tested with separated DAG parser and Scheduler w/o DAG code available?

In the last commit, I removed the attribute _weight_strategy, so there will not be any pickling problems.

If this works then I beleieve with some documentation we can make a full review.

I'm still trying to improve it by providing a task instance instead of a task to the new class in order to support dynamic priority weight.

@hussein-awala
Copy link
Member Author

I had to make some changes to provide a task instance instead of a task to get_weight method.

I tested it in Breeze (CeleryExecutor + Postgres) with this dag:

from datetime import datetime

from airflow.decorators import dag, task



@dag(
    dag_id="test_weight_rule",
    schedule_interval=None,
    start_date=datetime(2023, 1, 1),
    tags=["test"]
)
def test_weight_rule():
    @task(weight_rule="absolute", priority_weight=2)
    def task1():
        print("task1")

    @task(weight_rule="downstream")
    def task2():
        print("task2")

    @task(weight_rule="airflow.custom_priority_strategy.CustomPriorityStrategy")
    def task3():
        print("task3")

    @task
    def task4():
        print("task4")

    [task1(), task2(), task3()] >> task4()


test_weight_rule()

And for the class CustomPriorityStrategy , I used:

from airflow.models.taskinstance import TaskInstance
from airflow.task.priority_strategy import PriorityWeightStrategy


class CustomPriorityStrategy(PriorityWeightStrategy):
    def get_weight(self, ti: TaskInstance):
        return max(3 - ti._try_number + 1, 1)

I created a Dag run, and then I cleared the task a few times:

  • first try (when I triggered the dag run) -> priority_weight=3
  • on the first clear (try 1) -> priority_weight=2
  • after the second clear (try 2+) -> priority_weight=1

I will test in more complex examples, multiple concurrent dags, and in different situations (failure then retry, back from the deferred state, backfill, etc.); if all looks good, I'll add the documentation with a few examples and mark it as ready for review.

cc: @eladkal

@eladkal
Copy link
Contributor

eladkal commented Nov 12, 2023

This looks really good!
I will find time this week to test it as well

@hussein-awala hussein-awala added this to the Airflow 2.8.0 milestone Nov 26, 2023
Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good work @hussein-awala

@hussein-awala
Copy link
Member Author

I merge to test it and get some feebacks before 2.8.0 RC.

@hussein-awala hussein-awala merged commit 3385113 into apache:main Nov 28, 2023
47 checks passed
@potiuk
Copy link
Member

potiuk commented Nov 28, 2023

Late to the game - but I think (maybe I am wrong) we have potential security issue here (but it's easy to fix). For quite some time wa have been very careful to not allow to execute the code that can be contributed by DAG authors, that can be executed in the context of scheduler.

I believe this change breaks this assumption. This priority_strategy_class = import_string(strategy_name) will be executed in schecduler, and the strategy can be defined in DAG - so essentially DAG author can supply a code that will be executed in scheduler during scheduling. The priority_weight_total property is used during scheduling ...

Luckily it's easy to mitigate - similarly as timetables for example - the custom weight rules need to be registered via plugin mechanism. Plugins are special - because they can be only registered via "plugins" folder (where DAG authors have no access to write to) or via package entrypoints (and package needs to be installed in scheduler's virtualenv.

So I think this needs to be done as follow-up

@potiuk
Copy link
Member

potiuk commented Nov 28, 2023

cc: @hussein-awala

@jscheffl
Copy link
Contributor

jscheffl commented Nov 28, 2023

I believe this change breaks this assumption. This priority_strategy_class = import_string(strategy_name) will be executed in schecduler, and the strategy can be defined in DAG - so essentially DAG author can supply a code that will be executed in scheduler during scheduling. The priority_weight_total property is used during scheduling ...

So I think this needs to be done as follow-up

Are you sure it can inject "any" code by DAGs?
In the example it might be a bit mis-leading, this is loaded by the DAG parser but the referenced class is from airflow.example_dags.example_priority_weight_strategy.DecreasingPriorityStrategy which is the same file but in the python path of installed airflow.
Any other user-provided DAG will fail in lookup in scheduler as the Scheduler after parsing just de-serializes the DAG but as the DAG file itself is not in the PYTHONPATH will not be able to dynamically load a reference. So like "with the plugin approach" the Python file needs to be deployed in a proper manner into the scheduler instance.

I rather fear that if a user follows the example and deploys a similar copy into the /dags folder it will just not work. Did somebody test this? (I had no time (yet). If yes anyway documentation needs to be updated any maybe it is just a docs rework and we are "safe by accident" :-D

UPDATE: Okay I was wrong. Just tested with an example DAG and if you use the module name <dagfile>.<class> then it is possible to inject code from the DAG directly.

@hussein-awala
Copy link
Member Author

import_string

I will take a look tomorrow at the history of this security issue and if it's the case for the new feature.

Are you sure it can inject "any" code by DAGs?

If the dags folder is added to PYTHONPATH, then yes, but I need to check if there is a protection for dag file processor, which processes these files and sometimes it's running in the scheduler service.

@potiuk
Copy link
Member

potiuk commented Nov 28, 2023

UPDATE: Okay I was wrong. Just tested with an example DAG and if you use the module name . then it is possible to inject code from the DAG directly.

yep. it is

If the dags folder is added to PYTHONPATH, then yes, but I need to check if there is a protection for dag file processor, which processes these files and sometimes it's running in the scheduler service.

Nope. There is no protection. Whatever is in PYTHONPATH can be used ... and .. airflow will AUTOMATICALLY add dags folder to PYTHONPATH too.

Some more context on that one.

Historically, when dag file processor was not standalone this was even more important.

This is also due to historical reasons. DAGFileProcessor in likely 9X* of airflow installations is not "standalone". It is a newly forked process, so it is not really the "same" context as scheduler - those are different proceses. But they share everything else (memory, filesystem and they use the same PYTHONPATH - there is only one process to set the original PYTHONPATH to (airflow scheduler). So that basically means that both airflow scheduler process as well as dag file processor subprocess have to have access to the same PYTHONPATH and DAG folder will be on the PYTHONPATH by definition. This is the reason why we cannot let scheduler doimport("arbitrary import provided by DAG author")

It's only after standalone dag procesor wher we can (and plan to announce it more prominently when ready) to actually isolate scheduler from DAG folder and user code. once we have it, it's theorethically possible to run airflow scheduler without scheduler even SEEING DAG folder. Simply speaking when standalone dag file processor is configured, airflow scheduler is essentailly DAG-less. This is far more secure setup (but almost no-one uses it yet), and it will a base for multi-tenancy separation. And then, the plugin option is a bit less important (but still quite important because DAG author could make airflow import ANY Python package. Which we do not want to do because it could later allow crossing boundaries between tenants.

@potiuk
Copy link
Member

potiuk commented Nov 28, 2023

BTW. I realized that this is something we should describe in our Security Model https://airflow.apache.org/docs/apache-airflow/stable/security/security_model.html. I will draft PR soon.

@potiuk
Copy link
Member

potiuk commented Dec 2, 2023

Hey @hussein-awala @jscheffl -> I've proposed #36022 (subject to feedback and discussion - describing the way I understand the current model of DAG Author capabilities - and explaing (I hope) why this change violates it.

Since we are close to 2.8.0 - do you think @hussein-awala (of course if we agree that it is, indeed, security vulnerability to leave it in) you will be able to get the "plugin" mechanism in, or should we revert it and implement it again "properly" in 2.9.0 ?

ephraimbuddy pushed a commit that referenced this pull request Dec 5, 2023
* Add a public interface for custom weight_rule implementation

* Remove _weight_strategy attribute

* Move priority weight calculation to TI to support advanced strategies

* Fix loading the var from mapped operators and simplify loading it from task

* Update default value and deprecated the other one

* Update task endpoint API spec

* fix tests

* Update docs and add dag example

* Fix serialization test

* revert change in spark provider

* Update unit tests

(cherry picked from commit 3385113)
hussein-awala added a commit to hussein-awala/airflow that referenced this pull request Dec 5, 2023
@ephraimbuddy ephraimbuddy added changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) and removed type:new-feature Changelog: New Features labels Dec 6, 2023
ephraimbuddy pushed a commit that referenced this pull request Dec 6, 2023
hussein-awala added a commit to hussein-awala/airflow that referenced this pull request Dec 6, 2023
hussein-awala added a commit to hussein-awala/airflow that referenced this pull request Dec 6, 2023
…35210)

* Add a public interface for custom weight_rule implementation

* Remove _weight_strategy attribute

* Move priority weight calculation to TI to support advanced strategies

* Fix loading the var from mapped operators and simplify loading it from task

* Update default value and deprecated the other one

* Update task endpoint API spec

* fix tests

* Update docs and add dag example

* Fix serialization test

* revert change in spark provider

* Update unit tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants