Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to decode static metadata events #2495

Merged
merged 1 commit into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
* Web: handle lineage graph cycles on the client [`#2506`](https://github.com/MarquezProject/marquez/pull/2506) [@jlukenoff](https://github.com/jlukenoff)
*Fixes a bug where we blow the stack on the client-side if the user selects a node that is part of a cycle in the graph.*

### Added

* Ability to decode static metadata events [`#2495`](https://github.com/MarquezProject/marquez/pull/2495) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Adds the ability to distinguish on a bakend static metadata events introduced based on the [proposal](https://github.com/OpenLineage/OpenLineage/blob/main/proposals/1837/static_lineage.md).*


## [0.34.0](https://github.com/MarquezProject/marquez/compare/0.33.0...0.34.0) - 2023-05-18

### Fixed
Expand Down
33 changes: 20 additions & 13 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import marquez.api.models.SortDirection;
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.BaseEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.NodeId;

Expand All @@ -61,20 +62,26 @@ public OpenLineageResource(
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/lineage")
public void create(
@Valid @NotNull LineageEvent event, @Suspended final AsyncResponse asyncResponse)
public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncResponse asyncResponse)
throws JsonProcessingException, SQLException {
openLineageService
.createAsync(event)
.whenComplete(
(result, err) -> {
if (err != null) {
log.error("Unexpected error while processing request", err);
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
} else {
asyncResponse.resume(Response.status(201).build());
}
});
if (event instanceof LineageEvent) {
openLineageService
.createAsync((LineageEvent) event)
.whenComplete(
(result, err) -> {
if (err != null) {
log.error("Unexpected error while processing request", err);
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
} else {
asyncResponse.resume(Response.status(201).build());
}
});
} else {
log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName());

// return serialized event
asyncResponse.resume(Response.status(200).entity(event).build());
Copy link
Member

@wslulciuc wslulciuc Jun 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

200 status code is correct for the new OL events, and feel we should also return a 200 when accepting OL run events (as outlined by the OL spec). The semantics should be: "Return 200 OK to signify the OL event has been collected, and eventually will be processed." The 201 status code was never changed during the initial PoC phase of OL. More of a thought, and we'll want to have a follow up PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here was to distinguish between RunEvent that get saved into database (201 created) and other event types that do not affect application state. At the end, once Marquez will be capable of storing dataset and job events, it should return 201 for all the cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we save them in the lineage_events table to start with?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@julienledem we'll want to write a proposal on how to handle DatasetEvents and JobEvents (see #2544). For now, let's ensure the event can be accepted (but not stored).

}
}

private int determineStatusCode(Throwable e) {
Expand Down
20 changes: 20 additions & 0 deletions api/src/main/java/marquez/service/models/BaseEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;

@JsonTypeIdResolver(EventTypeResolver.class)
@JsonTypeInfo(
use = Id.CUSTOM,
include = As.EXISTING_PROPERTY,
property = "schemaURL",
defaultImpl = LineageEvent.class,
visible = true)
public class BaseEvent extends BaseJsonModel {}
31 changes: 31 additions & 0 deletions api/src/main/java/marquez/service/models/DatasetEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import java.net.URI;
import java.time.ZonedDateTime;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Setter
@Getter
@Valid
@ToString
public class DatasetEvent extends BaseEvent {
@NotNull private ZonedDateTime eventTime;
@Valid private LineageEvent.Dataset dataset;
@Valid @NotNull private String producer;
@Valid @NotNull private URI schemaURL;
}
89 changes: 89 additions & 0 deletions api/src/main/java/marquez/service/models/EventTypeResolver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import static marquez.service.models.EventTypeResolver.EventSchemaURL.LINEAGE_EVENT;

import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import com.fasterxml.jackson.databind.DatabindContext;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
import java.io.IOException;
import java.util.Arrays;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class EventTypeResolver extends TypeIdResolverBase {

@AllArgsConstructor
public enum EventSchemaURL {
LINEAGE_EVENT(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be called RUN_EVENT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, let's use RUN_EVENT.

"https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent",
LineageEvent.class),
DATASET_EVENT(
"https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/DatasetEvent",
DatasetEvent.class),
JOB_EVENT(
"https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/JobEvent", JobEvent.class);

@Getter private String schemaURL;

public String getName() {
int lastSlash = schemaURL.lastIndexOf('/');
return schemaURL.substring(lastSlash, schemaURL.length());
}

@Getter private Class<?> subType;
}

private JavaType superType;

@Override
public void init(JavaType baseType) {
superType = baseType;
}

@Override
public String idFromValue(Object value) {
return null;
}

@Override
public String idFromValueAndType(Object value, Class<?> suggestedType) {
return null;
}

@Override
public JavaType typeFromId(DatabindContext context, String id) throws IOException {
if (id == null) {
return context.constructSpecializedType(superType, LINEAGE_EVENT.subType);
}

int lastSlash = id.lastIndexOf('/');

if (lastSlash < 0) {
return context.constructSpecializedType(superType, LINEAGE_EVENT.subType);
}
Comment on lines +63 to +71
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add comment that we default to run event for backwards compatibility


String type = id.substring(lastSlash, id.length());

Class<?> subType =
Arrays.stream(EventSchemaURL.values())
.filter(s -> s.getName().equals(type))
.findAny()
.map(EventSchemaURL::getSubType)
.orElse(LINEAGE_EVENT.subType);

return context.constructSpecializedType(superType, subType);
}

@Override
public Id getMechanism() {
return null;
}
}
34 changes: 34 additions & 0 deletions api/src/main/java/marquez/service/models/JobEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import java.net.URI;
import java.time.ZonedDateTime;
import java.util.List;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Setter
@Getter
@Valid
@ToString
public class JobEvent extends BaseEvent {
wslulciuc marked this conversation as resolved.
Show resolved Hide resolved
@NotNull private ZonedDateTime eventTime;
@Valid @NotNull private LineageEvent.Job job;
@Valid private List<LineageEvent.Dataset> inputs;
@Valid private List<LineageEvent.Dataset> outputs;
@Valid @NotNull private String producer;
@Valid @NotNull private URI schemaURL;
}
3 changes: 2 additions & 1 deletion api/src/main/java/marquez/service/models/LineageEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@Getter
@Valid
@ToString
public class LineageEvent extends BaseJsonModel {
public class LineageEvent extends BaseEvent {

private String eventType;

Expand All @@ -48,6 +48,7 @@ public class LineageEvent extends BaseJsonModel {
@Valid private List<Dataset> inputs;
@Valid private List<Dataset> outputs;
@Valid @NotNull private String producer;
@Valid private URI schemaURL;

@AllArgsConstructor
@NoArgsConstructor
Expand Down
Loading