-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Separate and split run job method into prepare/execute/complete steps #30308
Conversation
0f9a91c
to
6f24503
Compare
6f24503
to
c2e4ed0
Compare
airflow/jobs/base_job.py
Outdated
session.commit() | ||
self.state = State.RUNNING | ||
self.start_date = timezone.utcnow() | ||
self.state = State.RUNNING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate
796b866
to
4d61689
Compare
Ok. I updated it now and I hope it will better explain why the split and how we are going to use it when we move to AIP-44. Note that it is not strictly needed now (as @uranusjr noticed) for the prepare/complete methods to be able to handle JobPydantic as input (and they don't get it) - this is in preparation for the final step of refactoring in #30325 and the follow-up steps to add @internal_api calls and make it works for AIP-44 case. Adding the JobPydantic option allowed me to see (see the comment in the current version) that I need to remove job_runner from job altogether (because currently I need to use "type: ignore [union-attr] to get rid of MyPy error. I applied keyword-only session and :meta private: for the internal methods (also prefixed them with Finally I added mermaid diagram showing the current Job lifecycle and how it will look like when we implement AIP-44. This might help with reasoning why we needed to separate prepare/complete and why they should also take JobPydantic as input. |
4d61689
to
1acb079
Compare
1acb079
to
f6ce7e5
Compare
As a follow-up after decoupling of the job logic from the BaseJob ORM object (apache#30255), the `run` method of BaseJob should also be decoupled from it (allowing BaseJobPydantic to be passed) as well as split into three steps, in order to allow db-less mode. The "prepare" and "complete" steps of the `run` method are modifying BaseJob ORM-mapped object, so they should be called over the internal-api from LocalTask, DafFileProcessor and Triggerer running in db-less mode. The "execute" method however does not need the database however and should be run locally. This is not yet full AIP-44 conversion, this is a prerequisite to do so - and AIP-44 conversion will be done as a follow-up after this one. However we added a mermaid diagram showing the job lifecycle with and without Internal API to make it easier to reason about it Closes: apache#30295
f6ce7e5
to
0e96ca8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for iterating a few times here @potiuk.
with create_session() as session: | ||
job.heartbeat(session=session) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to create a session explicitly? Can't we just not pass one (and maybe comment that that is intentional)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an excellent question! And I have some likely good answers after becoming intimately familiar with the Job code now :). This had been somewhat a surprise for me to learn how things work currently, so happy to share my findings when asked (I wanted to do it later, but since you asked, there you go :):
This is how it was done originally (and I also added a note about it in the diagrams that it leads to multiple sessions being open at the same time). The problem is (and this is even in the current code) that at the place where heartbeat is executed, there is no session available from the upper-stack. I wanted to avoid any "logic" change in the series of refactors.
See the code here (this is before any of my refactorings):
airflow/airflow/jobs/base_job.py
Line 233 in 1a85446
with create_session() as session: |
That was also quite a bit of surprise to me that this is happening and actually this is one of the cool things with this refactoring that those things become plain obvious as we decouple the database code from actual logic.
So as result of this refactor I left two TODOs in the code (the heartbeat one might have been lost with the changes/rebases I've done):
- Maybe we should NOT keep the session opened during the whole "execute_job" ? We currently keep a session opened, and possibly Connection,. Variable, Xcom retrieval can make use of it (but likely it does not as the session is not passed down during execute:
airflow/airflow/jobs/base_job.py
Line 254 in 1a85446
with create_session() as session: |
I am not 100% sure if the session can be passed down from the current thread (thread local?) in our use of it. but I guess it must be passed down explicitly. In which case it seems we keep the session opened while the task is running while we are NOT USING it. This is quite a waste of the session - TCP/IP connection opened, for postgres there is a process running on the other end (pgbouncer somewhat mitigates it), but it seems that we keep the session opened while even an hour-long task is running while we cannot use that session!
But I deliberately wanted to keep the original behaviour, to make sure that my refactors are just that - refactors. I am painfully aware that joining both refactors and functionality changes is a very bad idea. However, once we cut-off the 2.6 branch I am planning to explore that and possibly optimise the use of sessions there. The decoupling I've done will be rather helpful in making it "cleanly" I think. And maybe I found that we can vastly optimize the session/DB usage here and maybe we won't need pgbouncer any more if we complete it ? I certainly want to explore the consequences of changing the session allocation here. I think things might looka a bit differently in SchedulerJob, so any changes there should be done carefully.
- Then, heartbeat session is kinda connected. Heartbeat actually might simply not be able to get the session differently because execute() method does not have session to pass. So we are creating new session for heartbeat with
with create_session()
:airflow/airflow/jobs/base_job.py
Line 233 in 1a85446
with create_session() as session: guess
the idea for doing it without using @provide_session were that first part of the heartbeat check and "only_if_necessary" can be done without opening a session, so using "with create_session" is there to optimize the usage/creation of session to only create it when needed. In my AIP-44 variant of heartbeat (now deleted) I split heartbeat method into two, to achieve the same with @provide_session , and possibly we can do it also in the future in similar way but (YAGNI) - maybe not so I deleted it without looking back.
I will need to confirm the findings with others (@ashb? @uranusjr - maybe I missed some session sharing mechanisms here) but I tried to carefully replicate with the refactor what was originally in the code.
I hope the answer is not too long :D
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
It's cool to bounce of the ideas of yours and get good advices. I don't mind reworking my own staff to get it eventually much better :D (and I learn some ways on how to rebase and resolve conflicts much better - it's a long time I stopped fearing even complex conflict resolving and reworking even big parts of a change as long as I mastered the tooling and I have our extensive test harness to verify it at every stage). Fear of change is one of the worst reasons to not make a change. |
This is the final step of decoupling of the job runner from ORM based BaseJob. After this change, finally we rich the state that the BaseJob is just a state of the Job being run, but all the logic is kept in separate "JobRunner" entity which just keeps the reference to the job. Also it makes sure that job in each runner is defined as appropriate for each job type: * SchedulerJobRunner, BackfillJobRunner can only use BaseJob * DagProcessorJobRunner, TriggererJobRunner and especially the LocalTaskJobRunner can keep both BaseJob and it's Pydantic BaseJobPydantic representation - for AIP-44 usage. The highlights of this change: * Job does not have job_runner reference any more * Job is a mandatory parameter when creating each JobRunner * run_job method takes as parameter the job (i.e. where the state of the job is called) and executor_callable - i.e. the method to run when the job gets executed * heartbeat callback is also passed a generic callable in order to execute the post-heartbeat operation of each of the job type * there is no more need to specify job_type when you create BaseJob, the job gets its type by a simply creating a runner with the job This is the final stage of refactoring that was split into reviewable stages: apache#30255 -> apache#30302 -> apache#30308 -> this PR. Closes: apache#30325
This is the final step of decoupling of the job runner from ORM based BaseJob. After this change, finally we rich the state that the BaseJob is just a state of the Job being run, but all the logic is kept in separate "JobRunner" entity which just keeps the reference to the job. Also it makes sure that job in each runner is defined as appropriate for each job type: * SchedulerJobRunner, BackfillJobRunner can only use BaseJob * DagProcessorJobRunner, TriggererJobRunner and especially the LocalTaskJobRunner can keep both BaseJob and it's Pydantic BaseJobPydantic representation - for AIP-44 usage. The highlights of this change: * Job does not have job_runner reference any more * Job is a mandatory parameter when creating each JobRunner * run_job method takes as parameter the job (i.e. where the state of the job is called) and executor_callable - i.e. the method to run when the job gets executed * heartbeat callback is also passed a generic callable in order to execute the post-heartbeat operation of each of the job type * there is no more need to specify job_type when you create BaseJob, the job gets its type by a simply creating a runner with the job This is the final stage of refactoring that was split into reviewable stages: #30255 -> #30302 -> #30308 -> this PR. Closes: #30325
As a follow-up after decoupling of the job logic from the BaseJob
ORM object (#30255), the
run
method of BaseJob should also bedecoupled from it (allowing BaseJobPydantic to be passed) as well
as split into three steps, in order to allow db-less mode.
The "prepare" and "complete" steps of the
run
method are modifyingBaseJob ORM-mapped object, so they should be called over the
internal-api from LocalTask, DafFileProcessor and Triggerer running
in db-less mode. The "execute" method however does not need the
database however and should be run locally.
This is not yet full AIP-44 conversion, this is a prerequisite to do
so - and AIP-44 conversion will be done as a follow-up after this one.
Closes: #30295
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.