Skip to content

Commit

Permalink
Willy code review changes
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Sep 12, 2022
1 parent d4a204d commit d0bf768
Show file tree
Hide file tree
Showing 24 changed files with 197 additions and 247 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.25.0...HEAD)
* Added new API for retrieving OpenLineage events [`#2070`](https://github.com/MarquezProject/marquez/pull/2070) [@mobuchowski](https://github.com/mobuchowski)

## [0.25.0](https://github.com/MarquezProject/marquez/compare/0.24.0...0.25.0) - 2022-08-08

Expand Down
6 changes: 5 additions & 1 deletion api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.flywaydb.core.api.FlywayException;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.statement.SqlLogger;
import org.jdbi.v3.jackson2.Jackson2Config;
import org.jdbi.v3.jackson2.Jackson2Plugin;
import org.jdbi.v3.postgres.PostgresPlugin;
import org.jdbi.v3.sqlobject.SqlObjectPlugin;

Expand Down Expand Up @@ -154,12 +156,14 @@ private MarquezContext buildMarquezContext(
factory
.build(env, config.getDataSourceFactory(), source, DB_POSTGRES)
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin());
.installPlugin(new PostgresPlugin())
.installPlugin(new Jackson2Plugin());
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics());
if (isSentryEnabled(config)) {
sqlLogger = new TracingSQLLogger(sqlLogger);
}
jdbi.setSqlLogger(sqlLogger);
jdbi.getConfig(Jackson2Config.class).setMapper(Utils.getMapper());

