Skip to content

Commit

Permalink
fix column lineage when multiple jobs write to same dataset
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Dec 6, 2022
1 parent 0995b0a commit 5e3ce84
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 68 deletions.
43 changes: 20 additions & 23 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,15 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(clr.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
clr.output_dataset_version_uuid as dataset_version_uuid,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
ARRAY_AGG(DISTINCT ARRAY[
input_fields.namespace_name,
input_fields.dataset_name,
CAST(clr.input_dataset_version_uuid AS VARCHAR),
input_fields.field_name,
clr.transformation_description,
clr.transformation_type
]) AS inputFields,
clr.output_dataset_version_uuid as dataset_version_uuid
FROM column_lineage_recursive clr
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
Expand All @@ -161,11 +164,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
clr.output_dataset_version_uuid,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
clr.output_dataset_version_uuid
""")
Set<ColumnLineageNodeData> getLineage(
int depth,
Expand Down Expand Up @@ -193,25 +192,23 @@ dataset_fields_view AS (
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(c.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
c.output_dataset_version_uuid as dataset_version_uuid,
c.transformation_description,
c.transformation_type,
c.created_at,
c.updated_at
ARRAY_AGG(DISTINCT ARRAY[
input_fields.namespace_name,
input_fields.dataset_name,
CAST(c.input_dataset_version_uuid AS VARCHAR),
input_fields.field_name,
c.transformation_description,
c.transformation_type
]) AS inputFields,
null as dataset_version_uuid
FROM selected_column_lineage c
INNER JOIN dataset_fields_view output_fields ON c.output_dataset_field_uuid = output_fields.uuid
LEFT JOIN dataset_fields_view input_fields ON c.input_dataset_field_uuid = input_fields.uuid
GROUP BY
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
c.output_dataset_version_uuid,
c.transformation_description,
c.transformation_type,
c.created_at,
c.updated_at
output_fields.type
""")
/**
* Each dataset is identified by a pair of strings (namespace and name). A query returns column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@

package marquez.db.mappers;

import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
import static marquez.db.Columns.TRANSFORMATION_TYPE;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.uuidOrThrow;
import static marquez.db.Columns.uuidOrNull;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -37,11 +35,9 @@ public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws
return new ColumnLineageNodeData(
stringOrThrow(results, Columns.NAMESPACE_NAME),
stringOrThrow(results, Columns.DATASET_NAME),
uuidOrThrow(results, Columns.DATASET_VERSION_UUID),
uuidOrNull(results, Columns.DATASET_VERSION_UUID),
stringOrThrow(results, Columns.FIELD_NAME),
stringOrNull(results, Columns.TYPE),
stringOrNull(results, TRANSFORMATION_DESCRIPTION),
stringOrNull(results, TRANSFORMATION_TYPE),
toInputFields(results, "inputFields"));
}

Expand All @@ -57,7 +53,10 @@ public static ImmutableList<InputFieldNodeData> toInputFields(ResultSet results,
return ImmutableList.copyOf(
Arrays.asList(deserializedArray).stream()
.map(o -> (String[]) o)
.map(arr -> new InputFieldNodeData(arr[0], arr[1], UUID.fromString(arr[2]), arr[3]))
.map(
arr ->
new InputFieldNodeData(
arr[0], arr[1], UUID.fromString(arr[2]), arr[3], arr[4], arr[5]))
.collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,5 @@ public class ColumnLineageNodeData implements NodeData {
@Nullable UUID datasetVersion;
@NonNull String field;
@Nullable String fieldType;
String transformationDescription;
String transformationType;
@NonNull List<InputFieldNodeData> inputFields;
}
2 changes: 2 additions & 0 deletions api/src/main/java/marquez/db/models/InputFieldNodeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ public class InputFieldNodeData {
@NonNull String dataset;
@Nullable UUID datasetVersion;
@NonNull String field;
String transformationDescription;
String transformationType;
}
8 changes: 5 additions & 3 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,16 @@ public void enrichWithColumnLineage(List<Dataset> datasets) {
.add(
ColumnLineage.builder()
.name(nodeData.getField())
.transformationDescription(nodeData.getTransformationDescription())
.transformationType(nodeData.getTransformationType())
.inputFields(
nodeData.getInputFields().stream()
.map(
f ->
new ColumnLineageInputField(
f.getNamespace(), f.getDataset(), f.getField()))
f.getNamespace(),
f.getDataset(),
f.getField(),
f.getTransformationDescription(),
f.getTransformationType()))
.collect(Collectors.toList()))
.build());
});
Expand Down
2 changes: 0 additions & 2 deletions api/src/main/java/marquez/service/models/ColumnLineage.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,4 @@
public class ColumnLineage {
@NotNull private String name;
@NotNull private List<ColumnLineageInputField> inputFields;
@NotNull private String transformationDescription;
@NotNull private String transformationType;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ public class ColumnLineageInputField {
@NotNull private String namespace;
@NotNull private String dataset;
@NotNull private String field;
@NotNull private String transformationDescription;
@NotNull private String transformationType;
}
50 changes: 46 additions & 4 deletions api/src/test/java/marquez/db/ColumnLineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import marquez.db.models.ColumnLineageRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.InputFieldNodeData;
import marquez.db.models.NamespaceRow;
import marquez.db.models.SourceRow;
import marquez.db.models.UpdateLineageRow;
Expand Down Expand Up @@ -250,8 +251,8 @@ void testGetLineage() {
assertEquals("namespace", dataset_c.getInputFields().get(0).getNamespace());
assertEquals("dataset_b", dataset_c.getInputFields().get(0).getDataset());
assertEquals("col_c", dataset_c.getInputFields().get(0).getField());
assertEquals("type2", dataset_c.getTransformationType());
assertEquals("description2", dataset_c.getTransformationDescription());
assertEquals("type2", dataset_c.getInputFields().get(0).getTransformationType());
assertEquals("description2", dataset_c.getInputFields().get(0).getTransformationDescription());

// test dataset_b
assertThat(dataset_b.getInputFields()).hasSize(2);
Expand All @@ -273,8 +274,8 @@ void testGetLineage() {

assertEquals("namespace", dataset_b.getInputFields().get(0).getNamespace());
assertEquals("dataset_a", dataset_b.getInputFields().get(0).getDataset());
assertEquals("type1", dataset_b.getTransformationType());
assertEquals("description1", dataset_b.getTransformationDescription());
assertEquals("type1", dataset_b.getInputFields().get(0).getTransformationType());
assertEquals("description1", dataset_b.getInputFields().get(0).getTransformationDescription());
}

@Test
Expand Down Expand Up @@ -483,6 +484,47 @@ void testGetLineageWhenDataTypeIsEmpty() {
getColumnLineage(lineageRow, "col_c");
}

@Test
void testGetLineageRowsForDatasetsWhenMultipleJobsWriteToADataset() {
List<LineageEvent.ColumnLineageInputField> fields =
getDatasetB()
.getFacets()
.getColumnLineage()
.getFields()
.getAdditionalFacets()
.get("col_c")
.getInputFields();

Dataset datasetWithColAAsInputField = getDatasetB();
datasetWithColAAsInputField
.getFacets()
.getColumnLineage()
.getFields()
.getAdditionalFacets()
.get("col_c")
.setInputFields(Collections.singletonList(fields.get(0)));
createLineage(openLineageDao, getDatasetA(), datasetWithColAAsInputField);

Dataset datasetWithColBAsInputField = getDatasetB();
datasetWithColBAsInputField
.getFacets()
.getColumnLineage()
.getFields()
.getAdditionalFacets()
.get("col_c")
.setInputFields(Collections.singletonList(fields.get(1)));
createLineage(openLineageDao, getDatasetA(), datasetWithColBAsInputField);

List<InputFieldNodeData> inputFields =
dao
.getLineageRowsForDatasets(Collections.singletonList(Pair.of("namespace", "dataset_b")))
.stream()
.findAny()
.get()
.getInputFields();
assertThat(inputFields).hasSize(2); // should contain col_a and col_b
}

private Set<ColumnLineageNodeData> getColumnLineage(UpdateLineageRow lineageRow, String field) {
UpdateLineageRow.DatasetRecord datasetRecord = lineageRow.getOutputs().get().get(0);
UUID field_UUID = fieldDao.findUuid(datasetRecord.getDatasetRow().getUuid(), field).get();
Expand Down
22 changes: 10 additions & 12 deletions api/src/test/java/marquez/service/ColumnLineageServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ public void testLineageByDatasetFieldId() {
Node col_c = getNode(lineage, "dataset_b", "col_c").get();
List<InputFieldNodeData> inputFields =
((ColumnLineageNodeData) col_c.getData()).getInputFields();
assertEquals(
"description1", ((ColumnLineageNodeData) col_c.getData()).getTransformationDescription());
assertEquals("type1", ((ColumnLineageNodeData) col_c.getData()).getTransformationType());
assertEquals("description1", inputFields.get(0).getTransformationDescription());
assertEquals("type1", inputFields.get(0).getTransformationType());
assertEquals("STRING", ((ColumnLineageNodeData) col_c.getData()).getFieldType());
assertThat(inputFields).hasSize(2);
assertEquals("dataset_a", inputFields.get(0).getDataset());
Expand Down Expand Up @@ -195,28 +194,27 @@ public void testEnrichDatasets() {

assertThat(dataset_b.getColumnLineage()).hasSize(1);
assertThat(dataset_b.getColumnLineage().get(0).getName()).isEqualTo("col_c");
assertThat(dataset_b.getColumnLineage().get(0).getTransformationType()).isEqualTo("type1");
assertThat(dataset_b.getColumnLineage().get(0).getTransformationDescription())
.isEqualTo("description1");

List<ColumnLineageInputField> inputFields_b =
dataset_b.getColumnLineage().get(0).getInputFields();
assertThat(inputFields_b)
.hasSize(2)
.contains(new ColumnLineageInputField("namespace", "dataset_a", "col_a"))
.contains(new ColumnLineageInputField("namespace", "dataset_a", "col_b"));
.contains(
new ColumnLineageInputField("namespace", "dataset_a", "col_a", "description1", "type1"))
.contains(
new ColumnLineageInputField(
"namespace", "dataset_a", "col_b", "description1", "type1"));

assertThat(dataset_c.getColumnLineage()).hasSize(1);
assertThat(dataset_c.getColumnLineage().get(0).getName()).isEqualTo("col_d");
assertThat(dataset_c.getColumnLineage().get(0).getTransformationType()).isEqualTo("type2");
assertThat(dataset_c.getColumnLineage().get(0).getTransformationDescription())
.isEqualTo("description2");

List<ColumnLineageInputField> inputFields_c =
dataset_c.getColumnLineage().get(0).getInputFields();
assertThat(inputFields_c)
.hasSize(1)
.contains(new ColumnLineageInputField("namespace", "dataset_b", "col_c"));
.contains(
new ColumnLineageInputField(
"namespace", "dataset_b", "col_c", "description2", "type2"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,5 @@
@Getter
public class ColumnLineage {
@NonNull private String name;
@NonNull private List<DatasetFieldId> inputFields;
@NonNull private String transformationDescription;
@NonNull private String transformationType;
@NonNull private List<ColumnLineageInputField> inputFields;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ public class ColumnLineageInputField {
@NonNull private String namespace;
@NonNull private String dataset;
@NonNull private String field;
@NonNull String transformationDescription;
@NonNull String transformationType;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ public class ColumnLineageNodeData implements NodeData {
@NonNull String dataset;
@NonNull String field;
@NonNull String fieldType;
@NonNull String transformationDescription;
@NonNull String transformationType;
@NonNull List<DatasetFieldId> inputFields;
@NonNull List<ColumnLineageInputField> inputFields;

public static ColumnLineageNodeData fromJson(@NonNull final String json) {
return Utils.fromJson(json, new TypeReference<ColumnLineageNodeData>() {});
Expand Down
28 changes: 19 additions & 9 deletions clients/java/src/test/java/marquez/client/MarquezClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import marquez.client.MarquezClient.Runs;
import marquez.client.MarquezClient.Sources;
import marquez.client.MarquezClient.Tags;
import marquez.client.models.ColumnLineageInputField;
import marquez.client.models.ColumnLineageNodeData;
import marquez.client.models.Dataset;
import marquez.client.models.DatasetFieldId;
Expand Down Expand Up @@ -963,10 +964,13 @@ public void testGetColumnLineage() throws Exception {
DB_TABLE_NAME,
FIELD_NAME,
"String",
"transformationDescription",
"transformationType",
Collections.singletonList(
new DatasetFieldId("namespace", "inDataset", "some-col1"))),
new ColumnLineageInputField(
"namespace",
"inDataset",
"some-col1",
"transformationDescription",
"transformationType"))),
ImmutableSet.of(
Edge.of(
NodeId.of(DATASET_FIELD_ID),
Expand Down Expand Up @@ -1000,10 +1004,13 @@ public void testGetColumnLineageByField() throws Exception {
DB_TABLE_NAME,
FIELD_NAME,
"String",
"transformationDescription",
"transformationType",
Collections.singletonList(
new DatasetFieldId("namespace", "inDataset", "some-col1"))),
new ColumnLineageInputField(
"namespace",
"inDataset",
"some-col1",
"transformationDescription",
"transformationType"))),
ImmutableSet.of(
Edge.of(
NodeId.of(DATASET_FIELD_ID),
Expand Down Expand Up @@ -1037,10 +1044,13 @@ public void testGetColumnLineageByJob() throws Exception {
DB_TABLE_NAME,
FIELD_NAME,
"String",
"transformationDescription",
"transformationType",
Collections.singletonList(
new DatasetFieldId("namespace", "inDataset", "some-col1"))),
new ColumnLineageInputField(
"namespace",
"inDataset",
"some-col1",
"transformationDescription",
"transformationType"))),
ImmutableSet.of(
Edge.of(
NodeId.of(DATASET_FIELD_ID),
Expand Down

0 comments on commit 5e3ce84

Please sign in to comment.