Skip to content

Commit

Permalink
Merge branch 'main' into update/datasets-sql
Browse files Browse the repository at this point in the history
  • Loading branch information
wslulciuc authored Jul 17, 2023
2 parents 760c84b + e99ebc9 commit 15aef67
Show file tree
Hide file tree
Showing 37 changed files with 708 additions and 166 deletions.
2 changes: 1 addition & 1 deletion .circleci/api-load-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
set -e

# Build version of Marquez
readonly MARQUEZ_VERSION=0.37.0-SNAPSHOT
readonly MARQUEZ_VERSION=0.38.0-SNAPSHOT
# Fully qualified path to marquez.jar
readonly MARQUEZ_JAR="api/build/libs/marquez-api-${MARQUEZ_VERSION}.jar"

Expand Down
2 changes: 1 addition & 1 deletion .circleci/db-migration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# Version of PostgreSQL
readonly POSTGRES_VERSION="12.1"
# Version of Marquez
readonly MARQUEZ_VERSION=0.36.0
readonly MARQUEZ_VERSION=0.37.0
# Build version of Marquez
readonly MARQUEZ_BUILD_VERSION="$(git log --pretty=format:'%h' -n 1)" # SHA1

Expand Down
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
API_PORT=5000
API_ADMIN_PORT=5001
WEB_PORT=3000
TAG=0.36.0
TAG=0.37.0
20 changes: 17 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
# Changelog

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.36.0...HEAD)
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.37.0...HEAD)

