diff --git a/api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java b/api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java index 52a40fe784..bc76e591fd 100644 --- a/api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java +++ b/api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java @@ -5,15 +5,12 @@ package marquez.db.migrations; -import java.time.Instant; -import java.util.Optional; -import java.util.UUID; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import marquez.db.Columns; import org.flywaydb.core.api.MigrationVersion; import org.flywaydb.core.api.migration.Context; import org.flywaydb.core.api.migration.JavaMigration; +import org.jdbi.v3.core.Handle; import org.jdbi.v3.core.Jdbi; @Slf4j @@ -23,72 +20,62 @@ public class V57_1__BackfillFacets implements JavaMigration { private static int BASIC_MIGRATION_LIMIT = 100000; - private static final String GET_CURRENT_LOCK_SQL = + private static final String CREATE_TEMP_EVENT_RUNS_TABLE = """ - SELECT * FROM facet_migration_lock - ORDER BY created_at ASC, run_uuid ASC - LIMIT 1 - """; + CREATE TEMP TABLE lineage_event_runs AS + SELECT DISTINCT ON (run_uuid) run_uuid, + COALESCE(created_at, event_time) AS created_at + FROM lineage_events + """; - private static final String GET_FINISHING_LOCK_SQL = + private static final String CREATE_INDEX_EVENT_RUNS_TABLE = """ - SELECT run_uuid, created_at FROM lineage_events - ORDER BY - COALESCE(created_at, event_time) ASC, - run_uuid ASC - LIMIT 1 - """; - - private static final String GET_INITIAL_LOCK_SQL = - """ - SELECT - run_uuid, - COALESCE(created_at, event_time, NOW()) + INTERVAL '1 MILLISECONDS' as created_at - FROM lineage_events ORDER BY COALESCE(created_at, event_time) DESC, run_uuid DESC LIMIT 1 - """; + CREATE INDEX ON lineage_event_runs (created_at DESC) INCLUDE (run_uuid) + """; private static final String COUNT_LINEAGE_EVENTS_SQL = """ - SELECT count(*) as cnt FROM lineage_events - """; + SELECT COUNT(*) FROM lineage_events; + """; - private static final String COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL = + private static final String ESTIMATE_COUNT_LINEAGE_EVENTS_SQL = """ - SELECT count(*) as cnt FROM lineage_events e - WHERE - COALESCE(e.created_at, e.event_time) < :createdAt - OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid) + SELECT reltuples AS cnt FROM pg_class WHERE relname = 'lineage_events'; """; private String getBackFillFacetsSQL() { return String.format( """ - WITH events_chunk AS ( - SELECT e.* FROM lineage_events e - WHERE - COALESCE(e.created_at, e.event_time) < :createdAt - OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid) - ORDER BY COALESCE(e.created_at, e.event_time) DESC, e.run_uuid DESC - LIMIT :chunkSize - ), - insert_datasets AS ( - INSERT INTO dataset_facets %s - ), - insert_runs AS ( - INSERT INTO run_facets %s - ), - insert_jobs AS ( - INSERT INTO job_facets %s - ) - INSERT INTO facet_migration_lock - SELECT events_chunk.created_at, events_chunk.run_uuid - FROM events_chunk - ORDER BY - COALESCE(events_chunk.created_at, events_chunk.event_time) ASC, - events_chunk.run_uuid ASC - LIMIT 1 - RETURNING created_at, run_uuid; - """, + WITH queued_runs AS ( + SELECT created_at, run_uuid + FROM lineage_event_runs + ORDER BY created_at DESC, run_uuid + LIMIT :chunkSize + ), + processed_runs AS ( + DELETE FROM lineage_event_runs + USING queued_runs qe + WHERE lineage_event_runs.run_uuid=qe.run_uuid + RETURNING lineage_event_runs.run_uuid + ), + events_chunk AS ( + SELECT e.* + FROM lineage_events e + WHERE run_uuid IN (SELECT run_uuid FROM processed_runs) + ), + insert_datasets AS ( + INSERT INTO dataset_facets %s + ), + insert_runs AS ( + INSERT INTO run_facets %s + ), + insert_jobs AS ( + INSERT INTO job_facets %s + ) + INSERT INTO facet_migration_lock + SELECT events_chunk.created_at, events_chunk.run_uuid + FROM events_chunk + """, V56_1__FacetViews.getDatasetFacetsDefinitionSQL("events_chunk"), V56_1__FacetViews.getRunFacetsDefinitionSQL("events_chunk"), V56_1__FacetViews.getJobFacetsDefinitionSQL("events_chunk")); @@ -140,7 +127,9 @@ public void migrate(Context context) throws Exception { jdbi = Jdbi.create(context.getConnection()); } - if (getLock(GET_INITIAL_LOCK_SQL).isEmpty()) { + int estimatedEventsCount = estimateCountLineageEvents(); + log.info("Estimating {} events in lineage_events table", estimatedEventsCount); + if (estimatedEventsCount == 0 && countLineageEvents() == 0) { // lineage_events table is empty -> no need to run migration // anyway. we need to create lock to mark that no data requires migration execute("INSERT INTO facet_migration_lock VALUES (NOW(), null)"); @@ -148,9 +137,7 @@ public void migrate(Context context) throws Exception { createTargetViews(); return; } - Optional lastExpectedLock = getLock(GET_FINISHING_LOCK_SQL); - - if (!manual && countLineageEvents() >= BASIC_MIGRATION_LIMIT) { + if (!manual && estimatedEventsCount >= BASIC_MIGRATION_LIMIT) { log.warn( """ ================================================== @@ -168,14 +155,21 @@ public void migrate(Context context) throws Exception { return; } - log.info("Configured chunkSize is {}", getChunkSize()); - MigrationLock lock = getLock(GET_CURRENT_LOCK_SQL).orElse(getLock(GET_INITIAL_LOCK_SQL).get()); - while (!lock.equals(lastExpectedLock.get())) { - lock = backFillChunk(lock); - log.info( - "Migrating chunk finished. Still having {} records to migrate.", - countLineageEventsToProcess(lock)); - } + jdbi.withHandle( + h -> { + h.createUpdate(CREATE_TEMP_EVENT_RUNS_TABLE).execute(); + h.createUpdate(CREATE_INDEX_EVENT_RUNS_TABLE).execute(); + log.info("Configured chunkSize is {}", getChunkSize()); + boolean doMigration = true; + while (doMigration) { + int results = backFillChunk(h); + log.info("Migrating chunk finished processing {} records.", results); + if (results < 1) { + doMigration = false; + } + } + return null; + }); createTargetViews(); log.info("All records migrated"); @@ -195,51 +189,17 @@ private void execute(String sql) { jdbi.inTransaction(handle -> handle.execute(sql)); } - private MigrationLock backFillChunk(MigrationLock lock) { + private int backFillChunk(Handle h) { String backFillQuery = getBackFillFacetsSQL(); - return jdbi.withHandle( - h -> - h.createQuery(backFillQuery) - .bind("chunkSize", getChunkSize()) - .bind("createdAt", lock.created_at) - .bind("runUuid", lock.run_uuid) - .map( - rs -> - new MigrationLock( - rs.getColumn(Columns.RUN_UUID, UUID.class), - rs.getColumn(Columns.CREATED_AT, Instant.class))) - .one()); + return h.createUpdate(backFillQuery).bind("chunkSize", getChunkSize()).execute(); } - private Optional getLock(String sql) { + private int estimateCountLineageEvents() { return jdbi.withHandle( - h -> - h.createQuery(sql) - .map( - rs -> - new MigrationLock( - rs.getColumn(Columns.RUN_UUID, UUID.class), - rs.getColumn(Columns.CREATED_AT, Instant.class))) - .findFirst()); + h -> h.createQuery(ESTIMATE_COUNT_LINEAGE_EVENTS_SQL).mapTo(Integer.class).one()); } private int countLineageEvents() { - return jdbi.withHandle( - h -> - h.createQuery(COUNT_LINEAGE_EVENTS_SQL) - .map(rs -> rs.getColumn("cnt", Integer.class)) - .one()); + return jdbi.withHandle(h -> h.createQuery(COUNT_LINEAGE_EVENTS_SQL).mapTo(Integer.class).one()); } - - private int countLineageEventsToProcess(MigrationLock lock) { - return jdbi.withHandle( - h -> - h.createQuery(COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL) - .bind("createdAt", lock.created_at) - .bind("runUuid", lock.run_uuid) - .map(rs -> rs.getColumn("cnt", Integer.class)) - .one()); - } - - private record MigrationLock(UUID run_uuid, Instant created_at) {} } diff --git a/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java b/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java index 0118f8ad36..b5f98d12e4 100644 --- a/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java +++ b/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java @@ -165,44 +165,6 @@ public void testMigrateForMultipleChunks() throws Exception { } } - @Test - public void testWhenCurrentLockIsAvailable() throws Exception { - FacetTestUtils.createLineageWithFacets(openLineageDao); - FacetTestUtils.createLineageWithFacets(openLineageDao); - lineageRow = - FacetTestUtils.createLineageWithFacets( - openLineageDao); // point migration_lock to only match the latest lineage event - - jdbi.withHandle( - h -> - h.execute( - """ - INSERT INTO facet_migration_lock - SELECT created_at, run_uuid FROM lineage_events - ORDER by created_at DESC LIMIT 1 - """)); // last lineage row should be skipped - - jdbi.withHandle( - h -> - h.execute( - """ - INSERT INTO facet_migration_lock - SELECT created_at, run_uuid FROM lineage_events - ORDER by created_at DESC LIMIT 1 OFFSET 1 - """)); // middle lineage row should be skipped - - try (MockedStatic jdbiMockedStatic = Mockito.mockStatic(Jdbi.class)) { - when(Jdbi.create(connection)).thenReturn(jdbi); - subject.setChunkSize(1); - - // clear migration lock and dataset_facets table - jdbi.inTransaction(handle -> handle.execute("DELETE FROM dataset_facets")); - subject.migrate(flywayContext); - - assertThat(countDatasetFacets(jdbi)).isEqualTo(15); - } - } - @Test public void testMigrateForLineageWithNoDatasets() throws Exception { LineageEvent.JobFacet jobFacet =