Skip to content

Commit

Permalink
Add Java client method for dataset/job lineage
Browse files Browse the repository at this point in the history
Signed-off-by: David Goss <david.goss@matillion.com>
  • Loading branch information
davidjgoss committed Sep 24, 2023
1 parent 6003af6 commit 51a4055
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 16 deletions.
11 changes: 4 additions & 7 deletions api/src/main/java/marquez/service/models/NodeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,10 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import marquez.db.models.ColumnLineageNodeData;

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXTERNAL_PROPERTY,
property = "type")
@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION)
@JsonSubTypes({
@JsonSubTypes.Type(value = DatasetData.class, name = "DATASET"),
@JsonSubTypes.Type(value = JobData.class, name = "JOB"),
@JsonSubTypes.Type(value = ColumnLineageNodeData.class, name = "DATASET_FIELD")
@JsonSubTypes.Type(DatasetData.class),
@JsonSubTypes.Type(JobData.class),
@JsonSubTypes.Type(ColumnLineageNodeData.class)
})
public interface NodeData {}
1 change: 0 additions & 1 deletion api/src/test/resources/column_lineage/node.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"id": "datasetField:namespace:commonDataset:columnA",
"type": "DATASET_FIELD",
"data": {
"type": "DATASET_FIELD",
"namespace": "namespace",
"dataset": "otherDataset",
"field": "columnA",
Expand Down
1 change: 0 additions & 1 deletion api/src/test/resources/lineage/node.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"id": "dataset:namespace:commonDataset",
"type": "DATASET",
"data": {
"type": "DATASET",
"id": {
"namespace": "namespace",
"name": "commonDataset"
Expand Down
9 changes: 9 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ public enum SortDirection {
@Getter public final String value;
}

public Lineage getLineage(NodeId nodeId) {
return getLineage(nodeId, DEFAULT_LINEAGE_GRAPH_DEPTH);
}

public Lineage getLineage(NodeId nodeId, int depth) {
final String bodyAsJson = http.get(url.toLineageUrl(nodeId, depth));
return Lineage.fromJson(bodyAsJson);
}

public Lineage getColumnLineage(NodeId nodeId) {
return getColumnLineage(nodeId, DEFAULT_LINEAGE_GRAPH_DEPTH, false);
}
Expand Down
4 changes: 4 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezPathV1.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ static String searchPath() {
return path("/search");
}

static String lineagePath() {
return path("/lineage/");
}

static String columnLineagePath() {
return path("/column-lineage/");
}
Expand Down
8 changes: 8 additions & 0 deletions clients/java/src/main/java/marquez/client/MarquezUrl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static marquez.client.MarquezPathV1.fieldTagPath;
import static marquez.client.MarquezPathV1.jobPath;
import static marquez.client.MarquezPathV1.jobVersionPath;
import static marquez.client.MarquezPathV1.lineagePath;
import static marquez.client.MarquezPathV1.listDatasetVersionsPath;
import static marquez.client.MarquezPathV1.listDatasetsPath;
import static marquez.client.MarquezPathV1.listJobVersionsPath;
Expand Down Expand Up @@ -208,6 +209,13 @@ URL toSearchUrl(
return from(searchPath(), queryParams.build());
}

URL toLineageUrl(NodeId nodeId, int depth) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", nodeId.getValue());
queryParams.put("depth", String.valueOf(depth));
return from(lineagePath(), queryParams.build());
}

URL toColumnLineageUrl(NodeId nodeId, int depth, boolean withDownstream) {
final ImmutableMap.Builder queryParams = new ImmutableMap.Builder();
queryParams.put("nodeId", nodeId.getValue());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import java.time.Instant;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;

@Getter
@AllArgsConstructor
@EqualsAndHashCode
public class DatasetNodeData implements NodeData {
@NonNull private final DatasetId id;
@NonNull private final DatasetType type;
@NonNull private final String name;
@NonNull private final String physicalName;
@NonNull private final Instant createdAt;
@NonNull private final Instant updatedAt;
@NonNull private final String namespace;
@NonNull private final String sourceName;
@NonNull private final List<Field> fields;
@NonNull private final Set<String> tags;
@Nullable private final Instant lastModifiedAt;
@Nullable private final String description;
@Nullable private final String lastLifecycleState;
}
34 changes: 34 additions & 0 deletions clients/java/src/main/java/marquez/client/models/JobNodeData.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.client.models;

import java.net.URL;
import java.time.Instant;
import java.util.Set;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;

@Getter
@AllArgsConstructor
@EqualsAndHashCode
public class JobNodeData implements NodeData {
@NonNull private final JobId id;
@NonNull private final JobType type;
@NonNull private final String name;
@NonNull private final String simpleName;
@Nullable private final String parentJobName;
@NonNull private final Instant createdAt;
@NonNull private final Instant updatedAt;
@NonNull private final String namespace;
@NonNull private final Set<DatasetId> inputs;
@NonNull private final Set<DatasetId> outputs;
@Nullable private final URL location;
@Nullable private final String description;
@Nullable private final Run latestRun;
}
11 changes: 6 additions & 5 deletions clients/java/src/main/java/marquez/client/models/NodeData.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXTERNAL_PROPERTY,
property = "type")
@JsonSubTypes({@JsonSubTypes.Type(value = ColumnLineageNodeData.class, name = "DATASET_FIELD")})
@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION)
@JsonSubTypes({
@JsonSubTypes.Type(DatasetNodeData.class),
@JsonSubTypes.Type(JobNodeData.class),
@JsonSubTypes.Type(ColumnLineageNodeData.class),
})
public interface NodeData {}
46 changes: 44 additions & 2 deletions clients/java/src/test/java/marquez/client/MarquezClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
import marquez.client.models.Dataset;
import marquez.client.models.DatasetFieldId;
import marquez.client.models.DatasetId;
import marquez.client.models.DatasetNodeData;
import marquez.client.models.DatasetType;
import marquez.client.models.DatasetVersion;
import marquez.client.models.DbTable;
import marquez.client.models.DbTableMeta;
Expand Down Expand Up @@ -410,13 +412,39 @@ public class MarquezClientTest {
STREAM_DESCRIPTION,
CREATED_BY_RUN,
DB_FACETS);

private static final DatasetId DATASET_ID = new DatasetId(NAMESPACE_NAME, DB_TABLE_NAME);

private static final DatasetFieldId DATASET_FIELD_ID =
new DatasetFieldId(NAMESPACE_NAME, DB_TABLE_NAME, FIELD_NAME);

private static final DatasetFieldId DATASET_FIELD_VERSION_ID =
new DatasetFieldId(NAMESPACE_NAME, DB_TABLE_NAME, FIELD_NAME);

private static final Node LINEAGE_NODE =
new Node(
NodeId.of(DATASET_ID),
NodeType.DATASET,
new DatasetNodeData(
DATASET_ID,
DatasetType.DB_TABLE,
DB_TABLE_NAME,
DB_TABLE_PHYSICAL_NAME,
CREATED_AT,
UPDATED_AT,
NAMESPACE_NAME,
DB_TABLE_SOURCE_NAME,
FIELDS,
TAGS,
null,
DB_TABLE_DESCRIPTION,
null),
ImmutableSet.of(
Edge.of(NodeId.of(DATASET_ID), NodeId.of(new DatasetId("namespace", "inDataset")))),
ImmutableSet.of(
Edge.of(NodeId.of(new DatasetId("namespace", "outDataset")), NodeId.of(DATASET_ID))));

private static final Node COLUMN_LINEAGE_NODE =
new Node(
NodeId.of(DATASET_FIELD_ID),
NodeType.DATASET_FIELD,
Expand Down Expand Up @@ -1000,9 +1028,23 @@ public void testCreateTag() throws Exception {
}

@Test
public void testGetColumnLineage() throws Exception {
public void testGetLineage() throws Exception {
MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(LINEAGE_NODE));
String lineageJson = lineage.toJson();
when(http.get(buildUrlFor("/lineage?nodeId=dataset%3Anamespace%3Adataset&depth=20")))
.thenReturn(lineageJson);

Node retrievedNode =
client.getLineage(NodeId.of(new DatasetId("namespace", "dataset"))).getGraph().stream()
.findAny()
.get();
assertThat(retrievedNode).isEqualTo(LINEAGE_NODE);
}

@Test
public void testGetColumnLineage() throws Exception {
MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(COLUMN_LINEAGE_NODE));
String lineageJson = lineage.toJson();
when(http.get(
buildUrlFor(
"/column-lineage?nodeId=dataset%3Anamespace%3Adataset&depth=20&withDownstream=false")))
Expand All @@ -1015,7 +1057,7 @@ public void testGetColumnLineage() throws Exception {
.stream()
.findAny()
.get();
assertThat(retrievedNode).isEqualTo(LINEAGE_NODE);
assertThat(retrievedNode).isEqualTo(COLUMN_LINEAGE_NODE);
}

private URL buildUrlFor(String pathTemplate) throws Exception {
Expand Down
48 changes: 48 additions & 0 deletions clients/java/src/test/java/marquez/client/MarquezUrlTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,54 @@ void testEncodedMarquezUrl() {
"http://marquez:5000/namespace/s3:%2F%2Fbucket/job/jname", url.toString());
}

@Test
void testToLineageUrl() {
Assertions.assertEquals(
"http://marquez:5000/api/v1/lineage?nodeId=dataset%3Anamespace%3Adataset&depth=20",
marquezUrl.toLineageUrl(NodeId.of(new DatasetId("namespace", "dataset")), 20).toString());

Assertions.assertEquals(
"http://marquez:5000/api/v1/lineage?nodeId=datasetField%3Anamespace%3Adataset%3Afield&depth=20",
marquezUrl
.toLineageUrl(NodeId.of(new DatasetFieldId("namespace", "dataset", "field")), 20)
.toString());

Assertions.assertEquals(
"http://marquez:5000/api/v1/lineage?nodeId=job%3Anamespace%3Ajob&depth=20",
marquezUrl.toLineageUrl(NodeId.of(new JobId("namespace", "job")), 20).toString());

Assertions.assertEquals(
"http://marquez:5000/api/v1/lineage?nodeId=dataset%3Anamespace%3Adataset%23"
+ version
+ "&depth=20",
marquezUrl
.toLineageUrl(
NodeId.of(new DatasetVersionId("namespace", "dataset", UUID.fromString(version))),
20)
.toString());

Assertions.assertEquals(
"http://marquez:5000/api/v1/lineage?nodeId=datasetField%3Anamespace%3Adataset%3Afield%23"
+ version
+ "&depth=20",
marquezUrl
.toLineageUrl(
NodeId.of(
new DatasetFieldVersionId(
"namespace", "dataset", "field", UUID.fromString(version))),
20)
.toString());

Assertions.assertEquals(
"http://marquez:5000/api/v1/lineage?nodeId=job%3Anamespace%3Ajob%23"
+ version
+ "&depth=20",
marquezUrl
.toLineageUrl(
NodeId.of(new JobVersionId("namespace", "job", UUID.fromString(version))), 20)
.toString());
}

@Test
void testToColumnLineageUrl() {
Assertions.assertEquals(
Expand Down
Loading

0 comments on commit 51a4055

Please sign in to comment.