Skip to content

Commit

Permalink
add raw event API
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Aug 12, 2022
1 parent 7c6ddd8 commit cf37442
Show file tree
Hide file tree
Showing 19 changed files with 450 additions and 1 deletion.
1 change: 1 addition & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies {
implementation "io.prometheus:simpleclient_hotspot:${prometheusVersion}"
implementation "io.prometheus:simpleclient_servlet:${prometheusVersion}"
implementation "org.jdbi:jdbi3-core:${jdbi3Version}"
implementation "org.jdbi:jdbi3-jackson2:${jdbi3Version}"
implementation "org.jdbi:jdbi3-postgres:${jdbi3Version}"
implementation "org.jdbi:jdbi3-sqlobject:${jdbi3Version}"
implementation 'com.google.guava:guava:31.1-jre'
Expand Down
13 changes: 12 additions & 1 deletion api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import lombok.Getter;
import lombok.NonNull;
import marquez.api.DatasetResource;
import marquez.api.EventResource;
import marquez.api.JobResource;
import marquez.api.NamespaceResource;
import marquez.api.OpenLineageResource;
Expand All @@ -25,6 +26,7 @@
import marquez.db.DatasetDao;
import marquez.db.DatasetFieldDao;
import marquez.db.DatasetVersionDao;
import marquez.db.EventDao;
import marquez.db.JobContextDao;
import marquez.db.JobDao;
import marquez.db.JobVersionDao;
Expand All @@ -42,6 +44,7 @@
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
import marquez.service.EventService;
import marquez.service.JobService;
import marquez.service.LineageService;
import marquez.service.NamespaceService;
Expand Down Expand Up @@ -71,6 +74,7 @@ public final class MarquezContext {
@Getter private final OpenLineageDao openLineageDao;
@Getter private final LineageDao lineageDao;
@Getter private final SearchDao searchDao;
@Getter private final EventDao eventDao;

@Getter private final List<RunTransitionListener> runTransitionListeners;

Expand All @@ -82,6 +86,7 @@ public final class MarquezContext {
@Getter private final RunService runService;
@Getter private final OpenLineageService openLineageService;
@Getter private final LineageService lineageService;
@Getter private final EventService eventService;

@Getter private final NamespaceResource namespaceResource;
@Getter private final SourceResource sourceResource;
Expand All @@ -90,6 +95,7 @@ public final class MarquezContext {
@Getter private final TagResource tagResource;
@Getter private final OpenLineageResource openLineageResource;
@Getter private final SearchResource searchResource;
@Getter private final EventResource eventResource;

@Getter private final ImmutableList<Object> resources;
@Getter private final JdbiExceptionExceptionMapper jdbiException;
Expand Down Expand Up @@ -119,6 +125,7 @@ private MarquezContext(
this.openLineageDao = jdbi.onDemand(OpenLineageDao.class);
this.lineageDao = jdbi.onDemand(LineageDao.class);
this.searchDao = jdbi.onDemand(SearchDao.class);
this.eventDao = jdbi.onDemand(EventDao.class);
this.runTransitionListeners = runTransitionListeners;

this.namespaceService = new NamespaceService(baseDao);
Expand All @@ -131,6 +138,7 @@ private MarquezContext(
this.tagService.init(tags);
this.openLineageService = new OpenLineageService(baseDao, runService);
this.lineageService = new LineageService(lineageDao, jobDao);
this.eventService = new EventService(baseDao);
this.jdbiException = new JdbiExceptionExceptionMapper();
final ServiceFactory serviceFactory =
ServiceFactory.builder()
Expand All @@ -144,6 +152,7 @@ private MarquezContext(
.lineageService(lineageService)
.datasetFieldService(new DatasetFieldService(baseDao))
.datasetVersionService(new DatasetVersionService(baseDao))
.eventService(eventService)
.build();
this.namespaceResource = new NamespaceResource(serviceFactory);
this.sourceResource = new SourceResource(serviceFactory);
Expand All @@ -152,6 +161,7 @@ private MarquezContext(
this.tagResource = new TagResource(serviceFactory);
this.openLineageResource = new OpenLineageResource(serviceFactory);
this.searchResource = new SearchResource(searchDao);
this.eventResource = new EventResource(serviceFactory);

this.resources =
ImmutableList.of(
Expand All @@ -162,7 +172,8 @@ private MarquezContext(
tagResource,
jdbiException,
openLineageResource,
searchResource);
searchResource,
eventResource);

final MarquezGraphqlServletBuilder servlet = new MarquezGraphqlServletBuilder();
this.graphqlServlet = servlet.getServlet(new GraphqlSchemaBuilder(jdbi));
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/api/BaseResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
import marquez.service.EventService;
import marquez.service.JobService;
import marquez.service.LineageService;
import marquez.service.NamespaceService;
Expand All @@ -50,6 +51,7 @@ public class BaseResource {
protected DatasetVersionService datasetVersionService;
protected DatasetFieldService datasetFieldService;
protected LineageService lineageService;
protected EventService eventService;

public BaseResource(ServiceFactory serviceFactory) {
this.serviceFactory = serviceFactory;
Expand All @@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) {
this.datasetVersionService = serviceFactory.getDatasetVersionService();
this.datasetFieldService = serviceFactory.getDatasetFieldService();
this.lineageService = serviceFactory.getLineageService();
this.eventService = serviceFactory.getEventService();
}

void throwIfNotExists(@NonNull NamespaceName namespaceName) {
Expand Down
75 changes: 75 additions & 0 deletions api/src/main/java/marquez/api/EventResource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package marquez.api;

import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.NonNull;
import lombok.Value;
import marquez.api.exceptions.NamespaceNotFoundException;
import marquez.common.models.NamespaceName;
import marquez.service.ServiceFactory;
import marquez.service.models.LineageEvent;
import marquez.service.models.Namespace;
import marquez.service.models.NamespaceMeta;
import marquez.service.models.RawLineageEvent;

import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;

import java.util.List;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

@Path("/api/v1")
public class EventResource extends BaseResource {
public EventResource(@NonNull final ServiceFactory serviceFactory) {
super(serviceFactory);
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Path("/events")
@Produces(APPLICATION_JSON)
public Response get(
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit,
@QueryParam("offset") @DefaultValue("0") @Min(value = 0) int offset) {
final List<RawLineageEvent> events =
eventService.getAll(limit, offset);
return Response.ok(new Events(events)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Path("/events/{namespace}")
@Produces(APPLICATION_JSON)
public Response getByNamespace(
@PathParam("namespace") String namespace,
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit,
@QueryParam("offset") @DefaultValue("0") @Min(value = 0) int offset) {
final List<RawLineageEvent> event =
eventService.getByNamespace(namespace, limit, offset);
return Response.ok(new Events(event)).build();
}

@Value
static class Events {
@NonNull
@JsonProperty("events")
List<RawLineageEvent> value;
}

}
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ public interface BaseDao extends SqlObject {

@CreateSqlObject
OpenLineageDao createOpenLineageDao();

@CreateSqlObject
EventDao createEventDao();
}
18 changes: 18 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -126,6 +128,13 @@ private Columns() {}
public static final String RUN_UUID = "run_uuid";
public static final String STATE = "state";

/* LINEAGE EVENT ROW COLUMNS */

public static final String EVENT_TIME = "event_time";
public static final String EVENT = "event";
public static final String EVENT_TYPE = "event_type";
public static final String PRODUCER = "producer";

public static UUID uuidOrNull(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
return null;
Expand Down Expand Up @@ -156,6 +165,15 @@ public static Instant timestampOrThrow(final ResultSet results, final String col
return results.getTimestamp(column).toInstant();
}

public static ZonedDateTime zonedDateTimeOrThrow(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
throw new IllegalArgumentException();
}
return results.getTimestamp(column).toInstant().atZone(ZoneId.of("UTC"));
}


public static String stringOrNull(final ResultSet results, final String column)
throws SQLException {
if (results.getObject(column) == null) {
Expand Down
35 changes: 35 additions & 0 deletions api/src/main/java/marquez/db/EventDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package marquez.db;


import marquez.db.mappers.RawLineageEventMapper;
import marquez.service.models.RawLineageEvent;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import java.util.List;

@RegisterRowMapper(RawLineageEventMapper.class)
public interface EventDao extends BaseDao {

@SqlQuery("""
SELECT *
FROM lineage_events
ORDER BY event_time DESC
LIMIT :limit
OFFSET :offset""")
List<RawLineageEvent> getAll(int limit, int offset);


/**
* This is a "hack" to get inputs/outputs namespace from jsonb column:
* <a href="https://github.com/jdbi/jdbi/issues/1510#issuecomment-485423083">explanation</a>
*/
@SqlQuery("""
SELECT le.*
FROM lineage_events le, jsonb_array_elements(coalesce(le.event -> 'inputs', '[]'::jsonb) || coalesce(le.event -> 'outputs', '[]'::jsonb)) AS ds
WHERE le.job_namespace = :namespace
OR ds ->> 'namespace' = :namespace
LIMIT :limit
OFFSET :offset""")
List<RawLineageEvent> getByNamespace(@Bind("namespace") String namespace, @Bind("limit") int limit, @Bind("offset") int offset);
}
53 changes: 53 additions & 0 deletions api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package marquez.db.mappers;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import marquez.common.Utils;
import marquez.db.Columns;
import marquez.service.models.LineageEvent;
import marquez.service.models.RawLineageEvent;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.zonedDateTimeOrThrow;

@Slf4j
public class RawLineageEventMapper implements RowMapper<RawLineageEvent> {
@Override
public RawLineageEvent map(ResultSet rs, StatementContext ctx) throws SQLException {
String rawEvent = stringOrThrow(rs, Columns.EVENT);
Map<String, Object> run = Collections.emptyMap();
Map<String, Object> job = Collections.emptyMap();

List<Object> inputs = Collections.emptyList();
List<Object> outputs = Collections.emptyList();
try {
ObjectMapper mapper = Utils.getMapper();
Map<String, Object> event = mapper.readValue(rawEvent, Map.class);
run = (Map<String, Object>) event.getOrDefault("run", Collections.emptyMap());
job = (Map<String, Object>) event.getOrDefault("job", Collections.emptyMap());
inputs = (List<Object>) event.getOrDefault("inputs", Collections.emptyList());
outputs = (List<Object>) event.getOrDefault("outputs", Collections.emptyList());
} catch (JsonProcessingException e) {
log.error("Failed to process json", e);
}

return RawLineageEvent.builder()
.eventTime(zonedDateTimeOrThrow(rs, Columns.EVENT_TIME))
.eventType(stringOrThrow(rs, Columns.EVENT_TYPE))
.run(run)
.job(job)
.inputs(inputs)
.outputs(outputs)
.producer(stringOrThrow(rs, Columns.PRODUCER))
.build();
}
}
7 changes: 7 additions & 0 deletions api/src/main/java/marquez/service/DelegatingDaos.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import marquez.db.DatasetDao;
import marquez.db.DatasetFieldDao;
import marquez.db.DatasetVersionDao;
import marquez.db.EventDao;
import marquez.db.JobContextDao;
import marquez.db.JobDao;
import marquez.db.JobVersionDao;
Expand Down Expand Up @@ -98,4 +99,10 @@ public static class DelegatingTagDao implements TagDao {
public static class DelegatingLineageDao implements LineageDao {
@Delegate private final LineageDao delegate;
}

@RequiredArgsConstructor
public static class DelegatingEventDao implements EventDao {
@Delegate private final EventDao delegate;
}

}
10 changes: 10 additions & 0 deletions api/src/main/java/marquez/service/EventService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package marquez.service;

import lombok.NonNull;
import marquez.db.BaseDao;

public class EventService extends DelegatingDaos.DelegatingEventDao {
public EventService(@NonNull BaseDao baseDao) {
super(baseDao.createEventDao());
}
}
1 change: 1 addition & 0 deletions api/src/main/java/marquez/service/ServiceFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public class ServiceFactory {
@NonNull DatasetVersionService datasetVersionService;
@NonNull DatasetFieldService datasetFieldService;
@NonNull LineageService lineageService;
@NonNull EventService eventService;
}
Loading

0 comments on commit cf37442

Please sign in to comment.