diff --git a/CHANGELOG.md b/CHANGELOG.md index b95576c982..9de17542b6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,12 @@ * Web: handle lineage graph cycles on the client [`#2506`](https://github.com/MarquezProject/marquez/pull/2506) [@jlukenoff](https://github.com/jlukenoff) *Fixes a bug where we blow the stack on the client-side if the user selects a node that is part of a cycle in the graph.* +### Added + +* Ability to decode static metadata events [`#2495`](https://github.com/MarquezProject/marquez/pull/2495) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Adds the ability to distinguish on a bakend static metadata events introduced based on the [proposal](https://github.com/OpenLineage/OpenLineage/blob/main/proposals/1837/static_lineage.md).* + + ## [0.34.0](https://github.com/MarquezProject/marquez/compare/0.33.0...0.34.0) - 2023-05-18 ### Fixed diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index c938291a5e..677044fea8 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -38,6 +38,7 @@ import marquez.api.models.SortDirection; import marquez.db.OpenLineageDao; import marquez.service.ServiceFactory; +import marquez.service.models.BaseEvent; import marquez.service.models.LineageEvent; import marquez.service.models.NodeId; @@ -61,20 +62,26 @@ public OpenLineageResource( @Consumes(APPLICATION_JSON) @Produces(APPLICATION_JSON) @Path("/lineage") - public void create( - @Valid @NotNull LineageEvent event, @Suspended final AsyncResponse asyncResponse) + public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncResponse asyncResponse) throws JsonProcessingException, SQLException { - openLineageService - .createAsync(event) - .whenComplete( - (result, err) -> { - if (err != null) { - log.error("Unexpected error while processing request", err); - asyncResponse.resume(Response.status(determineStatusCode(err)).build()); - } else { - asyncResponse.resume(Response.status(201).build()); - } - }); + if (event instanceof LineageEvent) { + openLineageService + .createAsync((LineageEvent) event) + .whenComplete( + (result, err) -> { + if (err != null) { + log.error("Unexpected error while processing request", err); + asyncResponse.resume(Response.status(determineStatusCode(err)).build()); + } else { + asyncResponse.resume(Response.status(201).build()); + } + }); + } else { + log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName()); + + // return serialized event + asyncResponse.resume(Response.status(200).entity(event).build()); + } } private int determineStatusCode(Throwable e) { diff --git a/api/src/main/java/marquez/service/models/BaseEvent.java b/api/src/main/java/marquez/service/models/BaseEvent.java new file mode 100644 index 0000000000..a2ba6a8856 --- /dev/null +++ b/api/src/main/java/marquez/service/models/BaseEvent.java @@ -0,0 +1,20 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.As; +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; + +@JsonTypeIdResolver(EventTypeResolver.class) +@JsonTypeInfo( + use = Id.CUSTOM, + include = As.EXISTING_PROPERTY, + property = "schemaURL", + defaultImpl = LineageEvent.class, + visible = true) +public class BaseEvent extends BaseJsonModel {} diff --git a/api/src/main/java/marquez/service/models/DatasetEvent.java b/api/src/main/java/marquez/service/models/DatasetEvent.java new file mode 100644 index 0000000000..13c438ca21 --- /dev/null +++ b/api/src/main/java/marquez/service/models/DatasetEvent.java @@ -0,0 +1,31 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import java.net.URI; +import java.time.ZonedDateTime; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +@Builder +@AllArgsConstructor +@NoArgsConstructor +@Setter +@Getter +@Valid +@ToString +public class DatasetEvent extends BaseEvent { + @NotNull private ZonedDateTime eventTime; + @Valid private LineageEvent.Dataset dataset; + @Valid @NotNull private String producer; + @Valid @NotNull private URI schemaURL; +} diff --git a/api/src/main/java/marquez/service/models/EventTypeResolver.java b/api/src/main/java/marquez/service/models/EventTypeResolver.java new file mode 100644 index 0000000000..ec8ce3cd48 --- /dev/null +++ b/api/src/main/java/marquez/service/models/EventTypeResolver.java @@ -0,0 +1,89 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import static marquez.service.models.EventTypeResolver.EventSchemaURL.LINEAGE_EVENT; + +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import com.fasterxml.jackson.databind.DatabindContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; +import java.io.IOException; +import java.util.Arrays; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class EventTypeResolver extends TypeIdResolverBase { + + @AllArgsConstructor + public enum EventSchemaURL { + LINEAGE_EVENT( + "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent", + LineageEvent.class), + DATASET_EVENT( + "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/DatasetEvent", + DatasetEvent.class), + JOB_EVENT( + "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/JobEvent", JobEvent.class); + + @Getter private String schemaURL; + + public String getName() { + int lastSlash = schemaURL.lastIndexOf('/'); + return schemaURL.substring(lastSlash, schemaURL.length()); + } + + @Getter private Class subType; + } + + private JavaType superType; + + @Override + public void init(JavaType baseType) { + superType = baseType; + } + + @Override + public String idFromValue(Object value) { + return null; + } + + @Override + public String idFromValueAndType(Object value, Class suggestedType) { + return null; + } + + @Override + public JavaType typeFromId(DatabindContext context, String id) throws IOException { + if (id == null) { + return context.constructSpecializedType(superType, LINEAGE_EVENT.subType); + } + + int lastSlash = id.lastIndexOf('/'); + + if (lastSlash < 0) { + return context.constructSpecializedType(superType, LINEAGE_EVENT.subType); + } + + String type = id.substring(lastSlash, id.length()); + + Class subType = + Arrays.stream(EventSchemaURL.values()) + .filter(s -> s.getName().equals(type)) + .findAny() + .map(EventSchemaURL::getSubType) + .orElse(LINEAGE_EVENT.subType); + + return context.constructSpecializedType(superType, subType); + } + + @Override + public Id getMechanism() { + return null; + } +} diff --git a/api/src/main/java/marquez/service/models/JobEvent.java b/api/src/main/java/marquez/service/models/JobEvent.java new file mode 100644 index 0000000000..96f9b3ac0b --- /dev/null +++ b/api/src/main/java/marquez/service/models/JobEvent.java @@ -0,0 +1,34 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import java.net.URI; +import java.time.ZonedDateTime; +import java.util.List; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +@Builder +@AllArgsConstructor +@NoArgsConstructor +@Setter +@Getter +@Valid +@ToString +public class JobEvent extends BaseEvent { + @NotNull private ZonedDateTime eventTime; + @Valid @NotNull private LineageEvent.Job job; + @Valid private List inputs; + @Valid private List outputs; + @Valid @NotNull private String producer; + @Valid @NotNull private URI schemaURL; +} diff --git a/api/src/main/java/marquez/service/models/LineageEvent.java b/api/src/main/java/marquez/service/models/LineageEvent.java index 9478a7bdbd..6117e1fc3d 100644 --- a/api/src/main/java/marquez/service/models/LineageEvent.java +++ b/api/src/main/java/marquez/service/models/LineageEvent.java @@ -38,7 +38,7 @@ @Getter @Valid @ToString -public class LineageEvent extends BaseJsonModel { +public class LineageEvent extends BaseEvent { private String eventType; @@ -48,6 +48,7 @@ public class LineageEvent extends BaseJsonModel { @Valid private List inputs; @Valid private List outputs; @Valid @NotNull private String producer; + @Valid private URI schemaURL; @AllArgsConstructor @NoArgsConstructor diff --git a/api/src/test/java/marquez/DatasetIntegrationTest.java b/api/src/test/java/marquez/DatasetIntegrationTest.java index 86e0af5591..0499c73ef2 100644 --- a/api/src/test/java/marquez/DatasetIntegrationTest.java +++ b/api/src/test/java/marquez/DatasetIntegrationTest.java @@ -365,14 +365,17 @@ public void testApp_doesNotShowDeletedDataset() throws IOException { String namespace = "namespace"; String name = "table"; LineageEvent event = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job("namespace", "job_name", null), - List.of(new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job("namespace", "job_name", null)) + .inputs( + List.of( + new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet()))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); final CompletableFuture resp = sendEvent(event); assertThat(resp.join()).isEqualTo(201); @@ -388,14 +391,17 @@ public void testApp_showsDeletedDatasetAfterReceivingNewVersion() throws IOExcep String namespace = "namespace"; String name = "anotherTable"; LineageEvent event = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job("namespace", "job_name", null), - List.of(new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job("namespace", "job_name", null)) + .inputs( + List.of( + new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet()))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); CompletableFuture resp = sendEvent(event); assertThat(resp.join()).isEqualTo(201); @@ -414,14 +420,15 @@ public void testApp_showsDeletedDatasetAfterReceivingNewVersion() throws IOExcep @Test public void testApp_getDatasetContainsColumnLineage() { LineageEvent event = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job("namespace", "job_name", null), - List.of(getDatasetA()), - List.of(getDatasetB()), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job("namespace", "job_name", null)) + .inputs(List.of(getDatasetA())) + .outputs(List.of(getDatasetB())) + .producer("the_producer") + .build(); CompletableFuture resp = this.sendLineage(Utils.toJson(event)) @@ -457,14 +464,17 @@ public void testApp_doesNotShowDeletedDatasetAfterDeleteNamespace() throws IOExc String namespace = "namespace"; String name = "table"; LineageEvent event = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job("namespace", "job_name", null), - List.of(new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job("namespace", "job_name", null)) + .inputs( + List.of( + new LineageEvent.Dataset(namespace, name, LineageTestUtils.newDatasetFacet()))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); final CompletableFuture resp = sendEvent(event); assertThat(resp.join()).isEqualTo(201); @@ -481,27 +491,32 @@ public void testApp_doesNotShowDeletedDatasetAfterUndeleteNamespace() throws IOE String name = "table"; LineageEvent firstEvent = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job(namespaceName, "job_name", null), - List.of( - new LineageEvent.Dataset(namespaceName, name, LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job(namespaceName, "job_name", null)) + .inputs( + List.of( + new LineageEvent.Dataset( + namespaceName, name, LineageTestUtils.newDatasetFacet()))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); LineageEvent secondEvent = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job(namespaceName, "second_job_name", null), - List.of( - new LineageEvent.Dataset( - namespaceName, name + "2", LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job(namespaceName, "second_job_name", null)) + .inputs( + List.of( + new LineageEvent.Dataset( + namespaceName, name + "2", LineageTestUtils.newDatasetFacet()))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); CompletableFuture resp = sendEvent(firstEvent); assertThat(resp.join()).isEqualTo(201); @@ -529,15 +544,18 @@ public void testApp_doesNotShowDeletedDatasetAfterUndeleteNamespace() throws IOE assertThat(jobs).hasSize(0); LineageEvent eventThatWillUndeleteNamespace = - new LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new LineageEvent.Run(UUID.randomUUID().toString(), null), - new LineageEvent.Job(namespaceName, "job_name", null), - List.of( - new LineageEvent.Dataset(namespaceName, name, LineageTestUtils.newDatasetFacet())), - Collections.emptyList(), - "the_producer"); + LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new LineageEvent.Job(namespaceName, "job_name", null)) + .inputs( + List.of( + new LineageEvent.Dataset( + namespaceName, name, LineageTestUtils.newDatasetFacet()))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); resp = sendEvent(eventThatWillUndeleteNamespace); assertThat(resp.join()).isEqualTo(201); diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index 7861f342ee..4729a64971 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -43,6 +43,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import marquez.api.JdbiUtils; import marquez.client.MarquezClient; import marquez.client.models.Dataset; @@ -53,6 +55,8 @@ import marquez.client.models.Run; import marquez.common.Utils; import marquez.db.LineageTestUtils; +import marquez.service.models.DatasetEvent; +import marquez.service.models.JobEvent; import org.assertj.core.api.InstanceOfAssertFactories; import org.jdbi.v3.core.Jdbi; import org.jetbrains.annotations.NotNull; @@ -65,6 +69,7 @@ import org.slf4j.LoggerFactory; @org.junit.jupiter.api.Tag("IntegrationTests") +@Slf4j public class OpenLineageIntegrationTest extends BaseIntegrationTest { public static String EVENT_REQUIRED = "open_lineage/event_required_only.json"; @@ -74,11 +79,18 @@ public class OpenLineageIntegrationTest extends BaseIntegrationTest { public static String EVENT_LARGE = "open_lineage/event_large.json"; public static String NULL_NOMINAL_END_TIME = "open_lineage/null_nominal_end_time.json"; public static String EVENT_NAMESPACE_NAMING = "open_lineage/event_namespace_naming.json"; + public static String EVENT_DATASET_EVENT = "open_lineage/event_dataset_event.json"; + public static String EVENT_JOB_EVENT = "open_lineage/event_job_event.json"; + public static String EVENT_WITHOUT_SCHEMA_URL = "open_lineage/event_without_schema_url.json"; + + public static String RUN_EVENT_SCHEMA_URL = + "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent"; public static List data() { return Arrays.asList( EVENT_FULL, EVENT_SIMPLE, + EVENT_WITHOUT_SCHEMA_URL, EVENT_REQUIRED, EVENT_UNICODE, // FIXME: A very large event fails the test. @@ -99,15 +111,18 @@ public void testSendOpenLineageBadArgument() throws IOException { String badNamespace = "sqlserver://myhost:3342;user=auser;password=\uD83D\uDE02\uD83D\uDE02\uD83D\uDE02;database=TheDatabase"; marquez.service.models.LineageEvent event = - new marquez.service.models.LineageEvent( - "COMPLETE", - Instant.now().atZone(ZoneId.systemDefault()), - new marquez.service.models.LineageEvent.Run(UUID.randomUUID().toString(), null), - new marquez.service.models.LineageEvent.Job("namespace", "job_name", null), - List.of( - new marquez.service.models.LineageEvent.Dataset(badNamespace, "the_table", null)), - Collections.emptyList(), - "the_producer"); + marquez.service.models.LineageEvent.builder() + .eventType("COMPLETE") + .eventTime(Instant.now().atZone(ZoneId.systemDefault())) + .run(new marquez.service.models.LineageEvent.Run(UUID.randomUUID().toString(), null)) + .job(new marquez.service.models.LineageEvent.Job("namespace", "job_name", null)) + .inputs( + List.of( + new marquez.service.models.LineageEvent.Dataset( + badNamespace, "the_table", null))) + .outputs(Collections.emptyList()) + .producer("the_producer") + .build(); final CompletableFuture resp = sendEvent(event); assertThat(resp.join()).isEqualTo(400); @@ -911,6 +926,7 @@ public void testOpenLineageJobHierarchySparkAndAirflow() } @Test + @SneakyThrows public void testSendEventAndGetItBack() { marquez.service.models.LineageEvent.Run run = new marquez.service.models.LineageEvent.Run( @@ -939,6 +955,7 @@ public void testSendEventAndGetItBack() { .eventTime(time) .inputs(Collections.emptyList()) .outputs(Collections.singletonList(dataset)) + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)) .build(); final CompletableFuture resp = sendEvent(lineageEvent); @@ -954,6 +971,7 @@ public void testSendEventAndGetItBack() { } @Test + @SneakyThrows public void testFindEventIsSortedByTime() { marquez.service.models.LineageEvent.Run run = new marquez.service.models.LineageEvent.Run( @@ -978,16 +996,21 @@ public void testFindEventIsSortedByTime() { .run(run) .job(job) .inputs(Collections.emptyList()) - .outputs(Collections.singletonList(dataset)); + .outputs(Collections.singletonList(dataset)) + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)); marquez.service.models.LineageEvent firstEvent = - builder.eventTime(time).eventType("START").build(); + builder.eventTime(time).eventType("START").schemaURL(new URI(RUN_EVENT_SCHEMA_URL)).build(); CompletableFuture resp = sendEvent(firstEvent); assertThat(resp.join()).isEqualTo(201); marquez.service.models.LineageEvent secondEvent = - builder.eventTime(time.plusSeconds(10)).eventType("COMPLETE").build(); + builder + .eventTime(time.plusSeconds(10)) + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)) + .eventType("COMPLETE") + .build(); resp = sendEvent(secondEvent); assertThat(resp.join()).isEqualTo(201); @@ -1003,6 +1026,7 @@ public void testFindEventIsSortedByTime() { } @Test + @SneakyThrows public void testFindEventIsSortedByTimeAsc() { marquez.service.models.LineageEvent.Run run = new marquez.service.models.LineageEvent.Run( @@ -1027,16 +1051,21 @@ public void testFindEventIsSortedByTimeAsc() { .run(run) .job(job) .inputs(Collections.emptyList()) - .outputs(Collections.singletonList(dataset)); + .outputs(Collections.singletonList(dataset)) + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)); marquez.service.models.LineageEvent firstEvent = - builder.eventTime(time).eventType("START").build(); + builder.eventTime(time).eventType("START").schemaURL(new URI(RUN_EVENT_SCHEMA_URL)).build(); CompletableFuture resp = sendEvent(firstEvent); assertThat(resp.join()).isEqualTo(201); marquez.service.models.LineageEvent secondEvent = - builder.eventTime(time.plusSeconds(10)).eventType("COMPLETE").build(); + builder + .eventTime(time.plusSeconds(10)) + .eventType("COMPLETE") + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)) + .build(); resp = sendEvent(secondEvent); assertThat(resp.join()).isEqualTo(201); @@ -1052,6 +1081,7 @@ public void testFindEventIsSortedByTimeAsc() { } @Test + @SneakyThrows public void testFindEventBeforeAfterTime() { marquez.service.models.LineageEvent.Run run = new marquez.service.models.LineageEvent.Run( @@ -1078,7 +1108,8 @@ public void testFindEventBeforeAfterTime() { .run(run) .job(job) .inputs(Collections.emptyList()) - .outputs(Collections.singletonList(dataset)); + .outputs(Collections.singletonList(dataset)) + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)); marquez.service.models.LineageEvent firstEvent = builder.eventTime(after.minus(1, ChronoUnit.YEARS)).eventType("START").build(); @@ -1087,13 +1118,21 @@ public void testFindEventBeforeAfterTime() { assertThat(resp.join()).isEqualTo(201); marquez.service.models.LineageEvent secondEvent = - builder.eventTime(after.plusSeconds(10)).eventType("COMPLETE").build(); + builder + .eventTime(after.plusSeconds(10)) + .eventType("COMPLETE") + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)) + .build(); resp = sendEvent(secondEvent); assertThat(resp.join()).isEqualTo(201); marquez.service.models.LineageEvent thirdEvent = - builder.eventTime(before.plusSeconds(10)).eventType("COMPLETE").build(); + builder + .eventTime(before.plusSeconds(10)) + .eventType("COMPLETE") + .schemaURL(new URI(RUN_EVENT_SCHEMA_URL)) + .build(); List rawEvents = client.listLineageEvents(MarquezClient.SortDirection.ASC, before, after, 10); @@ -1336,6 +1375,72 @@ public void testSendOpenLineage(String pathToOpenLineageEvent) throws IOExceptio } } + @Test + public void testSendDatasetEventIsDecoded() throws IOException { + final String openLineageEventAsString = + Resources.toString(Resources.getResource(EVENT_DATASET_EVENT), Charset.defaultCharset()); + + // (2) Send OpenLineage event. + final CompletableFuture> resp = + this.sendLineage(openLineageEventAsString) + .thenApply(r -> Collections.singletonMap(r.statusCode(), r.body())) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + + // Ensure the event was received. + Map respMap = resp.join(); + + assertThat(respMap.containsKey(200)).isTrue(); // Status should be 200 instead of 201 + + // (3) Convert the OpenLineage event to Json. + DatasetEvent datasetEvent = + marquez.client.Utils.fromJson(respMap.get(200), new TypeReference() {}); + assertThat(datasetEvent.getDataset().getName()).isEqualTo("my-dataset-name"); + assertThat(datasetEvent.getDataset().getFacets().getSchema().getFields()).hasSize(1); + assertThat(datasetEvent.getDataset().getFacets().getSchema().getFields().get(0).getName()) + .isEqualTo("col_a"); + assertThat(datasetEvent.getEventTime().toString()).startsWith("2020-12-28T09:52:00.001"); + } + + @Test + public void testSendJobEventIsDecoded() throws IOException { + final String openLineageEventAsString = + Resources.toString(Resources.getResource(EVENT_JOB_EVENT), Charset.defaultCharset()); + + // (2) Send OpenLineage event. + final CompletableFuture> resp = + this.sendLineage(openLineageEventAsString) + .thenApply(r -> Collections.singletonMap(r.statusCode(), r.body())) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + + // Ensure the event was received. + Map respMap = resp.join(); + + assertThat(respMap.containsKey(200)).isTrue(); // Status should be 200 instead of 201 + + // (3) Convert the OpenLineage event to Json. + JobEvent jobEvent = + marquez.client.Utils.fromJson(respMap.get(200), new TypeReference() {}); + assertThat(jobEvent.getJob().getNamespace()).isEqualTo("my-scheduler-namespace"); + assertThat(jobEvent.getJob().getName()).isEqualTo("myjob"); + + assertThat(jobEvent.getInputs().get(0).getNamespace()).isEqualTo("my-datasource-namespace"); + assertThat(jobEvent.getInputs().get(0).getName()).isEqualTo("instance.schema.input-1"); + + assertThat(jobEvent.getOutputs().get(0).getNamespace()).isEqualTo("my-datasource-namespace"); + assertThat(jobEvent.getOutputs().get(0).getName()).isEqualTo("instance.schema.output-1"); + assertThat(jobEvent.getEventTime().toString()).startsWith("2020-12-28T09:52:00.001"); + } + private void validateDatasetFacets(JsonNode json) { final String namespace = json.path("namespace").asText(); final String output = json.path("name").asText(); diff --git a/api/src/test/java/marquez/db/BackfillTestUtils.java b/api/src/test/java/marquez/db/BackfillTestUtils.java index 92fddcee64..3fbaaf7fa1 100644 --- a/api/src/test/java/marquez/db/BackfillTestUtils.java +++ b/api/src/test/java/marquez/db/BackfillTestUtils.java @@ -83,20 +83,23 @@ public static RunRow writeNewEvent( new RunLink(runId), new JobLink(NAMESPACE, parentJobName))); LineageEvent event = - new LineageEvent( - COMPLETE, - Instant.now().atZone(LOCAL_ZONE), - new Run( - runUuid.toString(), - new RunFacet( - nominalTimeRunFacet, - parentRun.orElse(null), - ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc")))), - new LineageEvent.Job( - NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP)), - Collections.emptyList(), - Collections.emptyList(), - PRODUCER_URL.toString()); + LineageEvent.builder() + .eventType(COMPLETE) + .eventTime(Instant.now().atZone(LOCAL_ZONE)) + .run( + new Run( + runUuid.toString(), + new RunFacet( + nominalTimeRunFacet, + parentRun.orElse(null), + ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc"))))) + .job( + new LineageEvent.Job( + NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP))) + .inputs(Collections.emptyList()) + .outputs(Collections.emptyList()) + .producer(PRODUCER_URL.toString()) + .build(); PGobject eventJson = new PGobject(); eventJson.setType("json"); eventJson.setValue(Utils.getMapper().writeValueAsString(event)); diff --git a/api/src/test/java/marquez/db/LineageTestUtils.java b/api/src/test/java/marquez/db/LineageTestUtils.java index 13db1bd7c2..84dc6d18f1 100644 --- a/api/src/test/java/marquez/db/LineageTestUtils.java +++ b/api/src/test/java/marquez/db/LineageTestUtils.java @@ -122,14 +122,17 @@ public static UpdateLineageRow createLineageRow( UUID runId = UUID.randomUUID(); LineageEvent event = - new LineageEvent( - status, - Instant.now().atZone(LOCAL_ZONE), - new Run(runId.toString(), new RunFacet(nominalTimeRunFacet, parentRunFacet, runFacets)), - new Job(NAMESPACE, jobName, jobFacet), - inputs, - outputs, - PRODUCER_URL.toString()); + LineageEvent.builder() + .eventType(status) + .eventTime(Instant.now().atZone(LOCAL_ZONE)) + .run( + new Run( + runId.toString(), new RunFacet(nominalTimeRunFacet, parentRunFacet, runFacets))) + .job(new Job(NAMESPACE, jobName, jobFacet)) + .inputs(inputs) + .outputs(outputs) + .producer(PRODUCER_URL.toString()) + .build(); // emulate an OpenLineage RunEvent event .getProperties() diff --git a/api/src/test/java/marquez/service/models/EventTypeResolverTest.java b/api/src/test/java/marquez/service/models/EventTypeResolverTest.java new file mode 100644 index 0000000000..c8983bc012 --- /dev/null +++ b/api/src/test/java/marquez/service/models/EventTypeResolverTest.java @@ -0,0 +1,92 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.DatabindContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class EventTypeResolverTest { + + EventTypeResolver resolver = new EventTypeResolver(); + ObjectMapper mapper = new ObjectMapper(); + DatabindContext databindContext = mock(DatabindContext.class); + JavaType superType = mapper.constructType(BaseEvent.class); + + @BeforeEach + public void setup() { + resolver.init(superType); + } + + @Test + @SneakyThrows + public void testTypeFromIdForRunEvent() { + JavaType runEventType = mapper.constructType(LineageEvent.class); + when(databindContext.constructSpecializedType(superType, LineageEvent.class)) + .thenReturn(runEventType); + + assertThat( + resolver.typeFromId( + databindContext, + "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent")) + .isEqualTo(runEventType); + } + + @Test + @SneakyThrows + public void testTypeFromIdForDatasetEvent() { + JavaType datasetEventType = mapper.constructType(DatasetEvent.class); + when(databindContext.constructSpecializedType(superType, DatasetEvent.class)) + .thenReturn(datasetEventType); + + assertThat( + resolver.typeFromId( + databindContext, + "https://openlineage.io/spec/2-5-0/OpenLineage.json#/definitions/DatasetEvent")) + .isEqualTo(datasetEventType); + } + + @Test + @SneakyThrows + public void testTypeFromIdForJobEvent() { + JavaType jobEventType = mapper.constructType(JobEvent.class); + when(databindContext.constructSpecializedType(superType, JobEvent.class)) + .thenReturn(jobEventType); + + assertThat( + resolver.typeFromId( + databindContext, + "https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/JobEvent")) + .isEqualTo(jobEventType); + } + + @Test + @SneakyThrows + public void testTypeFromIdForUnknownEvent() { + JavaType runEventType = mapper.constructType(LineageEvent.class); + when(databindContext.constructSpecializedType(superType, LineageEvent.class)) + .thenReturn(runEventType); + + assertThat(resolver.typeFromId(databindContext, "unknown type")).isEqualTo(runEventType); + } + + @Test + @SneakyThrows + public void testTypeFromIdForNullId() { + JavaType runEventType = mapper.constructType(LineageEvent.class); + when(databindContext.constructSpecializedType(superType, LineageEvent.class)) + .thenReturn(runEventType); + + assertThat(resolver.typeFromId(databindContext, null)).isEqualTo(runEventType); + } +} diff --git a/api/src/test/java/marquez/service/models/LineageEventTest.java b/api/src/test/java/marquez/service/models/LineageEventTest.java index b6593f7e07..b06fe2946d 100644 --- a/api/src/test/java/marquez/service/models/LineageEventTest.java +++ b/api/src/test/java/marquez/service/models/LineageEventTest.java @@ -57,7 +57,8 @@ public void testIsomorphicOpenLineageEvents(String inputFile) throws IOException URL expectedResource = Resources.getResource(inputFile); ObjectMapper objectMapper = Utils.newObjectMapper(); RunEvent expectedEvent = objectMapper.readValue(expectedResource, RunEvent.class); - LineageEvent lineageEvent = objectMapper.readValue(expectedResource, LineageEvent.class); + LineageEvent lineageEvent = + (LineageEvent) objectMapper.readValue(expectedResource, BaseEvent.class); RunEvent converted = objectMapper.readValue(objectMapper.writeValueAsString(lineageEvent), RunEvent.class); assertThat(converted) @@ -75,7 +76,7 @@ public void testSerialization(String input) throws IOException { public void testSerialization(ObjectMapper mapper, String expectedFile) throws IOException { URL expectedResource = Resources.getResource(expectedFile); - LineageEvent deserialized = mapper.readValue(expectedResource, LineageEvent.class); + LineageEvent deserialized = (LineageEvent) mapper.readValue(expectedResource, BaseEvent.class); String serialized = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(deserialized); JsonNode expectedNode = mapper.readTree(expectedResource); diff --git a/api/src/test/resources/open_lineage/event_dataset_event.json b/api/src/test/resources/open_lineage/event_dataset_event.json new file mode 100644 index 0000000000..374c907371 --- /dev/null +++ b/api/src/test/resources/open_lineage/event_dataset_event.json @@ -0,0 +1,22 @@ +{ + "eventTime": "2020-12-28T19:52:00.001+10:00", + "dataset": { + "namespace": "my-dataset-namespace", + "name": "my-dataset-name", + "facets": { + "schema": { + "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#MyCustomJobFacet", + "fields": [ + { + "name": "col_a", + "type": "VARCHAR", + "description": "string" + } + ] + } + } + }, + "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", + "schemaURL": "https://openlineage.io/spec/2-8-9/OpenLineage.json#/definitions/DatasetEvent" +} \ No newline at end of file diff --git a/api/src/test/resources/open_lineage/event_job_event.json b/api/src/test/resources/open_lineage/event_job_event.json new file mode 100644 index 0000000000..715a26fdfd --- /dev/null +++ b/api/src/test/resources/open_lineage/event_job_event.json @@ -0,0 +1,22 @@ +{ + "eventType": "COMPLETE", + "eventTime": "2020-12-28T19:52:00.001+10:00", + "job": { + "namespace": "my-scheduler-namespace", + "name": "myjob" + }, + "inputs": [ + { + "namespace": "my-datasource-namespace", + "name": "instance.schema.input-1" + } + ], + "outputs": [ + { + "namespace": "my-datasource-namespace", + "name": "instance.schema.output-1" + } + ], + "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", + "schemaURL": "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/JobEvent" +} \ No newline at end of file diff --git a/api/src/test/resources/open_lineage/event_without_schema_url.json b/api/src/test/resources/open_lineage/event_without_schema_url.json new file mode 100644 index 0000000000..4618d4791a --- /dev/null +++ b/api/src/test/resources/open_lineage/event_without_schema_url.json @@ -0,0 +1,28 @@ +{ + "eventType": "COMPLETE", + "eventTime": "2020-12-28T19:52:00.001+10:00", + "run": { + "runId": "41fb5137-f0fd-4ee5-ba5c-56f8571d1bd7" + }, + "job": { + "namespace": "my-scheduler-namespace", + "name": "myjob" + }, + "inputs": [ + { + "namespace": "my-datasource-namespace", + "name": "instance.schema.input-1" + }, + { + "namespace": "my-datasource-namespace", + "name": "instance.schema.input-2" + } + ], + "outputs": [ + { + "namespace": "my-datasource-namespace", + "name": "instance.schema.output-1" + } + ], + "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client" +} \ No newline at end of file diff --git a/clients/java/src/main/java/marquez/client/models/LineageEvent.java b/clients/java/src/main/java/marquez/client/models/LineageEvent.java index c9a6d20827..b9dd8dec57 100644 --- a/clients/java/src/main/java/marquez/client/models/LineageEvent.java +++ b/clients/java/src/main/java/marquez/client/models/LineageEvent.java @@ -20,4 +20,12 @@ public class LineageEvent { List inputs; List outputs; URI producer; + URI schemaURL; + + public URI getSchemaURL() { + if (schemaURL == null) { + return URI.create("https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent"); + } + return schemaURL; + } } diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index a24c98315b..b5b3dbac3d 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -188,7 +188,8 @@ public class MarquezClientTest { Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), - URI.create("http://localhost:8080")); + URI.create("http://localhost:8080"), + URI.create("https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent")); // STREAM DATASET private static final DatasetId STREAM_ID = newDatasetIdWith(NAMESPACE_NAME); diff --git a/clients/java/src/test/java/marquez/client/models/LineageEventTest.java b/clients/java/src/test/java/marquez/client/models/LineageEventTest.java new file mode 100644 index 0000000000..986eaa0215 --- /dev/null +++ b/clients/java/src/test/java/marquez/client/models/LineageEventTest.java @@ -0,0 +1,45 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.net.URI; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +public class LineageEventTest { + + public static final URI VALID_SCHEMA_URL = + URI.create("https://openlineage.io/spec/2-5-0/OpenLineage.json#/definitions/RunEvent"); + + @Test + public void testGetSchemaURL() { + assertThat(createlLineageEventWith(VALID_SCHEMA_URL).getSchemaURL()) + .isEqualTo(VALID_SCHEMA_URL); + } + + @Test + public void testGetSchemaURLWhenNull() { + assertThat(createlLineageEventWith(null).getSchemaURL()) + .isEqualTo( + URI.create("https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent")); + } + + private LineageEvent createlLineageEventWith(URI schemaURL) { + return new LineageEvent( + "START", + ZonedDateTime.now(ZoneId.of("UTC")), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + URI.create("http://localhost:8080"), + schemaURL); + } +}