From cf37442e70991e542c137525bf44b7b28e4d1db8 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Fri, 12 Aug 2022 14:51:24 +0200 Subject: [PATCH] add raw event API Signed-off-by: Maciej Obuchowski --- api/build.gradle | 1 + api/src/main/java/marquez/MarquezContext.java | 13 ++- .../main/java/marquez/api/BaseResource.java | 3 + .../main/java/marquez/api/EventResource.java | 75 ++++++++++++ api/src/main/java/marquez/db/BaseDao.java | 3 + api/src/main/java/marquez/db/Columns.java | 18 +++ api/src/main/java/marquez/db/EventDao.java | 35 ++++++ .../db/mappers/RawLineageEventMapper.java | 53 +++++++++ .../java/marquez/service/DelegatingDaos.java | 7 ++ .../java/marquez/service/EventService.java | 10 ++ .../java/marquez/service/ServiceFactory.java | 1 + .../service/models/RawLineageEvent.java | 35 ++++++ .../V47__add_lineage_event_indexes.sql | 5 + .../marquez/OpenLineageIntegrationTest.java | 109 ++++++++++++++++++ .../MarquezJdbiExternalPostgresExtension.java | 2 + .../java/marquez/client/MarquezClient.java | 26 +++++ .../java/marquez/client/MarquezPathV1.java | 6 + .../main/java/marquez/client/MarquezUrl.java | 8 ++ .../client/models/RawLineageEvent.java | 41 +++++++ 19 files changed, 450 insertions(+), 1 deletion(-) create mode 100644 api/src/main/java/marquez/api/EventResource.java create mode 100644 api/src/main/java/marquez/db/EventDao.java create mode 100644 api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java create mode 100644 api/src/main/java/marquez/service/EventService.java create mode 100644 api/src/main/java/marquez/service/models/RawLineageEvent.java create mode 100644 api/src/main/resources/marquez/db/migration/V47__add_lineage_event_indexes.sql create mode 100644 clients/java/src/main/java/marquez/client/models/RawLineageEvent.java diff --git a/api/build.gradle b/api/build.gradle index 9db5e79e84..376fb60f8b 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -40,6 +40,7 @@ dependencies { implementation "io.prometheus:simpleclient_hotspot:${prometheusVersion}" implementation "io.prometheus:simpleclient_servlet:${prometheusVersion}" implementation "org.jdbi:jdbi3-core:${jdbi3Version}" + implementation "org.jdbi:jdbi3-jackson2:${jdbi3Version}" implementation "org.jdbi:jdbi3-postgres:${jdbi3Version}" implementation "org.jdbi:jdbi3-sqlobject:${jdbi3Version}" implementation 'com.google.guava:guava:31.1-jre' diff --git a/api/src/main/java/marquez/MarquezContext.java b/api/src/main/java/marquez/MarquezContext.java index 1459f3041e..771f58559c 100644 --- a/api/src/main/java/marquez/MarquezContext.java +++ b/api/src/main/java/marquez/MarquezContext.java @@ -14,6 +14,7 @@ import lombok.Getter; import lombok.NonNull; import marquez.api.DatasetResource; +import marquez.api.EventResource; import marquez.api.JobResource; import marquez.api.NamespaceResource; import marquez.api.OpenLineageResource; @@ -25,6 +26,7 @@ import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.DatasetVersionDao; +import marquez.db.EventDao; import marquez.db.JobContextDao; import marquez.db.JobDao; import marquez.db.JobVersionDao; @@ -42,6 +44,7 @@ import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; +import marquez.service.EventService; import marquez.service.JobService; import marquez.service.LineageService; import marquez.service.NamespaceService; @@ -71,6 +74,7 @@ public final class MarquezContext { @Getter private final OpenLineageDao openLineageDao; @Getter private final LineageDao lineageDao; @Getter private final SearchDao searchDao; + @Getter private final EventDao eventDao; @Getter private final List runTransitionListeners; @@ -82,6 +86,7 @@ public final class MarquezContext { @Getter private final RunService runService; @Getter private final OpenLineageService openLineageService; @Getter private final LineageService lineageService; + @Getter private final EventService eventService; @Getter private final NamespaceResource namespaceResource; @Getter private final SourceResource sourceResource; @@ -90,6 +95,7 @@ public final class MarquezContext { @Getter private final TagResource tagResource; @Getter private final OpenLineageResource openLineageResource; @Getter private final SearchResource searchResource; + @Getter private final EventResource eventResource; @Getter private final ImmutableList resources; @Getter private final JdbiExceptionExceptionMapper jdbiException; @@ -119,6 +125,7 @@ private MarquezContext( this.openLineageDao = jdbi.onDemand(OpenLineageDao.class); this.lineageDao = jdbi.onDemand(LineageDao.class); this.searchDao = jdbi.onDemand(SearchDao.class); + this.eventDao = jdbi.onDemand(EventDao.class); this.runTransitionListeners = runTransitionListeners; this.namespaceService = new NamespaceService(baseDao); @@ -131,6 +138,7 @@ private MarquezContext( this.tagService.init(tags); this.openLineageService = new OpenLineageService(baseDao, runService); this.lineageService = new LineageService(lineageDao, jobDao); + this.eventService = new EventService(baseDao); this.jdbiException = new JdbiExceptionExceptionMapper(); final ServiceFactory serviceFactory = ServiceFactory.builder() @@ -144,6 +152,7 @@ private MarquezContext( .lineageService(lineageService) .datasetFieldService(new DatasetFieldService(baseDao)) .datasetVersionService(new DatasetVersionService(baseDao)) + .eventService(eventService) .build(); this.namespaceResource = new NamespaceResource(serviceFactory); this.sourceResource = new SourceResource(serviceFactory); @@ -152,6 +161,7 @@ private MarquezContext( this.tagResource = new TagResource(serviceFactory); this.openLineageResource = new OpenLineageResource(serviceFactory); this.searchResource = new SearchResource(searchDao); + this.eventResource = new EventResource(serviceFactory); this.resources = ImmutableList.of( @@ -162,7 +172,8 @@ private MarquezContext( tagResource, jdbiException, openLineageResource, - searchResource); + searchResource, + eventResource); final MarquezGraphqlServletBuilder servlet = new MarquezGraphqlServletBuilder(); this.graphqlServlet = servlet.getServlet(new GraphqlSchemaBuilder(jdbi)); diff --git a/api/src/main/java/marquez/api/BaseResource.java b/api/src/main/java/marquez/api/BaseResource.java index ce15d31ab3..d499c481bf 100644 --- a/api/src/main/java/marquez/api/BaseResource.java +++ b/api/src/main/java/marquez/api/BaseResource.java @@ -28,6 +28,7 @@ import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; +import marquez.service.EventService; import marquez.service.JobService; import marquez.service.LineageService; import marquez.service.NamespaceService; @@ -50,6 +51,7 @@ public class BaseResource { protected DatasetVersionService datasetVersionService; protected DatasetFieldService datasetFieldService; protected LineageService lineageService; + protected EventService eventService; public BaseResource(ServiceFactory serviceFactory) { this.serviceFactory = serviceFactory; @@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) { this.datasetVersionService = serviceFactory.getDatasetVersionService(); this.datasetFieldService = serviceFactory.getDatasetFieldService(); this.lineageService = serviceFactory.getLineageService(); + this.eventService = serviceFactory.getEventService(); } void throwIfNotExists(@NonNull NamespaceName namespaceName) { diff --git a/api/src/main/java/marquez/api/EventResource.java b/api/src/main/java/marquez/api/EventResource.java new file mode 100644 index 0000000000..b20b4051e6 --- /dev/null +++ b/api/src/main/java/marquez/api/EventResource.java @@ -0,0 +1,75 @@ +package marquez.api; + +import com.codahale.metrics.annotation.ExceptionMetered; +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.NonNull; +import lombok.Value; +import marquez.api.exceptions.NamespaceNotFoundException; +import marquez.common.models.NamespaceName; +import marquez.service.ServiceFactory; +import marquez.service.models.LineageEvent; +import marquez.service.models.Namespace; +import marquez.service.models.NamespaceMeta; +import marquez.service.models.RawLineageEvent; + +import javax.validation.Valid; +import javax.validation.constraints.Min; +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; + +import java.util.List; + +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; + +@Path("/api/v1") +public class EventResource extends BaseResource { + public EventResource(@NonNull final ServiceFactory serviceFactory) { + super(serviceFactory); + } + + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Path("/events") + @Produces(APPLICATION_JSON) + public Response get( + @QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit, + @QueryParam("offset") @DefaultValue("0") @Min(value = 0) int offset) { + final List events = + eventService.getAll(limit, offset); + return Response.ok(new Events(events)).build(); + } + + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Path("/events/{namespace}") + @Produces(APPLICATION_JSON) + public Response getByNamespace( + @PathParam("namespace") String namespace, + @QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit, + @QueryParam("offset") @DefaultValue("0") @Min(value = 0) int offset) { + final List event = + eventService.getByNamespace(namespace, limit, offset); + return Response.ok(new Events(event)).build(); + } + + @Value + static class Events { + @NonNull + @JsonProperty("events") + List value; + } + +} \ No newline at end of file diff --git a/api/src/main/java/marquez/db/BaseDao.java b/api/src/main/java/marquez/db/BaseDao.java index 848372d863..9e8b3d37ac 100644 --- a/api/src/main/java/marquez/db/BaseDao.java +++ b/api/src/main/java/marquez/db/BaseDao.java @@ -50,4 +50,7 @@ public interface BaseDao extends SqlObject { @CreateSqlObject OpenLineageDao createOpenLineageDao(); + + @CreateSqlObject + EventDao createEventDao(); } diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index 5af8e60079..27bc2cd62b 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -17,6 +17,8 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -126,6 +128,13 @@ private Columns() {} public static final String RUN_UUID = "run_uuid"; public static final String STATE = "state"; + /* LINEAGE EVENT ROW COLUMNS */ + + public static final String EVENT_TIME = "event_time"; + public static final String EVENT = "event"; + public static final String EVENT_TYPE = "event_type"; + public static final String PRODUCER = "producer"; + public static UUID uuidOrNull(final ResultSet results, final String column) throws SQLException { if (results.getObject(column) == null) { return null; @@ -156,6 +165,15 @@ public static Instant timestampOrThrow(final ResultSet results, final String col return results.getTimestamp(column).toInstant(); } + public static ZonedDateTime zonedDateTimeOrThrow(final ResultSet results, final String column) + throws SQLException { + if (results.getObject(column) == null) { + throw new IllegalArgumentException(); + } + return results.getTimestamp(column).toInstant().atZone(ZoneId.of("UTC")); + } + + public static String stringOrNull(final ResultSet results, final String column) throws SQLException { if (results.getObject(column) == null) { diff --git a/api/src/main/java/marquez/db/EventDao.java b/api/src/main/java/marquez/db/EventDao.java new file mode 100644 index 0000000000..054fe1d923 --- /dev/null +++ b/api/src/main/java/marquez/db/EventDao.java @@ -0,0 +1,35 @@ +package marquez.db; + + +import marquez.db.mappers.RawLineageEventMapper; +import marquez.service.models.RawLineageEvent; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; +import java.util.List; + +@RegisterRowMapper(RawLineageEventMapper.class) +public interface EventDao extends BaseDao { + + @SqlQuery(""" + SELECT * + FROM lineage_events + ORDER BY event_time DESC + LIMIT :limit + OFFSET :offset""") + List getAll(int limit, int offset); + + + /** + * This is a "hack" to get inputs/outputs namespace from jsonb column: + * explanation + */ + @SqlQuery(""" + SELECT le.* + FROM lineage_events le, jsonb_array_elements(coalesce(le.event -> 'inputs', '[]'::jsonb) || coalesce(le.event -> 'outputs', '[]'::jsonb)) AS ds + WHERE le.job_namespace = :namespace + OR ds ->> 'namespace' = :namespace + LIMIT :limit + OFFSET :offset""") + List getByNamespace(@Bind("namespace") String namespace, @Bind("limit") int limit, @Bind("offset") int offset); +} \ No newline at end of file diff --git a/api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java b/api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java new file mode 100644 index 0000000000..4377c7022c --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java @@ -0,0 +1,53 @@ +package marquez.db.mappers; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import marquez.common.Utils; +import marquez.db.Columns; +import marquez.service.models.LineageEvent; +import marquez.service.models.RawLineageEvent; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static marquez.db.Columns.stringOrThrow; +import static marquez.db.Columns.zonedDateTimeOrThrow; + +@Slf4j +public class RawLineageEventMapper implements RowMapper { + @Override + public RawLineageEvent map(ResultSet rs, StatementContext ctx) throws SQLException { + String rawEvent = stringOrThrow(rs, Columns.EVENT); + Map run = Collections.emptyMap(); + Map job = Collections.emptyMap(); + + List inputs = Collections.emptyList(); + List outputs = Collections.emptyList(); + try { + ObjectMapper mapper = Utils.getMapper(); + Map event = mapper.readValue(rawEvent, Map.class); + run = (Map) event.getOrDefault("run", Collections.emptyMap()); + job = (Map) event.getOrDefault("job", Collections.emptyMap()); + inputs = (List) event.getOrDefault("inputs", Collections.emptyList()); + outputs = (List) event.getOrDefault("outputs", Collections.emptyList()); + } catch (JsonProcessingException e) { + log.error("Failed to process json", e); + } + + return RawLineageEvent.builder() + .eventTime(zonedDateTimeOrThrow(rs, Columns.EVENT_TIME)) + .eventType(stringOrThrow(rs, Columns.EVENT_TYPE)) + .run(run) + .job(job) + .inputs(inputs) + .outputs(outputs) + .producer(stringOrThrow(rs, Columns.PRODUCER)) + .build(); + } +} diff --git a/api/src/main/java/marquez/service/DelegatingDaos.java b/api/src/main/java/marquez/service/DelegatingDaos.java index 8184200ab4..0bea6dc184 100644 --- a/api/src/main/java/marquez/service/DelegatingDaos.java +++ b/api/src/main/java/marquez/service/DelegatingDaos.java @@ -10,6 +10,7 @@ import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.DatasetVersionDao; +import marquez.db.EventDao; import marquez.db.JobContextDao; import marquez.db.JobDao; import marquez.db.JobVersionDao; @@ -98,4 +99,10 @@ public static class DelegatingTagDao implements TagDao { public static class DelegatingLineageDao implements LineageDao { @Delegate private final LineageDao delegate; } + + @RequiredArgsConstructor + public static class DelegatingEventDao implements EventDao { + @Delegate private final EventDao delegate; + } + } diff --git a/api/src/main/java/marquez/service/EventService.java b/api/src/main/java/marquez/service/EventService.java new file mode 100644 index 0000000000..9da1afe672 --- /dev/null +++ b/api/src/main/java/marquez/service/EventService.java @@ -0,0 +1,10 @@ +package marquez.service; + +import lombok.NonNull; +import marquez.db.BaseDao; + +public class EventService extends DelegatingDaos.DelegatingEventDao { + public EventService(@NonNull BaseDao baseDao) { + super(baseDao.createEventDao()); + } +} diff --git a/api/src/main/java/marquez/service/ServiceFactory.java b/api/src/main/java/marquez/service/ServiceFactory.java index 5a4b51465b..4a8ded0a0a 100644 --- a/api/src/main/java/marquez/service/ServiceFactory.java +++ b/api/src/main/java/marquez/service/ServiceFactory.java @@ -22,4 +22,5 @@ public class ServiceFactory { @NonNull DatasetVersionService datasetVersionService; @NonNull DatasetFieldService datasetFieldService; @NonNull LineageService lineageService; + @NonNull EventService eventService; } diff --git a/api/src/main/java/marquez/service/models/RawLineageEvent.java b/api/src/main/java/marquez/service/models/RawLineageEvent.java new file mode 100644 index 0000000000..4090d2806f --- /dev/null +++ b/api/src/main/java/marquez/service/models/RawLineageEvent.java @@ -0,0 +1,35 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service.models; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Map; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +@Builder +@AllArgsConstructor +@NoArgsConstructor +@Setter +@Getter +@Valid +@ToString +public class RawLineageEvent extends BaseJsonModel { + @NotNull private String eventType; + @NotNull private ZonedDateTime eventTime; + @NotNull private Map run; + @NotNull private Map job; + private List inputs; + private List outputs; + @NotNull private String producer; +} diff --git a/api/src/main/resources/marquez/db/migration/V47__add_lineage_event_indexes.sql b/api/src/main/resources/marquez/db/migration/V47__add_lineage_event_indexes.sql new file mode 100644 index 0000000000..32249c6a84 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V47__add_lineage_event_indexes.sql @@ -0,0 +1,5 @@ +CREATE INDEX lineage_events_event_time + on lineage_events(event_time DESC); + +CREATE INDEX lineage_events_namespace_event_time + on lineage_events(job_namespace, event_time DESC); diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index b3783e39ec..032a5ff5e1 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -6,11 +6,13 @@ package marquez; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.map; import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import io.dropwizard.util.Resources; import io.openlineage.client.OpenLineage; @@ -26,6 +28,7 @@ import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.temporal.ChronoField; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -39,6 +42,7 @@ import marquez.client.models.DatasetVersion; import marquez.client.models.Job; import marquez.client.models.JobId; +import marquez.client.models.RawLineageEvent; import marquez.client.models.Run; import marquez.common.Utils; import marquez.db.LineageTestUtils; @@ -320,6 +324,111 @@ public void testOpenLineageJobHierarchySparkAndAirflow() assertThat(runsList).isNotEmpty().hasSize(1); } + @Test + public void testSendEventAndGetItBack() { + LineageEvent.Run run = new LineageEvent.Run( + UUID.randomUUID().toString(), LineageEvent.RunFacet.builder().build()); + LineageEvent.Job job = LineageEvent.Job.builder().namespace(NAMESPACE_NAME).name(JOB_NAME).build(); + LineageEvent.Dataset dataset = LineageEvent.Dataset.builder() + .namespace(NAMESPACE_NAME) + .name(DB_TABLE_NAME) + .build(); + + // We're losing zone info on write, so I have to UTC it here to compare later + ZonedDateTime time = ZonedDateTime.now().withZoneSameInstant(ZoneId.of("UTC")); + + final LineageEvent lineageEvent = + LineageEvent.builder() + .producer("testSendEventAndGetItBack") + .eventType("COMPLETE") + .run(run) + .job(job) + .eventTime(time) + .inputs(Collections.emptyList()) + .outputs(Collections.singletonList(dataset)) + .build(); + + final CompletableFuture resp = + this.sendLineage(Utils.toJson(lineageEvent)) + .thenApply(HttpResponse::statusCode) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + assertThat(resp.join()).isEqualTo(201); + + List events = client.listEvents(); + + assertThat(events.size()).isEqualTo(1); + + ObjectMapper mapper = Utils.getMapper(); + JsonNode prev = mapper.valueToTree(events.get(0)); + assertThat(prev).isEqualTo(mapper.valueToTree(lineageEvent)); +// .hasFieldOrPropertyWithValue("producer", "testSendEventAndGetItBack") +// .hasFieldOrPropertyWithValue("eventType", "COMPLETE") +// .hasFieldOrPropertyWithValue("eventTime", time) +// .hasFieldOrPropertyWithValue("run", run) +// .hasFieldOrPropertyWithValue("job", job) +// .hasFieldOrPropertyWithValue("inputs", Collections.emptyList()) +// .hasFieldOrPropertyWithValue("outputs", Collections.singletonList(dataset)); + + } + + @Test + public void testFindEventByDatasetNamespace() { + LineageEvent.Run run = new LineageEvent.Run( + UUID.randomUUID().toString(), LineageEvent.RunFacet.builder().build()); + LineageEvent.Job job = LineageEvent.Job.builder().namespace(NAMESPACE_NAME).name(JOB_NAME).build(); + + ZonedDateTime time = ZonedDateTime.now(); + + for (int i = 0; i < 10; i++) { + LineageEvent.Dataset dataset = LineageEvent.Dataset.builder() + .namespace(String.format("namespace%d", i)) + .name(DB_TABLE_NAME) + .build(); + + LineageEvent event = LineageEvent.builder() + .producer("testSendEventAndGetItBack") + .eventType("COMPLETE") + .run(run) + .job(job) + .eventTime(time) + .inputs(Collections.emptyList()) + .outputs(Collections.singletonList(dataset)) + .build(); + + final CompletableFuture resp = + this.sendLineage(Utils.toJson(event)) + .thenApply(HttpResponse::statusCode) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + assertThat(resp.join()).isEqualTo(201); + } + + List rawEvents = client.listEvents("namespace3"); + + assertThat(rawEvents.size()).isEqualTo(1); + ObjectMapper mapper = Utils.getMapper(); + JsonNode prev = mapper.valueToTree(rawEvents.get(0)); + +// assertThat(events.get(0)).isNotNull() +// .hasFieldOrPropertyWithValue("producer", "testSendEventAndGetItBack") +// .hasFieldOrPropertyWithValue("eventType", "COMPLETE") +// .hasFieldOrPropertyWithValue("eventTime", time) +// .hasFieldOrPropertyWithValue("run", run) +// .hasFieldOrPropertyWithValue("job", job) +// .hasFieldOrPropertyWithValue("inputs", Collections.emptyList()) +// .hasFieldOrPropertyWithValue("outputs", Collections.singletonList(dataset)); + + } + private CompletableFuture sendAllEvents(RunEvent... events) { return Arrays.stream(events) .reduce( diff --git a/api/src/test/java/marquez/jdbi/MarquezJdbiExternalPostgresExtension.java b/api/src/test/java/marquez/jdbi/MarquezJdbiExternalPostgresExtension.java index 3673313444..0a1d4acf0d 100644 --- a/api/src/test/java/marquez/jdbi/MarquezJdbiExternalPostgresExtension.java +++ b/api/src/test/java/marquez/jdbi/MarquezJdbiExternalPostgresExtension.java @@ -7,6 +7,7 @@ import javax.sql.DataSource; import marquez.PostgresContainer; +import org.jdbi.v3.jackson2.Jackson2Plugin; import org.jdbi.v3.postgres.PostgresPlugin; import org.jdbi.v3.sqlobject.SqlObjectPlugin; import org.postgresql.ds.PGSimpleDataSource; @@ -34,6 +35,7 @@ public class MarquezJdbiExternalPostgresExtension extends JdbiExternalPostgresEx database = POSTGRES.getDatabaseName(); plugins.add(new SqlObjectPlugin()); plugins.add(new PostgresPlugin()); + plugins.add(new Jackson2Plugin()); migration = Migration.before().withPaths("marquez/db/migration", "classpath:marquez/db/migrations"); } diff --git a/clients/java/src/main/java/marquez/client/MarquezClient.java b/clients/java/src/main/java/marquez/client/MarquezClient.java index bc617a4f2a..6376ef3f46 100644 --- a/clients/java/src/main/java/marquez/client/MarquezClient.java +++ b/clients/java/src/main/java/marquez/client/MarquezClient.java @@ -40,6 +40,7 @@ import marquez.client.models.JobVersion; import marquez.client.models.Namespace; import marquez.client.models.NamespaceMeta; +import marquez.client.models.RawLineageEvent; import marquez.client.models.Run; import marquez.client.models.RunMeta; import marquez.client.models.RunState; @@ -87,6 +88,16 @@ public MarquezClient(final URL baseUrl, @Nullable final String apiKey) { this.http = http; } + public List listEvents() { + final String bodyAsJson = http.get(url.toEventUrl()); + return Events.fromJson(bodyAsJson).getValue(); + } + + public List listEvents(String namespaceName) { + final String bodyAsJson = http.get(url.toEventUrl(namespaceName)); + return Events.fromJson(bodyAsJson).getValue(); + } + public Namespace createNamespace( @NonNull String namespaceName, @NonNull NamespaceMeta namespaceMeta) { final String bodyAsJson = http.put(url.toNamespaceUrl(namespaceName), namespaceMeta.toJson()); @@ -528,6 +539,21 @@ static Datasets fromJson(final String json) { } } + @Value + @EqualsAndHashCode(callSuper = false) + static class Events extends ResultsPage { + @Getter List value; + + @JsonCreator + Events(@JsonProperty("events") final List value) { + this.value = ImmutableList.copyOf(value); + } + + static Events fromJson(final String json) { + return Utils.fromJson(json, new TypeReference() {}); + } + } + @Value static class DatasetVersions { @Getter List value; diff --git a/clients/java/src/main/java/marquez/client/MarquezPathV1.java b/clients/java/src/main/java/marquez/client/MarquezPathV1.java index a029e9ac4a..96f53d1418 100644 --- a/clients/java/src/main/java/marquez/client/MarquezPathV1.java +++ b/clients/java/src/main/java/marquez/client/MarquezPathV1.java @@ -72,6 +72,12 @@ static String namespacePath(String namespaceName) { return path("/namespaces/%s", namespaceName); } + static String eventPath() { + return path("/events"); + } + static String eventPath(String namespaceName) { + return path("/events/%s", namespaceName); + } static String sourcePath(String sourceName) { return path("/sources/%s", sourceName); } diff --git a/clients/java/src/main/java/marquez/client/MarquezUrl.java b/clients/java/src/main/java/marquez/client/MarquezUrl.java index 48197c5485..e65de4441b 100644 --- a/clients/java/src/main/java/marquez/client/MarquezUrl.java +++ b/clients/java/src/main/java/marquez/client/MarquezUrl.java @@ -12,6 +12,7 @@ import static marquez.client.MarquezPathV1.datasetPath; import static marquez.client.MarquezPathV1.datasetTagPath; import static marquez.client.MarquezPathV1.datasetVersionPath; +import static marquez.client.MarquezPathV1.eventPath; import static marquez.client.MarquezPathV1.fieldTagPath; import static marquez.client.MarquezPathV1.jobPath; import static marquez.client.MarquezPathV1.jobVersionPath; @@ -89,6 +90,13 @@ URL toNamespaceUrl(String namespaceName) { return from(namespacePath(namespaceName)); } + URL toEventUrl() { + return from(eventPath()); + } + + URL toEventUrl(String namespaceName) { + return from(eventPath(namespaceName)); + } URL toSourceUrl(String sourceName) { return from(sourcePath(sourceName)); } diff --git a/clients/java/src/main/java/marquez/client/models/RawLineageEvent.java b/clients/java/src/main/java/marquez/client/models/RawLineageEvent.java new file mode 100644 index 0000000000..eb46b0fba4 --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/RawLineageEvent.java @@ -0,0 +1,41 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.core.type.TypeReference; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.Setter; +import lombok.ToString; +import marquez.client.Utils; + +@Builder +@AllArgsConstructor +@NoArgsConstructor +@Setter +@Getter +@ToString +public class RawLineageEvent{ + private String eventType; + private ZonedDateTime eventTime; + private Map run; + private Map job; + private List inputs; + private List outputs; + private String producer; + + public static RawLineageEvent fromJson(@NonNull final String json) { + return Utils.fromJson(json, new TypeReference() {}); + } + +}