Skip to content

Commit

Permalink
fix column lineage when multiple jobs write to same dataset
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Dec 7, 2022
1 parent c8a38a1 commit a7ecf04
Show file tree
Hide file tree
Showing 16 changed files with 327 additions and 100 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
* Allow null column type in column-lineage [`#2272`](https://github.com/MarquezProject/marquez/pull/2272) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Include error message for JSON processing exception [`#2271`](https://github.com/MarquezProject/marquez/pull/2271) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*In case of JSON processing exceptions Marquez API should return exception message to a client.*
* Fix column lineage when multiple jobs write to same dataset [`#2289`](https://github.com/MarquezProject/marquez/pull/2289) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*The fix deprecates the way fields `transformationDescription` and `transformationType` are returned. The depracated way of returning those fields will be removed in 0.30.0.*

## [0.28.0](https://github.com/MarquezProject/marquez/compare/0.27.0...0.28.0) - 2022-11-21

Expand Down
43 changes: 20 additions & 23 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,15 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(clr.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
clr.output_dataset_version_uuid as dataset_version_uuid,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
ARRAY_AGG(DISTINCT ARRAY[
input_fields.namespace_name,
input_fields.dataset_name,
CAST(clr.input_dataset_version_uuid AS VARCHAR),
input_fields.field_name,
clr.transformation_description,
clr.transformation_type
]) AS inputFields,
clr.output_dataset_version_uuid as dataset_version_uuid
FROM column_lineage_recursive clr
INNER JOIN dataset_fields_view output_fields ON clr.output_dataset_field_uuid = output_fields.uuid -- hidden datasets will be filtered
LEFT JOIN dataset_fields_view input_fields ON clr.input_dataset_field_uuid = input_fields.uuid
Expand All @@ -161,11 +164,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
clr.output_dataset_version_uuid,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
clr.updated_at
clr.output_dataset_version_uuid
""")
Set<ColumnLineageNodeData> getLineage(
int depth,
Expand Down Expand Up @@ -193,25 +192,23 @@ dataset_fields_view AS (
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(c.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
c.output_dataset_version_uuid as dataset_version_uuid,
c.transformation_description,
c.transformation_type,
c.created_at,
c.updated_at
ARRAY_AGG(DISTINCT ARRAY[
input_fields.namespace_name,
input_fields.dataset_name,
CAST(c.input_dataset_version_uuid AS VARCHAR),
input_fields.field_name,
c.transformation_description,
c.transformation_type
]) AS inputFields,
null as dataset_version_uuid
FROM selected_column_lineage c
INNER JOIN dataset_fields_view output_fields ON c.output_dataset_field_uuid = output_fields.uuid
LEFT JOIN dataset_fields_view input_fields ON c.input_dataset_field_uuid = input_fields.uuid
GROUP BY
output_fields.namespace_name,
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
c.output_dataset_version_uuid,
c.transformation_description,
c.transformation_type,
c.created_at,
c.updated_at
output_fields.type
""")
/**
* Each dataset is identified by a pair of strings (namespace and name). A query returns column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@

package marquez.db.mappers;

import static marquez.db.Columns.TRANSFORMATION_DESCRIPTION;
import static marquez.db.Columns.TRANSFORMATION_TYPE;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.uuidOrThrow;
import static marquez.db.Columns.uuidOrNull;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -37,11 +35,9 @@ public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws
return new ColumnLineageNodeData(
stringOrThrow(results, Columns.NAMESPACE_NAME),
stringOrThrow(results, Columns.DATASET_NAME),
uuidOrThrow(results, Columns.DATASET_VERSION_UUID),
uuidOrNull(results, Columns.DATASET_VERSION_UUID),
stringOrThrow(results, Columns.FIELD_NAME),
stringOrNull(results, Columns.TYPE),
stringOrNull(results, TRANSFORMATION_DESCRIPTION),
stringOrNull(results, TRANSFORMATION_TYPE),
toInputFields(results, "inputFields"));
}

Expand All @@ -57,7 +53,10 @@ public static ImmutableList<InputFieldNodeData> toInputFields(ResultSet results,
return ImmutableList.copyOf(
Arrays.asList(deserializedArray).stream()
.map(o -> (String[]) o)
.map(arr -> new InputFieldNodeData(arr[0], arr[1], UUID.fromString(arr[2]), arr[3]))
.map(
arr ->
new InputFieldNodeData(
arr[0], arr[1], UUID.fromString(arr[2]), arr[3], arr[4], arr[5]))
.collect(Collectors.toList()));
}
}
49 changes: 45 additions & 4 deletions api/src/main/java/marquez/db/models/ColumnLineageNodeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,63 @@

package marquez.db.models;

import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import marquez.service.models.ColumnLineageInputField;

@Getter
@AllArgsConstructor
public class ColumnLineageNodeData implements NodeData {
@NonNull String namespace;
@NonNull String dataset;
@Nullable UUID datasetVersion;
@NonNull String field;
@Nullable String fieldType;
String transformationDescription;
String transformationType;
@Nullable String transformationDescription;
@Nullable String transformationType;
@NonNull List<InputFieldNodeData> inputFields;

public ColumnLineageNodeData(
String namespace,
String dataset,
UUID datasetVersion,
String field,
String fieldType,
ImmutableList<InputFieldNodeData> inputFields) {
this.namespace = namespace;
this.dataset = dataset;
this.datasetVersion = datasetVersion;
this.field = field;
this.fieldType = fieldType;
this.inputFields = inputFields;
}

/**
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
*/
public String getTransformationDescription() {
return Optional.ofNullable(inputFields).map(List::stream).stream()
.flatMap(Function.identity())
.findAny()
.map(d -> d.getTransformationDescription())
.orElse(null);
}

/**
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
*/
public String getTransformationType() {
return Optional.ofNullable(inputFields).map(List::stream).stream()
.flatMap(Function.identity())
.findAny()
.map(d -> d.getTransformationType())
.orElse(null);
}
}
2 changes: 2 additions & 0 deletions api/src/main/java/marquez/db/models/InputFieldNodeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ public class InputFieldNodeData {
@NonNull String dataset;
@Nullable UUID datasetVersion;
@NonNull String field;
String transformationDescription;
String transformationType;
}
8 changes: 5 additions & 3 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,16 @@ public void enrichWithColumnLineage(List<Dataset> datasets) {
.add(
ColumnLineage.builder()
.name(nodeData.getField())
.transformationDescription(nodeData.getTransformationDescription())
.transformationType(nodeData.getTransformationType())
.inputFields(
nodeData.getInputFields().stream()
.map(
f ->
new ColumnLineageInputField(
f.getNamespace(), f.getDataset(), f.getField()))
f.getNamespace(),
f.getDataset(),
f.getField(),
f.getTransformationDescription(),
f.getTransformationType()))
.collect(Collectors.toList()))
.build());
});
Expand Down
32 changes: 30 additions & 2 deletions api/src/main/java/marquez/service/models/ColumnLineage.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package marquez.service.models;

import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import lombok.Builder;
import lombok.EqualsAndHashCode;
Expand All @@ -19,6 +22,31 @@
public class ColumnLineage {
@NotNull private String name;
@NotNull private List<ColumnLineageInputField> inputFields;
@NotNull private String transformationDescription;
@NotNull private String transformationType;

@Nullable private String transformationDescription;
@Nullable private String transformationType;

/**
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
*/
public String getTransformationDescription() {
return Optional.ofNullable(inputFields).map(List::stream).stream()
.flatMap(Function.identity())
.findAny()
.map(d -> d.getTransformationDescription())
.orElse(null);
}

/**
* @deprecated Moved into {@link ColumnLineageInputField} to support multiple jobs writing to a
* single dataset. This method is scheduled to be removed in release {@code 0.30.0}.
*/
public String getTransformationType() {
return Optional.ofNullable(inputFields).map(List::stream).stream()
.flatMap(Function.identity())
.findAny()
.map(d -> d.getTransformationType())
.orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ public class ColumnLineageInputField {
@NotNull private String namespace;
@NotNull private String dataset;
@NotNull private String field;
@NotNull private String transformationDescription;
@NotNull private String transformationType;
}
Loading

0 comments on commit a7ecf04

Please sign in to comment.