From 51a40550a0b802bf0daebb8b18b3130652691411 Mon Sep 17 00:00:00 2001 From: David Goss Date: Wed, 23 Aug 2023 19:38:01 +0100 Subject: [PATCH] Add Java client method for dataset/job lineage Signed-off-by: David Goss --- .../java/marquez/service/models/NodeData.java | 11 +- .../test/resources/column_lineage/node.json | 1 - api/src/test/resources/lineage/node.json | 1 - .../java/marquez/client/MarquezClient.java | 9 ++ .../java/marquez/client/MarquezPathV1.java | 4 + .../main/java/marquez/client/MarquezUrl.java | 8 ++ .../client/models/DatasetNodeData.java | 34 ++++++ .../marquez/client/models/JobNodeData.java | 34 ++++++ .../java/marquez/client/models/NodeData.java | 11 +- .../marquez/client/MarquezClientTest.java | 46 +++++++- .../java/marquez/client/MarquezUrlTest.java | 48 ++++++++ .../java/marquez/client/models/NodeTest.java | 108 ++++++++++++++++++ 12 files changed, 299 insertions(+), 16 deletions(-) create mode 100644 clients/java/src/main/java/marquez/client/models/DatasetNodeData.java create mode 100644 clients/java/src/main/java/marquez/client/models/JobNodeData.java create mode 100644 clients/java/src/test/java/marquez/client/models/NodeTest.java diff --git a/api/src/main/java/marquez/service/models/NodeData.java b/api/src/main/java/marquez/service/models/NodeData.java index 912bf492f0..c3cdcbdf13 100644 --- a/api/src/main/java/marquez/service/models/NodeData.java +++ b/api/src/main/java/marquez/service/models/NodeData.java @@ -9,13 +9,10 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import marquez.db.models.ColumnLineageNodeData; -@JsonTypeInfo( - use = JsonTypeInfo.Id.NAME, - include = JsonTypeInfo.As.EXTERNAL_PROPERTY, - property = "type") +@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION) @JsonSubTypes({ - @JsonSubTypes.Type(value = DatasetData.class, name = "DATASET"), - @JsonSubTypes.Type(value = JobData.class, name = "JOB"), - @JsonSubTypes.Type(value = ColumnLineageNodeData.class, name = "DATASET_FIELD") + @JsonSubTypes.Type(DatasetData.class), + @JsonSubTypes.Type(JobData.class), + @JsonSubTypes.Type(ColumnLineageNodeData.class) }) public interface NodeData {} diff --git a/api/src/test/resources/column_lineage/node.json b/api/src/test/resources/column_lineage/node.json index 0ad713d4ec..91765b2671 100644 --- a/api/src/test/resources/column_lineage/node.json +++ b/api/src/test/resources/column_lineage/node.json @@ -2,7 +2,6 @@ "id": "datasetField:namespace:commonDataset:columnA", "type": "DATASET_FIELD", "data": { - "type": "DATASET_FIELD", "namespace": "namespace", "dataset": "otherDataset", "field": "columnA", diff --git a/api/src/test/resources/lineage/node.json b/api/src/test/resources/lineage/node.json index 7499272951..03e3669a9d 100644 --- a/api/src/test/resources/lineage/node.json +++ b/api/src/test/resources/lineage/node.json @@ -2,7 +2,6 @@ "id": "dataset:namespace:commonDataset", "type": "DATASET", "data": { - "type": "DATASET", "id": { "namespace": "namespace", "name": "commonDataset" diff --git a/clients/java/src/main/java/marquez/client/MarquezClient.java b/clients/java/src/main/java/marquez/client/MarquezClient.java index f8a21b2bb5..3d55eac28f 100644 --- a/clients/java/src/main/java/marquez/client/MarquezClient.java +++ b/clients/java/src/main/java/marquez/client/MarquezClient.java @@ -116,6 +116,15 @@ public enum SortDirection { @Getter public final String value; } + public Lineage getLineage(NodeId nodeId) { + return getLineage(nodeId, DEFAULT_LINEAGE_GRAPH_DEPTH); + } + + public Lineage getLineage(NodeId nodeId, int depth) { + final String bodyAsJson = http.get(url.toLineageUrl(nodeId, depth)); + return Lineage.fromJson(bodyAsJson); + } + public Lineage getColumnLineage(NodeId nodeId) { return getColumnLineage(nodeId, DEFAULT_LINEAGE_GRAPH_DEPTH, false); } diff --git a/clients/java/src/main/java/marquez/client/MarquezPathV1.java b/clients/java/src/main/java/marquez/client/MarquezPathV1.java index 7736b045f7..2ceba79f9e 100644 --- a/clients/java/src/main/java/marquez/client/MarquezPathV1.java +++ b/clients/java/src/main/java/marquez/client/MarquezPathV1.java @@ -178,6 +178,10 @@ static String searchPath() { return path("/search"); } + static String lineagePath() { + return path("/lineage/"); + } + static String columnLineagePath() { return path("/column-lineage/"); } diff --git a/clients/java/src/main/java/marquez/client/MarquezUrl.java b/clients/java/src/main/java/marquez/client/MarquezUrl.java index d761199e90..cc460c4a4f 100644 --- a/clients/java/src/main/java/marquez/client/MarquezUrl.java +++ b/clients/java/src/main/java/marquez/client/MarquezUrl.java @@ -16,6 +16,7 @@ import static marquez.client.MarquezPathV1.fieldTagPath; import static marquez.client.MarquezPathV1.jobPath; import static marquez.client.MarquezPathV1.jobVersionPath; +import static marquez.client.MarquezPathV1.lineagePath; import static marquez.client.MarquezPathV1.listDatasetVersionsPath; import static marquez.client.MarquezPathV1.listDatasetsPath; import static marquez.client.MarquezPathV1.listJobVersionsPath; @@ -208,6 +209,13 @@ URL toSearchUrl( return from(searchPath(), queryParams.build()); } + URL toLineageUrl(NodeId nodeId, int depth) { + final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); + queryParams.put("nodeId", nodeId.getValue()); + queryParams.put("depth", String.valueOf(depth)); + return from(lineagePath(), queryParams.build()); + } + URL toColumnLineageUrl(NodeId nodeId, int depth, boolean withDownstream) { final ImmutableMap.Builder queryParams = new ImmutableMap.Builder(); queryParams.put("nodeId", nodeId.getValue()); diff --git a/clients/java/src/main/java/marquez/client/models/DatasetNodeData.java b/clients/java/src/main/java/marquez/client/models/DatasetNodeData.java new file mode 100644 index 0000000000..a8b00c71dd --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/DatasetNodeData.java @@ -0,0 +1,34 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import java.time.Instant; +import java.util.List; +import java.util.Set; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; + +@Getter +@AllArgsConstructor +@EqualsAndHashCode +public class DatasetNodeData implements NodeData { + @NonNull private final DatasetId id; + @NonNull private final DatasetType type; + @NonNull private final String name; + @NonNull private final String physicalName; + @NonNull private final Instant createdAt; + @NonNull private final Instant updatedAt; + @NonNull private final String namespace; + @NonNull private final String sourceName; + @NonNull private final List fields; + @NonNull private final Set tags; + @Nullable private final Instant lastModifiedAt; + @Nullable private final String description; + @Nullable private final String lastLifecycleState; +} diff --git a/clients/java/src/main/java/marquez/client/models/JobNodeData.java b/clients/java/src/main/java/marquez/client/models/JobNodeData.java new file mode 100644 index 0000000000..480124cc8c --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/JobNodeData.java @@ -0,0 +1,34 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import java.net.URL; +import java.time.Instant; +import java.util.Set; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; + +@Getter +@AllArgsConstructor +@EqualsAndHashCode +public class JobNodeData implements NodeData { + @NonNull private final JobId id; + @NonNull private final JobType type; + @NonNull private final String name; + @NonNull private final String simpleName; + @Nullable private final String parentJobName; + @NonNull private final Instant createdAt; + @NonNull private final Instant updatedAt; + @NonNull private final String namespace; + @NonNull private final Set inputs; + @NonNull private final Set outputs; + @Nullable private final URL location; + @Nullable private final String description; + @Nullable private final Run latestRun; +} diff --git a/clients/java/src/main/java/marquez/client/models/NodeData.java b/clients/java/src/main/java/marquez/client/models/NodeData.java index e919d8c2f3..78a45aea7b 100644 --- a/clients/java/src/main/java/marquez/client/models/NodeData.java +++ b/clients/java/src/main/java/marquez/client/models/NodeData.java @@ -8,9 +8,10 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -@JsonTypeInfo( - use = JsonTypeInfo.Id.NAME, - include = JsonTypeInfo.As.EXTERNAL_PROPERTY, - property = "type") -@JsonSubTypes({@JsonSubTypes.Type(value = ColumnLineageNodeData.class, name = "DATASET_FIELD")}) +@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION) +@JsonSubTypes({ + @JsonSubTypes.Type(DatasetNodeData.class), + @JsonSubTypes.Type(JobNodeData.class), + @JsonSubTypes.Type(ColumnLineageNodeData.class), +}) public interface NodeData {} diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index b5b3dbac3d..5395337bfa 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -74,6 +74,8 @@ import marquez.client.models.Dataset; import marquez.client.models.DatasetFieldId; import marquez.client.models.DatasetId; +import marquez.client.models.DatasetNodeData; +import marquez.client.models.DatasetType; import marquez.client.models.DatasetVersion; import marquez.client.models.DbTable; import marquez.client.models.DbTableMeta; @@ -410,6 +412,9 @@ public class MarquezClientTest { STREAM_DESCRIPTION, CREATED_BY_RUN, DB_FACETS); + + private static final DatasetId DATASET_ID = new DatasetId(NAMESPACE_NAME, DB_TABLE_NAME); + private static final DatasetFieldId DATASET_FIELD_ID = new DatasetFieldId(NAMESPACE_NAME, DB_TABLE_NAME, FIELD_NAME); @@ -417,6 +422,29 @@ public class MarquezClientTest { new DatasetFieldId(NAMESPACE_NAME, DB_TABLE_NAME, FIELD_NAME); private static final Node LINEAGE_NODE = + new Node( + NodeId.of(DATASET_ID), + NodeType.DATASET, + new DatasetNodeData( + DATASET_ID, + DatasetType.DB_TABLE, + DB_TABLE_NAME, + DB_TABLE_PHYSICAL_NAME, + CREATED_AT, + UPDATED_AT, + NAMESPACE_NAME, + DB_TABLE_SOURCE_NAME, + FIELDS, + TAGS, + null, + DB_TABLE_DESCRIPTION, + null), + ImmutableSet.of( + Edge.of(NodeId.of(DATASET_ID), NodeId.of(new DatasetId("namespace", "inDataset")))), + ImmutableSet.of( + Edge.of(NodeId.of(new DatasetId("namespace", "outDataset")), NodeId.of(DATASET_ID)))); + + private static final Node COLUMN_LINEAGE_NODE = new Node( NodeId.of(DATASET_FIELD_ID), NodeType.DATASET_FIELD, @@ -1000,9 +1028,23 @@ public void testCreateTag() throws Exception { } @Test - public void testGetColumnLineage() throws Exception { + public void testGetLineage() throws Exception { MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(LINEAGE_NODE)); String lineageJson = lineage.toJson(); + when(http.get(buildUrlFor("/lineage?nodeId=dataset%3Anamespace%3Adataset&depth=20"))) + .thenReturn(lineageJson); + + Node retrievedNode = + client.getLineage(NodeId.of(new DatasetId("namespace", "dataset"))).getGraph().stream() + .findAny() + .get(); + assertThat(retrievedNode).isEqualTo(LINEAGE_NODE); + } + + @Test + public void testGetColumnLineage() throws Exception { + MarquezClient.Lineage lineage = new MarquezClient.Lineage(ImmutableSet.of(COLUMN_LINEAGE_NODE)); + String lineageJson = lineage.toJson(); when(http.get( buildUrlFor( "/column-lineage?nodeId=dataset%3Anamespace%3Adataset&depth=20&withDownstream=false"))) @@ -1015,7 +1057,7 @@ public void testGetColumnLineage() throws Exception { .stream() .findAny() .get(); - assertThat(retrievedNode).isEqualTo(LINEAGE_NODE); + assertThat(retrievedNode).isEqualTo(COLUMN_LINEAGE_NODE); } private URL buildUrlFor(String pathTemplate) throws Exception { diff --git a/clients/java/src/test/java/marquez/client/MarquezUrlTest.java b/clients/java/src/test/java/marquez/client/MarquezUrlTest.java index 55c24711bf..f0c8ef47fc 100644 --- a/clients/java/src/test/java/marquez/client/MarquezUrlTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezUrlTest.java @@ -43,6 +43,54 @@ void testEncodedMarquezUrl() { "http://marquez:5000/namespace/s3:%2F%2Fbucket/job/jname", url.toString()); } + @Test + void testToLineageUrl() { + Assertions.assertEquals( + "http://marquez:5000/api/v1/lineage?nodeId=dataset%3Anamespace%3Adataset&depth=20", + marquezUrl.toLineageUrl(NodeId.of(new DatasetId("namespace", "dataset")), 20).toString()); + + Assertions.assertEquals( + "http://marquez:5000/api/v1/lineage?nodeId=datasetField%3Anamespace%3Adataset%3Afield&depth=20", + marquezUrl + .toLineageUrl(NodeId.of(new DatasetFieldId("namespace", "dataset", "field")), 20) + .toString()); + + Assertions.assertEquals( + "http://marquez:5000/api/v1/lineage?nodeId=job%3Anamespace%3Ajob&depth=20", + marquezUrl.toLineageUrl(NodeId.of(new JobId("namespace", "job")), 20).toString()); + + Assertions.assertEquals( + "http://marquez:5000/api/v1/lineage?nodeId=dataset%3Anamespace%3Adataset%23" + + version + + "&depth=20", + marquezUrl + .toLineageUrl( + NodeId.of(new DatasetVersionId("namespace", "dataset", UUID.fromString(version))), + 20) + .toString()); + + Assertions.assertEquals( + "http://marquez:5000/api/v1/lineage?nodeId=datasetField%3Anamespace%3Adataset%3Afield%23" + + version + + "&depth=20", + marquezUrl + .toLineageUrl( + NodeId.of( + new DatasetFieldVersionId( + "namespace", "dataset", "field", UUID.fromString(version))), + 20) + .toString()); + + Assertions.assertEquals( + "http://marquez:5000/api/v1/lineage?nodeId=job%3Anamespace%3Ajob%23" + + version + + "&depth=20", + marquezUrl + .toLineageUrl( + NodeId.of(new JobVersionId("namespace", "job", UUID.fromString(version))), 20) + .toString()); + } + @Test void testToColumnLineageUrl() { Assertions.assertEquals( diff --git a/clients/java/src/test/java/marquez/client/models/NodeTest.java b/clients/java/src/test/java/marquez/client/models/NodeTest.java new file mode 100644 index 0000000000..d756f635e3 --- /dev/null +++ b/clients/java/src/test/java/marquez/client/models/NodeTest.java @@ -0,0 +1,108 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.fasterxml.jackson.databind.ObjectMapper; +import marquez.client.Utils; +import org.junit.jupiter.api.Test; + +@org.junit.jupiter.api.Tag("UnitTests") +public class NodeTest { + private static final ObjectMapper MAPPER = Utils.newObjectMapper(); + + @Test + void testResolvesCorrectNodeDataTypeFromJsonDatasetField() throws Exception { + final String input = + """ + { + "id": "datasetField:snowflake://matillion.eu-central-1:FROSTY_BORG.ETLD.Do11yJiraIssues:Assignee", + "type": "DATASET_FIELD", + "data": { + "namespace": "snowflake://matillion.eu-central-1", + "dataset": "FROSTY_BORG.ETLD.Do11yJiraIssues", + "datasetVersion": "9caaa5b3-d101-4368-9bd1-b99669736a78", + "field": "Assignee", + "fieldType": "UNKNOWN", + "inputFields": [] + }, + "inEdges": [], + "outEdges": [] + } + """; + + final Node node = MAPPER.readValue(input, Node.class); + + assertTrue(node.getData() instanceof ColumnLineageNodeData); + } + + @Test + void testResolvesCorrectNodeDataTypeFromJsonDataset() throws Exception { + final String input = + """ + { + "id": "dataset:namespace:table", + "type": "DATASET", + "data": { + "id": { + "namespace": "namespace", + "name": "table" + }, + "type": "DB_TABLE", + "name": "table", + "physicalName": "table", + "createdAt": "2023-08-09T11:17:30.091688Z", + "updatedAt": "2023-08-09T11:17:30.091688Z", + "namespace": "namespace", + "sourceName": "default", + "fields": [], + "tags": [] + }, + "inEdges": [], + "outEdges": [] + } + """; + + final Node node = MAPPER.readValue(input, Node.class); + + assertTrue(node.getData() instanceof DatasetNodeData); + assertEquals(DatasetType.DB_TABLE, ((DatasetNodeData) node.getData()).getType()); + } + + @Test + void testResolvesCorrectNodeDataTypeFromJsonJob() throws Exception { + final String input = + """ + { + "id": "job:namespace:job", + "type": "JOB", + "data": { + "id": { + "namespace": "namespace", + "name": "job" + }, + "type": "BATCH", + "name": "job", + "simpleName": "job", + "createdAt": "2023-08-09T11:17:30.091688Z", + "updatedAt": "2023-08-09T11:17:30.091688Z", + "namespace": "namespace", + "inputs": [], + "outputs": [] + }, + "inEdges": [], + "outEdges": [] + } + """; + + final Node node = MAPPER.readValue(input, Node.class); + + assertTrue(node.getData() instanceof JobNodeData); + assertEquals(JobType.BATCH, ((JobNodeData) node.getData()).getType()); + } +}