Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

point-in-time endpoint for column-level lineage #2265

Merged
merged 2 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.28.0...HEAD)
* Column-lineage endpoints supports point-in-time requests [`#2265`](https://github.com/MarquezProject/marquez/pull/2265) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Enable requesting `column-lineage` endpoint by a dataset version, job version or dataset field of a specific dataset version.*

## [0.28.0](https://github.com/MarquezProject/marquez/compare/0.27.0...0.28.0) - 2022-11-21

Expand Down
8 changes: 5 additions & 3 deletions api/src/main/java/marquez/api/ColumnLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import javax.validation.constraints.NotNull;
import javax.ws.rs.DefaultValue;
Expand Down Expand Up @@ -44,7 +43,10 @@ public Response getLineage(
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth,
@QueryParam("withDownstream") @DefaultValue("false") boolean withDownstream)
throws ExecutionException, InterruptedException {
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream, Instant.now()))
.build();
if (nodeId.hasVersion() && withDownstream) {
return Response.status(400, "Node version cannot be specified when withDownstream is true")
.build();
}
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream)).build();
}
}
31 changes: 31 additions & 0 deletions api/src/main/java/marquez/common/models/DatasetFieldVersionId.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.common.models;

import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;

/** ID for {@code DatasetField} with a version of {@code Dataset}. */
@EqualsAndHashCode
@AllArgsConstructor
@ToString
public class DatasetFieldVersionId {

@Getter private final DatasetId datasetId;
@Getter private final FieldName fieldName;
@Getter private final UUID version;

public static DatasetFieldVersionId of(
String namespace, String datasetName, String field, UUID version) {
return new DatasetFieldVersionId(
new DatasetId(NamespaceName.of(namespace), DatasetName.of(datasetName)),
FieldName.of(field),
version);
}
}
5 changes: 5 additions & 0 deletions api/src/main/java/marquez/common/models/JobVersionId.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ public class JobVersionId {
@NonNull NamespaceName namespace;
@NonNull JobName name;
@NonNull UUID version;

public static JobVersionId of(
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
final NamespaceName namespaceName, final JobName jobName, final UUID version) {
return new JobVersionId(namespaceName, jobName, version);
}
}
8 changes: 6 additions & 2 deletions api/src/main/java/marquez/db/ColumnLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(clr.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
clr.output_dataset_version_uuid as dataset_version_uuid,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
Expand All @@ -160,6 +161,7 @@ WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
clr.output_dataset_version_uuid,
clr.transformation_description,
clr.transformation_type,
clr.created_at,
Expand Down Expand Up @@ -191,7 +193,8 @@ dataset_fields_view AS (
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
ARRAY_AGG(ARRAY[input_fields.namespace_name, input_fields.dataset_name, input_fields.field_name]) AS inputFields,
ARRAY_AGG(DISTINCT ARRAY[input_fields.namespace_name, input_fields.dataset_name, CAST(c.input_dataset_version_uuid AS VARCHAR), input_fields.field_name]) AS inputFields,
c.output_dataset_version_uuid as dataset_version_uuid,
c.transformation_description,
c.transformation_type,
c.created_at,
Expand All @@ -204,6 +207,7 @@ dataset_fields_view AS (
output_fields.dataset_name,
output_fields.field_name,
output_fields.type,
c.output_dataset_version_uuid,
c.transformation_description,
c.transformation_type,
c.created_at,
Expand Down
40 changes: 38 additions & 2 deletions api/src/main/java/marquez/db/DatasetFieldDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import marquez.db.mappers.DatasetFieldMapper;
import marquez.db.mappers.DatasetFieldRowMapper;
import marquez.db.mappers.FieldDataMapper;
import marquez.db.mappers.PairUuidInstantMapper;
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.InputFieldData;
import marquez.db.models.TagRow;
import marquez.service.models.Dataset;
import marquez.service.models.DatasetVersion;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.BindBean;
import org.jdbi.v3.sqlobject.statement.SqlBatch;
Expand All @@ -32,6 +34,7 @@
@RegisterRowMapper(DatasetFieldRowMapper.class)
@RegisterRowMapper(DatasetFieldMapper.class)
@RegisterRowMapper(FieldDataMapper.class)
@RegisterRowMapper(PairUuidInstantMapper.class)
public interface DatasetFieldDao extends BaseDao {
@SqlQuery(
"""
Expand Down Expand Up @@ -98,13 +101,26 @@ default Dataset updateTags(

@SqlQuery(
"""
SELECT df.uuid
FROM dataset_fields df
SELECT df.uuid, max(dv.created_at)
FROM dataset_fields df
JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
JOIN dataset_versions_field_mapping AS fm ON fm.dataset_field_uuid = df.uuid
JOIN dataset_versions AS dv ON dv.uuid = fm.dataset_version_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
GROUP BY df.uuid
""")
List<UUID> findDatasetFieldsUuids(String namespaceName, String datasetName);

@SqlQuery(
"""
SELECT df.uuid, dv.created_at
FROM dataset_fields df
JOIN dataset_versions_field_mapping AS fm ON fm.dataset_field_uuid = df.uuid
JOIN dataset_versions AS dv ON dv.uuid = :datasetVersion
WHERE fm.dataset_version_uuid = :datasetVersion
""")
List<Pair<UUID, Instant>> findDatasetVersionFieldsUuids(UUID datasetVersion);

@SqlQuery(
"""
WITH latest_run AS (
Expand All @@ -121,6 +137,15 @@ WITH latest_run AS (
""")
List<UUID> findFieldsUuidsByJob(String namespaceName, String jobName);

@SqlQuery(
"""
SELECT dataset_fields.uuid, r.created_at
FROM dataset_fields
JOIN dataset_versions ON dataset_versions.dataset_uuid = dataset_fields.dataset_uuid
JOIN runs_view r ON r.job_version_uuid = :jobVersion
""")
List<Pair<UUID, Instant>> findFieldsUuidsByJobVersion(UUID jobVersion);

@SqlQuery(
"""
SELECT df.uuid
Expand All @@ -131,6 +156,17 @@ WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symli
""")
Optional<UUID> findUuid(String namespaceName, String datasetName, String name);

@SqlQuery(
"""
SELECT df.uuid, dv.created_at
FROM dataset_fields df
JOIN datasets_view AS d ON d.uuid = df.dataset_uuid
JOIN dataset_versions AS dv ON dv.uuid = :datasetVersion
JOIN dataset_versions_field_mapping AS fm ON fm.dataset_field_uuid = df.uuid
WHERE fm.dataset_version_uuid = :datasetVersion AND df.name = :fieldName
""")
List<Pair<UUID, Instant>> findDatasetVersionFieldsUuids(String fieldName, UUID datasetVersion);

@SqlQuery(
"SELECT f.*, "
+ "ARRAY(SELECT t.name "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import static marquez.db.Columns.TRANSFORMATION_TYPE;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.uuidOrThrow;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import marquez.common.Utils;
Expand All @@ -35,6 +37,7 @@ public ColumnLineageNodeData map(ResultSet results, StatementContext ctx) throws
return new ColumnLineageNodeData(
stringOrThrow(results, Columns.NAMESPACE_NAME),
stringOrThrow(results, Columns.DATASET_NAME),
uuidOrThrow(results, Columns.DATASET_VERSION_UUID),
stringOrThrow(results, Columns.FIELD_NAME),
stringOrThrow(results, Columns.TYPE),
stringOrNull(results, TRANSFORMATION_DESCRIPTION),
Expand All @@ -54,7 +57,7 @@ public static ImmutableList<InputFieldNodeData> toInputFields(ResultSet results,
return ImmutableList.copyOf(
Arrays.asList(deserializedArray).stream()
.map(o -> (String[]) o)
.map(arr -> new InputFieldNodeData(arr[0], arr[1], arr[2]))
.map(arr -> new InputFieldNodeData(arr[0], arr[1], UUID.fromString(arr[2]), arr[3]))
.collect(Collectors.toList()));
}
}
28 changes: 28 additions & 0 deletions api/src/main/java/marquez/db/mappers/PairUuidInstantMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.timestampOrNull;
import static marquez.db.Columns.uuidOrThrow;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.UUID;
import lombok.NonNull;
import marquez.db.Columns;
import org.apache.commons.lang3.tuple.Pair;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public final class PairUuidInstantMapper implements RowMapper<Pair<UUID, Instant>> {
@Override
public Pair<UUID, Instant> map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
return Pair.of(
uuidOrThrow(results, Columns.ROW_UUID), timestampOrNull(results, Columns.CREATED_AT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package marquez.db.models;

import java.util.List;
import java.util.UUID;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
Expand All @@ -15,6 +17,7 @@
public class ColumnLineageNodeData implements NodeData {
@NonNull String namespace;
@NonNull String dataset;
@Nullable UUID datasetVersion;
@NonNull String field;
@NonNull String fieldType;
String transformationDescription;
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/marquez/db/models/InputFieldNodeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

package marquez.db.models;

import java.util.UUID;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;

@Getter
@AllArgsConstructor
@ToString
public class InputFieldNodeData {
@NonNull String namespace;
@NonNull String dataset;
@Nullable UUID datasetVersion;
@NonNull String field;
}
Loading