final MarquezContext context =
MarquezContext.builder().jdbi(jdbi).tags(config.getTags()).build();
Expand Down
18 changes: 2 additions & 16 deletions api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import lombok.Getter;
import lombok.NonNull;
import marquez.api.DatasetResource;
import marquez.api.EventResource;
import marquez.api.JobResource;
import marquez.api.NamespaceResource;
import marquez.api.OpenLineageResource;
Expand All @@ -26,7 +25,6 @@
import marquez.db.DatasetDao;
import marquez.db.DatasetFieldDao;
import marquez.db.DatasetVersionDao;
import marquez.db.EventDao;
import marquez.db.JobContextDao;
import marquez.db.JobDao;
import marquez.db.JobVersionDao;
Expand All @@ -44,7 +42,6 @@
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
import marquez.service.EventService;
import marquez.service.JobService;
import marquez.service.LineageService;
import marquez.service.NamespaceService;
Expand Down Expand Up @@ -74,8 +71,6 @@ public final class MarquezContext {
@Getter private final OpenLineageDao openLineageDao;
@Getter private final LineageDao lineageDao;
@Getter private final SearchDao searchDao;
@Getter private final EventDao eventDao;

@Getter private final List<RunTransitionListener> runTransitionListeners;

@Getter private final NamespaceService namespaceService;
Expand All @@ -86,17 +81,13 @@ public final class MarquezContext {
@Getter private final RunService runService;
@Getter private final OpenLineageService openLineageService;
@Getter private final LineageService lineageService;
@Getter private final EventService eventService;

@Getter private final NamespaceResource namespaceResource;
@Getter private final SourceResource sourceResource;
@Getter private final DatasetResource datasetResource;
@Getter private final JobResource jobResource;
@Getter private final TagResource tagResource;
@Getter private final OpenLineageResource openLineageResource;
@Getter private final SearchResource searchResource;
@Getter private final EventResource eventResource;

@Getter private final ImmutableList<Object> resources;
@Getter private final JdbiExceptionExceptionMapper jdbiException;
@Getter private final GraphQLHttpServlet graphqlServlet;
Expand Down Expand Up @@ -125,7 +116,6 @@ private MarquezContext(
this.openLineageDao = jdbi.onDemand(OpenLineageDao.class);
this.lineageDao = jdbi.onDemand(LineageDao.class);
this.searchDao = jdbi.onDemand(SearchDao.class);
this.eventDao = jdbi.onDemand(EventDao.class);
this.runTransitionListeners = runTransitionListeners;

this.namespaceService = new NamespaceService(baseDao);
Expand All @@ -138,7 +128,6 @@ private MarquezContext(
this.tagService.init(tags);
this.openLineageService = new OpenLineageService(baseDao, runService);
this.lineageService = new LineageService(lineageDao, jobDao);
this.eventService = new EventService(baseDao);
this.jdbiException = new JdbiExceptionExceptionMapper();
final ServiceFactory serviceFactory =
ServiceFactory.builder()
Expand All @@ -152,16 +141,14 @@ private MarquezContext(
.lineageService(lineageService)
.datasetFieldService(new DatasetFieldService(baseDao))
.datasetVersionService(new DatasetVersionService(baseDao))
.eventService(eventService)
.build();
this.namespaceResource = new NamespaceResource(serviceFactory);
this.sourceResource = new SourceResource(serviceFactory);
this.datasetResource = new DatasetResource(serviceFactory);
this.jobResource = new JobResource(serviceFactory, jobVersionDao);
this.tagResource = new TagResource(serviceFactory);
this.openLineageResource = new OpenLineageResource(serviceFactory);
this.openLineageResource = new OpenLineageResource(serviceFactory, openLineageDao);
this.searchResource = new SearchResource(searchDao);
this.eventResource = new EventResource(serviceFactory);

this.resources =
ImmutableList.of(
Expand All @@ -172,8 +159,7 @@ private MarquezContext(
tagResource,
jdbiException,
openLineageResource,
searchResource,
eventResource);
searchResource);

final MarquezGraphqlServletBuilder servlet = new MarquezGraphqlServletBuilder();
this.graphqlServlet = servlet.getServlet(new GraphqlSchemaBuilder(jdbi));
Expand Down
3 changes: 0 additions & 3 deletions api/src/main/java/marquez/api/BaseResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
import marquez.service.EventService;
import marquez.service.JobService;
import marquez.service.LineageService;
import marquez.service.NamespaceService;
Expand All @@ -51,7 +50,6 @@ public class BaseResource {
protected DatasetVersionService datasetVersionService;
protected DatasetFieldService datasetFieldService;
protected LineageService lineageService;
protected EventService eventService;

public BaseResource(ServiceFactory serviceFactory) {
this.serviceFactory = serviceFactory;
Expand All @@ -65,7 +63,6 @@ public BaseResource(ServiceFactory serviceFactory) {
this.datasetVersionService = serviceFactory.getDatasetVersionService();
this.datasetFieldService = serviceFactory.getDatasetFieldService();
this.lineageService = serviceFactory.getLineageService();
this.eventService = serviceFactory.getEventService();
}

void throwIfNotExists(@NonNull NamespaceName namespaceName) {
Expand Down
62 changes: 0 additions & 62 deletions api/src/main/java/marquez/api/EventResource.java

This file was deleted.

59 changes: 54 additions & 5 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,45 @@
import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.dropwizard.jersey.jsr310.ZonedDateTimeParam;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.LineageEvent;
import marquez.service.models.NodeId;

@Slf4j
@Path("/api/v1/lineage")
@Path("/api/v1")
public class OpenLineageResource extends BaseResource {
private static final String DEFAULT_DEPTH = "20";

public OpenLineageResource(@NonNull final ServiceFactory serviceFactory) {
private final OpenLineageDao openLineageDao;

public OpenLineageResource(
@NonNull final ServiceFactory serviceFactory, @NonNull final OpenLineageDao openLineageDao) {
super(serviceFactory);
this.openLineageDao = openLineageDao;
}

@Timed
Expand All @@ -49,6 +59,7 @@ public OpenLineageResource(@NonNull final ServiceFactory serviceFactory) {
@POST
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/lineage")
public void create(
@Valid @NotNull LineageEvent event, @Suspended final AsyncResponse asyncResponse)
throws JsonProcessingException, SQLException {
Expand Down Expand Up @@ -80,10 +91,48 @@ private int determineStatusCode(Throwable e) {
@GET
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON)
@Path("/lineage")
public Response getLineage(
@QueryParam("nodeId") @NotNull NodeId nodeId,
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth)
throws ExecutionException, InterruptedException {
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
return Response.ok(lineageService.lineage(nodeId, depth)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Path("/events/lineage")
@Produces(APPLICATION_JSON)
public Response getLineageEvents(
@QueryParam("before") @DefaultValue("2030-01-01T00:00:00+00:00") ZonedDateTimeParam before,
@QueryParam("after") @DefaultValue("1970-01-01T00:00:00+00:00") ZonedDateTimeParam after,
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit) {
final List<LineageEvent> events =
openLineageDao.getAllLineageEvents(before.get(), after.get(), limit);
return Response.ok(new Events(events)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Path("/namespace/{namespace}/events/lineage")
@Produces(APPLICATION_JSON)
public Response getLineageEventsByNamespace(
@PathParam("namespace") String namespace,
@QueryParam("before") @DefaultValue("2030-01-01T00:00:00+00:00") ZonedDateTimeParam before,
@QueryParam("after") @DefaultValue("1970-01-01T00:00:00+00:00") ZonedDateTimeParam after,
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit) {
final List<LineageEvent> event =
openLineageDao.getLineageEventsByNamespace(namespace, before.get(), after.get(), limit);
return Response.ok(new Events(event)).build();
}

@Value
static class Events {
@NonNull
@JsonProperty("events")
List<LineageEvent> value;
}
}
3 changes: 0 additions & 3 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,4 @@ public interface BaseDao extends SqlObject {

@CreateSqlObject
OpenLineageDao createOpenLineageDao();

@CreateSqlObject
EventDao createEventDao();
}
37 changes: 0 additions & 37 deletions api/src/main/java/marquez/db/EventDao.java

This file was deleted.

Loading

0 comments on commit d0bf768

Please sign in to comment.