From 17fb0f7f6f308607beea6265c1ff74ac3166ab48 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Tue, 6 Dec 2022 09:48:04 +0100 Subject: [PATCH] fix column lineage when multiple jobs write to same dataset Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 2 + .../java/marquez/db/ColumnLineageDao.java | 43 ++++--- .../mappers/ColumnLineageNodeDataMapper.java | 13 +- .../db/models/ColumnLineageNodeData.java | 49 +++++++- .../marquez/db/models/InputFieldNodeData.java | 2 + .../marquez/service/ColumnLineageService.java | 8 +- .../marquez/service/models/ColumnLineage.java | 32 ++++- .../models/ColumnLineageInputField.java | 2 + .../java/marquez/db/ColumnLineageDaoTest.java | 112 ++++++++++++------ .../db/models/ColumnLineageNodeDataTest.java | 55 +++++++++ .../service/ColumnLineageServiceTest.java | 22 ++-- .../service/models/ColumnLineageTest.java | 49 ++++++++ .../marquez/client/models/ColumnLineage.java | 4 +- .../models/ColumnLineageInputField.java | 2 + .../client/models/ColumnLineageNodeData.java | 4 +- .../marquez/client/MarquezClientTest.java | 28 +++-- 16 files changed, 327 insertions(+), 100 deletions(-) create mode 100644 api/src/test/java/marquez/db/models/ColumnLineageNodeDataTest.java create mode 100644 api/src/test/java/marquez/service/models/ColumnLineageTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index f0da110030..c23deeca8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ * Allow null column type in column-lineage [`#2272`](https://github.com/MarquezProject/marquez/pull/2272) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) * Include error message for JSON processing exception [`#2271`](https://github.com/MarquezProject/marquez/pull/2271) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) *In case of JSON processing exceptions Marquez API should return exception message to a client.* +* Fix column lineage when multiple jobs write to same dataset [`#2289`](https://github.com/MarquezProject/marquez/pull/2289) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski) + *The fix deprecates the way fields `transformationDescription` and `transformationType` are returned. The depracated way of returning those fields will be removed in 0.30.0.* ## [0.28.0](https://github.com/MarquezProject/marquez/compare/0.27.0...0.28.0) - 2022-11-21 diff --git a/api/src/main/java/marquez/db/ColumnLineageDao.java b/api/src/main/java/marquez/db/ColumnLineageDao.java index ac39a8e92a..038de9e75d 100644 --- a/api/src/main/java/marquez/db/ColumnLineageDao.java +++ b/api/src/main/java/marquez/db/ColumnLineageDao.java @@ -146,12 +146,15 @@ WHERE output_dataset_field_uuid IN () output_fields.dataset_name, output_fields.field_name, output_fields.type, - ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(clr.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields, - clr.output_dataset_version_uuid as dataset_version_uuid, - clr.transformation_description, - clr.transformation_type, - clr.created_at, - clr.updated_at + ARRAY_AGG(DISTINCT ARRAY[ + input_fields.namespace_name, + input_fields.dataset_name, + CAST(clr.input_dataset_version_uuid AS VARCHAR), + input_fields.field_name, + clr.transformation_description, + clr.transformation_type + ]) AS inputFields, + clr.output_dataset_version_uuid as dataset_version_uuid FROM column_lineage_recursive clr INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid @@ -161,11 +164,7 @@ WHERE output_dataset_field_uuid IN () output_fields.dataset_name, output_fields.field_name, output_fields.type, - clr.output_dataset_version_uuid, - clr.transformation_description, - clr.transformation_type, - clr.created_at, - clr.updated_at + clr.output_dataset_version_uuid """) Set getLineage( int depth, @@ -193,12 +192,15 @@ dataset_fields_view AS ( output_fields.dataset_name, output_fields.field_name, output_fields.type, - ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(c.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields, - c.output_dataset_version_uuid as dataset_version_uuid, - c.transformation_description, - c.transformation_type, - c.created_at, - c.updated_at + ARRAY_AGG(DISTINCT ARRAY[ + input_fields.namespace_name, + input_fields.dataset_name, + CAST(c.input_dataset_version_uuid AS VARCHAR), + input_fields.field_name, + c.transformation_description, + c.transformation_type + ]) AS inputFields, + null as dataset_version_uuid FROM selected_column_lineage c INNER JOIN dataset_fields_view output_fields ON c.output_dataset_field_uuid = output_fields.uuid LEFT JOIN dataset_fields_view input_fields ON c.input_dataset_field_uuid = input_fields.uuid @@ -206,12 +208,7 @@ dataset_fields_view AS ( output_fields.namespace_name, output_fields.dataset_name, output_fields.field_name, - output_fields.type, - c.output_dataset_version_uuid, - c.transformation_description, - c.transformation_type, - c.created_at, - c.updated_at + output_fields.type """) /** * Each dataset is identified by a pair of strings (namespace and name). A query returns column diff --git a/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java b/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java index 0c7ef15b32..04787ff4e7 100644 --- a/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java +++ b/api/src/main/java/marquez/db/mappers/ColumnLineageNodeDataMapper.java @@ -5,11 +5,9 @@ package marquez.db.mappers; -import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION; -import static marquez.db.Columns.TRANSFORMATION_TYPE; import static marquez.db.Columns.stringOrNull; import static marquez.db.Columns.stringOrThrow; -import static marquez.db.Columns.uuidOrThrow; +import static marquez.db.Columns.uuidOrNull; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; @@ -37,11 +35,9 @@ public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws return new ColumnLineageNodeData( stringOrThrow(results, Columns.NAMESPACE_NAME), stringOrThrow(results, Columns.DATASET_NAME), - uuidOrThrow(results, Columns.DATASET_VERSION_UUID), + uuidOrNull(results, Columns.DATASET_VERSION_UUID), stringOrThrow(results, Columns.FIELD_NAME), stringOrNull(results, Columns.TYPE), - stringOrNull(results, TRANSFORMATION_DESCRIPTION), - stringOrNull(results, TRANSFORMATION_TYPE), toInputFields(results, "inputFields")); } @@ -57,7 +53,10 @@ public static ImmutableList toInputFields(ResultSet results, return ImmutableList.copyOf( Arrays.asList(deserializedArray).stream() .map(o -> (String[]) o) - .map(arr -> new InputFieldNodeData(arr[0], arr[1], UUID.fromString(arr[2]), arr[3])) + .map( + arr -> + new InputFieldNodeData( + arr[0], arr[1], UUID.fromString(arr[2]), arr[3], arr[4], arr[5])) .collect(Collectors.toList())); } } diff --git a/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java b/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java index 88431bb48f..1ac679ec56 100644 --- a/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java +++ b/api/src/main/java/marquez/db/models/ColumnLineageNodeData.java @@ -5,22 +5,63 @@ package marquez.db.models; +import com.google.common.collect.ImmutableList; import java.util.List; +import java.util.Optional; import java.util.UUID; +import java.util.function.Function; import javax.annotation.Nullable; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NonNull; +import marquez.service.models.ColumnLineageInputField; @Getter -@AllArgsConstructor public class ColumnLineageNodeData implements NodeData { @NonNull String namespace; @NonNull String dataset; @Nullable UUID datasetVersion; @NonNull String field; @Nullable String fieldType; - String transformationDescription; - String transformationType; + @Nullable String transformationDescription; + @Nullable String transformationType; @NonNull List inputFields; + + public ColumnLineageNodeData( + String namespace, + String dataset, + UUID datasetVersion, + String field, + String fieldType, + ImmutableList inputFields) { + this.namespace = namespace; + this.dataset = dataset; + this.datasetVersion = datasetVersion; + this.field = field; + this.fieldType = fieldType; + this.inputFields = inputFields; + } + + /** + * @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a + * single dataset. This method is scheduled to be removed in release {@code 0.30.0}. + */ + public String getTransformationDescription() { + return Optional.ofNullable(inputFields).map(List::stream).stream() + .flatMap(Function.identity()) + .findAny() + .map(d -> d.getTransformationDescription()) + .orElse(null); + } + + /** + * @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a + * single dataset. This method is scheduled to be removed in release {@code 0.30.0}. + */ + public String getTransformationType() { + return Optional.ofNullable(inputFields).map(List::stream).stream() + .flatMap(Function.identity()) + .findAny() + .map(d -> d.getTransformationType()) + .orElse(null); + } } diff --git a/api/src/main/java/marquez/db/models/InputFieldNodeData.java b/api/src/main/java/marquez/db/models/InputFieldNodeData.java index 493c1d0414..635e91966a 100644 --- a/api/src/main/java/marquez/db/models/InputFieldNodeData.java +++ b/api/src/main/java/marquez/db/models/InputFieldNodeData.java @@ -20,4 +20,6 @@ public class InputFieldNodeData { @NonNull String dataset; @Nullable UUID datasetVersion; @NonNull String field; + String transformationDescription; + String transformationType; } diff --git a/api/src/main/java/marquez/service/ColumnLineageService.java b/api/src/main/java/marquez/service/ColumnLineageService.java index 180db08c9a..a2a0ecfec3 100644 --- a/api/src/main/java/marquez/service/ColumnLineageService.java +++ b/api/src/main/java/marquez/service/ColumnLineageService.java @@ -226,14 +226,16 @@ public void enrichWithColumnLineage(List datasets) { .add( ColumnLineage.builder() .name(nodeData.getField()) - .transformationDescription(nodeData.getTransformationDescription()) - .transformationType(nodeData.getTransformationType()) .inputFields( nodeData.getInputFields().stream() .map( f -> new ColumnLineageInputField( - f.getNamespace(), f.getDataset(), f.getField())) + f.getNamespace(), + f.getDataset(), + f.getField(), + f.getTransformationDescription(), + f.getTransformationType())) .collect(Collectors.toList())) .build()); }); diff --git a/api/src/main/java/marquez/service/models/ColumnLineage.java b/api/src/main/java/marquez/service/models/ColumnLineage.java index f5a8854495..cd8574a3d8 100644 --- a/api/src/main/java/marquez/service/models/ColumnLineage.java +++ b/api/src/main/java/marquez/service/models/ColumnLineage.java @@ -6,6 +6,9 @@ package marquez.service.models; import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import lombok.Builder; import lombok.EqualsAndHashCode; @@ -19,6 +22,31 @@ public class ColumnLineage { @NotNull private String name; @NotNull private List inputFields; - @NotNull private String transformationDescription; - @NotNull private String transformationType; + + @Nullable private String transformationDescription; + @Nullable private String transformationType; + + /** + * @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a + * single dataset. This method is scheduled to be removed in release {@code 0.30.0}. + */ + public String getTransformationDescription() { + return Optional.ofNullable(inputFields).map(List::stream).stream() + .flatMap(Function.identity()) + .findAny() + .map(d -> d.getTransformationDescription()) + .orElse(null); + } + + /** + * @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a + * single dataset. This method is scheduled to be removed in release {@code 0.30.0}. + */ + public String getTransformationType() { + return Optional.ofNullable(inputFields).map(List::stream).stream() + .flatMap(Function.identity()) + .findAny() + .map(d -> d.getTransformationType()) + .orElse(null); + } } diff --git a/api/src/main/java/marquez/service/models/ColumnLineageInputField.java b/api/src/main/java/marquez/service/models/ColumnLineageInputField.java index 34cac80f20..3d8ccaadf5 100644 --- a/api/src/main/java/marquez/service/models/ColumnLineageInputField.java +++ b/api/src/main/java/marquez/service/models/ColumnLineageInputField.java @@ -19,4 +19,6 @@ public class ColumnLineageInputField { @NotNull private String namespace; @NotNull private String dataset; @NotNull private String field; + @NotNull private String transformationDescription; + @NotNull private String transformationType; } diff --git a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java index e0571f3562..968e26c4f5 100644 --- a/api/src/test/java/marquez/db/ColumnLineageDaoTest.java +++ b/api/src/test/java/marquez/db/ColumnLineageDaoTest.java @@ -28,6 +28,7 @@ import marquez.db.models.ColumnLineageRow; import marquez.db.models.DatasetRow; import marquez.db.models.DatasetVersionRow; +import marquez.db.models.InputFieldNodeData; import marquez.db.models.NamespaceRow; import marquez.db.models.SourceRow; import marquez.db.models.UpdateLineageRow; @@ -250,8 +251,8 @@ void testGetLineage() { assertEquals("namespace", dataset_c.getInputFields().get(0).getNamespace()); assertEquals("dataset_b", dataset_c.getInputFields().get(0).getDataset()); assertEquals("col_c", dataset_c.getInputFields().get(0).getField()); - assertEquals("type2", dataset_c.getTransformationType()); - assertEquals("description2", dataset_c.getTransformationDescription()); + assertEquals("type2", dataset_c.getInputFields().get(0).getTransformationType()); + assertEquals("description2", dataset_c.getInputFields().get(0).getTransformationDescription()); // test dataset_b assertThat(dataset_b.getInputFields()).hasSize(2); @@ -273,8 +274,8 @@ void testGetLineage() { assertEquals("namespace", dataset_b.getInputFields().get(0).getNamespace()); assertEquals("dataset_a", dataset_b.getInputFields().get(0).getDataset()); - assertEquals("type1", dataset_b.getTransformationType()); - assertEquals("description1", dataset_b.getTransformationDescription()); + assertEquals("type1", dataset_b.getInputFields().get(0).getTransformationType()); + assertEquals("description1", dataset_b.getInputFields().get(0).getTransformationDescription()); } @Test @@ -389,35 +390,37 @@ void testGetLineageWhenCycleExists() { */ @Test void testGetLineageWhenTwoJobsWriteToSameDataset() { - Dataset dataset_B_another_job = - new Dataset( - "namespace", - "dataset_b", - LineageEvent.DatasetFacets.builder() - .schema( - new LineageEvent.SchemaDatasetFacet( - PRODUCER_URL, - SCHEMA_URL, - Arrays.asList(new LineageEvent.SchemaField("col_c", "STRING", "")))) - .columnLineage( - new LineageEvent.ColumnLineageDatasetFacet( - PRODUCER_URL, - SCHEMA_URL, - new LineageEvent.ColumnLineageDatasetFacetFields( - Collections.singletonMap( - "col_c", - new LineageEvent.ColumnLineageOutputColumn( - Arrays.asList( - new LineageEvent.ColumnLineageInputField( - "namespace", "dataset_c", "col_d")), - "description1", - "type1"))))) - .build()); - - createLineage(openLineageDao, dataset_A, dataset_B); - UpdateLineageRow lineageRow = createLineage(openLineageDao, dataset_C, dataset_B_another_job); - - // assert input fields for col_d contain col_a and col_c + List fields = + getDatasetB() + .getFacets() + .getColumnLineage() + .getFields() + .getAdditionalFacets() + .get("col_c") + .getInputFields(); + + Dataset datasetWithColAAsInputField = getDatasetB(); + datasetWithColAAsInputField + .getFacets() + .getColumnLineage() + .getFields() + .getAdditionalFacets() + .get("col_c") + .setInputFields(Collections.singletonList(fields.get(0))); + createLineage(openLineageDao, getDatasetA(), datasetWithColAAsInputField); + + Dataset datasetWithColBAsInputField = getDatasetB(); + datasetWithColBAsInputField + .getFacets() + .getColumnLineage() + .getFields() + .getAdditionalFacets() + .get("col_c") + .setInputFields(Collections.singletonList(fields.get(1))); + UpdateLineageRow lineageRow = + createLineage(openLineageDao, getDatasetA(), datasetWithColBAsInputField); + + // assert input fields for col_c contain col_a and col_b List inputFields = getColumnLineage(lineageRow, "col_c").stream() .filter(node -> node.getDataset().equals("dataset_b")) @@ -425,7 +428,7 @@ void testGetLineageWhenTwoJobsWriteToSameDataset() { .map(input -> input.getField()) .collect(Collectors.toList()); - assertThat(inputFields).hasSize(3).contains("col_a", "col_b", "col_d"); + assertThat(inputFields).hasSize(2).contains("col_a", "col_b"); } @Test @@ -483,6 +486,47 @@ void testGetLineageWhenDataTypeIsEmpty() { getColumnLineage(lineageRow, "col_c"); } + @Test + void testGetLineageRowsForDatasetsWhenMultipleJobsWriteToADataset() { + List fields = + getDatasetB() + .getFacets() + .getColumnLineage() + .getFields() + .getAdditionalFacets() + .get("col_c") + .getInputFields(); + + Dataset datasetWithColAAsInputField = getDatasetB(); + datasetWithColAAsInputField + .getFacets() + .getColumnLineage() + .getFields() + .getAdditionalFacets() + .get("col_c") + .setInputFields(Collections.singletonList(fields.get(0))); + createLineage(openLineageDao, getDatasetA(), datasetWithColAAsInputField); + + Dataset datasetWithColBAsInputField = getDatasetB(); + datasetWithColBAsInputField + .getFacets() + .getColumnLineage() + .getFields() + .getAdditionalFacets() + .get("col_c") + .setInputFields(Collections.singletonList(fields.get(1))); + createLineage(openLineageDao, getDatasetA(), datasetWithColBAsInputField); + + List inputFields = + dao + .getLineageRowsForDatasets(Collections.singletonList(Pair.of("namespace", "dataset_b"))) + .stream() + .findAny() + .get() + .getInputFields(); + assertThat(inputFields).hasSize(2); // should contain col_a and col_b + } + private Set getColumnLineage(UpdateLineageRow lineageRow, String field) { UpdateLineageRow.DatasetRecord datasetRecord = lineageRow.getOutputs().get().get(0); UUID field_UUID = fieldDao.findUuid(datasetRecord.getDatasetRow().getUuid(), field).get(); diff --git a/api/src/test/java/marquez/db/models/ColumnLineageNodeDataTest.java b/api/src/test/java/marquez/db/models/ColumnLineageNodeDataTest.java new file mode 100644 index 0000000000..985b6eeadc --- /dev/null +++ b/api/src/test/java/marquez/db/models/ColumnLineageNodeDataTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.models; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableList; +import java.util.UUID; +import org.junit.jupiter.api.Test; + +public class ColumnLineageNodeDataTest { + + @Test + public void testGetters() { + ColumnLineageNodeData node = + new ColumnLineageNodeData( + "namespace", + "dataset", + UUID.randomUUID(), + "field", + "varchar", + ImmutableList.of( + new InputFieldNodeData( + "namespace", + "dataset", + UUID.randomUUID(), + "other-field", + "transformation description", + "transformation type"))); + + assertThat(node.getTransformationDescription()).isEqualTo("transformation description"); + assertThat(node.getTransformationType()).isEqualTo("transformation type"); + } + + @Test + public void testGettersWhenEmptyInputFields() { + ColumnLineageNodeData node = + new ColumnLineageNodeData( + "namespace", "dataset", UUID.randomUUID(), "field", "varchar", ImmutableList.of()); + assertThat(node.getTransformationDescription()).isNull(); + assertThat(node.getTransformationType()).isNull(); + } + + @Test + public void testGettersWhenInputFieldsAreNull() { + ColumnLineageNodeData node = + new ColumnLineageNodeData( + "namespace", "dataset", UUID.randomUUID(), "field", "varchar", null); + assertThat(node.getTransformationDescription()).isNull(); + assertThat(node.getTransformationType()).isNull(); + } +} diff --git a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java index a95ecd483a..dd48b8fafa 100644 --- a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java +++ b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java @@ -93,9 +93,8 @@ public void testLineageByDatasetFieldId() { Node col_c = getNode(lineage, "dataset_b", "col_c").get(); List inputFields = ((ColumnLineageNodeData) col_c.getData()).getInputFields(); - assertEquals( - "description1", ((ColumnLineageNodeData) col_c.getData()).getTransformationDescription()); - assertEquals("type1", ((ColumnLineageNodeData) col_c.getData()).getTransformationType()); + assertEquals("description1", inputFields.get(0).getTransformationDescription()); + assertEquals("type1", inputFields.get(0).getTransformationType()); assertEquals("STRING", ((ColumnLineageNodeData) col_c.getData()).getFieldType()); assertThat(inputFields).hasSize(2); assertEquals("dataset_a", inputFields.get(0).getDataset()); @@ -195,28 +194,27 @@ public void testEnrichDatasets() { assertThat(dataset_b.getColumnLineage()).hasSize(1); assertThat(dataset_b.getColumnLineage().get(0).getName()).isEqualTo("col_c"); - assertThat(dataset_b.getColumnLineage().get(0).getTransformationType()).isEqualTo("type1"); - assertThat(dataset_b.getColumnLineage().get(0).getTransformationDescription()) - .isEqualTo("description1"); List inputFields_b = dataset_b.getColumnLineage().get(0).getInputFields(); assertThat(inputFields_b) .hasSize(2) - .contains(new ColumnLineageInputField("namespace", "dataset_a", "col_a")) - .contains(new ColumnLineageInputField("namespace", "dataset_a", "col_b")); + .contains( + new ColumnLineageInputField("namespace", "dataset_a", "col_a", "description1", "type1")) + .contains( + new ColumnLineageInputField( + "namespace", "dataset_a", "col_b", "description1", "type1")); assertThat(dataset_c.getColumnLineage()).hasSize(1); assertThat(dataset_c.getColumnLineage().get(0).getName()).isEqualTo("col_d"); - assertThat(dataset_c.getColumnLineage().get(0).getTransformationType()).isEqualTo("type2"); - assertThat(dataset_c.getColumnLineage().get(0).getTransformationDescription()) - .isEqualTo("description2"); List inputFields_c = dataset_c.getColumnLineage().get(0).getInputFields(); assertThat(inputFields_c) .hasSize(1) - .contains(new ColumnLineageInputField("namespace", "dataset_b", "col_c")); + .contains( + new ColumnLineageInputField( + "namespace", "dataset_b", "col_c", "description2", "type2")); } @Test diff --git a/api/src/test/java/marquez/service/models/ColumnLineageTest.java b/api/src/test/java/marquez/service/models/ColumnLineageTest.java new file mode 100644 index 0000000000..1a6f7cbe8e --- /dev/null +++ b/api/src/test/java/marquez/service/models/ColumnLineageTest.java @@ -0,0 +1,49 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +public class ColumnLineageTest { + + @Test + public void testGetters() { + ColumnLineage columnLineage = + ColumnLineage.builder() + .name("name") + .inputFields( + ImmutableList.of( + new ColumnLineageInputField( + "namespace", + "dataset", + "other-field", + "transformation description", + "transformation type"))) + .build(); + + assertThat(columnLineage.getTransformationDescription()) + .isEqualTo("transformation description"); + assertThat(columnLineage.getTransformationType()).isEqualTo("transformation type"); + } + + @Test + public void testGettersWhenEmptyInputFields() { + ColumnLineage columnLineage = + ColumnLineage.builder().name("name").inputFields(ImmutableList.of()).build(); + assertThat(columnLineage.getTransformationDescription()).isNull(); + assertThat(columnLineage.getTransformationType()).isNull(); + } + + @Test + public void testGettersWhenInputFieldsAreNull() { + ColumnLineage columnLineage = ColumnLineage.builder().name("name").inputFields(null).build(); + assertThat(columnLineage.getTransformationDescription()).isNull(); + assertThat(columnLineage.getTransformationType()).isNull(); + } +} diff --git a/clients/java/src/main/java/marquez/client/models/ColumnLineage.java b/clients/java/src/main/java/marquez/client/models/ColumnLineage.java index 0668555ea8..ccf6d4f471 100644 --- a/clients/java/src/main/java/marquez/client/models/ColumnLineage.java +++ b/clients/java/src/main/java/marquez/client/models/ColumnLineage.java @@ -18,7 +18,5 @@ @Getter public class ColumnLineage { @NonNull private String name; - @NonNull private List inputFields; - @NonNull private String transformationDescription; - @NonNull private String transformationType; + @NonNull private List inputFields; } diff --git a/clients/java/src/main/java/marquez/client/models/ColumnLineageInputField.java b/clients/java/src/main/java/marquez/client/models/ColumnLineageInputField.java index bd5ad6a130..1e6ae5d2ba 100644 --- a/clients/java/src/main/java/marquez/client/models/ColumnLineageInputField.java +++ b/clients/java/src/main/java/marquez/client/models/ColumnLineageInputField.java @@ -19,4 +19,6 @@ public class ColumnLineageInputField { @NonNull private String namespace; @NonNull private String dataset; @NonNull private String field; + @NonNull String transformationDescription; + @NonNull String transformationType; } diff --git a/clients/java/src/main/java/marquez/client/models/ColumnLineageNodeData.java b/clients/java/src/main/java/marquez/client/models/ColumnLineageNodeData.java index 71a8855a85..26fb1fc7a4 100644 --- a/clients/java/src/main/java/marquez/client/models/ColumnLineageNodeData.java +++ b/clients/java/src/main/java/marquez/client/models/ColumnLineageNodeData.java @@ -21,9 +21,7 @@ public class ColumnLineageNodeData implements NodeData { @NonNull String dataset; @NonNull String field; @NonNull String fieldType; - @NonNull String transformationDescription; - @NonNull String transformationType; - @NonNull List inputFields; + @NonNull List inputFields; public static ColumnLineageNodeData fromJson(@NonNull final String json) { return Utils.fromJson(json, new TypeReference() {}); diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index dce0192e87..a9ba5c54f5 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -68,6 +68,7 @@ import marquez.client.MarquezClient.Runs; import marquez.client.MarquezClient.Sources; import marquez.client.MarquezClient.Tags; +import marquez.client.models.ColumnLineageInputField; import marquez.client.models.ColumnLineageNodeData; import marquez.client.models.Dataset; import marquez.client.models.DatasetFieldId; @@ -963,10 +964,13 @@ public void testGetColumnLineage() throws Exception { DB_TABLE_NAME, FIELD_NAME, "String", - "transformationDescription", - "transformationType", Collections.singletonList( - new DatasetFieldId("namespace", "inDataset", "some-col1"))), + new ColumnLineageInputField( + "namespace", + "inDataset", + "some-col1", + "transformationDescription", + "transformationType"))), ImmutableSet.of( Edge.of( NodeId.of(DATASET_FIELD_ID), @@ -1000,10 +1004,13 @@ public void testGetColumnLineageByField() throws Exception { DB_TABLE_NAME, FIELD_NAME, "String", - "transformationDescription", - "transformationType", Collections.singletonList( - new DatasetFieldId("namespace", "inDataset", "some-col1"))), + new ColumnLineageInputField( + "namespace", + "inDataset", + "some-col1", + "transformationDescription", + "transformationType"))), ImmutableSet.of( Edge.of( NodeId.of(DATASET_FIELD_ID), @@ -1037,10 +1044,13 @@ public void testGetColumnLineageByJob() throws Exception { DB_TABLE_NAME, FIELD_NAME, "String", - "transformationDescription", - "transformationType", Collections.singletonList( - new DatasetFieldId("namespace", "inDataset", "some-col1"))), + new ColumnLineageInputField( + "namespace", + "inDataset", + "some-col1", + "transformationDescription", + "transformationType"))), ImmutableSet.of( Edge.of( NodeId.of(DATASET_FIELD_ID),