Skip to content

Commit

Permalink
point-in-timea for column-level lineage
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Nov 28, 2022
1 parent 0c78cd8 commit 7ba6cc5
Show file tree
Hide file tree
Showing 17 changed files with 465 additions and 357 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.28.0...HEAD)
* Column-lineage endpoints supports point-in-time requests [`#2265`](https://github.com/MarquezProject/marquez/pull/2265) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Enable requesting `column-lineage` endpoint by a dataset version, job version or dataset field of a specific dataset version.*

## [0.28.0](https://github.com/MarquezProject/marquez/compare/0.27.0...0.28.0) - 2022-11-21

Expand Down
8 changes: 5 additions & 3 deletions api/src/main/java/marquez/api/ColumnLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import javax.validation.constraints.NotNull;
import javax.ws.rs.DefaultValue;
Expand Down Expand Up @@ -44,7 +43,10 @@ public Response getLineage(
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth,
@QueryParam("withDownstream") @DefaultValue("false") boolean withDownstream)
throws ExecutionException, InterruptedException {
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream, Instant.now()))
.build();
if (nodeId.hasVersion() && withDownstream) {
return Response.status(400, "Node version cannot be specified when withDownstream is true")
.build();
}
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream)).build();
}
}
31 changes: 31 additions & 0 deletions api/src/main/java/marquez/common/models/DatasetFieldVersionId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.common.models;

import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

