From 84f46702a05f62ae1c3399ad51d876113006f6e4 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Wed, 1 Nov 2023 13:23:22 +0100 Subject: [PATCH] Runless events - fix listLineage API Signed-off-by: Pawel Leszczynski --- .../main/java/marquez/db/OpenLineageDao.java | 27 ++++++++++++++----- .../migration/V66.1__job_facets_changes.sql | 5 +++- .../marquez/OpenLineageIntegrationTest.java | 3 +++ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 68db5961cc..d37a4fe145 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -79,6 +79,13 @@ public interface OpenLineageDao extends BaseDao { String DEFAULT_SOURCE_NAME = "default"; String DEFAULT_NAMESPACE_OWNER = "anonymous"; + + enum SpecEventType { + RUN_EVENT, + DATASET_EVENT, + JOB_EVENT; + } + ModelDaos daos = new ModelDaos(); @SqlUpdate( @@ -89,8 +96,9 @@ public interface OpenLineageDao extends BaseDao { + "job_name, " + "job_namespace, " + "event, " - + "producer) " - + "VALUES (?, ?, ?, ?, ?, ?, ?)") + + "producer, " + + "spec_event_type) " + + "VALUES (?, ?, ?, ?, ?, ?, ?, 'RUN_EVENT')") void createLineageEvent( String eventType, Instant eventTime, @@ -104,8 +112,9 @@ void createLineageEvent( "INSERT INTO lineage_events (" + "event_time, " + "event, " - + "producer) " - + "VALUES (?, ?, ?)") + + "producer, " + + "spec_event_type) " + + "VALUES (?, ?, ?, 'DATASET_EVENT')") void createDatasetEvent(Instant eventTime, PGobject event, String producer); @SqlUpdate( @@ -114,12 +123,14 @@ void createLineageEvent( + "job_name, " + "job_namespace, " + "event, " - + "producer) " - + "VALUES (?, ?, ?, ?, ?)") + + "producer, " + + "spec_event_type) " + + "VALUES (?, ?, ?, ?, ?, 'JOB_EVENT')") void createJobEvent( Instant eventTime, String jobName, String jobNamespace, PGobject event, String producer); - @SqlQuery("SELECT event FROM lineage_events WHERE run_uuid = :runUuid") + @SqlQuery( + "SELECT event FROM lineage_events WHERE run_uuid = :runUuid AND spec_event_type='RUN_EVENT'") List findLineageEventsByRunUuid(UUID runUuid); @SqlQuery( @@ -128,6 +139,7 @@ void createJobEvent( FROM lineage_events le WHERE (le.event_time < :before AND le.event_time >= :after) + AND le.spec_event_type='RUN_EVENT' ORDER BY le.event_time DESC LIMIT :limit OFFSET :offset""") List getAllLineageEventsDesc( @@ -139,6 +151,7 @@ List getAllLineageEventsDesc( FROM lineage_events le WHERE (le.event_time < :before AND le.event_time >= :after) + AND le.spec_event_type='RUN_EVENT' ORDER BY le.event_time ASC LIMIT :limit OFFSET :offset""") List getAllLineageEventsAsc( diff --git a/api/src/main/resources/marquez/db/migration/V66.1__job_facets_changes.sql b/api/src/main/resources/marquez/db/migration/V66.1__job_facets_changes.sql index d804167b8a..6452449ee1 100644 --- a/api/src/main/resources/marquez/db/migration/V66.1__job_facets_changes.sql +++ b/api/src/main/resources/marquez/db/migration/V66.1__job_facets_changes.sql @@ -2,4 +2,7 @@ ALTER TABLE job_facets ALTER COLUMN lineage_event_type DROP NOT NULL; ALTER TABLE job_facets DROP CONSTRAINT job_facets_run_uuid_fkey; ALTER TABLE job_facets ADD COLUMN job_version_uuid uuid REFERENCES job_versions (uuid); -CREATE INDEX job_facets_job_version_uuid ON job_facets (job_version_uuid); \ No newline at end of file +CREATE INDEX job_facets_job_version_uuid ON job_facets (job_version_uuid); + +ALTER TABLE lineage_events ADD COLUMN spec_event_type VARCHAR(64); +UPDATE lineage_events SET spec_event_type = 'RunEvent'; \ No newline at end of file diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index bbdab90d14..f0d0fe8aaa 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -1490,6 +1490,9 @@ public void testSendJobEvent() throws IOException { .hasFieldOrPropertyWithValue("namespace", jobNamespace) .hasFieldOrPropertyWithValue("name", jobName); assertThat(jobVersion.getInputs()).isNotEmpty(); + + // (6) verify list lineage endpoint responds correctly with no events returned + assertThat(client.listLineageEvents()).hasSize(0); } private void validateDatasetFacets(JsonNode json) {