## [0.37.0](https://github.com/MarquezProject/marquez/compare/0.36.0...0.37.0) - 2023-07-17
### Added
* API: add ability to decode static metadata events [`#2495`](https://github.com/MarquezProject/marquez/pull/2495) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Introduces an `EventTypeResolver` for using the `schemaURL` field to decode `POST` requests to `/lineage` with `LineageEvent`s, `DatasetEvent`s or `JobEvent`s, as the first step in implementing static lineage support.*

### Fixed
* API: remove unnecessary DB updates [`#2531`](https://github.com/MarquezProject/marquez/pull/2531)[@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Prevent updates that are not needed and are deadlock prone.*
* API: remove unnecessary DB updates [`#2531`](https://github.com/MarquezProject/marquez/pull/2531) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Prevent updates that are not needed and are deadlock-prone.*
* Web: revert URL encoding when fetching lineage [`#2529`](https://github.com/MarquezProject/marquez/pull/2529) [@jlukenoff](https://github.com/jlukenoff)
*Reverts the node ID from being URL-encoded and allows the backend to return lineage details successfully even when a node ID contains special characters.*

## [0.36.0](https://github.com/MarquezProject/marquez/compare/0.35.0...0.36.0) - 2023-06-27
### Added
Expand All @@ -28,6 +36,12 @@
* Web: handle lineage graph cycles on the client [`#2506`](https://github.com/MarquezProject/marquez/pull/2506) [@jlukenoff](https://github.com/jlukenoff)
*Fixes a bug where we blow the stack on the client-side if the user selects a node that is part of a cycle in the graph.*

### Added

* Ability to decode static metadata events [`#2495`](https://github.com/MarquezProject/marquez/pull/2495) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
*Adds the ability to distinguish on a bakend static metadata events introduced based on the [proposal](https://github.com/OpenLineage/OpenLineage/blob/main/proposals/1837/static_lineage.md).*


## [0.34.0](https://github.com/MarquezProject/marquez/compare/0.33.0...0.34.0) - 2023-05-18

### Fixed
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ Versions of Marquez are compatible with OpenLineage unless noted otherwise. We e
| **Marquez** | **OpenLineage** | **Status** |
|--------------------------------------------------------------------------------------------------|---------------------------------------------------------------|---------------|
| [`UNRELEASED`](https://github.com/MarquezProject/marquez/blob/main/CHANGELOG.md#unreleased) | [`1-0-5`](https://openlineage.io/spec/1-0-5/OpenLineage.json) | `CURRENT` |
| [`0.35.0`](https://github.com/MarquezProject/marquez/blob/0.35.0/CHANGELOG.md#0350---2023-06-13) | [`1-0-5`](https://openlineage.io/spec/1-0-5/OpenLineage.json) | `RECOMMENDED` |
| [`0.34.0`](https://github.com/MarquezProject/marquez/blob/0.34.0/CHANGELOG.md#0340---2023-05-18) | [`1-0-5`](https://openlineage.io/spec/1-0-0/OpenLineage.json) | `MAINTENANCE` |
| [`0.37.0`](https://github.com/MarquezProject/marquez/blob/0.37.0/CHANGELOG.md#0370---2023-07-17) | [`1-0-5`](https://openlineage.io/spec/1-0-5/OpenLineage.json) | `RECOMMENDED` |
| [`0.36.0`](https://github.com/MarquezProject/marquez/blob/0.36.0/CHANGELOG.md#0360---2023-06-27) | [`1-0-5`](https://openlineage.io/spec/1-0-0/OpenLineage.json) | `MAINTENANCE` |

> **Note:** The [`openlineage-python`](https://pypi.org/project/openlineage-python) and [`openlineage-java`](https://central.sonatype.com/artifact/io.openlineage/openlineage-java) libraries will a higher version than the OpenLineage [specification](https://github.com/OpenLineage/OpenLineage/tree/main/spec) as they have different version requirements.
Expand Down Expand Up @@ -160,7 +160,7 @@ Marquez listens on port `8080` for all API calls and port `8081` for the admin i

* Website: https://marquezproject.ai
* Source: https://github.com/MarquezProject/marquez
* Chat: [https://marquezproject.slack.com](https://bit.ly/MqzSlack)
* Chat: [MarquezProject Slack](https://bit.ly/MqzSlackInvite)
* Twitter: [@MarquezProject](https://twitter.com/MarquezProject)

## Contributing
Expand Down
33 changes: 20 additions & 13 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import marquez.api.models.SortDirection;
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.BaseEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.NodeId;

Expand All @@ -61,20 +62,26 @@ public OpenLineageResource(
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/lineage")
public void create(
@Valid @NotNull LineageEvent event, @Suspended final AsyncResponse asyncResponse)
public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncResponse asyncResponse)
throws JsonProcessingException, SQLException {
openLineageService
.createAsync(event)
.whenComplete(
(result, err) -> {
if (err != null) {
log.error("Unexpected error while processing request", err);
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
} else {
asyncResponse.resume(Response.status(201).build());
}
});
if (event instanceof LineageEvent) {
openLineageService
.createAsync((LineageEvent) event)
.whenComplete(
(result, err) -> {
if (err != null) {
log.error("Unexpected error while processing request", err);
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
} else {
asyncResponse.resume(Response.status(201).build());
}
});
} else {
log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName());

// return serialized event
asyncResponse.resume(Response.status(200).entity(event).build());
}
}

private int determineStatusCode(Throwable e) {
Expand Down
20 changes: 20 additions & 0 deletions api/src/main/java/marquez/service/models/BaseEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;

@JsonTypeIdResolver(EventTypeResolver.class)
@JsonTypeInfo(
use = Id.CUSTOM,
include = As.EXISTING_PROPERTY,
property = "schemaURL",
defaultImpl = LineageEvent.class,
visible = true)
public class BaseEvent extends BaseJsonModel {}
31 changes: 31 additions & 0 deletions api/src/main/java/marquez/service/models/DatasetEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import java.net.URI;
import java.time.ZonedDateTime;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Setter
@Getter
@Valid
@ToString
public class DatasetEvent extends BaseEvent {
@NotNull private ZonedDateTime eventTime;
@Valid private LineageEvent.Dataset dataset;
@Valid @NotNull private String producer;
@Valid @NotNull private URI schemaURL;
}
89 changes: 89 additions & 0 deletions api/src/main/java/marquez/service/models/EventTypeResolver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import static marquez.service.models.EventTypeResolver.EventSchemaURL.LINEAGE_EVENT;

import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import com.fasterxml.jackson.databind.DatabindContext;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
import java.io.IOException;
import java.util.Arrays;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class EventTypeResolver extends TypeIdResolverBase {

@AllArgsConstructor
public enum EventSchemaURL {
LINEAGE_EVENT(
"https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/RunEvent",
LineageEvent.class),
DATASET_EVENT(
"https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/DatasetEvent",
DatasetEvent.class),
JOB_EVENT(
"https://openlineage.io/spec/2-0-0/OpenLineage.json#/definitions/JobEvent", JobEvent.class);

@Getter private String schemaURL;

public String getName() {
int lastSlash = schemaURL.lastIndexOf('/');
return schemaURL.substring(lastSlash, schemaURL.length());
}

@Getter private Class<?> subType;
}

private JavaType superType;

@Override
public void init(JavaType baseType) {
superType = baseType;
}

@Override
public String idFromValue(Object value) {
return null;
}

@Override
public String idFromValueAndType(Object value, Class<?> suggestedType) {
return null;
}

@Override
public JavaType typeFromId(DatabindContext context, String id) throws IOException {
if (id == null) {
return context.constructSpecializedType(superType, LINEAGE_EVENT.subType);
}

int lastSlash = id.lastIndexOf('/');

if (lastSlash < 0) {
return context.constructSpecializedType(superType, LINEAGE_EVENT.subType);
}

String type = id.substring(lastSlash, id.length());

Class<?> subType =
Arrays.stream(EventSchemaURL.values())
.filter(s -> s.getName().equals(type))
.findAny()
.map(EventSchemaURL::getSubType)
.orElse(LINEAGE_EVENT.subType);

return context.constructSpecializedType(superType, subType);
}

@Override
public Id getMechanism() {
return null;
}
}
34 changes: 34 additions & 0 deletions api/src/main/java/marquez/service/models/JobEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service.models;

import java.net.URI;
import java.time.ZonedDateTime;
import java.util.List;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Setter
@Getter
@Valid
@ToString
public class JobEvent extends BaseEvent {
@NotNull private ZonedDateTime eventTime;
@Valid @NotNull private LineageEvent.Job job;
@Valid private List<LineageEvent.Dataset> inputs;
@Valid private List<LineageEvent.Dataset> outputs;
@Valid @NotNull private String producer;
@Valid @NotNull private URI schemaURL;
}
3 changes: 2 additions & 1 deletion api/src/main/java/marquez/service/models/LineageEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@Getter
@Valid
@ToString
public class LineageEvent extends BaseJsonModel {
public class LineageEvent extends BaseEvent {

private String eventType;

Expand All @@ -48,6 +48,7 @@ public class LineageEvent extends BaseJsonModel {
@Valid private List<Dataset> inputs;
@Valid private List<Dataset> outputs;
@Valid @NotNull private String producer;
@Valid private URI schemaURL;

@AllArgsConstructor
@NoArgsConstructor
Expand Down
Loading

0 comments on commit 15aef67

Please sign in to comment.