From 1c61d027d71403b49c8d50bf0942874bf7408c83 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Wed, 10 Aug 2022 13:42:01 -0700 Subject: [PATCH] Update lineage query to only look at jobs with inputs or outputs Signed-off-by: Michael Collado --- api/src/main/java/marquez/db/LineageDao.java | 62 ++++++++++--------- .../test/java/marquez/db/LineageDaoTest.java | 32 ++++++++++ 2 files changed, 65 insertions(+), 29 deletions(-) diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index 625c861e5a..9a12cf2967 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -37,35 +37,39 @@ public interface LineageDao { * @return */ @SqlQuery( - // dataset_ids: all the input and output datasets of the current version of the specified jobs - "WITH RECURSIVE\n" - + " job_io AS (\n" - + " SELECT j.uuid AS job_uuid,\n" - + " ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,\n" - + " ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs\n" - + " FROM jobs_view j\n" - + " LEFT JOIN jobs_view s ON s.symlink_target_uuid=j.uuid\n" - + " LEFT JOIN job_versions v on COALESCE(j.current_version_uuid, s.current_version_uuid) = v.uuid\n" - + " LEFT JOIN job_versions_io_mapping io on v.uuid = io.job_version_uuid\n" - + " GROUP BY j.uuid\n" - + " ),\n" - + " lineage(job_uuid, inputs, outputs) AS (\n" - + " SELECT job_uuid, inputs, outputs, 0 AS depth\n" - + " FROM job_io\n" - + " WHERE job_uuid IN ()\n" - + " UNION\n" - + " SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1\n" - + " FROM job_io io,\n" - + " lineage l\n" - + " WHERE io.job_uuid != l.job_uuid AND\n" - + " array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)\n" - + " AND depth < :depth" - + " )\n" - + "SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n" - + "FROM lineage l2\n" - + "INNER JOIN jobs_view s ON s.uuid=l2.job_uuid\n" - + "INNER JOIN jobs_view j ON j.uuid=COALESCE(s.symlink_target_uuid, s.uuid)\n" - + "LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid") + """ + WITH RECURSIVE + job_io AS ( + SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs + FROM job_versions_io_mapping io + INNER JOIN job_versions v ON io.job_version_uuid=v.uuid + INNER JOIN jobs_view j on j.current_version_uuid = v.uuid + LEFT JOIN jobs_view s ON s.uuid=j.symlink_target_uuid + WHERE s.current_version_uuid IS NULL + GROUP BY COALESCE(j.symlink_target_uuid, j.uuid) + ), + lineage(job_uuid, inputs, outputs) AS ( + SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, + COALESCE(inputs, Array[]::uuid[]) AS inputs, + COALESCE(outputs, Array[]::uuid[]) AS outputs, + 0 AS depth + FROM jobs_view j + LEFT JOIN job_io io ON io.job_uuid=j.uuid OR j.symlink_target_uuid=io.job_uuid + WHERE j.uuid IN () + UNION + SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1 + FROM job_io io, + lineage l + WHERE io.job_uuid != l.job_uuid AND + array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs) + AND depth < :depth) + SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context + FROM lineage l2 + INNER JOIN jobs_view j ON j.uuid=l2.job_uuid + LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid; + """) Set getLineage(@BindList Set jobIds, int depth); @SqlQuery( diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 2db046b7de..29eed88887 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -276,6 +276,38 @@ public void testGetLineageForSymlinkedJob() throws SQLException { .map(JobData::getUuid) .collect(Collectors.toSet()); assertThat(lineageForOriginalJob).isEqualTo(jobIds); + + UpdateLineageRow updatedTargetJob = + LineageTestUtils.createLineageRow( + openLineageDao, + symlinkTargetJobName, + "COMPLETE", + jobFacet, + Arrays.asList(), + Arrays.asList( + new Dataset( + NAMESPACE, + "a_new_dataset", + newDatasetFacet(new SchemaField("firstname", "string", "the first name"))))); + assertThat(updatedTargetJob.getJob().getUuid()).isEqualTo(targetJob.getUuid()); + + // get lineage for original job - the old datasets/jobs should no longer be present + assertThat( + lineageDao + .getLineage(new HashSet<>(Arrays.asList(writeJob.getJob().getUuid())), 2) + .stream() + .map(JobData::getUuid) + .collect(Collectors.toSet())) + .hasSize(1) + .containsExactlyInAnyOrder(targetJob.getUuid()); + + // fetching lineage for target job should yield the same results + assertThat( + lineageDao.getLineage(new HashSet<>(Arrays.asList(targetJob.getUuid())), 2).stream() + .map(JobData::getUuid) + .collect(Collectors.toSet())) + .hasSize(1) + .containsExactlyInAnyOrder(targetJob.getUuid()); } @Test