-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP-65: Add versioning to the SerializedDagModel #42547
Conversation
1fc366c
to
1515206
Compare
13b76cf
to
5ebc047
Compare
airflow/models/serialized_dag.py
Outdated
return row | ||
|
||
return session.scalar(select(cls).where(cls.dag_id == dag_id)) | ||
return session.scalar(select(cls).where(cls.dag_id == dag_id).order_by(cls.id.desc()).limit(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are doing this pattern a lot. Might be worth adding a classmethod to do it?
@@ -87,7 +101,10 @@ class SerializedDagModel(Base): | |||
dag_hash = Column(String(32), nullable=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see in #42690 that you're planning to propagate this everywhere. Since this is a straight up hash of the serdag, I'm starting to wonder if we'd be better off with a separate "dag_version" entity, so that:
DagModel -> DagVersion -> [SerializedDagModel, DagCode]
Today they are the same, but fundamentally they aren't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see in #42690 that you're planning to propagate this everywhere. Since this is a straight up hash of the serdag, I'm starting to wonder if we'd be better off with a separate "dag_version" entity, so that:
DagModel -> DagVersion -> [SerializedDagModel, DagCode]
Today they are the same, but fundamentally they aren't.
The dag_hash is already in use everywhere, and it looks like there are no more places to add it except the TIHistory. There's a dagrun.dag_hash
which the scheduler uses to create dagrun and also verify integrity of the DAG and in the #42690, we just needed to record the hash that the task instance ran with. I think that would be all we will do in the PR aside from tests.
We already have all we need to determine the dag version a task instance ran with, and it looks like a separate DagVersion table will have more queries with joins, etc., as I understand it. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another proposal: Instead of a separate versioning table, track the serialized dag version used by dagrun and taskinstance by having a foreignkey between the models and serialized dag model. Currently, this is tracked with only dag_hash but now, dag_hash can be duplicated due to the versioning. This could also allow the scheduler to run with a specific version fed through the UI. Scheduler is currently running with the latest serdag, when it changes, the scheduler change to the updated serdag. We can make it stick to one version until the end of the run.
I will update #42690 with this proposal so you can see what I mean.
@@ -33,7 +33,7 @@ def example_bash_decorator(): | |||
def run_me(sleep_seconds: int, task_instance_key_str: str) -> str: | |||
return f"echo {task_instance_key_str} && sleep {sleep_seconds}" | |||
|
|||
run_me_loop = [run_me.override(task_id=f"runme_{i}")(sleep_seconds=i) for i in range(3)] | |||
run_me_loop = [run_me.override(task_id=f"runme_{i}")(sleep_seconds=i) for i in range(5)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you indend to bump this permanently, or was that a leftover experiment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. Leftover ..
5ebc047
to
d769be2
Compare
This commit adds versioning to the serializedDagModel. Changes: Added new columns, id, and version_number to the SDM and made id the primary key. Updated the write_dag method of the SDM to add the SDs correctly. Updated the queries so the scheduler/webserver runs with the latest SDM The version_number was added to help us track the evolution of a DAG. Suppose a DAG with dag_hash AB is changed, and the dag_hash becomes CD. If the change is reverted, we will have a dag_hash of AB again. In this case, the version_number would still increment, letting us know that the DAG was changed three times. I feel it's a meaningful way to track the changes, independent of the id column, which is database internals.
d769be2
to
01a0c77
Compare
Closing in preference of #42913 |
Depends on #42517
This commit adds versioning to the serializedDagModel.
Changes:
Added new columns, id, and version_number to the SDM and made id the
primary key.
Updated the write_dag method of the SDM to add the SDs correctly.
Updated the queries so the scheduler/webserver runs with the latest SDM
The version_number was added to help us track the evolution of a DAG.
Suppose a DAG with dag_hash AB is changed, and the dag_hash becomes CD.
If the change is reverted, we will have a dag_hash of AB again. In this
case, the version_number would still increment, letting us know that the
DAG was changed three times. I feel it's a meaningful way to track the changes,
independent of the id column, which is database internals.