From b23be06f8dea50ac64fb22c10b0df752b9d375a5 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Mon, 3 Oct 2022 11:46:16 +0200 Subject: [PATCH] downstream column lineage Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 1 + .../marquez/api/ColumnLineageResource.java | 6 +- .../java/marquez/db/ColumnLineageDao.java | 24 ++++---- .../marquez/service/ColumnLineageService.java | 5 +- .../api/ColumnLineageResourceTest.java | 4 +- .../java/marquez/db/ColumnLineageDaoTest.java | 28 ++++++---- .../service/ColumnLineageServiceTest.java | 56 ++++++++++++++++++- 7 files changed, 95 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 42147a4449..7fbfb68b2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * Store column lineage facets in separate table [`#2096`](https://github.com/MarquezProject/marquez/pull/2096) [@mzareba382](https://github.com/mzareba382) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) * Lineage graph endpoint for column lineage [`#2124`](https://github.com/MarquezProject/marquez/pull/2124) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) * Enrich returned dataset resource with column lineage information [`#2113`](https://github.com/MarquezProject/marquez/pull/2113) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) +* Downstream column lineage [`#2159`](https://github.com/MarquezProject/marquez/pull/2159) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) ### Fixed * Add support for `parentRun` facet as reported by older Airflow OpenLineage versions [@collado-mike](https://github.com/collado-mike) diff --git a/api/src/main/java/marquez/api/ColumnLineageResource.java b/api/src/main/java/marquez/api/ColumnLineageResource.java index 154e2e7764..efebc06a86 100644 --- a/api/src/main/java/marquez/api/ColumnLineageResource.java +++ b/api/src/main/java/marquez/api/ColumnLineageResource.java @@ -41,8 +41,10 @@ public ColumnLineageResource(@NonNull final ServiceFactory serviceFactory) { @Produces(APPLICATION_JSON) public Response getLineage( @QueryParam("nodeId") @NotNull NodeId nodeId, - @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) + @QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth, + @QueryParam("withDownstream") @DefaultValue("false") boolean withDownstream) throws ExecutionException, InterruptedException { - return Response.ok(columnLineageService.lineage(nodeId, depth, Instant.now())).build(); + return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream, Instant.now())) + .build(); } } diff --git a/api/src/main/java/marquez/db/ColumnLineageDao.java b/api/src/main/java/marquez/db/ColumnLineageDao.java index 9217559600..b0be61947d 100644 --- a/api/src/main/java/marquez/db/ColumnLineageDao.java +++ b/api/src/main/java/marquez/db/ColumnLineageDao.java @@ -113,17 +113,20 @@ WHERE output_dataset_field_uuid IN () AND created_at <= :crea ) UNION SELECT - upstream_node.output_dataset_version_uuid, - upstream_node.output_dataset_field_uuid, - upstream_node.input_dataset_version_uuid, - upstream_node.input_dataset_field_uuid, - upstream_node.transformation_description, - upstream_node.transformation_type, - upstream_node.created_at, - upstream_node.updated_at, + adjacent_node.output_dataset_version_uuid, + adjacent_node.output_dataset_field_uuid, + adjacent_node.input_dataset_version_uuid, + adjacent_node.input_dataset_field_uuid, + adjacent_node.transformation_description, + adjacent_node.transformation_type, + adjacent_node.created_at, + adjacent_node.updated_at, node.depth + 1 as depth - FROM column_lineage upstream_node, column_lineage_recursive node - WHERE node.input_dataset_field_uuid = upstream_node.output_dataset_field_uuid + FROM column_lineage adjacent_node, column_lineage_recursive node + WHERE ( + (node.input_dataset_field_uuid = adjacent_node.output_dataset_field_uuid) --upstream lineage + OR (:withDownstream AND adjacent_node.input_dataset_field_uuid = node.output_dataset_field_uuid) --optional downstream lineage + ) AND node.depth < :depth ) SELECT @@ -152,6 +155,7 @@ WHERE output_dataset_field_uuid IN () AND created_at <= :crea Set getLineage( int depth, @BindList(onEmpty = NULL_STRING) List datasetFieldUuids, + boolean withDownstream, Instant createdAtUntil); @SqlQuery( diff --git a/api/src/main/java/marquez/service/ColumnLineageService.java b/api/src/main/java/marquez/service/ColumnLineageService.java index 65acb2c139..b29e9ee1e5 100644 --- a/api/src/main/java/marquez/service/ColumnLineageService.java +++ b/api/src/main/java/marquez/service/ColumnLineageService.java @@ -40,13 +40,12 @@ public ColumnLineageService(ColumnLineageDao dao, DatasetFieldDao datasetFieldDa this.datasetFieldDao = datasetFieldDao; } - public Lineage lineage(NodeId nodeId, int depth, Instant createdAtUntil) { + public Lineage lineage(NodeId nodeId, int depth, boolean withDownstream, Instant createdAtUntil) { List columnNodeUuids = getColumnNodeUuids(nodeId); if (columnNodeUuids.isEmpty()) { throw new NodeIdNotFoundException("Could not find node"); } - - return toLineage(getLineage(depth, columnNodeUuids, createdAtUntil)); + return toLineage(getLineage(depth, columnNodeUuids, withDownstream, createdAtUntil)); } private Lineage toLineage(Set lineageNodeData) { diff --git a/api/src/test/java/marquez/api/ColumnLineageResourceTest.java b/api/src/test/java/marquez/api/ColumnLineageResourceTest.java index dd5c2ab03e..9ebf944e40 100644 --- a/api/src/test/java/marquez/api/ColumnLineageResourceTest.java +++ b/api/src/test/java/marquez/api/ColumnLineageResourceTest.java @@ -7,7 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -40,7 +40,7 @@ public class ColumnLineageResourceTest { ColumnLineageResourceTest.class.getResourceAsStream("/column_lineage/node.json"), new TypeReference<>() {}); LINEAGE = new Lineage(ImmutableSortedSet.of(testNode)); - when(lineageService.lineage(any(NodeId.class), anyInt(), any(Instant.class))) + when(lineageService.lineage(any(NodeId.class), eq(20), eq(false), any(Instant.class))) .thenReturn(LINEAGE); ServiceFactory serviceFactory = diff --git a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java index 1129ee29c4..cbbd92bb90 100644 --- a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java +++ b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java @@ -257,7 +257,7 @@ void testGetLineage() { UpdateLineageRow.DatasetRecord datasetRecord_c = lineageRow.getOutputs().get().get(0); UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get(); Set lineage = - dao.getLineage(20, Collections.singletonList(field_col_d), Instant.now()); + dao.getLineage(20, Collections.singletonList(field_col_d), false, Instant.now()); assertEquals(2, lineage.size()); @@ -326,7 +326,8 @@ void testGetLineageWhenNoLineageForColumn() { UUID field_col_a = fieldDao.findUuid(datasetRecord_a.getDatasetRow().getUuid(), "col_a").get(); // assert lineage is empty - assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), Instant.now())).isEmpty(); + assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), false, Instant.now())) + .isEmpty(); } /** @@ -392,11 +393,12 @@ void testGetLineageWithLimitedDepth() { UUID field_col_e = fieldDao.findUuid(datasetRecord_d.getDatasetRow().getUuid(), "col_e").get(); // make sure dataset are constructed properly - assertThat(dao.getLineage(20, Collections.singletonList(field_col_e), Instant.now())) + assertThat(dao.getLineage(20, Collections.singletonList(field_col_e), false, Instant.now())) .hasSize(3); // verify graph size is 2 when max depth is 1 - assertThat(dao.getLineage(1, Collections.singletonList(field_col_e), Instant.now())).hasSize(2); + assertThat(dao.getLineage(1, Collections.singletonList(field_col_e), false, Instant.now())) + .hasSize(2); } @Test @@ -462,9 +464,9 @@ void testGetLineageWhenCycleExists() { UUID field_col_d = fieldDao.findUuid(datasetRecord_c.getDatasetRow().getUuid(), "col_d").get(); // column lineages for col_a and col_e should be of size 3 - assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), Instant.now())) + assertThat(dao.getLineage(20, Collections.singletonList(field_col_a), false, Instant.now())) .hasSize(3); - assertThat(dao.getLineage(20, Collections.singletonList(field_col_d), Instant.now())) + assertThat(dao.getLineage(20, Collections.singletonList(field_col_d), false, Instant.now())) .hasSize(3); } @@ -524,7 +526,7 @@ void testGetLineageWhenTwoJobsWriteToSameDataset() { // assert input fields for col_d contain col_a and col_c List inputFields = - dao.getLineage(20, Collections.singletonList(field_col_c), Instant.now()).stream() + dao.getLineage(20, Collections.singletonList(field_col_c), false, Instant.now()).stream() .filter(node -> node.getDataset().equals("dataset_b")) .flatMap(node -> node.getInputFields().stream()) .map(input -> input.getField()) @@ -558,11 +560,17 @@ void testGetLineagePointInTime() { // assert lineage is empty before and present after assertThat( dao.getLineage( - 20, Collections.singletonList(field_col_b), columnLineageCreatedAt.minusSeconds(1))) + 20, + Collections.singletonList(field_col_b), + false, + columnLineageCreatedAt.minusSeconds(1))) .isEmpty(); assertThat( dao.getLineage( - 20, Collections.singletonList(field_col_b), columnLineageCreatedAt.plusSeconds(1))) + 20, + Collections.singletonList(field_col_b), + false, + columnLineageCreatedAt.plusSeconds(1))) .hasSize(1); } @@ -590,7 +598,7 @@ void testGetLineageWhenJobRunMultipleTimes() { UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0); UUID field_col_b = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get(); - assertThat(dao.getLineage(20, Collections.singletonList(field_col_b), Instant.now())) + assertThat(dao.getLineage(20, Collections.singletonList(field_col_b), false, Instant.now())) .hasSize(1); } } diff --git a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java index 003d63363c..7ec8ac67c0 100644 --- a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java +++ b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java @@ -90,7 +90,10 @@ public void testLineageByDatasetFieldId() { Lineage lineage = lineageService.lineage( - NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now()); + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), + 20, + false, + Instant.now()); assertThat(lineage.getGraph()).hasSize(3); @@ -156,12 +159,16 @@ public void testLineageByDatasetId() { Lineage lineageByField = lineageService.lineage( - NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), 20, Instant.now()); + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), + 20, + false, + Instant.now()); Lineage lineageByDataset = lineageService.lineage( NodeId.of(new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_b"))), 20, + false, Instant.now()); // lineage of dataset and column should be equal @@ -195,6 +202,7 @@ public void testLineageWhenLineageEmpty() { lineageService.lineage( NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_d")), 20, + false, Instant.now())); assertThrows( @@ -204,6 +212,7 @@ public void testLineageWhenLineageEmpty() { NodeId.of( new DatasetId(NamespaceName.of("namespace"), DatasetName.of("dataset_d"))), 20, + false, Instant.now())); assertThat( @@ -211,6 +220,7 @@ public void testLineageWhenLineageEmpty() { .lineage( NodeId.of(DatasetFieldId.of("namespace", "dataset_a", "col_a")), 20, + false, Instant.now()) .getGraph()) .hasSize(0); @@ -268,6 +278,48 @@ public void testEnrichDatasets() { .contains(new ColumnLineageInputField("namespace", "dataset_b", "col_c")); } + @Test + public void testGetLineageWithDownstream() { + LineageEvent.Dataset dataset_A = getDatasetA(); + LineageEvent.Dataset dataset_B = getDatasetB(); + LineageEvent.Dataset dataset_C = getDatasetC(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job2", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_B), + Arrays.asList(dataset_C)); + + Lineage lineage = + lineageService.lineage( + NodeId.of(DatasetFieldId.of("namespace", "dataset_b", "col_c")), + 20, + true, + Instant.now()); + + // assert that get lineage of dataset_B should co also return dataset_A and dataset_C + assertThat( + lineage.getGraph().stream() + .filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_a")) + .findAny()) + .isPresent(); + assertThat( + lineage.getGraph().stream() + .filter(c -> c.getId().asDatasetFieldId().getFieldName().getValue().equals("col_d")) + .findAny()) + .isPresent(); + } + @Test public void testEnrichDatasetsHasNoDuplicates() { LineageEvent.Dataset dataset_A = getDatasetA();