Skip to content

Commit

Permalink
Made improvements to lineage query performance
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike committed Apr 5, 2023
1 parent da3863c commit b5d3b65
Showing 1 changed file with 39 additions and 31 deletions.
70 changes: 39 additions & 31 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,37 +38,45 @@ public interface LineageDao {
*/
@SqlQuery(
"""
WITH RECURSIVE
job_io AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
ARRAY_AGG(DISTINCT j.uuid) AS ids,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
FROM jobs j
LEFT JOIN jobs_view s On s.uuid=j.symlink_target_uuid
LEFT JOIN job_versions v on v.uuid=COALESCE(s.current_version_uuid, j.current_version_uuid)
LEFT JOIN job_versions_io_mapping io ON io.job_version_uuid=v.uuid
GROUP BY COALESCE(j.symlink_target_uuid, j.uuid)
),
lineage(job_uuid, inputs, outputs) AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
COALESCE(inputs, Array[]::uuid[]) AS inputs,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM jobs_view j
INNER JOIN job_io io ON j.uuid=ANY(io.ids)
WHERE io.ids && ARRAY[<jobIds>]::uuid[]
UNION
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io,
lineage l
WHERE io.job_uuid != l.job_uuid AND
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
AND depth < :depth)
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context
FROM lineage l2
INNER JOIN jobs_view j ON j.uuid=l2.job_uuid
LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid;
WITH RECURSIVE
-- Find the current version of a job or its symlink target if the target has no
-- current_version_uuid. This ensures that we don't lose lineage for a job after it is
-- symlinked to another job but before that target job has run successfully.
job_current_version AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid
FROM jobs j
LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid
WHERE s.current_version_uuid IS NULL
),
job_io AS (
SELECT j.job_uuid,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
FROM job_versions_io_mapping io
INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid
GROUP BY j.job_uuid
),
lineage(job_uuid, inputs, outputs) AS (
SELECT v.job_uuid AS job_uuid,
COALESCE(inputs, Array[]::uuid[]) AS inputs,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM jobs j
INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid
LEFT JOIN job_io io ON io.job_uuid=v.job_uuid
WHERE j.uuid IN (<jobIds>) OR j.symlink_target_uuid IN (<jobIds>)
UNION
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io,
lineage l
WHERE io.job_uuid != l.job_uuid AND
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
AND depth < :depth)
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context
FROM lineage l2
INNER JOIN jobs_view j ON j.uuid=l2.job_uuid
LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid;
""")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

Expand Down

0 comments on commit b5d3b65

Please sign in to comment.