/** ID for {@code DatasetField} with a version of {@code Dataset}. */
@EqualsAndHashCode
@AllArgsConstructor
@ToString
public class DatasetFieldVersionId {

@Getter private final DatasetId datasetId;
@Getter private final FieldName fieldName;
@Getter private final UUID version;

public static DatasetFieldVersionId of(
String namespace, String datasetName, String field, UUID version) {
return new DatasetFieldVersionId(
new DatasetId(NamespaceName.of(namespace), DatasetName.of(datasetName)),
FieldName.of(field),
version);
}
}
5 changes: 5 additions & 0 deletions api/src/main/java/marquez/common/models/JobVersionId.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ public class JobVersionId {
@NonNull NamespaceName namespace;
@NonNull JobName name;
@NonNull UUID version;

public static JobVersionId of(
final NamespaceName namespaceName, final JobName jobName, final UUID version) {
return new JobVersionId(namespaceName, jobName, version);
}
}
8 changes: 6 additions & 2 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(clr.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
clr.output_dataset_version_uuid as dataset_version_uuid,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
Expand All @@ -160,6 +161,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
clr.output_dataset_version_uuid,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
Expand Down Expand Up @@ -191,7 +193,8 @@ dataset_fields_view AS (
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(c.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
c.output_dataset_version_uuid as dataset_version_uuid,
c.transformation_description,
c.transformation_type,
c.created_at,
Expand All @@ -204,6 +207,7 @@ dataset_fields_view AS (
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
c.output_dataset_version_uuid,
c.transformation_description,
c.transformation_type,
c.created_at,
Expand Down
40 changes: 38 additions & 2 deletions api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import marquez.db.mappers.DatasetFieldMapper;
import marquez.db.mappers.DatasetFieldRowMapper;
import marquez.db.mappers.FieldDataMapper;
import marquez.db.mappers.PairUuidInstantMapper;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.InputFieldData;
import marquez.db.models.TagRow;
import marquez.service.models.Dataset;
import marquez.service.models.DatasetVersion;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindBean;
import org.jdbi.v3.sqlobject.statement.SqlBatch;
Expand All @@ -32,6 +34,7 @@
@RegisterRowMapper(DatasetFieldRowMapper.class)
@RegisterRowMapper(DatasetFieldMapper.class)
@RegisterRowMapper(FieldDataMapper.class)
@RegisterRowMapper(PairUuidInstantMapper.class)
public interface DatasetFieldDao extends BaseDao {
@SqlQuery(
"""
Expand Down Expand Up @@ -98,13 +101,26 @@ default Dataset updateTags(

@SqlQuery(
"""
SELECT df.uuid
FROM dataset_fields df
SELECT df.uuid, max(dv.created_at)
FROM dataset_fields df
JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
JOIN dataset_versions_field_mapping AS fm ON fm.dataset_field_uuid = df.uuid
JOIN dataset_versions AS dv ON dv.uuid = fm.dataset_version_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
GROUP BY df.uuid
""")
List<UUID> findDatasetFieldsUuids(String namespaceName, String datasetName);

@SqlQuery(
"""
SELECT df.uuid, dv.created_at
FROM dataset_fields df
JOIN dataset_versions_field_mapping AS fm ON fm.dataset_field_uuid = df.uuid
JOIN dataset_versions AS dv ON dv.uuid = :datasetVersion
WHERE fm.dataset_version_uuid = :datasetVersion
""")
List<Pair<UUID, Instant>> findDatasetVersionFieldsUuids(UUID datasetVersion);

@SqlQuery(
"""
WITH latest_run AS (
Expand All @@ -121,6 +137,15 @@ WITH latest_run AS (
""")
List<UUID> findFieldsUuidsByJob(String namespaceName, String jobName);

@SqlQuery(
"""
SELECT dataset_fields.uuid, r.created_at
FROM dataset_fields
JOIN dataset_versions ON dataset_versions.dataset_uuid = dataset_fields.dataset_uuid
JOIN runs_view r ON r.job_version_uuid = :jobVersion
""")
List<Pair<UUID, Instant>> findFieldsUuidsByJobVersion(UUID jobVersion);

@SqlQuery(
"""
SELECT df.uuid
Expand All @@ -131,6 +156,17 @@ WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symli
""")
Optional<UUID> findUuid(String namespaceName, String datasetName, String name);

@SqlQuery(
"""
SELECT df.uuid, dv.created_at
FROM dataset_fields df
JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
JOIN dataset_versions AS dv ON dv.uuid = :datasetVersion
JOIN dataset_versions_field_mapping AS fm ON fm.dataset_field_uuid = df.uuid
WHERE fm.dataset_version_uuid = :datasetVersion AND df.name = :fieldName
""")
List<Pair<UUID, Instant>> findDatasetVersionFieldsUuids(String fieldName, UUID datasetVersion);

@SqlQuery(
"SELECT f.*, "
+ "ARRAY(SELECT t.name "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import static marquez.db.Columns.TRANSFORMATION_TYPE;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.uuidOrThrow;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import marquez.common.Utils;
Expand All @@ -35,6 +37,7 @@ public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws
return new ColumnLineageNodeData(
stringOrThrow(results, Columns.NAMESPACE_NAME),
stringOrThrow(results, Columns.DATASET_NAME),
uuidOrThrow(results, Columns.DATASET_VERSION_UUID),
stringOrThrow(results, Columns.FIELD_NAME),
stringOrThrow(results, Columns.TYPE),
stringOrNull(results, TRANSFORMATION_DESCRIPTION),
Expand All @@ -54,7 +57,7 @@ public static ImmutableList<InputFieldNodeData> toInputFields(ResultSet results,
return ImmutableList.copyOf(
Arrays.asList(deserializedArray).stream()
.map(o -> (String[]) o)
.map(arr -> new InputFieldNodeData(arr[0], arr[1], arr[2]))
.map(arr -> new InputFieldNodeData(arr[0], arr[1], UUID.fromString(arr[2]), arr[3]))
.collect(Collectors.toList()));
}
}
28 changes: 28 additions & 0 deletions api/src/main/java/marquez/db/mappers/PairUuidInstantMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.timestampOrNull;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.UUID;
import lombok.NonNull;
import marquez.db.Columns;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public final class PairUuidInstantMapper implements RowMapper<Pair<UUID, Instant>> {
@Override
public Pair<UUID, Instant> map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return Pair.of(
uuidOrThrow(results, Columns.ROW_UUID), timestampOrNull(results, Columns.CREATED_AT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package marquez.db.models;

import java.util.List;
import java.util.UUID;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
Expand All @@ -15,6 +17,7 @@
public class ColumnLineageNodeData implements NodeData {
@NonNull String namespace;
@NonNull String dataset;
@Nullable UUID datasetVersion;
@NonNull String field;
@NonNull String fieldType;
String transformationDescription;
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/marquez/db/models/InputFieldNodeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

package marquez.db.models;

import java.util.UUID;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;

@Getter
@AllArgsConstructor
@ToString
public class InputFieldNodeData {
@NonNull String namespace;
@NonNull String dataset;
@Nullable UUID datasetVersion;
@NonNull String field;
}
Loading

0 comments on commit 7ba6cc5

Please sign in to comment.