diff --git a/.circleci/api-load-test.sh b/.circleci/api-load-test.sh index 309dda6556..bb13d0d2d2 100755 --- a/.circleci/api-load-test.sh +++ b/.circleci/api-load-test.sh @@ -14,7 +14,7 @@ set -e # Build version of Marquez -readonly MARQUEZ_VERSION=0.37.0-SNAPSHOT +readonly MARQUEZ_VERSION=0.38.0-SNAPSHOT # Fully qualified path to marquez.jar readonly MARQUEZ_JAR="api/build/libs/marquez-api-${MARQUEZ_VERSION}.jar" diff --git a/.circleci/db-migration.sh b/.circleci/db-migration.sh index aa05146661..1ab2fcb868 100755 --- a/.circleci/db-migration.sh +++ b/.circleci/db-migration.sh @@ -13,7 +13,7 @@ # Version of PostgreSQL readonly POSTGRES_VERSION="12.1" # Version of Marquez -readonly MARQUEZ_VERSION=0.36.0 +readonly MARQUEZ_VERSION=0.37.0 # Build version of Marquez readonly MARQUEZ_BUILD_VERSION="$(git log --pretty=format:'%h' -n 1)" # SHA1 diff --git a/.env.example b/.env.example index 1c3f53ba3f..2aae51adfa 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,4 @@ API_PORT=5000 API_ADMIN_PORT=5001 WEB_PORT=3000 -TAG=0.36.0 +TAG=0.37.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index b95576c982..8d5d0228a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,17 @@ # Changelog -## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.36.0...HEAD) +## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.37.0...HEAD) + +## [0.37.0](https://github.com/MarquezProject/marquez/compare/0.36.0...0.37.0) - 2023-07-17 +### Added +* API: add ability to decode static metadata events [`#2495`](https://github.com/MarquezProject/marquez/pull/2495) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Introduces an `EventTypeResolver` for using the `schemaURL` field to decode `POST` requests to `/lineage` with `LineageEvent`s, `DatasetEvent`s or `JobEvent`s, as the first step in implementing static lineage support.* + ### Fixed -* API: remove unnecessary DB updates [`#2531`](https://github.com/MarquezProject/marquez/pull/2531)[@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) - *Prevent updates that are not needed and are deadlock prone.* +* API: remove unnecessary DB updates [`#2531`](https://github.com/MarquezProject/marquez/pull/2531) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Prevent updates that are not needed and are deadlock-prone.* +* Web: revert URL encoding when fetching lineage [`#2529`](https://github.com/MarquezProject/marquez/pull/2529) [@jlukenoff](https://github.com/jlukenoff) + *Reverts the node ID from being URL-encoded and allows the backend to return lineage details successfully even when a node ID contains special characters.* ## [0.36.0](https://github.com/MarquezProject/marquez/compare/0.35.0...0.36.0) - 2023-06-27 ### Added @@ -28,6 +36,12 @@ * Web: handle lineage graph cycles on the client [`#2506`](https://github.com/MarquezProject/marquez/pull/2506) [@jlukenoff](https://github.com/jlukenoff) *Fixes a bug where we blow the stack on the client-side if the user selects a node that is part of a cycle in the graph.* +### Added + +* Ability to decode static metadata events [`#2495`](https://github.com/MarquezProject/marquez/pull/2495) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Adds the ability to distinguish on a bakend static metadata events introduced based on the [proposal](https://github.com/OpenLineage/OpenLineage/blob/main/proposals/1837/static_lineage.md).* + + ## [0.34.0](https://github.com/MarquezProject/marquez/compare/0.33.0...0.34.0) - 2023-05-18 ### Fixed diff --git a/README.md b/README.md index 8120c94e7b..2961483fbd 100644 --- a/README.md +++ b/README.md @@ -79,8 +79,8 @@ Versions of Marquez are compatible with OpenLineage unless noted otherwise. We e | **Marquez** | **OpenLineage** | **Status** | |--------------------------------------------------------------------------------------------------|---------------------------------------------------------------|---------------| | [`UNRELEASED`](https://github.com/MarquezProject/marquez/blob/main/CHANGELOG.md#unreleased) | [`1-0-5`](https://openlineage.io/spec/1-0-5/OpenLineage.json) | `CURRENT` | -| [`0.35.0`](https://github.com/MarquezProject/marquez/blob/0.35.0/CHANGELOG.md#0350---2023-06-13) | [`1-0-5`](https://openlineage.io/spec/1-0-5/OpenLineage.json) | `RECOMMENDED` | -| [`0.34.0`](https://github.com/MarquezProject/marquez/blob/0.34.0/CHANGELOG.md#0340---2023-05-18) | [`1-0-5`](https://openlineage.io/spec/1-0-0/OpenLineage.json) | `MAINTENANCE` | +| [`0.37.0`](https://github.com/MarquezProject/marquez/blob/0.37.0/CHANGELOG.md#0370---2023-07-17) | [`1-0-5`](https://openlineage.io/spec/1-0-5/OpenLineage.json) | `RECOMMENDED` | +| [`0.36.0`](https://github.com/MarquezProject/marquez/blob/0.36.0/CHANGELOG.md#0360---2023-06-27) | [`1-0-5`](https://openlineage.io/spec/1-0-0/OpenLineage.json) | `MAINTENANCE` | > **Note:** The [`openlineage-python`](https://pypi.org/project/openlineage-python) and [`openlineage-java`](https://central.sonatype.com/artifact/io.openlineage/openlineage-java) libraries will a higher version than the OpenLineage [specification](https://github.com/OpenLineage/OpenLineage/tree/main/spec) as they have different version requirements. @@ -160,7 +160,7 @@ Marquez listens on port `8080` for all API calls and port `8081` for the admin i * Website: https://marquezproject.ai * Source: https://github.com/MarquezProject/marquez -* Chat: [https://marquezproject.slack.com](https://bit.ly/MqzSlack) +* Chat: [MarquezProject Slack](https://bit.ly/MqzSlackInvite) * Twitter: [@MarquezProject](https://twitter.com/MarquezProject) ## Contributing diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index c938291a5e..677044fea8 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -38,6 +38,7 @@ import marquez.api.models.SortDirection; import marquez.db.OpenLineageDao; import marquez.service.ServiceFactory; +import marquez.service.models.BaseEvent; import marquez.service.models.LineageEvent; import marquez.service.models.NodeId; @@ -61,20 +62,26 @@ public OpenLineageResource( @Consumes(APPLICATION_JSON) @Produces(APPLICATION_JSON) @Path("/lineage") - public void create( - @Valid @NotNull LineageEvent event, @Suspended final AsyncResponse asyncResponse) + public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncResponse asyncResponse) throws JsonProcessingException, SQLException { - openLineageService - .createAsync(event) - .whenComplete( - (result, err) -> { - if (err != null) { - log.error("Unexpected error while processing request", err); - asyncResponse.resume(Response.status(determineStatusCode(err)).build()); - } else { - asyncResponse.resume(Response.status(201).build()); - } - }); + if (event instanceof LineageEvent) { + openLineageService + .createAsync((LineageEvent) event) + .whenComplete( + (result, err) -> { + if (err != null) { + log.error("Unexpected error while processing request", err); + asyncResponse.resume(Response.status(determineStatusCode(err)).build()); + } else { + asyncResponse.resume(Response.status(201).build()); + } + }); + } else { + log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName()); + + // return serialized event + asyncResponse.resume(Response.status(200).entity(event).build()); + } } private int determineStatusCode(Throwable e) { diff --git a/api/src/main/java/marquez/service/models/BaseEvent.java b/api/src/main/java/marquez/service/models/BaseEvent.java new file mode 100644 index 0000000000..a2ba6a8856 --- /dev/null +++ b/api/src/main/java/marquez/service/models/BaseEvent.java @@ -0,0 +1,20 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.As; +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; + +@JsonTypeIdResolver(EventTypeResolver.class) +@JsonTypeInfo( + use = Id.CUSTOM, + include = As.EXISTING_PROPERTY, + property = "schemaURL", + defaultImpl = LineageEvent.class, + visible = true) +public class BaseEvent extends BaseJsonModel {} diff --git a/api/src/main/java/marquez/service/models/DatasetEvent.java b/api/src/main/java/marquez/service/models/DatasetEvent.java new file mode 100644 index 0000000000..13c438ca21 --- /dev/null +++ b/api/src/main/java/marquez/service/models/DatasetEvent.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import java.net.URI; +import java.time.ZonedDateTime; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +@Builder +@AllArgsConstructor +@NoArgsConstructor +@Setter +@Getter +@Valid +@ToString +public class DatasetEvent extends BaseEvent { + @NotNull private ZonedDateTime eventTime; + @Valid private LineageEvent.Dataset dataset; + @Valid @NotNull private String producer; + @Valid @NotNull private URI schemaURL; +} diff --git a/api/src/main/java/marquez/service/models/EventTypeResolver.java b/api/src/main/java/marquez/service/models/EventTypeResolver.java new file mode 100644 index 0000000000..ec8ce3cd48 --- /dev/null +++ b/api/src/main/java/marquez/service/models/EventTypeResolver.java @@ -0,0 +1,89 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import static marquez.service.models.EventTypeResolver.EventSchemaURL.LINEAGE_EVENT; + +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import com.fasterxml.jackson.databind.DatabindContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; +import java.io.IOException; +import java.util.Arrays; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class EventTypeResolver extends TypeIdResolverBase { + + @AllArgsConstructor + public enum EventSchemaURL { + LINEAGE_EVENT( + "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent", + LineageEvent.class), + DATASET_EVENT( + "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/DatasetEvent", + DatasetEvent.class), + JOB_EVENT( + "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/JobEvent", JobEvent.class); + + @Getter private String schemaURL; + + public String getName() { + int lastSlash = schemaURL.lastIndexOf('/'); + return schemaURL.substring(lastSlash, schemaURL.length()); + } + + @Getter private Class subType; + } + + private JavaType superType; + + @Override + public void init(JavaType baseType) { + superType = baseType; + } + + @Override + public String idFromValue(Object value) { + return null; + } + + @Override + public String idFromValueAndType(Object value, Class suggestedType) { + return null; + } + + @Override + public JavaType typeFromId(DatabindContext context, String id) throws IOException { + if (id == null) { + return context.constructSpecializedType(superType, LINEAGE_EVENT.subType); + } + + int lastSlash = id.lastIndexOf('/'); + + if (lastSlash < 0) { + return context.constructSpecializedType(superType, LINEAGE_EVENT.subType); + } + + String type = id.substring(lastSlash, id.length()); + + Class subType = + Arrays.stream(EventSchemaURL.values()) + .filter(s -> s.getName().equals(type)) + .findAny() + .map(EventSchemaURL::getSubType) + .orElse(LINEAGE_EVENT.subType); + + return context.constructSpecializedType(superType, subType); + } + + @Override + public Id getMechanism() { + return null; + } +} diff --git a/api/src/main/java/marquez/service/models/JobEvent.java b/api/src/main/java/marquez/service/models/JobEvent.java new file mode 100644 index 0000000000..96f9b3ac0b --- /dev/null +++ b/api/src/main/java/marquez/service/models/JobEvent.java @@ -0,0 +1,34 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import java.net.URI; +import java.time.ZonedDateTime; +import java.util.List; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +@Builder +@AllArgsConstructor +@NoArgsConstructor +@Setter +@Getter +@Valid +@ToString +public class JobEvent extends BaseEvent { + @NotNull private ZonedDateTime eventTime; + @Valid @NotNull private LineageEvent.Job job; + @Valid private List inputs; + @Valid private List outputs; + @Valid @NotNull private String producer; + @Valid @NotNull private URI schemaURL; +} diff --git a/api/src/main/java/marquez/service/models/LineageEvent.java b/api/src/main/java/marquez/service/models/LineageEvent.java index 9478a7bdbd..6117e1fc3d 100644 --- a/api/src/main/java/marquez/service/models/LineageEvent.java +++ b/api/src/main/java/marquez/service/models/LineageEvent.java @@ -38,7 +38,7 @@ @Getter @Valid @ToString -public class LineageEvent extends BaseJsonModel { +public class LineageEvent extends BaseEvent { private String eventType; @@ -48,6 +48,7 @@ public class LineageEvent extends BaseJsonModel { @Valid private List inputs; @Valid private List outputs; @Valid @NotNull private String producer; + @Valid private URI schemaURL; @AllArgsConstructor @NoArgsConstructor diff --git a/api/src/test/java/marquez/DatasetIntegrationTest.java b/api/src/test/java/marquez/DatasetIntegrationTest.java index 86e0af5591..0499c73ef2 100644 --- a/api/src/test/java/marquez/DatasetIntegrationTest.java +++ b/api/src/test/java/marquez/DatasetIntegrationTest.java @@ -365,14 +365,17 @@ public void testApp_doesNotShowDeletedDataset() throws IOException { String namespace = "namespace"; String name = "table"; LineageEvent event = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job("namespace", "job_name", null), - List.of(new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job("namespace", "job_name", null)) + .inputs( + List.of( + new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet()))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); final CompletableFuture resp = sendEvent(event); assertThat(resp.join()).isEqualTo(201); @@ -388,14 +391,17 @@ public void testApp_showsDeletedDatasetAfterReceivingNewVersion() throws IOExcep String namespace = "namespace"; String name = "anotherTable"; LineageEvent event = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job("namespace", "job_name", null), - List.of(new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job("namespace", "job_name", null)) + .inputs( + List.of( + new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet()))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); CompletableFuture resp = sendEvent(event); assertThat(resp.join()).isEqualTo(201); @@ -414,14 +420,15 @@ public void testApp_showsDeletedDatasetAfterReceivingNewVersion() throws IOExcep @Test public void testApp_getDatasetContainsColumnLineage() { LineageEvent event = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job("namespace", "job_name", null), - List.of(getDatasetA()), - List.of(getDatasetB()), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job("namespace", "job_name", null)) + .inputs(List.of(getDatasetA())) + .outputs(List.of(getDatasetB())) + .producer("the_producer") + .build(); CompletableFuture resp = this.sendLineage(Utils.toJson(event)) @@ -457,14 +464,17 @@ public void testApp_doesNotShowDeletedDatasetAfterDeleteNamespace() throws IOExc String namespace = "namespace"; String name = "table"; LineageEvent event = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job("namespace", "job_name", null), - List.of(new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job("namespace", "job_name", null)) + .inputs( + List.of( + new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet()))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); final CompletableFuture resp = sendEvent(event); assertThat(resp.join()).isEqualTo(201); @@ -481,27 +491,32 @@ public void testApp_doesNotShowDeletedDatasetAfterUndeleteNamespace() throws IOE String name = "table"; LineageEvent firstEvent = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job(namespaceName, "job_name", null), - List.of( - new LineageEvent.Dataset(namespaceName, name, LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job(namespaceName, "job_name", null)) + .inputs( + List.of( + new LineageEvent.Dataset( + namespaceName, name, LineageTestUtils.newDatasetFacet()))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); LineageEvent secondEvent = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job(namespaceName, "second_job_name", null), - List.of( - new LineageEvent.Dataset( - namespaceName, name + "2", LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job(namespaceName, "second_job_name", null)) + .inputs( + List.of( + new LineageEvent.Dataset( + namespaceName, name + "2", LineageTestUtils.newDatasetFacet()))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); CompletableFuture resp = sendEvent(firstEvent); assertThat(resp.join()).isEqualTo(201); @@ -529,15 +544,18 @@ public void testApp_doesNotShowDeletedDatasetAfterUndeleteNamespace() throws IOE assertThat(jobs).hasSize(0); LineageEvent eventThatWillUndeleteNamespace = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job(namespaceName, "job_name", null), - List.of( - new LineageEvent.Dataset(namespaceName, name, LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job(namespaceName, "job_name", null)) + .inputs( + List.of( + new LineageEvent.Dataset( + namespaceName, name, LineageTestUtils.newDatasetFacet()))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); resp = sendEvent(eventThatWillUndeleteNamespace); assertThat(resp.join()).isEqualTo(201); diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index 7861f342ee..4729a64971 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -43,6 +43,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import marquez.api.JdbiUtils; import marquez.client.MarquezClient; import marquez.client.models.Dataset; @@ -53,6 +55,8 @@ import marquez.client.models.Run; import marquez.common.Utils; import marquez.db.LineageTestUtils; +import marquez.service.models.DatasetEvent; +import marquez.service.models.JobEvent; import org.assertj.core.api.InstanceOfAssertFactories; import org.jdbi.v3.core.Jdbi; import org.jetbrains.annotations.NotNull; @@ -65,6 +69,7 @@ import org.slf4j.LoggerFactory; @org.junit.jupiter.api.Tag("IntegrationTests") +@Slf4j public class OpenLineageIntegrationTest extends BaseIntegrationTest { public static String EVENT_REQUIRED = "open_lineage/event_required_only.json"; @@ -74,11 +79,18 @@ public class OpenLineageIntegrationTest extends BaseIntegrationTest { public static String EVENT_LARGE = "open_lineage/event_large.json"; public static String NULL_NOMINAL_END_TIME = "open_lineage/null_nominal_end_time.json"; public static String EVENT_NAMESPACE_NAMING = "open_lineage/event_namespace_naming.json"; + public static String EVENT_DATASET_EVENT = "open_lineage/event_dataset_event.json"; + public static String EVENT_JOB_EVENT = "open_lineage/event_job_event.json"; + public static String EVENT_WITHOUT_SCHEMA_URL = "open_lineage/event_without_schema_url.json"; + + public static String RUN_EVENT_SCHEMA_URL = + "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent"; public static List data() { return Arrays.asList( EVENT_FULL, EVENT_SIMPLE, + EVENT_WITHOUT_SCHEMA_URL, EVENT_REQUIRED, EVENT_UNICODE, // FIXME: A very large event fails the test. @@ -99,15 +111,18 @@ public void testSendOpenLineageBadArgument() throws IOException { String badNamespace = "sqlserver://myhost:3342;user=auser;password=\uD83D\uDE02\uD83D\uDE02\uD83D\uDE02;database=TheDatabase"; marquez.service.models.LineageEvent event = - new marquez.service.models.LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new marquez.service.models.LineageEvent.Run(UUID.randomUUID().toString(), null), - new marquez.service.models.LineageEvent.Job("namespace", "job_name", null), - List.of( - new marquez.service.models.LineageEvent.Dataset(badNamespace, "the_table", null)), - Collections.emptyList(), - "the_producer"); + marquez.service.models.LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new marquez.service.models.LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new marquez.service.models.LineageEvent.Job("namespace", "job_name", null)) + .inputs( + List.of( + new marquez.service.models.LineageEvent.Dataset( + badNamespace, "the_table", null))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); final CompletableFuture resp = sendEvent(event); assertThat(resp.join()).isEqualTo(400); @@ -911,6 +926,7 @@ public void testOpenLineageJobHierarchySparkAndAirflow() } @Test + @SneakyThrows public void testSendEventAndGetItBack() { marquez.service.models.LineageEvent.Run run = new marquez.service.models.LineageEvent.Run( @@ -939,6 +955,7 @@ public void testSendEventAndGetItBack() { .eventTime(time) .inputs(Collections.emptyList()) .outputs(Collections.singletonList(dataset)) + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)) .build(); final CompletableFuture resp = sendEvent(lineageEvent); @@ -954,6 +971,7 @@ public void testSendEventAndGetItBack() { } @Test + @SneakyThrows public void testFindEventIsSortedByTime() { marquez.service.models.LineageEvent.Run run = new marquez.service.models.LineageEvent.Run( @@ -978,16 +996,21 @@ public void testFindEventIsSortedByTime() { .run(run) .job(job) .inputs(Collections.emptyList()) - .outputs(Collections.singletonList(dataset)); + .outputs(Collections.singletonList(dataset)) + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)); marquez.service.models.LineageEvent firstEvent = - builder.eventTime(time).eventType("START").build(); + builder.eventTime(time).eventType("START").schemaURL(new URI(RUN_EVENT_SCHEMA_URL)).build(); CompletableFuture resp = sendEvent(firstEvent); assertThat(resp.join()).isEqualTo(201); marquez.service.models.LineageEvent secondEvent = - builder.eventTime(time.plusSeconds(10)).eventType("COMPLETE").build(); + builder + .eventTime(time.plusSeconds(10)) + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)) + .eventType("COMPLETE") + .build(); resp = sendEvent(secondEvent); assertThat(resp.join()).isEqualTo(201); @@ -1003,6 +1026,7 @@ public void testFindEventIsSortedByTime() { } @Test + @SneakyThrows public void testFindEventIsSortedByTimeAsc() { marquez.service.models.LineageEvent.Run run = new marquez.service.models.LineageEvent.Run( @@ -1027,16 +1051,21 @@ public void testFindEventIsSortedByTimeAsc() { .run(run) .job(job) .inputs(Collections.emptyList()) - .outputs(Collections.singletonList(dataset)); + .outputs(Collections.singletonList(dataset)) + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)); marquez.service.models.LineageEvent firstEvent = - builder.eventTime(time).eventType("START").build(); + builder.eventTime(time).eventType("START").schemaURL(new URI(RUN_EVENT_SCHEMA_URL)).build(); CompletableFuture resp = sendEvent(firstEvent); assertThat(resp.join()).isEqualTo(201); marquez.service.models.LineageEvent secondEvent = - builder.eventTime(time.plusSeconds(10)).eventType("COMPLETE").build(); + builder + .eventTime(time.plusSeconds(10)) + .eventType("COMPLETE") + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)) + .build(); resp = sendEvent(secondEvent); assertThat(resp.join()).isEqualTo(201); @@ -1052,6 +1081,7 @@ public void testFindEventIsSortedByTimeAsc() { } @Test + @SneakyThrows public void testFindEventBeforeAfterTime() { marquez.service.models.LineageEvent.Run run = new marquez.service.models.LineageEvent.Run( @@ -1078,7 +1108,8 @@ public void testFindEventBeforeAfterTime() { .run(run) .job(job) .inputs(Collections.emptyList()) - .outputs(Collections.singletonList(dataset)); + .outputs(Collections.singletonList(dataset)) + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)); marquez.service.models.LineageEvent firstEvent = builder.eventTime(after.minus(1, ChronoUnit.YEARS)).eventType("START").build(); @@ -1087,13 +1118,21 @@ public void testFindEventBeforeAfterTime() { assertThat(resp.join()).isEqualTo(201); marquez.service.models.LineageEvent secondEvent = - builder.eventTime(after.plusSeconds(10)).eventType("COMPLETE").build(); + builder + .eventTime(after.plusSeconds(10)) + .eventType("COMPLETE") + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)) + .build(); resp = sendEvent(secondEvent); assertThat(resp.join()).isEqualTo(201); marquez.service.models.LineageEvent thirdEvent = - builder.eventTime(before.plusSeconds(10)).eventType("COMPLETE").build(); + builder + .eventTime(before.plusSeconds(10)) + .eventType("COMPLETE") + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)) + .build(); List rawEvents = client.listLineageEvents(MarquezClient.SortDirection.ASC, before, after, 10); @@ -1336,6 +1375,72 @@ public void testSendOpenLineage(String pathToOpenLineageEvent) throws IOExceptio } } + @Test + public void testSendDatasetEventIsDecoded() throws IOException { + final String openLineageEventAsString = + Resources.toString(Resources.getResource(EVENT_DATASET_EVENT), Charset.defaultCharset()); + + // (2) Send OpenLineage event. + final CompletableFuture> resp = + this.sendLineage(openLineageEventAsString) + .thenApply(r -> Collections.singletonMap(r.statusCode(), r.body())) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + + // Ensure the event was received. + Map respMap = resp.join(); + + assertThat(respMap.containsKey(200)).isTrue(); // Status should be 200 instead of 201 + + // (3) Convert the OpenLineage event to Json. + DatasetEvent datasetEvent = + marquez.client.Utils.fromJson(respMap.get(200), new TypeReference() {}); + assertThat(datasetEvent.getDataset().getName()).isEqualTo("my-dataset-name"); + assertThat(datasetEvent.getDataset().getFacets().getSchema().getFields()).hasSize(1); + assertThat(datasetEvent.getDataset().getFacets().getSchema().getFields().get(0).getName()) + .isEqualTo("col_a"); + assertThat(datasetEvent.getEventTime().toString()).startsWith("2020-12-28T09:52:00.001"); + } + + @Test + public void testSendJobEventIsDecoded() throws IOException { + final String openLineageEventAsString = + Resources.toString(Resources.getResource(EVENT_JOB_EVENT), Charset.defaultCharset()); + + // (2) Send OpenLineage event. + final CompletableFuture> resp = + this.sendLineage(openLineageEventAsString) + .thenApply(r -> Collections.singletonMap(r.statusCode(), r.body())) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + + // Ensure the event was received. + Map respMap = resp.join(); + + assertThat(respMap.containsKey(200)).isTrue(); // Status should be 200 instead of 201 + + // (3) Convert the OpenLineage event to Json. + JobEvent jobEvent = + marquez.client.Utils.fromJson(respMap.get(200), new TypeReference() {}); + assertThat(jobEvent.getJob().getNamespace()).isEqualTo("my-scheduler-namespace"); + assertThat(jobEvent.getJob().getName()).isEqualTo("myjob"); + + assertThat(jobEvent.getInputs().get(0).getNamespace()).isEqualTo("my-datasource-namespace"); + assertThat(jobEvent.getInputs().get(0).getName()).isEqualTo("instance.schema.input-1"); + + assertThat(jobEvent.getOutputs().get(0).getNamespace()).isEqualTo("my-datasource-namespace"); + assertThat(jobEvent.getOutputs().get(0).getName()).isEqualTo("instance.schema.output-1"); + assertThat(jobEvent.getEventTime().toString()).startsWith("2020-12-28T09:52:00.001"); + } + private void validateDatasetFacets(JsonNode json) { final String namespace = json.path("namespace").asText(); final String output = json.path("name").asText(); diff --git a/api/src/test/java/marquez/db/BackfillTestUtils.java b/api/src/test/java/marquez/db/BackfillTestUtils.java index 92fddcee64..3fbaaf7fa1 100644 --- a/api/src/test/java/marquez/db/BackfillTestUtils.java +++ b/api/src/test/java/marquez/db/BackfillTestUtils.java @@ -83,20 +83,23 @@ public static RunRow writeNewEvent( new RunLink(runId), new JobLink(NAMESPACE, parentJobName))); LineageEvent event = - new LineageEvent( - COMPLETE, - Instant.now().atZone(LOCAL_ZONE), - new Run( - runUuid.toString(), - new RunFacet( - nominalTimeRunFacet, - parentRun.orElse(null), - ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc")))), - new LineageEvent.Job( - NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP)), - Collections.emptyList(), - Collections.emptyList(), - PRODUCER_URL.toString()); + LineageEvent.builder() + .eventType(COMPLETE) + .eventTime(Instant.now().atZone(LOCAL_ZONE)) + .run( + new Run( + runUuid.toString(), + new RunFacet( + nominalTimeRunFacet, + parentRun.orElse(null), + ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc"))))) + .job( + new LineageEvent.Job( + NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP))) + .inputs(Collections.emptyList()) + .outputs(Collections.emptyList()) + .producer(PRODUCER_URL.toString()) + .build(); PGobject eventJson = new PGobject(); eventJson.setType("json"); eventJson.setValue(Utils.getMapper().writeValueAsString(event)); diff --git a/api/src/test/java/marquez/db/LineageTestUtils.java b/api/src/test/java/marquez/db/LineageTestUtils.java index 13db1bd7c2..84dc6d18f1 100644 --- a/api/src/test/java/marquez/db/LineageTestUtils.java +++ b/api/src/test/java/marquez/db/LineageTestUtils.java @@ -122,14 +122,17 @@ public static UpdateLineageRow createLineageRow( UUID runId = UUID.randomUUID(); LineageEvent event = - new LineageEvent( - status, - Instant.now().atZone(LOCAL_ZONE), - new Run(runId.toString(), new RunFacet(nominalTimeRunFacet, parentRunFacet, runFacets)), - new Job(NAMESPACE, jobName, jobFacet), - inputs, - outputs, - PRODUCER_URL.toString()); + LineageEvent.builder() + .eventType(status) + .eventTime(Instant.now().atZone(LOCAL_ZONE)) + .run( + new Run( + runId.toString(), new RunFacet(nominalTimeRunFacet, parentRunFacet, runFacets))) + .job(new Job(NAMESPACE, jobName, jobFacet)) + .inputs(inputs) + .outputs(outputs) + .producer(PRODUCER_URL.toString()) + .build(); // emulate an OpenLineage RunEvent event .getProperties() diff --git a/api/src/test/java/marquez/service/models/EventTypeResolverTest.java b/api/src/test/java/marquez/service/models/EventTypeResolverTest.java new file mode 100644 index 0000000000..c8983bc012 --- /dev/null +++ b/api/src/test/java/marquez/service/models/EventTypeResolverTest.java @@ -0,0 +1,92 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.DatabindContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class EventTypeResolverTest { + + EventTypeResolver resolver = new EventTypeResolver(); + ObjectMapper mapper = new ObjectMapper(); + DatabindContext databindContext = mock(DatabindContext.class); + JavaType superType = mapper.constructType(BaseEvent.class); + + @BeforeEach + public void setup() { + resolver.init(superType); + } + + @Test + @SneakyThrows + public void testTypeFromIdForRunEvent() { + JavaType runEventType = mapper.constructType(LineageEvent.class); + when(databindContext.constructSpecializedType(superType, LineageEvent.class)) + .thenReturn(runEventType); + + assertThat( + resolver.typeFromId( + databindContext, + "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent")) + .isEqualTo(runEventType); + } + + @Test + @SneakyThrows + public void testTypeFromIdForDatasetEvent() { + JavaType datasetEventType = mapper.constructType(DatasetEvent.class); + when(databindContext.constructSpecializedType(superType, DatasetEvent.class)) + .thenReturn(datasetEventType); + + assertThat( + resolver.typeFromId( + databindContext, + "https://openlineage.io/spec/2-5-0/OpenLineage.json#/definitions/DatasetEvent")) + .isEqualTo(datasetEventType); + } + + @Test + @SneakyThrows + public void testTypeFromIdForJobEvent() { + JavaType jobEventType = mapper.constructType(JobEvent.class); + when(databindContext.constructSpecializedType(superType, JobEvent.class)) + .thenReturn(jobEventType); + + assertThat( + resolver.typeFromId( + databindContext, + "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/JobEvent")) + .isEqualTo(jobEventType); + } + + @Test + @SneakyThrows + public void testTypeFromIdForUnknownEvent() { + JavaType runEventType = mapper.constructType(LineageEvent.class); + when(databindContext.constructSpecializedType(superType, LineageEvent.class)) + .thenReturn(runEventType); + + assertThat(resolver.typeFromId(databindContext, "unknown type")).isEqualTo(runEventType); + } + + @Test + @SneakyThrows + public void testTypeFromIdForNullId() { + JavaType runEventType = mapper.constructType(LineageEvent.class); + when(databindContext.constructSpecializedType(superType, LineageEvent.class)) + .thenReturn(runEventType); + + assertThat(resolver.typeFromId(databindContext, null)).isEqualTo(runEventType); + } +} diff --git a/api/src/test/java/marquez/service/models/LineageEventTest.java b/api/src/test/java/marquez/service/models/LineageEventTest.java index b6593f7e07..b06fe2946d 100644 --- a/api/src/test/java/marquez/service/models/LineageEventTest.java +++ b/api/src/test/java/marquez/service/models/LineageEventTest.java @@ -57,7 +57,8 @@ public void testIsomorphicOpenLineageEvents(String inputFile) throws IOException URL expectedResource = Resources.getResource(inputFile); ObjectMapper objectMapper = Utils.newObjectMapper(); RunEvent expectedEvent = objectMapper.readValue(expectedResource, RunEvent.class); - LineageEvent lineageEvent = objectMapper.readValue(expectedResource, LineageEvent.class); + LineageEvent lineageEvent = + (LineageEvent) objectMapper.readValue(expectedResource, BaseEvent.class); RunEvent converted = objectMapper.readValue(objectMapper.writeValueAsString(lineageEvent), RunEvent.class); assertThat(converted) @@ -75,7 +76,7 @@ public void testSerialization(String input) throws IOException { public void testSerialization(ObjectMapper mapper, String expectedFile) throws IOException { URL expectedResource = Resources.getResource(expectedFile); - LineageEvent deserialized = mapper.readValue(expectedResource, LineageEvent.class); + LineageEvent deserialized = (LineageEvent) mapper.readValue(expectedResource, BaseEvent.class); String serialized = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(deserialized); JsonNode expectedNode = mapper.readTree(expectedResource); diff --git a/api/src/test/resources/open_lineage/event_dataset_event.json b/api/src/test/resources/open_lineage/event_dataset_event.json new file mode 100644 index 0000000000..374c907371 --- /dev/null +++ b/api/src/test/resources/open_lineage/event_dataset_event.json @@ -0,0 +1,22 @@ +{ + "eventTime": "2020-12-28T19:52:00.001+10:00", + "dataset": { + "namespace": "my-dataset-namespace", + "name": "my-dataset-name", + "facets": { + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#MyCustomJobFacet", + "fields": [ + { + "name": "col_a", + "type": "VARCHAR", + "description": "string" + } + ] + } + } + }, + "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", + "schemaURL": "https://openlineage.io/spec/2-8-9/OpenLineage.json#/definitions/DatasetEvent" +} \ No newline at end of file diff --git a/api/src/test/resources/open_lineage/event_job_event.json b/api/src/test/resources/open_lineage/event_job_event.json new file mode 100644 index 0000000000..715a26fdfd --- /dev/null +++ b/api/src/test/resources/open_lineage/event_job_event.json @@ -0,0 +1,22 @@ +{ + "eventType": "COMPLETE", + "eventTime": "2020-12-28T19:52:00.001+10:00", + "job": { + "namespace": "my-scheduler-namespace", + "name": "myjob" + }, + "inputs": [ + { + "namespace": "my-datasource-namespace", + "name": "instance.schema.input-1" + } + ], + "outputs": [ + { + "namespace": "my-datasource-namespace", + "name": "instance.schema.output-1" + } + ], + "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", + "schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/JobEvent" +} \ No newline at end of file diff --git a/api/src/test/resources/open_lineage/event_without_schema_url.json b/api/src/test/resources/open_lineage/event_without_schema_url.json new file mode 100644 index 0000000000..4618d4791a --- /dev/null +++ b/api/src/test/resources/open_lineage/event_without_schema_url.json @@ -0,0 +1,28 @@ +{ + "eventType": "COMPLETE", + "eventTime": "2020-12-28T19:52:00.001+10:00", + "run": { + "runId": "41fb5137-f0fd-4ee5-ba5c-56f8571d1bd7" + }, + "job": { + "namespace": "my-scheduler-namespace", + "name": "myjob" + }, + "inputs": [ + { + "namespace": "my-datasource-namespace", + "name": "instance.schema.input-1" + }, + { + "namespace": "my-datasource-namespace", + "name": "instance.schema.input-2" + } + ], + "outputs": [ + { + "namespace": "my-datasource-namespace", + "name": "instance.schema.output-1" + } + ], + "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client" +} \ No newline at end of file diff --git a/chart/Chart.yaml b/chart/Chart.yaml index aa0ede32dd..9bb0cfd8bb 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -29,4 +29,4 @@ name: marquez sources: - https://github.com/MarquezProject/marquez - https://marquezproject.github.io/marquez/ -version: 0.36.0 +version: 0.37.0 diff --git a/chart/values.yaml b/chart/values.yaml index 2626dda703..bd456bd9c7 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -17,7 +17,7 @@ marquez: image: registry: docker.io repository: marquezproject/marquez - tag: 0.36.0 + tag: 0.37.0 pullPolicy: IfNotPresent ## Name of the existing secret containing credentials for the Marquez installation. ## When this is specified, it will take precedence over the values configured in the 'db' section. @@ -75,7 +75,7 @@ web: image: registry: docker.io repository: marquezproject/marquez-web - tag: 0.36.0 + tag: 0.37.0 pullPolicy: IfNotPresent ## Marquez website will run on this port ## diff --git a/clients/java/README.md b/clients/java/README.md index 626e93a19f..32b58dbf90 100644 --- a/clients/java/README.md +++ b/clients/java/README.md @@ -10,14 +10,14 @@ Maven: io.github.marquezproject marquez-java - 0.36.0 + 0.37.0 ``` or Gradle: ```groovy -implementation 'io.github.marquezproject:marquez-java:0.36.0 +implementation 'io.github.marquezproject:marquez-java:0.37.0 ``` ## Usage diff --git a/clients/java/src/main/java/marquez/client/models/LineageEvent.java b/clients/java/src/main/java/marquez/client/models/LineageEvent.java index c9a6d20827..b9dd8dec57 100644 --- a/clients/java/src/main/java/marquez/client/models/LineageEvent.java +++ b/clients/java/src/main/java/marquez/client/models/LineageEvent.java @@ -20,4 +20,12 @@ public class LineageEvent { List inputs; List outputs; URI producer; + URI schemaURL; + + public URI getSchemaURL() { + if (schemaURL == null) { + return URI.create("https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent"); + } + return schemaURL; + } } diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index a24c98315b..b5b3dbac3d 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -188,7 +188,8 @@ public class MarquezClientTest { Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), - URI.create("http://localhost:8080")); + URI.create("http://localhost:8080"), + URI.create("https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent")); // STREAM DATASET private static final DatasetId STREAM_ID = newDatasetIdWith(NAMESPACE_NAME); diff --git a/clients/java/src/test/java/marquez/client/models/LineageEventTest.java b/clients/java/src/test/java/marquez/client/models/LineageEventTest.java new file mode 100644 index 0000000000..986eaa0215 --- /dev/null +++ b/clients/java/src/test/java/marquez/client/models/LineageEventTest.java @@ -0,0 +1,45 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.net.URI; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +public class LineageEventTest { + + public static final URI VALID_SCHEMA_URL = + URI.create("https://openlineage.io/spec/2-5-0/OpenLineage.json#/definitions/RunEvent"); + + @Test + public void testGetSchemaURL() { + assertThat(createlLineageEventWith(VALID_SCHEMA_URL).getSchemaURL()) + .isEqualTo(VALID_SCHEMA_URL); + } + + @Test + public void testGetSchemaURLWhenNull() { + assertThat(createlLineageEventWith(null).getSchemaURL()) + .isEqualTo( + URI.create("https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent")); + } + + private LineageEvent createlLineageEventWith(URI schemaURL) { + return new LineageEvent( + "START", + ZonedDateTime.now(ZoneId.of("UTC")), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + URI.create("http://localhost:8080"), + schemaURL); + } +} diff --git a/clients/python/marquez_client/__init__.py b/clients/python/marquez_client/__init__.py index 0a48cd9109..0a05dc187e 100644 --- a/clients/python/marquez_client/__init__.py +++ b/clients/python/marquez_client/__init__.py @@ -4,7 +4,7 @@ # -*- coding: utf-8 -*- __author__ = """Marquez Project""" -__version__ = "0.37.0" +__version__ = "0.38.0" from marquez_client.client import MarquezClient # noqa: F401 from marquez_client.clients import Clients # noqa: F401 diff --git a/clients/python/setup.cfg b/clients/python/setup.cfg index fb4ce473b2..57c29b6bb3 100644 --- a/clients/python/setup.cfg +++ b/clients/python/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.37.0 +current_version = 0.38.0 commit = False tag = False parse = (?P\d+)\.(?P\d+)\.(?P\d+)(?P.*) diff --git a/clients/python/setup.py b/clients/python/setup.py index f8e72810ca..ab38406c2c 100644 --- a/clients/python/setup.py +++ b/clients/python/setup.py @@ -24,7 +24,7 @@ setup( name="marquez-python", - version="0.37.0", + version="0.38.0", description="Marquez Python Client", long_description=readme, long_description_content_type="text/markdown", diff --git a/docker/up.sh b/docker/up.sh index 605979e530..e154d641d7 100755 --- a/docker/up.sh +++ b/docker/up.sh @@ -8,9 +8,9 @@ set -e # Version of Marquez -readonly VERSION=0.36.0 +readonly VERSION=0.37.0 # Build version of Marquez -readonly BUILD_VERSION=0.36.0 +readonly BUILD_VERSION=0.37.0 title() { echo -e "\033[1m${1}\033[0m" diff --git a/docs/openapi.html b/docs/openapi.html index 3d11e04ca2..77603291ee 100644 --- a/docs/openapi.html +++ b/docs/openapi.html @@ -2174,7 +2174,7 @@ 55.627 l 55.6165,55.627 -231.245496,231.24803 c -127.185,127.1864 -231.5279,231.248 -231.873,231.248 -0.3451,0 -104.688, -104.0616 -231.873,-231.248 z - " fill="currentColor">

Marquez (0.36.0)

Download OpenAPI specification:Download

License: Apache 2.0

Marquez is an open source metadata service for the collection, aggregation, and visualization of a data ecosystem's metadata.

+ " fill="currentColor">

Marquez (0.37.0)

Download OpenAPI specification:Download

License: Apache 2.0

Marquez is an open source metadata service for the collection, aggregation, and visualization of a data ecosystem's metadata.

Namespaces

Create a namespace

Creates a new namespace object. A namespace enables the contextual grouping of related jobs and datasets. Namespaces must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), dashes (-), colons (:), slashes (/), or dots (.). A namespace is case-insensitive with a maximum length of 1024 characters. Note jobs and datasets will be unique within a namespace, but not across namespaces.

path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

Request Body schema: application/json
ownerName
required
string

The owner of the namespace.

@@ -2388,7 +2388,7 @@

Response samples

Content type
application/json
{
  • "totalCount": 1,
  • "results": [
    ]
}