Skip to content

Commit

Permalink
remove namespace endpoint
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Sep 15, 2022
1 parent b31503c commit c9271b8
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 143 deletions.
31 changes: 12 additions & 19 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
Expand All @@ -35,6 +34,7 @@
import lombok.NonNull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import marquez.api.models.SortDirection;
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.LineageEvent;
Expand Down Expand Up @@ -107,28 +107,21 @@ public Response getLineage(
public Response getLineageEvents(
@QueryParam("before") @DefaultValue("2030-01-01T00:00:00+00:00") ZonedDateTimeParam before,
@QueryParam("after") @DefaultValue("1970-01-01T00:00:00+00:00") ZonedDateTimeParam after,
@QueryParam("sortDirection") @DefaultValue("desc") SortDirection sortDirection,
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit) {
final List<LineageEvent> events =
openLineageDao.getAllLineageEvents(before.get(), after.get(), limit);
final List<LineageEvent> events;
if (sortDirection.getValue().equalsIgnoreCase("desc")) {
events = openLineageDao.getAllLineageEventsDesc(before.get(), after.get(), limit);
} else if (sortDirection.getValue().equalsIgnoreCase("asc")) {
events = openLineageDao.getAllLineageEventsAsc(before.get(), after.get(), limit);
} else {
return Response.status(BAD_REQUEST)
.entity(String.format("%s should be either 'asc' or 'desc", sortDirection.getValue()))
.build();
}
return Response.ok(new Events(events)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Path("/namespace/{namespace}/events/lineage")
@Produces(APPLICATION_JSON)
public Response getLineageEventsByNamespace(
@PathParam("namespace") String namespace,
@QueryParam("before") @DefaultValue("2030-01-01T00:00:00+00:00") ZonedDateTimeParam before,
@QueryParam("after") @DefaultValue("1970-01-01T00:00:00+00:00") ZonedDateTimeParam after,
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit) {
final List<LineageEvent> event =
openLineageDao.getLineageEventsByNamespace(namespace, before.get(), after.get(), limit);
return Response.ok(new Events(event)).build();
}

@Value
static class Events {
@NonNull
Expand Down
12 changes: 12 additions & 0 deletions api/src/main/java/marquez/api/models/SortDirection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package marquez.api.models;

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
public enum SortDirection {
DESC("desc"),
ASC("asc");

@Getter public final String value;
}
46 changes: 8 additions & 38 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import marquez.service.models.LineageEvent.SchemaDatasetFacet;
import marquez.service.models.LineageEvent.SchemaField;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
import org.jdbi.v3.sqlobject.statement.SqlUpdate;
import org.postgresql.util.PGobject;
Expand Down Expand Up @@ -98,46 +97,17 @@ void createLineageEvent(
AND le.event_time >= :after)
ORDER BY le.event_time DESC
LIMIT :limit""")
List<LineageEvent> getAllLineageEvents(ZonedDateTime before, ZonedDateTime after, int limit);
List<LineageEvent> getAllLineageEventsDesc(ZonedDateTime before, ZonedDateTime after, int limit);

/**
* This is a "hack" to get inputs/outputs namespace from jsonb column: <a
* href="https://github.com/jdbi/jdbi/issues/1510#issuecomment-485423083">explanation</a>
*/
@SqlQuery(
"""
WITH job_events AS (
SELECT le.event
FROM lineage_events le
WHERE le.job_namespace = :namespace
AND (le.event_time < :before
AND le.event_time >= :after)
ORDER BY le.event_time DESC
), dataset_events AS (
SELECT le.event, le.event_time
FROM lineage_events le
JOIN dataset_versions dv on le.run_uuid = dv.run_uuid
JOIN datasets ds on dv.dataset_uuid = ds.uuid
JOIN namespaces n on ds.namespace_uuid = n.uuid
WHERE n.name = :namespace
AND (le.event_time < :before
AND le.event_time >= :after)
ORDER BY le.event_time DESC
)
SELECT le.event
FROM (
SELECT * FROM dataset_events
UNION ALL
SELECT * FROM job_events
) le
ORDER BY le.event_time
LIMIT :limit
""")
List<LineageEvent> getLineageEventsByNamespace(
@Bind("namespace") String namespace,
@Bind("before") ZonedDateTime before,
@Bind("after") ZonedDateTime after,
@Bind("limit") int limit);
SELECT event
FROM lineage_events le
WHERE (le.event_time < :before
AND le.event_time >= :after)
ORDER BY le.event_time ASC
LIMIT :limit""")
List<LineageEvent> getAllLineageEventsAsc(ZonedDateTime before, ZonedDateTime after, int limit);

default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) {
UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
CREATE INDEX CONCURRENTLY lineage_events_event_time
on lineage_events(event_time DESC);

CREATE INDEX CONCURRENTLY lineage_events_namespace_event_time
on lineage_events(job_namespace, event_time DESC);
109 changes: 79 additions & 30 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -36,6 +37,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import marquez.client.MarquezClient;
import marquez.client.models.Dataset;
import marquez.client.models.DatasetVersion;
import marquez.client.models.Job;
Expand Down Expand Up @@ -414,7 +416,7 @@ public void testSendEventAndGetItBack() {
}

@Test
public void testFindEventByDatasetNamespace() {
public void testFindEventIsSortedByTime() {
marquez.service.models.LineageEvent.Run run =
new marquez.service.models.LineageEvent.Run(
UUID.randomUUID().toString(),
Expand All @@ -426,50 +428,44 @@ public void testFindEventByDatasetNamespace() {
.build();

ZonedDateTime time = ZonedDateTime.now(ZoneId.of("UTC"));
marquez.service.models.LineageEvent.Dataset dataset =
marquez.service.models.LineageEvent.Dataset.builder()
.namespace(NAMESPACE_NAME)
.name(DB_TABLE_NAME)
.build();

marquez.service.models.LineageEvent.LineageEventBuilder builder =
marquez.service.models.LineageEvent.builder()
.producer("testFindEventByDatasetNamespace")
.eventType("COMPLETE")
.producer("testFindEventIsSortedByTime")
.run(run)
.job(job)
.eventTime(time)
.inputs(Collections.emptyList());
.inputs(Collections.emptyList())
.outputs(Collections.singletonList(dataset));

for (int i = 0; i < 10; i++) {
marquez.service.models.LineageEvent.Dataset dataset =
marquez.service.models.LineageEvent.Dataset.builder()
.namespace(String.format("namespace%d", i))
.name(DB_TABLE_NAME)
.build();
marquez.service.models.LineageEvent firstEvent =
builder.eventTime(time).eventType("START").build();

marquez.service.models.LineageEvent event =
builder.outputs(Collections.singletonList(dataset)).build();
CompletableFuture<Integer> resp = sendEvent(firstEvent);
assertThat(resp.join()).isEqualTo(201);

final CompletableFuture<Integer> resp = sendEvent(event);
assertThat(resp.join()).isEqualTo(201);
}
marquez.service.models.LineageEvent secondEvent =
builder.eventTime(time.plusSeconds(10)).eventType("COMPLETE").build();

List<LineageEvent> rawEvents = client.listLineageEvents("namespace3");
resp = sendEvent(secondEvent);
assertThat(resp.join()).isEqualTo(201);

marquez.service.models.LineageEvent thirdEvent =
builder
.outputs(
Collections.singletonList(
marquez.service.models.LineageEvent.Dataset.builder()
.namespace(String.format("namespace3"))
.name(DB_TABLE_NAME)
.build()))
.build();
List<LineageEvent> rawEvents = client.listLineageEvents();

assertThat(rawEvents.size()).isEqualTo(1);
assertThat(rawEvents.size()).isEqualTo(2);
ObjectMapper mapper = Utils.getMapper();
assertThat((JsonNode) mapper.valueToTree(thirdEvent))
assertThat((JsonNode) mapper.valueToTree(firstEvent))
.isEqualTo(mapper.valueToTree(rawEvents.get(1)));
assertThat((JsonNode) mapper.valueToTree(secondEvent))
.isEqualTo(mapper.valueToTree(rawEvents.get(0)));
}

@Test
public void testFindEventIsSortedByTime() {
public void testFindEventIsSortedByTimeAsc() {
marquez.service.models.LineageEvent.Run run =
new marquez.service.models.LineageEvent.Run(
UUID.randomUUID().toString(),
Expand Down Expand Up @@ -507,12 +503,65 @@ public void testFindEventIsSortedByTime() {
resp = sendEvent(secondEvent);
assertThat(resp.join()).isEqualTo(201);

List<LineageEvent> rawEvents = client.listLineageEvents(NAMESPACE_NAME);
List<LineageEvent> rawEvents = client.listLineageEvents(MarquezClient.SortDirection.ASC, 10);

assertThat(rawEvents.size()).isEqualTo(2);
ObjectMapper mapper = Utils.getMapper();
assertThat((JsonNode) mapper.valueToTree(firstEvent))
.isEqualTo(mapper.valueToTree(rawEvents.get(0)));
assertThat((JsonNode) mapper.valueToTree(secondEvent))
.isEqualTo(mapper.valueToTree(rawEvents.get(1)));
}

@Test
public void testFindEventBeforeAfterTime() {
marquez.service.models.LineageEvent.Run run =
new marquez.service.models.LineageEvent.Run(
UUID.randomUUID().toString(),
marquez.service.models.LineageEvent.RunFacet.builder().build());
marquez.service.models.LineageEvent.Job job =
marquez.service.models.LineageEvent.Job.builder()
.namespace(NAMESPACE_NAME)
.name(JOB_NAME)
.build();

ZonedDateTime after = ZonedDateTime.of(2021, 1, 1, 0, 0, 0, 0, ZoneId.of("UTC"));
ZonedDateTime before = ZonedDateTime.of(2022, 1, 1, 0, 0, 0, 0, ZoneId.of("UTC"));

marquez.service.models.LineageEvent.Dataset dataset =
marquez.service.models.LineageEvent.Dataset.builder()
.namespace(NAMESPACE_NAME)
.name(DB_TABLE_NAME)
.build();

marquez.service.models.LineageEvent.LineageEventBuilder builder =
marquez.service.models.LineageEvent.builder()
.producer("testFindEventIsSortedByTime")
.run(run)
.job(job)
.inputs(Collections.emptyList())
.outputs(Collections.singletonList(dataset));

marquez.service.models.LineageEvent firstEvent =
builder.eventTime(after.minus(1, ChronoUnit.YEARS)).eventType("START").build();

CompletableFuture<Integer> resp = sendEvent(firstEvent);
assertThat(resp.join()).isEqualTo(201);

marquez.service.models.LineageEvent secondEvent =
builder.eventTime(after.plusSeconds(10)).eventType("COMPLETE").build();

resp = sendEvent(secondEvent);
assertThat(resp.join()).isEqualTo(201);

marquez.service.models.LineageEvent thirdEvent =
builder.eventTime(before.plusSeconds(10)).eventType("COMPLETE").build();

List<LineageEvent> rawEvents =
client.listLineageEvents(MarquezClient.SortDirection.ASC, before, after, 10);

assertThat(rawEvents.size()).isEqualTo(1);
ObjectMapper mapper = Utils.getMapper();
assertThat((JsonNode) mapper.valueToTree(secondEvent))
.isEqualTo(mapper.valueToTree(rawEvents.get(0)));
}
Expand Down
13 changes: 13 additions & 0 deletions api/src/test/java/marquez/api/OpenLineageResourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.dropwizard.testing.junit5.DropwizardExtensionsSupport;
import io.dropwizard.testing.junit5.ResourceExtension;
import java.util.Map;
import javax.ws.rs.core.Response;
import marquez.common.Utils;
import marquez.db.OpenLineageDao;
import marquez.service.LineageService;
Expand Down Expand Up @@ -63,4 +64,16 @@ public void testGetLineage() {

assertEquals(lineage, LINEAGE);
}

@Test
public void testGetLineageEventsBadSort() {
final Response response =
UNDER_TEST
.target("/api/v1/events/lineage")
.queryParam("sortDirection", "asdf")
.request()
.get();

assertEquals(response.getStatus(), 400);
}
}
23 changes: 15 additions & 8 deletions clients/java/src/main/java/marquez/client/MarquezClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import java.io.InputStream;
import java.net.URL;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -89,21 +91,26 @@ public MarquezClient(final URL baseUrl, @Nullable final String apiKey) {
}

public List<LineageEvent> listLineageEvents() {
return listLineageEvents(DEFAULT_LIMIT, DEFAULT_OFFSET);
return listLineageEvents(SortDirection.DESC, DEFAULT_LIMIT);
}

public List<LineageEvent> listLineageEvents(int limit, int offset) {
final String bodyAsJson = http.get(url.toEventUrl(limit, offset));
public List<LineageEvent> listLineageEvents(MarquezClient.SortDirection sort, int limit) {
final String bodyAsJson = http.get(url.toEventUrl(sort, limit));
return Events.fromJson(bodyAsJson).getValue();
}

public List<LineageEvent> listLineageEvents(String namespaceName) {
return listLineageEvents(namespaceName, DEFAULT_LIMIT, DEFAULT_OFFSET);
public List<LineageEvent> listLineageEvents(
MarquezClient.SortDirection sort, ZonedDateTime before, ZonedDateTime after, int limit) {
final String bodyAsJson = http.get(url.toEventUrl(sort, before, after, limit));
return Events.fromJson(bodyAsJson).getValue();
}

public List<LineageEvent> listLineageEvents(String namespaceName, int limit, int offset) {
final String bodyAsJson = http.get(url.toEventUrl(namespaceName, limit, offset));
return Events.fromJson(bodyAsJson).getValue();
@AllArgsConstructor
public enum SortDirection {
DESC("desc"),
ASC("asc");

@Getter public final String value;
}

public Namespace createNamespace(
Expand Down
4 changes: 0 additions & 4 deletions clients/java/src/main/java/marquez/client/MarquezPathV1.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@ static String lineageEventPath() {
return path("/events/lineage");
}

static String lineageEventPath(String namespaceName) {
return path("/namespace/%s/events/lineage", namespaceName);
}

static String sourcePath(String sourceName) {
return path("/sources/%s", sourceName);
}
Expand Down
Loading

0 comments on commit c9271b8

Please sign in to comment.