-
Notifications
You must be signed in to change notification settings - Fork 323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Runless events - refactor job_versions_io_mapping #2654
Conversation
✅ Deploy Preview for peppy-sprite-186812 canceled.
|
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #2654 +/- ##
============================================
+ Coverage 84.08% 84.15% +0.06%
- Complexity 1379 1390 +11
============================================
Files 248 249 +1
Lines 6295 6322 +27
Branches 286 286
============================================
+ Hits 5293 5320 +27
- Misses 849 850 +1
+ Partials 153 152 -1 ☔ View full report in Codecov by Sentry. |
@NonNull Instant lineageEventTime, | ||
@NonNull String lineageEventType, | ||
String lineageEventType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use DATASET
or JOB
as the lineageEventType
? I think it would be helpful to know the event type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, this column holds run states and its name is kind of misleading. Its name fits static lineage scenario well. However, I don't think we should store in a single column run-state and event-type. Renaming this column would require significant amount of work witch changes all over the project, including the spec.
Assuming this column contains run state update, then keeping it to null
for DatasetEvent
makes sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it's misleading, but Marquez doesn't necessarily have to adhere exactly to OpenLineage concepts and can redefine them. We can set lineage_event_type
as null
, but would prefer we (eventually) remap eventType
to runState
(just not within this PR).
api/src/main/resources/marquez/db/migration/V66__job_versions_io_mapping_add_job_reference.sql
Outdated
Show resolved
Hide resolved
api/src/main/resources/marquez/db/migration/V66__job_versions_io_mapping_add_job_reference.sql
Outdated
Show resolved
Hide resolved
1607104
to
9f1dedd
Compare
2213b35
to
40bfe6b
Compare
46e26da
to
7592059
Compare
@wslulciuc I've added another commit to the PR keeping in mind upcoming streaming job support and our offline discussion. Based on that, I think we should rename |
That's sounds fine to me. I'm curious to hear what Willy thinks. |
ca5f6cd
to
79acd2d
Compare
2110951
to
4aa0900
Compare
b6c0213
to
554f90c
Compare
b7e40f8
to
0a3f98a
Compare
d7166cc
to
17810f2
Compare
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='INPUT') AS inputs, | ||
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='OUTPUT') AS outputs | ||
FROM job_io_mapping io | ||
WHERE io.is_job_version_current = TRUE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of using a current version check, I'd be interesting to see the query plan and how the query may be optimized with the removal of join on that jobs
table. Do we have any numbers on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getLineage
method is commonly used as it is entry point for a user to Marquez. The method is recursive and the purpose of this refactor is to make each recursion step to be computed within a single table with no joins required.
After this change, a whole lineage graph can be computed based on job_versions_io_mapping table. jobs_view
is used only to enrich the returned graph nodes. Before this change, a join to jobs_view
was needed in each recursion step to make sure if a row in job_versions_io_mapping
represents current job version.
@@ -0,0 +1,13 @@ | |||
ALTER TABLE job_versions_io_mapping ADD COLUMN job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE; | |||
ALTER TABLE job_versions_io_mapping ADD COLUMN symlink_target_job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: symlink_target_job_uuid
-> job_symlink_target_uuid
since it's defined in the jobs
table as jobs.symlink_target_uuid
...src/main/resources/marquez/db/migration/V67.1__job_versions_io_mapping_add_job_reference.sql
Outdated
Show resolved
Hide resolved
...src/main/resources/marquez/db/migration/V67.1__job_versions_io_mapping_add_job_reference.sql
Outdated
Show resolved
Hide resolved
INSERT INTO job_io_mapping ( | ||
job_version_uuid, dataset_uuid, io_type, job_uuid, symlink_target_job_uuid, is_job_version_current) | ||
VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, :symlinkTargetJobUuid, TRUE) | ||
ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_job_version_current = TRUE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We set is_job_version_current = TRUE
as a noop? i.e. just to fulfill the ON CONFLICT
? Should we just use DO NOTHING
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right we can go with do nothing.
markVersionIOMappingObsolete
marks obsolete rows with job version different that a given one, so we don't need to implement conflict scenario here.
17810f2
to
c00b988
Compare
@@ -0,0 +1,11 @@ | |||
ALTER TABLE job_versions_io_mapping ADD COLUMN job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE; | |||
ALTER TABLE job_versions_io_mapping ADD COLUMN job_symlink_target_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE; | |||
ALTER TABLE job_versions_io_mapping ADD COLUMN is_current_job_version boolean DEFAULT FALSE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a made_current_at
column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking forward to the lineage query perf improvements and follow up analysis!
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
c00b988
to
a8cdbe0
Compare
Problem
This is currently a draft PR which is far from being merged. It is missing few tests related to schema changes which are marked with
todo
within the code. I've created a PR to have a better discussion on addingjob_id
tojob_versions_io_mapping
. This PR should be a follow-up of #2641.The assumption was that it should be helpful in optimising get-lineage query. I would like first to clarify how are we going to make benefit of this extra column.
Solution
Please describe your change as it relates to the problem, or bug fix, as well as any dependencies. If your change requires a database schema migration, please describe the schema modification(s) and whether it's a backwards-incompatible or backwards-compatible change.
One-line summary:
Checklist
CHANGELOG.md
(Depending on the change, this may not be necessary)..sql
database schema migration according to Flyway's naming convention (if relevant)