From b302008d04444ed37bd6c7bc8fbc5027cf6da8b4 Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Mon, 12 Dec 2022 12:57:06 -0800 Subject: [PATCH] Fix lineage for orphaned datasets Signed-off-by: Michael Collado --- api/src/main/java/marquez/db/LineageDao.java | 8 +++++ .../java/marquez/service/LineageService.java | 13 ++++++++ .../marquez/service/LineageServiceTest.java | 33 +++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index a3b92904d0..0879370ee2 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -80,6 +80,14 @@ SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, WHERE ds.uuid IN ()""") Set getDatasetData(@BindList Set dsUuids); + @SqlQuery( + """ + SELECT ds.*, dv.fields, dv.lifecycle_state + FROM datasets_view ds + LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid + WHERE ds.name = :datasetName AND ds.namespace_name = :namespaceName""") + DatasetData getDatasetData(String namespaceName, String datasetName); + @SqlQuery( """ SELECT j.uuid FROM jobs j diff --git a/api/src/main/java/marquez/service/LineageService.java b/api/src/main/java/marquez/service/LineageService.java index 70643b2fd0..684dde5d08 100644 --- a/api/src/main/java/marquez/service/LineageService.java +++ b/api/src/main/java/marquez/service/LineageService.java @@ -7,6 +7,7 @@ import com.google.common.base.Functions; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Maps; import java.util.Collections; import java.util.HashMap; @@ -81,6 +82,18 @@ public Lineage lineage(NodeId nodeId, int depth, boolean withRunFacets) { if (!datasetIds.isEmpty()) { datasets.addAll(this.getDatasetData(datasetIds)); } + if (nodeId.isDatasetType() + && datasets.stream().noneMatch(n -> n.getId().equals(nodeId.asDatasetId()))) { + log.warn( + "Found jobs {} which no longer share lineage with dataset {} - discarding", + jobData.stream().map(JobData::getId).toList()); + DatasetId datasetId = nodeId.asDatasetId(); + DatasetData datasetData = + getDatasetData(datasetId.getNamespace().getValue(), datasetId.getName().getValue()); + return new Lineage( + ImmutableSortedSet.of( + Node.dataset().data(datasetData).id(NodeId.of(datasetData.getId())).build())); + } return toLineage(jobData, datasets); } diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 52fd9deef5..1a5ade90b1 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -17,6 +17,7 @@ import marquez.api.JdbiUtils; import marquez.common.models.DatasetName; import marquez.common.models.DatasetVersionId; +import marquez.common.models.JobId; import marquez.common.models.JobName; import marquez.common.models.NamespaceName; import marquez.db.DatasetDao; @@ -393,6 +394,38 @@ public void testLineageWithWithCycle() { .matches(n -> n.isJobType() && n.asJobId().getName().getValue().equals("writeJob")); } + @Test + public void testLineageForOrphanedDataset() { + UpdateLineageRow writeJob = + LineageTestUtils.createLineageRow( + openLineageDao, + "writeJob", + "COMPLETE", + jobFacet, + Arrays.asList(), + Arrays.asList(dataset)); + + NodeId datasetNodeId = + NodeId.of(new NamespaceName(dataset.getNamespace()), new DatasetName(dataset.getName())); + Lineage lineage = lineageService.lineage(datasetNodeId, 2, false); + assertThat(lineage.getGraph()) + .hasSize(2) + .extracting(Node::getId) + .containsExactlyInAnyOrder( + NodeId.of(new JobId(new NamespaceName(NAMESPACE), new JobName("writeJob"))), + datasetNodeId); + + UpdateLineageRow updatedWriteJob = + LineageTestUtils.createLineageRow( + openLineageDao, "writeJob", "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList()); + + lineage = lineageService.lineage(datasetNodeId, 2, false); + assertThat(lineage.getGraph()) + .hasSize(1) + .extracting(Node::getId) + .containsExactlyInAnyOrder(datasetNodeId); + } + private boolean jobNameEquals(Node node, String writeJob) { return node.getId().asJobId().getName().getValue().equals(writeJob); }