Skip to content

Commit

Permalink
decode runless events
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Jul 17, 2023
1 parent 23de0de commit 24c3240
Show file tree
Hide file tree
Showing 19 changed files with 653 additions and 117 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
* Web: handle lineage graph cycles on the client [`#2506`](https://github.com/MarquezProject/marquez/pull/2506) [@jlukenoff](https://github.com/jlukenoff)
*Fixes a bug where we blow the stack on the client-side if the user selects a node that is part of a cycle in the graph.*

### Added

* Ability to decode static metadata events [`#2495`](https://github.com/MarquezProject/marquez/pull/2495) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Adds the ability to distinguish on a bakend static metadata events introduced based on the [proposal](https://github.com/OpenLineage/OpenLineage/blob/main/proposals/1837/static_lineage.md).*


## [0.34.0](https://github.com/MarquezProject/marquez/compare/0.33.0...0.34.0) - 2023-05-18

### Fixed
Expand Down
33 changes: 20 additions & 13 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import marquez.api.models.SortDirection;
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.BaseEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.NodeId;

Expand All @@ -61,20 +62,26 @@ public OpenLineageResource(
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/lineage")
public void create(
@Valid @NotNull LineageEvent event, @Suspended final AsyncResponse asyncResponse)
public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncResponse asyncResponse)
throws JsonProcessingException, SQLException {
openLineageService
.createAsync(event)
.whenComplete(
(result, err) -> {
if (err != null) {
log.error("Unexpected error while processing request", err);
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
} else {
asyncResponse.resume(Response.status(201).build());
}
});
if (event instanceof LineageEvent) {
openLineageService
.createAsync((LineageEvent) event)
.whenComplete(
(result, err) -> {
if (err != null) {
log.error("Unexpected error while processing request", err);
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
} else {
asyncResponse.resume(Response.status(201).build());
}
});
} else {
log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName());

// return serialized event
asyncResponse.resume(Response.status(200).entity(event).build());
}
}

private int determineStatusCode(Throwable e) {
Expand Down
20 changes: 20 additions & 0 deletions api/src/main/java/marquez/service/models/BaseEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;

@JsonTypeIdResolver(EventTypeResolver.class)
@JsonTypeInfo(
use = Id.CUSTOM,
include = As.EXISTING_PROPERTY,
property = "schemaURL",
defaultImpl = LineageEvent.class,
visible = true)
public class BaseEvent extends BaseJsonModel {}
31 changes: 31 additions & 0 deletions api/src/main/java/marquez/service/models/DatasetEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import java.net.URI;
import java.time.ZonedDateTime;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Setter
@Getter
@Valid
@ToString
public class DatasetEvent extends BaseEvent {
@NotNull private ZonedDateTime eventTime;
@Valid private LineageEvent.Dataset dataset;
@Valid @NotNull private String producer;
@Valid @NotNull private URI schemaURL;
}
89 changes: 89 additions & 0 deletions api/src/main/java/marquez/service/models/EventTypeResolver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import static marquez.service.models.EventTypeResolver.EventSchemaURL.LINEAGE_EVENT;

import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import com.fasterxml.jackson.databind.DatabindContext;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
import java.io.IOException;
import java.util.Arrays;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class EventTypeResolver extends TypeIdResolverBase {

@AllArgsConstructor
public enum EventSchemaURL {
LINEAGE_EVENT(
"https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent",
LineageEvent.class),
DATASET_EVENT(
"https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/DatasetEvent",
DatasetEvent.class),
JOB_EVENT(
"https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/JobEvent", JobEvent.class);

@Getter private String schemaURL;

public String getName() {
int lastSlash = schemaURL.lastIndexOf('/');
return schemaURL.substring(lastSlash, schemaURL.length());
}

@Getter private Class<?> subType;
}

private JavaType superType;

@Override
public void init(JavaType baseType) {
superType = baseType;
}

@Override
public String idFromValue(Object value) {
return null;
}

@Override
public String idFromValueAndType(Object value, Class<?> suggestedType) {
return null;
}

@Override
public JavaType typeFromId(DatabindContext context, String id) throws IOException {
if (id == null) {
return context.constructSpecializedType(superType, LINEAGE_EVENT.subType);
}

int lastSlash = id.lastIndexOf('/');

if (lastSlash < 0) {
return context.constructSpecializedType(superType, LINEAGE_EVENT.subType);
}

String type = id.substring(lastSlash, id.length());

Class<?> subType =
Arrays.stream(EventSchemaURL.values())
.filter(s -> s.getName().equals(type))
.findAny()
.map(EventSchemaURL::getSubType)
.orElse(LINEAGE_EVENT.subType);

return context.constructSpecializedType(superType, subType);
}

@Override
public Id getMechanism() {
return null;
}
}
34 changes: 34 additions & 0 deletions api/src/main/java/marquez/service/models/JobEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import java.net.URI;
import java.time.ZonedDateTime;
import java.util.List;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Setter
@Getter
@Valid
@ToString
public class JobEvent extends BaseEvent {
@NotNull private ZonedDateTime eventTime;
@Valid @NotNull private LineageEvent.Job job;
@Valid private List<LineageEvent.Dataset> inputs;
@Valid private List<LineageEvent.Dataset> outputs;
@Valid @NotNull private String producer;
@Valid @NotNull private URI schemaURL;
}
3 changes: 2 additions & 1 deletion api/src/main/java/marquez/service/models/LineageEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@Getter
@Valid
@ToString
public class LineageEvent extends BaseJsonModel {
public class LineageEvent extends BaseEvent {

private String eventType;

Expand All @@ -48,6 +48,7 @@ public class LineageEvent extends BaseJsonModel {
@Valid private List<Dataset> inputs;
@Valid private List<Dataset> outputs;
@Valid @NotNull private String producer;
@Valid private URI schemaURL;

@AllArgsConstructor
@NoArgsConstructor
Expand Down
Loading

0 comments on commit 24c3240

Please sign in to comment.