Skip to content

Commit

Permalink
Runless events - fix listLineage API
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Nov 1, 2023
1 parent 151ed6b commit 84f4670
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
27 changes: 20 additions & 7 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@
public interface OpenLineageDao extends BaseDao {
String DEFAULT_SOURCE_NAME = "default";
String DEFAULT_NAMESPACE_OWNER = "anonymous";

enum SpecEventType {
RUN_EVENT,
DATASET_EVENT,
JOB_EVENT;
}

ModelDaos daos = new ModelDaos();

@SqlUpdate(
Expand All @@ -89,8 +96,9 @@ public interface OpenLineageDao extends BaseDao {
+ "job_name, "
+ "job_namespace, "
+ "event, "
+ "producer) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?)")
+ "producer, "
+ "spec_event_type) "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, 'RUN_EVENT')")
void createLineageEvent(
String eventType,
Instant eventTime,
Expand All @@ -104,8 +112,9 @@ void createLineageEvent(
"INSERT INTO lineage_events ("
+ "event_time, "
+ "event, "
+ "producer) "
+ "VALUES (?, ?, ?)")
+ "producer, "
+ "spec_event_type) "
+ "VALUES (?, ?, ?, 'DATASET_EVENT')")
void createDatasetEvent(Instant eventTime, PGobject event, String producer);

@SqlUpdate(
Expand All @@ -114,12 +123,14 @@ void createLineageEvent(
+ "job_name, "
+ "job_namespace, "
+ "event, "
+ "producer) "
+ "VALUES (?, ?, ?, ?, ?)")
+ "producer, "
+ "spec_event_type) "
+ "VALUES (?, ?, ?, ?, ?, 'JOB_EVENT')")
void createJobEvent(
Instant eventTime, String jobName, String jobNamespace, PGobject event, String producer);

@SqlQuery("SELECT event FROM lineage_events WHERE run_uuid = :runUuid")
@SqlQuery(
"SELECT event FROM lineage_events WHERE run_uuid = :runUuid AND spec_event_type='RUN_EVENT'")
List<LineageEvent> findLineageEventsByRunUuid(UUID runUuid);

@SqlQuery(
Expand All @@ -128,6 +139,7 @@ void createJobEvent(
FROM lineage_events le
WHERE (le.event_time < :before
AND le.event_time >= :after)
AND le.spec_event_type='RUN_EVENT'
ORDER BY le.event_time DESC
LIMIT :limit OFFSET :offset""")
List<LineageEvent> getAllLineageEventsDesc(
Expand All @@ -139,6 +151,7 @@ List<LineageEvent> getAllLineageEventsDesc(
FROM lineage_events le
WHERE (le.event_time < :before
AND le.event_time >= :after)
AND le.spec_event_type='RUN_EVENT'
ORDER BY le.event_time ASC
LIMIT :limit OFFSET :offset""")
List<LineageEvent> getAllLineageEventsAsc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ ALTER TABLE job_facets ALTER COLUMN lineage_event_type DROP NOT NULL;
ALTER TABLE job_facets DROP CONSTRAINT job_facets_run_uuid_fkey;
ALTER TABLE job_facets ADD COLUMN job_version_uuid uuid REFERENCES job_versions (uuid);

CREATE INDEX job_facets_job_version_uuid ON job_facets (job_version_uuid);
CREATE INDEX job_facets_job_version_uuid ON job_facets (job_version_uuid);

ALTER TABLE lineage_events ADD COLUMN spec_event_type VARCHAR(64);
UPDATE lineage_events SET spec_event_type = 'RunEvent';
3 changes: 3 additions & 0 deletions api/src/test/java/marquez/OpenLineageIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,9 @@ public void testSendJobEvent() throws IOException {
.hasFieldOrPropertyWithValue("namespace", jobNamespace)
.hasFieldOrPropertyWithValue("name", jobName);
assertThat(jobVersion.getInputs()).isNotEmpty();

// (6) verify list lineage endpoint responds correctly with no events returned
assertThat(client.listLineageEvents()).hasSize(0);
}

private void validateDatasetFacets(JsonNode json) {
Expand Down

0 comments on commit 84f4670

Please sign in to comment.