Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add raw OpenLineage get event API #2070

Merged
merged 3 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.25.0...HEAD)
* Add job/dataset soft delete API [`#2032`](https://github.com/MarquezProject/marquez/pull/2032)
* Add new API for retrieving OpenLineage events [`#2070`](https://github.com/MarquezProject/marquez/pull/2070) [@mobuchowski](https://github.com/mobuchowski)

## [0.25.0](https://github.com/MarquezProject/marquez/compare/0.24.0...0.25.0) - 2022-08-08

Expand Down
1 change: 1 addition & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies {
implementation "io.prometheus:simpleclient_hotspot:${prometheusVersion}"
implementation "io.prometheus:simpleclient_servlet:${prometheusVersion}"
implementation "org.jdbi:jdbi3-core:${jdbi3Version}"
implementation "org.jdbi:jdbi3-jackson2:${jdbi3Version}"
implementation "org.jdbi:jdbi3-postgres:${jdbi3Version}"
implementation "org.jdbi:jdbi3-sqlobject:${jdbi3Version}"
implementation 'com.google.guava:guava:31.1-jre'
Expand Down
6 changes: 5 additions & 1 deletion api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.flywaydb.core.api.FlywayException;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.statement.SqlLogger;
import org.jdbi.v3.jackson2.Jackson2Config;
import org.jdbi.v3.jackson2.Jackson2Plugin;
import org.jdbi.v3.postgres.PostgresPlugin;
import org.jdbi.v3.sqlobject.SqlObjectPlugin;

Expand Down Expand Up @@ -154,12 +156,14 @@ private MarquezContext buildMarquezContext(
factory
.build(env, config.getDataSourceFactory(), source, DB_POSTGRES)
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin());
.installPlugin(new PostgresPlugin())
.installPlugin(new Jackson2Plugin());
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics());
if (isSentryEnabled(config)) {
sqlLogger = new TracingSQLLogger(sqlLogger);
}
jdbi.setSqlLogger(sqlLogger);
jdbi.getConfig(Jackson2Config.class).setMapper(Utils.getMapper());

final MarquezContext context =
MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build();
Expand Down
5 changes: 1 addition & 4 deletions api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public final class MarquezContext {
@Getter private final OpenLineageDao openLineageDao;
@Getter private final LineageDao lineageDao;
@Getter private final SearchDao searchDao;

@Getter private final List<RunTransitionListener> runTransitionListeners;

@Getter private final NamespaceService namespaceService;
Expand All @@ -82,15 +81,13 @@ public final class MarquezContext {
@Getter private final RunService runService;
@Getter private final OpenLineageService openLineageService;
@Getter private final LineageService lineageService;

@Getter private final NamespaceResource namespaceResource;
@Getter private final SourceResource sourceResource;
@Getter private final DatasetResource datasetResource;
@Getter private final JobResource jobResource;
@Getter private final TagResource tagResource;
@Getter private final OpenLineageResource openLineageResource;
@Getter private final SearchResource searchResource;

@Getter private final ImmutableList<Object> resources;
@Getter private final JdbiExceptionExceptionMapper jdbiException;
@Getter private final GraphQLHttpServlet graphqlServlet;
Expand Down Expand Up @@ -150,7 +147,7 @@ private MarquezContext(
this.datasetResource = new DatasetResource(serviceFactory);
this.jobResource = new JobResource(serviceFactory, jobVersionDao);
this.tagResource = new TagResource(serviceFactory);
this.openLineageResource = new OpenLineageResource(serviceFactory);
this.openLineageResource = new OpenLineageResource(serviceFactory, openLineageDao);
this.searchResource = new SearchResource(searchDao);

this.resources =
Expand Down
49 changes: 44 additions & 5 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.dropwizard.jersey.jsr310.ZonedDateTimeParam;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
Expand All @@ -29,18 +33,25 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import marquez.api.models.SortDirection;
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.LineageEvent;
import marquez.service.models.NodeId;

@Slf4j
@Path("/api/v1/lineage")
@Path("/api/v1")
public class OpenLineageResource extends BaseResource {
private static final String DEFAULT_DEPTH = "20";

public OpenLineageResource(@NonNull final ServiceFactory serviceFactory) {
private final OpenLineageDao openLineageDao;

public OpenLineageResource(
@NonNull final ServiceFactory serviceFactory, @NonNull final OpenLineageDao openLineageDao) {
super(serviceFactory);
this.openLineageDao = openLineageDao;
}

@Timed
Expand All @@ -49,6 +60,7 @@ public OpenLineageResource(@NonNull final ServiceFactory serviceFactory) {
@POST
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/lineage")
public void create(
@Valid @NotNull LineageEvent event, @Suspended final AsyncResponse asyncResponse)
throws JsonProcessingException, SQLException {
Expand Down Expand Up @@ -80,10 +92,37 @@ private int determineStatusCode(Throwable e) {
@GET
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/lineage")
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
throws ExecutionException, InterruptedException {
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
return Response.ok(lineageService.lineage(nodeId, depth)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Path("/events/lineage")
@Produces(APPLICATION_JSON)
public Response getLineageEvents(
@QueryParam("before") @DefaultValue("2030-01-01T00:00:00+00:00") ZonedDateTimeParam before,
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
@QueryParam("after") @DefaultValue("1970-01-01T00:00:00+00:00") ZonedDateTimeParam after,
@QueryParam("sortDirection") @DefaultValue("desc") SortDirection sortDirection,
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit) {
List<LineageEvent> events = Collections.emptyList();
switch (sortDirection) {
case DESC -> events =
openLineageDao.getAllLineageEventsDesc(before.get(), after.get(), limit);
case ASC -> events = openLineageDao.getAllLineageEventsAsc(before.get(), after.get(), limit);
}
return Response.ok(new Events(events)).build();
}

@Value
static class Events {
@NonNull
@JsonProperty("events")
List<LineageEvent> value;
}
}
12 changes: 12 additions & 0 deletions api/src/main/java/marquez/api/models/SortDirection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package marquez.api.models;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
public enum SortDirection {
DESC("desc"),
ASC("asc");

@Getter public final String value;
}
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ private Columns() {}
public static final String RUN_UUID = "run_uuid";
public static final String STATE = "state";

/* LINEAGE EVENT ROW COLUMNS */
public static final String EVENT = "event";

public static UUID uuidOrNull(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
return null;
Expand Down
23 changes: 22 additions & 1 deletion api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -86,7 +87,27 @@ void createLineageEvent(
String producer);

@SqlQuery("SELECT event FROM lineage_events WHERE run_uuid = :runUuid")
List<LineageEvent> findOlEventsByRunUuid(UUID runUuid);
List<LineageEvent> findLineageEventsByRunUuid(UUID runUuid);

@SqlQuery(
"""
SELECT event
FROM lineage_events le
WHERE (le.event_time < :before
AND le.event_time >= :after)
ORDER BY le.event_time DESC
LIMIT :limit""")
List<LineageEvent> getAllLineageEventsDesc(ZonedDateTime before, ZonedDateTime after, int limit);

@SqlQuery(
"""
SELECT event
FROM lineage_events le
WHERE (le.event_time < :before
AND le.event_time >= :after)
ORDER BY le.event_time ASC
LIMIT :limit""")
List<LineageEvent> getAllLineageEventsAsc(ZonedDateTime before, ZonedDateTime after, int limit);

default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) {
UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE INDEX CONCURRENTLY lineage_events_event_time
on lineage_events(event_time DESC);
Loading