-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Convert the LocalExecutor to run tasks using new Task SDK supervisor code #44427
Conversation
c9a0845
to
17f1a5e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments/questions but directionally lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not fully understand the code but maybe I need a night of sleep. Just some comments not blocking.
I was scratching my head regarding the general supervisor approach - Do we have platform limitations by this? I assume no, all major/general operating systems have the socket mechanisms, also Windows, correct? Do we see limitations for running outside Linux?
Yes, windows supports the socketpair etc (it's why I chose that API). This supervisor approach is what we do today with LocalTaskJob and the StandardRunner etc, this is just a simplified re-implementation of it. To fully support windows we will need to have something other than |
17f1a5e
to
c660a89
Compare
…code This also lays the groundwork for a more general purpose "workload" execution system, make a single interface for executors to run tasks and callbacks. Also in this PR we set up the supervise function to send Task logs to a file, and handle the task log template rendering in the scheduler before queueing the workload. Additionally we don't pass the activity directly to `supervise()` but instead the properties/fields of it to reduce the coupling between SDK and Executor. (More separation will appear in PRs over the next few weeks.) The big change of note here is that rather than sending an airflow command line to execute (`["airflow", "tasks", "run", ...]`) and going back in via the CLI parser we go directly to a special purpose function. Much simpler. It doesn't remove any of the old behaviour (CeleryExecutor still uses LocalTaskJob via the CLI parser etc.), nor does anything currently send callback requests via this new workload mechanism. The `airflow.executors.workloads` module currently needs to be shared between the Scheduler (or more specifically the Executor) and the "worker" side of things. In the future these will be separate python dists and this module will need to live somewhere else. Right now we check the if `executor.queue_workload` is different from the BaseExecutor version (which just raises an error right now) to see which executors support this new version. That check will be removed as soon as all the in-tree executors have been migrated.
c660a89
to
5a7aca4
Compare
…code (apache#44427) This also lays the groundwork for a more general purpose "workload" execution system, make a single interface for executors to run tasks and callbacks. Also in this PR we set up the supervise function to send Task logs to a file, and handle the task log template rendering in the scheduler before queueing the workload. Additionally we don't pass the activity directly to `supervise()` but instead the properties/fields of it to reduce the coupling between SDK and Executor. (More separation will appear in PRs over the next few weeks.) The big change of note here is that rather than sending an airflow command line to execute (`["airflow", "tasks", "run", ...]`) and going back in via the CLI parser we go directly to a special purpose function. Much simpler. It doesn't remove any of the old behaviour (CeleryExecutor still uses LocalTaskJob via the CLI parser etc.), nor does anything currently send callback requests via this new workload mechanism. The `airflow.executors.workloads` module currently needs to be shared between the Scheduler (or more specifically the Executor) and the "worker" side of things. In the future these will be separate python dists and this module will need to live somewhere else. Right now we check the if `executor.queue_workload` is different from the BaseExecutor version (which just raises an error right now) to see which executors support this new version. That check will be removed as soon as all the in-tree executors have been migrated.
…code (apache#44427) This also lays the groundwork for a more general purpose "workload" execution system, make a single interface for executors to run tasks and callbacks. Also in this PR we set up the supervise function to send Task logs to a file, and handle the task log template rendering in the scheduler before queueing the workload. Additionally we don't pass the activity directly to `supervise()` but instead the properties/fields of it to reduce the coupling between SDK and Executor. (More separation will appear in PRs over the next few weeks.) The big change of note here is that rather than sending an airflow command line to execute (`["airflow", "tasks", "run", ...]`) and going back in via the CLI parser we go directly to a special purpose function. Much simpler. It doesn't remove any of the old behaviour (CeleryExecutor still uses LocalTaskJob via the CLI parser etc.), nor does anything currently send callback requests via this new workload mechanism. The `airflow.executors.workloads` module currently needs to be shared between the Scheduler (or more specifically the Executor) and the "worker" side of things. In the future these will be separate python dists and this module will need to live somewhere else. Right now we check the if `executor.queue_workload` is different from the BaseExecutor version (which just raises an error right now) to see which executors support this new version. That check will be removed as soon as all the in-tree executors have been migrated.
…code (apache#44427) This also lays the groundwork for a more general purpose "workload" execution system, make a single interface for executors to run tasks and callbacks. Also in this PR we set up the supervise function to send Task logs to a file, and handle the task log template rendering in the scheduler before queueing the workload. Additionally we don't pass the activity directly to `supervise()` but instead the properties/fields of it to reduce the coupling between SDK and Executor. (More separation will appear in PRs over the next few weeks.) The big change of note here is that rather than sending an airflow command line to execute (`["airflow", "tasks", "run", ...]`) and going back in via the CLI parser we go directly to a special purpose function. Much simpler. It doesn't remove any of the old behaviour (CeleryExecutor still uses LocalTaskJob via the CLI parser etc.), nor does anything currently send callback requests via this new workload mechanism. The `airflow.executors.workloads` module currently needs to be shared between the Scheduler (or more specifically the Executor) and the "worker" side of things. In the future these will be separate python dists and this module will need to live somewhere else. Right now we check the if `executor.queue_workload` is different from the BaseExecutor version (which just raises an error right now) to see which executors support this new version. That check will be removed as soon as all the in-tree executors have been migrated.
This also lays the groundwork for a more general purpose "workload" execution
system, make a single interface for executors to run tasks and callbacks.
Also in this PR we set up the supervise function to send Task logs to a file,
and handle the task log template rendering in the scheduler before queueing
the workload.
Additionally we don't pass the activity directly to
supervise()
but insteadthe properties/fields of it to reduce the coupling between SDK and Executor.
(More separation will appear in PRs over the next few weeks.)
The big change of note here is that rather than sending an airflow command
line to execute (
["airflow", "tasks", "run", ...]
) and going back in via theCLI parser we go directly to a special purpose function. Much simpler.
It doesn't remove any of the old behaviour (CeleryExecutor still uses
LocalTaskJob via the CLI parser etc.), nor does anything currently send
callback requests via this new workload mechanism.
The
airflow.executors.workloads
module currently needs to be shared betweenthe Scheduler (or more specifically the Executor) and the "worker" side of
things. In the future these will be separate python dists and this module will
need to live somewhere else.
Right now we check the if
executor.queue_workload
is different from theBaseExecutor version (which just raises an error right now) to see which
executors support this new version. That check will be removed as soon as all
the in-tree executors have been migrated.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.