diff --git a/api/src/main/java/marquez/db/ColumnLineageDao.java b/api/src/main/java/marquez/db/ColumnLineageDao.java index c472937cd1..87bf81311e 100644 --- a/api/src/main/java/marquez/db/ColumnLineageDao.java +++ b/api/src/main/java/marquez/db/ColumnLineageDao.java @@ -105,9 +105,12 @@ dataset_fields_view AS ( INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid ), column_lineage_recursive AS ( - SELECT *, 0 as depth - FROM column_lineage - WHERE output_dataset_field_uuid IN () AND created_at <= :createdAtUntil + ( + SELECT DISTINCT ON (output_dataset_field_uuid, input_dataset_field_uuid) *, 0 as depth + FROM column_lineage + WHERE output_dataset_field_uuid IN () AND created_at <= :createdAtUntil + ORDER BY output_dataset_field_uuid, input_dataset_field_uuid, updated_at DESC, updated_at + ) UNION SELECT upstream_node.output_dataset_version_uuid, diff --git a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java index 6586c8a57c..1129ee29c4 100644 --- a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java +++ b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java @@ -565,4 +565,32 @@ void testGetLineagePointInTime() { 20, Collections.singletonList(field_col_b), columnLineageCreatedAt.plusSeconds(1))) .hasSize(1); } + + @Test + void testGetLineageWhenJobRunMultipleTimes() { + Dataset dataset_A = getDatasetA(); + Dataset dataset_B = getDatasetB(); + + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + UpdateLineageRow lineageRow = + LineageTestUtils.createLineageRow( + openLineageDao, + "job1", + "COMPLETE", + jobFacet, + Arrays.asList(dataset_A), + Arrays.asList(dataset_B)); + + UpdateLineageRow.DatasetRecord datasetRecord_b = lineageRow.getOutputs().get().get(0); + UUID field_col_b = fieldDao.findUuid(datasetRecord_b.getDatasetRow().getUuid(), "col_c").get(); + + assertThat(dao.getLineage(20, Collections.singletonList(field_col_b), Instant.now())) + .hasSize(1); + } }