diff --git a/api/build.gradle b/api/build.gradle index 9db5e79e84..376fb60f8b 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -40,6 +40,7 @@ dependencies { implementation "io.prometheus:simpleclient_hotspot:${prometheusVersion}" implementation "io.prometheus:simpleclient_servlet:${prometheusVersion}" implementation "org.jdbi:jdbi3-core:${jdbi3Version}" + implementation "org.jdbi:jdbi3-jackson2:${jdbi3Version}" implementation "org.jdbi:jdbi3-postgres:${jdbi3Version}" implementation "org.jdbi:jdbi3-sqlobject:${jdbi3Version}" implementation 'com.google.guava:guava:31.1-jre' diff --git a/api/src/main/java/marquez/MarquezContext.java b/api/src/main/java/marquez/MarquezContext.java index 1459f3041e..771f58559c 100644 --- a/api/src/main/java/marquez/MarquezContext.java +++ b/api/src/main/java/marquez/MarquezContext.java @@ -14,6 +14,7 @@ import lombok.Getter; import lombok.NonNull; import marquez.api.DatasetResource; +import marquez.api.EventResource; import marquez.api.JobResource; import marquez.api.NamespaceResource; import marquez.api.OpenLineageResource; @@ -25,6 +26,7 @@ import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.DatasetVersionDao; +import marquez.db.EventDao; import marquez.db.JobContextDao; import marquez.db.JobDao; import marquez.db.JobVersionDao; @@ -42,6 +44,7 @@ import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; +import marquez.service.EventService; import marquez.service.JobService; import marquez.service.LineageService; import marquez.service.NamespaceService; @@ -71,6 +74,7 @@ public final class MarquezContext { @Getter private final OpenLineageDao openLineageDao; @Getter private final LineageDao lineageDao; @Getter private final SearchDao searchDao; + @Getter private final EventDao eventDao; @Getter private final List runTransitionListeners; @@ -82,6 +86,7 @@ public final class MarquezContext { @Getter private final RunService runService; @Getter private final OpenLineageService openLineageService; @Getter private final LineageService lineageService; + @Getter private final EventService eventService; @Getter private final NamespaceResource namespaceResource; @Getter private final SourceResource sourceResource; @@ -90,6 +95,7 @@ public final class MarquezContext { @Getter private final TagResource tagResource; @Getter private final OpenLineageResource openLineageResource; @Getter private final SearchResource searchResource; + @Getter private final EventResource eventResource; @Getter private final ImmutableList resources; @Getter private final JdbiExceptionExceptionMapper jdbiException; @@ -119,6 +125,7 @@ private MarquezContext( this.openLineageDao = jdbi.onDemand(OpenLineageDao.class); this.lineageDao = jdbi.onDemand(LineageDao.class); this.searchDao = jdbi.onDemand(SearchDao.class); + this.eventDao = jdbi.onDemand(EventDao.class); this.runTransitionListeners = runTransitionListeners; this.namespaceService = new NamespaceService(baseDao); @@ -131,6 +138,7 @@ private MarquezContext( this.tagService.init(tags); this.openLineageService = new OpenLineageService(baseDao, runService); this.lineageService = new LineageService(lineageDao, jobDao); + this.eventService = new EventService(baseDao); this.jdbiException = new JdbiExceptionExceptionMapper(); final ServiceFactory serviceFactory = ServiceFactory.builder() @@ -144,6 +152,7 @@ private MarquezContext( .lineageService(lineageService) .datasetFieldService(new DatasetFieldService(baseDao)) .datasetVersionService(new DatasetVersionService(baseDao)) + .eventService(eventService) .build(); this.namespaceResource = new NamespaceResource(serviceFactory); this.sourceResource = new SourceResource(serviceFactory); @@ -152,6 +161,7 @@ private MarquezContext( this.tagResource = new TagResource(serviceFactory); this.openLineageResource = new OpenLineageResource(serviceFactory); this.searchResource = new SearchResource(searchDao); + this.eventResource = new EventResource(serviceFactory); this.resources = ImmutableList.of( @@ -162,7 +172,8 @@ private MarquezContext( tagResource, jdbiException, openLineageResource, - searchResource); + searchResource, + eventResource); final MarquezGraphqlServletBuilder servlet = new MarquezGraphqlServletBuilder(); this.graphqlServlet = servlet.getServlet(new GraphqlSchemaBuilder(jdbi)); diff --git a/api/src/main/java/marquez/api/BaseResource.java b/api/src/main/java/marquez/api/BaseResource.java index ce15d31ab3..d499c481bf 100644 --- a/api/src/main/java/marquez/api/BaseResource.java +++ b/api/src/main/java/marquez/api/BaseResource.java @@ -28,6 +28,7 @@ import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; +import marquez.service.EventService; import marquez.service.JobService; import marquez.service.LineageService; import marquez.service.NamespaceService; @@ -50,6 +51,7 @@ public class BaseResource { protected DatasetVersionService datasetVersionService; protected DatasetFieldService datasetFieldService; protected LineageService lineageService; + protected EventService eventService; public BaseResource(ServiceFactory serviceFactory) { this.serviceFactory = serviceFactory; @@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) { this.datasetVersionService = serviceFactory.getDatasetVersionService(); this.datasetFieldService = serviceFactory.getDatasetFieldService(); this.lineageService = serviceFactory.getLineageService(); + this.eventService = serviceFactory.getEventService(); } void throwIfNotExists(@NonNull NamespaceName namespaceName) { diff --git a/api/src/main/java/marquez/api/EventResource.java b/api/src/main/java/marquez/api/EventResource.java new file mode 100644 index 0000000000..b1273b4589 --- /dev/null +++ b/api/src/main/java/marquez/api/EventResource.java @@ -0,0 +1,62 @@ +package marquez.api; + +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; + +import com.codahale.metrics.annotation.ExceptionMetered; +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import javax.validation.constraints.Min; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; +import lombok.NonNull; +import lombok.Value; +import marquez.service.ServiceFactory; + +@Path("/api/v1") +public class EventResource extends BaseResource { + public EventResource(@NonNull final ServiceFactory serviceFactory) { + super(serviceFactory); + } + + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Path("/events") + @Produces(APPLICATION_JSON) + public Response get( + @QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit, + @QueryParam("offset") @DefaultValue("0") @Min(value = 0) int offset) { + final List events = eventService.getAll(limit, offset); + return Response.ok(new Events(events)).build(); + } + + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Path("/events/{namespace}") + @Produces(APPLICATION_JSON) + public Response getByNamespace( + @PathParam("namespace") String namespace, + @QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit, + @QueryParam("offset") @DefaultValue("0") @Min(value = 0) int offset) { + final List event = eventService.getByNamespace(namespace, limit, offset); + return Response.ok(new Events(event)).build(); + } + + @Value + static class Events { + @NonNull + @JsonProperty("events") + List value; + } +} diff --git a/api/src/main/java/marquez/db/BaseDao.java b/api/src/main/java/marquez/db/BaseDao.java index 848372d863..9e8b3d37ac 100644 --- a/api/src/main/java/marquez/db/BaseDao.java +++ b/api/src/main/java/marquez/db/BaseDao.java @@ -50,4 +50,7 @@ public interface BaseDao extends SqlObject { @CreateSqlObject OpenLineageDao createOpenLineageDao(); + + @CreateSqlObject + EventDao createEventDao(); } diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index 5af8e60079..0cfe4d1e3a 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -126,6 +126,9 @@ private Columns() {} public static final String RUN_UUID = "run_uuid"; public static final String STATE = "state"; + /* LINEAGE EVENT ROW COLUMNS */ + public static final String EVENT = "event"; + public static UUID uuidOrNull(final ResultSet results, final String column) throws SQLException { if (results.getObject(column) == null) { return null; diff --git a/api/src/main/java/marquez/db/EventDao.java b/api/src/main/java/marquez/db/EventDao.java new file mode 100644 index 0000000000..5ab67a5b98 --- /dev/null +++ b/api/src/main/java/marquez/db/EventDao.java @@ -0,0 +1,37 @@ +package marquez.db; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import marquez.db.mappers.RawLineageEventMapper; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; + +@RegisterRowMapper(RawLineageEventMapper.class) +public interface EventDao extends BaseDao { + + @SqlQuery( + """ + SELECT event + FROM lineage_events + ORDER BY event_time DESC + LIMIT :limit + OFFSET :offset""") + List getAll(int limit, int offset); + + /** + * This is a "hack" to get inputs/outputs namespace from jsonb column: explanation + */ + @SqlQuery( + """ + SELECT le.event + FROM lineage_events le, jsonb_array_elements(coalesce(le.event -> 'inputs', '[]'::jsonb) || coalesce(le.event -> 'outputs', '[]'::jsonb)) AS ds + WHERE le.job_namespace = :namespace + OR ds ->> 'namespace' = :namespace + ORDER BY event_time DESC + LIMIT :limit + OFFSET :offset""") + List getByNamespace( + @Bind("namespace") String namespace, @Bind("limit") int limit, @Bind("offset") int offset); +} diff --git a/api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java b/api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java new file mode 100644 index 0000000000..5209f034fc --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java @@ -0,0 +1,30 @@ +package marquez.db.mappers; + +import static marquez.db.Columns.stringOrThrow; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.ResultSet; +import java.sql.SQLException; +import lombok.extern.slf4j.Slf4j; +import marquez.common.Utils; +import marquez.db.Columns; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +@Slf4j +public class RawLineageEventMapper implements RowMapper { + @Override + public JsonNode map(ResultSet rs, StatementContext ctx) throws SQLException { + String rawEvent = stringOrThrow(rs, Columns.EVENT); + + try { + ObjectMapper mapper = Utils.getMapper(); + return mapper.readTree(rawEvent); + } catch (JsonProcessingException e) { + log.error("Failed to process json", e); + } + return null; + } +} diff --git a/api/src/main/java/marquez/service/DelegatingDaos.java b/api/src/main/java/marquez/service/DelegatingDaos.java index 8184200ab4..e1f91e7c5c 100644 --- a/api/src/main/java/marquez/service/DelegatingDaos.java +++ b/api/src/main/java/marquez/service/DelegatingDaos.java @@ -10,6 +10,7 @@ import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.DatasetVersionDao; +import marquez.db.EventDao; import marquez.db.JobContextDao; import marquez.db.JobDao; import marquez.db.JobVersionDao; @@ -98,4 +99,9 @@ public static class DelegatingTagDao implements TagDao { public static class DelegatingLineageDao implements LineageDao { @Delegate private final LineageDao delegate; } + + @RequiredArgsConstructor + public static class DelegatingEventDao implements EventDao { + @Delegate private final EventDao delegate; + } } diff --git a/api/src/main/java/marquez/service/EventService.java b/api/src/main/java/marquez/service/EventService.java new file mode 100644 index 0000000000..9da1afe672 --- /dev/null +++ b/api/src/main/java/marquez/service/EventService.java @@ -0,0 +1,10 @@ +package marquez.service; + +import lombok.NonNull; +import marquez.db.BaseDao; + +public class EventService extends DelegatingDaos.DelegatingEventDao { + public EventService(@NonNull BaseDao baseDao) { + super(baseDao.createEventDao()); + } +} diff --git a/api/src/main/java/marquez/service/ServiceFactory.java b/api/src/main/java/marquez/service/ServiceFactory.java index 5a4b51465b..4a8ded0a0a 100644 --- a/api/src/main/java/marquez/service/ServiceFactory.java +++ b/api/src/main/java/marquez/service/ServiceFactory.java @@ -22,4 +22,5 @@ public class ServiceFactory { @NonNull DatasetVersionService datasetVersionService; @NonNull DatasetFieldService datasetFieldService; @NonNull LineageService lineageService; + @NonNull EventService eventService; } diff --git a/api/src/main/resources/marquez/db/migration/V46__add_lineage_event_indexes.sql b/api/src/main/resources/marquez/db/migration/V46__add_lineage_event_indexes.sql new file mode 100644 index 0000000000..32249c6a84 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V46__add_lineage_event_indexes.sql @@ -0,0 +1,5 @@ +CREATE INDEX lineage_events_event_time + on lineage_events(event_time DESC); + +CREATE INDEX lineage_events_namespace_event_time + on lineage_events(job_namespace, event_time DESC); diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index b3783e39ec..bcca459446 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import io.dropwizard.util.Resources; import io.openlineage.client.OpenLineage; @@ -39,6 +40,7 @@ import marquez.client.models.DatasetVersion; import marquez.client.models.Job; import marquez.client.models.JobId; +import marquez.client.models.RawLineageEvent; import marquez.client.models.Run; import marquez.common.Utils; import marquez.db.LineageTestUtils; @@ -320,6 +322,141 @@ public void testOpenLineageJobHierarchySparkAndAirflow() assertThat(runsList).isNotEmpty().hasSize(1); } + @Test + public void testSendEventAndGetItBack() { + LineageEvent.Run run = + new LineageEvent.Run(UUID.randomUUID().toString(), LineageEvent.RunFacet.builder().build()); + LineageEvent.Job job = + LineageEvent.Job.builder().namespace(NAMESPACE_NAME).name(JOB_NAME).build(); + LineageEvent.Dataset dataset = + LineageEvent.Dataset.builder().namespace(NAMESPACE_NAME).name(DB_TABLE_NAME).build(); + + // We're losing zone info on write, so I have to UTC it here to compare later + ZonedDateTime time = ZonedDateTime.now(ZoneId.of("UTC")); + + final LineageEvent lineageEvent = + LineageEvent.builder() + .producer("testSendEventAndGetItBack") + .eventType("COMPLETE") + .run(run) + .job(job) + .eventTime(time) + .inputs(Collections.emptyList()) + .outputs(Collections.singletonList(dataset)) + .build(); + + final CompletableFuture resp = sendEvent(lineageEvent); + assertThat(resp.join()).isEqualTo(201); + + List events = client.listEvents(); + + assertThat(events.size()).isEqualTo(1); + + ObjectMapper mapper = Utils.getMapper(); + JsonNode prev = mapper.valueToTree(events.get(0)); + assertThat(prev).isEqualTo(mapper.valueToTree(lineageEvent)); + } + + @Test + public void testFindEventByDatasetNamespace() { + LineageEvent.Run run = + new LineageEvent.Run(UUID.randomUUID().toString(), LineageEvent.RunFacet.builder().build()); + LineageEvent.Job job = + LineageEvent.Job.builder().namespace(NAMESPACE_NAME).name(JOB_NAME).build(); + + ZonedDateTime time = ZonedDateTime.now(ZoneId.of("UTC")); + + LineageEvent.LineageEventBuilder builder = + LineageEvent.builder() + .producer("testFindEventByDatasetNamespace") + .eventType("COMPLETE") + .run(run) + .job(job) + .eventTime(time) + .inputs(Collections.emptyList()); + + for (int i = 0; i < 10; i++) { + LineageEvent.Dataset dataset = + LineageEvent.Dataset.builder() + .namespace(String.format("namespace%d", i)) + .name(DB_TABLE_NAME) + .build(); + + LineageEvent event = builder.outputs(Collections.singletonList(dataset)).build(); + + final CompletableFuture resp = sendEvent(event); + assertThat(resp.join()).isEqualTo(201); + } + + List rawEvents = client.listEvents("namespace3"); + + LineageEvent thirdEvent = + builder + .outputs( + Collections.singletonList( + LineageEvent.Dataset.builder() + .namespace(String.format("namespace3")) + .name(DB_TABLE_NAME) + .build())) + .build(); + + assertThat(rawEvents.size()).isEqualTo(1); + ObjectMapper mapper = Utils.getMapper(); + assertThat((JsonNode) mapper.valueToTree(thirdEvent)) + .isEqualTo(mapper.valueToTree(rawEvents.get(0))); + } + + @Test + public void testFindEventIsSortedByTime() { + LineageEvent.Run run = + new LineageEvent.Run(UUID.randomUUID().toString(), LineageEvent.RunFacet.builder().build()); + LineageEvent.Job job = + LineageEvent.Job.builder().namespace(NAMESPACE_NAME).name(JOB_NAME).build(); + + ZonedDateTime time = ZonedDateTime.now(ZoneId.of("UTC")); + LineageEvent.Dataset dataset = + LineageEvent.Dataset.builder().namespace(NAMESPACE_NAME).name(DB_TABLE_NAME).build(); + + LineageEvent.LineageEventBuilder builder = + LineageEvent.builder() + .producer("testFindEventIsSortedByTime") + .run(run) + .job(job) + .inputs(Collections.emptyList()) + .outputs(Collections.singletonList(dataset)); + + LineageEvent firstEvent = builder.eventTime(time).eventType("START").build(); + + CompletableFuture resp = sendEvent(firstEvent); + assertThat(resp.join()).isEqualTo(201); + + LineageEvent secondEvent = + builder.eventTime(time.plusSeconds(10)).eventType("COMPLETE").build(); + + resp = sendEvent(secondEvent); + assertThat(resp.join()).isEqualTo(201); + + List rawEvents = client.listEvents(NAMESPACE_NAME); + + assertThat(rawEvents.size()).isEqualTo(2); + ObjectMapper mapper = Utils.getMapper(); + assertThat((JsonNode) mapper.valueToTree(firstEvent)) + .isEqualTo(mapper.valueToTree(rawEvents.get(1))); + assertThat((JsonNode) mapper.valueToTree(secondEvent)) + .isEqualTo(mapper.valueToTree(rawEvents.get(0))); + } + + private CompletableFuture sendEvent(LineageEvent event) { + return this.sendLineage(Utils.toJson(event)) + .thenApply(HttpResponse::statusCode) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + } + private CompletableFuture sendAllEvents(RunEvent... events) { return Arrays.stream(events) .reduce( diff --git a/api/src/test/java/marquez/api/ApiTestUtils.java b/api/src/test/java/marquez/api/ApiTestUtils.java index f9e181cf79..8cacedbd5b 100644 --- a/api/src/test/java/marquez/api/ApiTestUtils.java +++ b/api/src/test/java/marquez/api/ApiTestUtils.java @@ -11,6 +11,7 @@ import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; +import marquez.service.EventService; import marquez.service.JobService; import marquez.service.LineageService; import marquez.service.NamespaceService; @@ -53,6 +54,8 @@ public static ServiceFactory mockServiceFactory(Map mocks) { (SourceService) mocks.getOrDefault(SourceService.class, (mock(SourceService.class)))) .datasetService( (DatasetService) mocks.getOrDefault(DatasetService.class, (mock(DatasetService.class)))) + .eventService( + (EventService) mocks.getOrDefault(EventService.class, (mock(EventService.class)))) .build(); } } diff --git a/api/src/test/java/marquez/jdbi/MarquezJdbiExternalPostgresExtension.java b/api/src/test/java/marquez/jdbi/MarquezJdbiExternalPostgresExtension.java index 3673313444..0a1d4acf0d 100644 --- a/api/src/test/java/marquez/jdbi/MarquezJdbiExternalPostgresExtension.java +++ b/api/src/test/java/marquez/jdbi/MarquezJdbiExternalPostgresExtension.java @@ -7,6 +7,7 @@ import javax.sql.DataSource; import marquez.PostgresContainer; +import org.jdbi.v3.jackson2.Jackson2Plugin; import org.jdbi.v3.postgres.PostgresPlugin; import org.jdbi.v3.sqlobject.SqlObjectPlugin; import org.postgresql.ds.PGSimpleDataSource; @@ -34,6 +35,7 @@ public class MarquezJdbiExternalPostgresExtension extends JdbiExternalPostgresEx database = POSTGRES.getDatabaseName(); plugins.add(new SqlObjectPlugin()); plugins.add(new PostgresPlugin()); + plugins.add(new Jackson2Plugin()); migration = Migration.before().withPaths("marquez/db/migration", "classpath:marquez/db/migrations"); } diff --git a/build.gradle b/build.gradle index bfcbe65dd1..fee4e13905 100644 --- a/build.gradle +++ b/build.gradle @@ -21,7 +21,7 @@ buildscript { dependencies { classpath 'com.adarshr:gradle-test-logger-plugin:3.2.0' classpath 'gradle.plugin.com.github.johnrengelman:shadow:7.1.2' - classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.6.1' + classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.7.2' } } diff --git a/clients/java/src/main/java/marquez/client/MarquezClient.java b/clients/java/src/main/java/marquez/client/MarquezClient.java index bc617a4f2a..67262879e2 100644 --- a/clients/java/src/main/java/marquez/client/MarquezClient.java +++ b/clients/java/src/main/java/marquez/client/MarquezClient.java @@ -40,6 +40,7 @@ import marquez.client.models.JobVersion; import marquez.client.models.Namespace; import marquez.client.models.NamespaceMeta; +import marquez.client.models.RawLineageEvent; import marquez.client.models.Run; import marquez.client.models.RunMeta; import marquez.client.models.RunState; @@ -87,6 +88,24 @@ public MarquezClient(final URL baseUrl, @Nullable final String apiKey) { this.http = http; } + public List listEvents() { + return listEvents(DEFAULT_LIMIT, DEFAULT_OFFSET); + } + + public List listEvents(int limit, int offset) { + final String bodyAsJson = http.get(url.toEventUrl(limit, offset)); + return Events.fromJson(bodyAsJson).getValue(); + } + + public List listEvents(String namespaceName) { + return listEvents(namespaceName, DEFAULT_LIMIT, DEFAULT_OFFSET); + } + + public List listEvents(String namespaceName, int limit, int offset) { + final String bodyAsJson = http.get(url.toEventUrl(namespaceName, limit, offset)); + return Events.fromJson(bodyAsJson).getValue(); + } + public Namespace createNamespace( @NonNull String namespaceName, @NonNull NamespaceMeta namespaceMeta) { final String bodyAsJson = http.put(url.toNamespaceUrl(namespaceName), namespaceMeta.toJson()); @@ -528,6 +547,21 @@ static Datasets fromJson(final String json) { } } + @Value + @EqualsAndHashCode(callSuper = false) + public static class Events extends ResultsPage { + @Getter List value; + + @JsonCreator + Events(@JsonProperty("events") final List value) { + this.value = ImmutableList.copyOf(value); + } + + static Events fromJson(final String json) { + return Utils.fromJson(json, new TypeReference() {}); + } + } + @Value static class DatasetVersions { @Getter List value; diff --git a/clients/java/src/main/java/marquez/client/MarquezPathV1.java b/clients/java/src/main/java/marquez/client/MarquezPathV1.java index a029e9ac4a..ac8fc34d23 100644 --- a/clients/java/src/main/java/marquez/client/MarquezPathV1.java +++ b/clients/java/src/main/java/marquez/client/MarquezPathV1.java @@ -72,6 +72,14 @@ static String namespacePath(String namespaceName) { return path("/namespaces/%s", namespaceName); } + static String eventPath() { + return path("/events"); + } + + static String eventPath(String namespaceName) { + return path("/events/%s", namespaceName); + } + static String sourcePath(String sourceName) { return path("/sources/%s", sourceName); } diff --git a/clients/java/src/main/java/marquez/client/MarquezUrl.java b/clients/java/src/main/java/marquez/client/MarquezUrl.java index 48197c5485..2bbedccbcf 100644 --- a/clients/java/src/main/java/marquez/client/MarquezUrl.java +++ b/clients/java/src/main/java/marquez/client/MarquezUrl.java @@ -12,6 +12,7 @@ import static marquez.client.MarquezPathV1.datasetPath; import static marquez.client.MarquezPathV1.datasetTagPath; import static marquez.client.MarquezPathV1.datasetVersionPath; +import static marquez.client.MarquezPathV1.eventPath; import static marquez.client.MarquezPathV1.fieldTagPath; import static marquez.client.MarquezPathV1.jobPath; import static marquez.client.MarquezPathV1.jobVersionPath; @@ -89,6 +90,14 @@ URL toNamespaceUrl(String namespaceName) { return from(namespacePath(namespaceName)); } + URL toEventUrl(int limit, int offset) { + return from(eventPath(), newQueryParamsWith(limit, offset)); + } + + URL toEventUrl(String namespaceName, int limit, int offset) { + return from(eventPath(namespaceName), newQueryParamsWith(limit, offset)); + } + URL toSourceUrl(String sourceName) { return from(sourcePath(sourceName)); } diff --git a/clients/java/src/main/java/marquez/client/models/RawLineageEvent.java b/clients/java/src/main/java/marquez/client/models/RawLineageEvent.java new file mode 100644 index 0000000000..0773f79eff --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/RawLineageEvent.java @@ -0,0 +1,22 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Map; +import lombok.Value; + +@Value +public class RawLineageEvent { + String eventType; + ZonedDateTime eventTime; + Map run; + Map job; + List inputs; + List outputs; + String producer; +} diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index df56e638b3..32d23983aa 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -47,6 +47,8 @@ import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -60,6 +62,7 @@ import lombok.Value; import marquez.client.MarquezClient.DatasetVersions; import marquez.client.MarquezClient.Datasets; +import marquez.client.MarquezClient.Events; import marquez.client.MarquezClient.Jobs; import marquez.client.MarquezClient.Namespaces; import marquez.client.MarquezClient.Runs; @@ -79,6 +82,7 @@ import marquez.client.models.JsonGenerator; import marquez.client.models.Namespace; import marquez.client.models.NamespaceMeta; +import marquez.client.models.RawLineageEvent; import marquez.client.models.Run; import marquez.client.models.RunMeta; import marquez.client.models.RunState; @@ -160,6 +164,18 @@ public class MarquezClientTest { DB_FACETS, CURRENT_VERSION); + // RAW LINEAGE EVENT + + private static final RawLineageEvent RAW_LINEAGE_EVENT = + new RawLineageEvent( + "START", + ZonedDateTime.now(ZoneId.of("UTC")), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + "MQZ-CLIENT"); + // STREAM DATASET private static final DatasetId STREAM_ID = newDatasetIdWith(NAMESPACE_NAME); private static final String STREAM_NAME = STREAM_ID.getName(); @@ -678,6 +694,26 @@ public void testListDatasetVersions() throws Exception { assertThat(datasetVersions).asList().containsExactly(DB_TABLE_VERSION); } + @Test + public void testListEvents() throws Exception { + Events events = new Events(Collections.singletonList(RAW_LINEAGE_EVENT)); + when(http.get(buildUrlFor("/events?limit=10&offset=0"))) + .thenReturn( + Utils.toJson(new ResultsPage<>("events", events.getValue(), events.getValue().size()))); + final List listEvents = client.listEvents(10, 0); + assertThat(listEvents).asList().containsExactly(RAW_LINEAGE_EVENT); + } + + @Test + public void testListEventsWithNamespace() throws Exception { + Events events = new Events(Collections.singletonList(RAW_LINEAGE_EVENT)); + when(http.get(buildUrlFor("/events/%s?limit=10&offset=0", NAMESPACE_NAME))) + .thenReturn( + Utils.toJson(new ResultsPage<>("events", events.getValue(), events.getValue().size()))); + final List listEvents = client.listEvents(NAMESPACE_NAME, 10, 0); + assertThat(listEvents).asList().containsExactly(RAW_LINEAGE_EVENT); + } + @Test public void testCreateJob() throws Exception { final URL url = buildUrlFor("/namespaces/%s/jobs/%s", NAMESPACE_NAME, JOB_NAME); diff --git a/clients/java/src/test/java/marquez/client/MarquezPathV1Test.java b/clients/java/src/test/java/marquez/client/MarquezPathV1Test.java index f427a36041..69fffd5267 100644 --- a/clients/java/src/test/java/marquez/client/MarquezPathV1Test.java +++ b/clients/java/src/test/java/marquez/client/MarquezPathV1Test.java @@ -36,6 +36,12 @@ void testPath_datasetUrl() { MarquezPathV1.datasetPath("s3://buckets", "source-file.json")); } + @Test + void testPath_eventUrl() { + Assertions.assertEquals( + "/api/v1/events/s3%3A%2F%2Fbuckets", MarquezPathV1.eventPath("s3://buckets")); + } + @Test void testPath_placeholderReplacement() { Assertions.assertEquals(