Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SQL in backfill script for facet tables to improve performance #2461

Merged
merged 1 commit into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 68 additions & 108 deletions api/src/main/java/marquez/db/migrations/V57_1__BackfillFacets.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@

package marquez.db.migrations;

import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import marquez.db.Columns;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.migration.Context;
import org.flywaydb.core.api.migration.JavaMigration;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;

@Slf4j
Expand All @@ -23,72 +20,62 @@ public class V57_1__BackfillFacets implements JavaMigration {

private static int BASIC_MIGRATION_LIMIT = 100000;

private static final String GET_CURRENT_LOCK_SQL =
private static final String CREATE_TEMP_EVENT_RUNS_TABLE =
"""
SELECT * FROM facet_migration_lock
ORDER BY created_at ASC, run_uuid ASC
LIMIT 1
""";
CREATE TEMP TABLE lineage_event_runs AS
SELECT DISTINCT ON (run_uuid) run_uuid,
COALESCE(created_at, event_time) AS created_at
FROM lineage_events
""";

private static final String GET_FINISHING_LOCK_SQL =
private static final String CREATE_INDEX_EVENT_RUNS_TABLE =
"""
SELECT run_uuid, created_at FROM lineage_events
ORDER BY
COALESCE(created_at, event_time) ASC,
run_uuid ASC
LIMIT 1
""";

private static final String GET_INITIAL_LOCK_SQL =
"""
SELECT
run_uuid,
COALESCE(created_at, event_time, NOW()) + INTERVAL '1 MILLISECONDS' as created_at
FROM lineage_events ORDER BY COALESCE(created_at, event_time) DESC, run_uuid DESC LIMIT 1
""";
CREATE INDEX ON lineage_event_runs (created_at DESC) INCLUDE (run_uuid)
""";

private static final String COUNT_LINEAGE_EVENTS_SQL =
"""
SELECT count(*) as cnt FROM lineage_events
""";
SELECT COUNT(*) FROM lineage_events;
""";

private static final String COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL =
private static final String ESTIMATE_COUNT_LINEAGE_EVENTS_SQL =
"""
SELECT count(*) as cnt FROM lineage_events e
WHERE
COALESCE(e.created_at, e.event_time) < :createdAt
OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid)
SELECT reltuples AS cnt FROM pg_class WHERE relname = 'lineage_events';
""";

private String getBackFillFacetsSQL() {
return String.format(
"""
WITH events_chunk AS (
SELECT e.* FROM lineage_events e
WHERE
COALESCE(e.created_at, e.event_time) < :createdAt
OR (COALESCE(e.created_at, e.event_time) = :createdAt AND e.run_uuid < :runUuid)
ORDER BY COALESCE(e.created_at, e.event_time) DESC, e.run_uuid DESC
LIMIT :chunkSize
),
insert_datasets AS (
INSERT INTO dataset_facets %s
),
insert_runs AS (
INSERT INTO run_facets %s
),
insert_jobs AS (
INSERT INTO job_facets %s
)
INSERT INTO facet_migration_lock
SELECT events_chunk.created_at, events_chunk.run_uuid
FROM events_chunk
ORDER BY
COALESCE(events_chunk.created_at, events_chunk.event_time) ASC,
events_chunk.run_uuid ASC
LIMIT 1
RETURNING created_at, run_uuid;
""",
WITH queued_runs AS (
SELECT created_at, run_uuid
FROM lineage_event_runs
ORDER BY created_at DESC, run_uuid
LIMIT :chunkSize
),
processed_runs AS (
DELETE FROM lineage_event_runs
USING queued_runs qe
WHERE lineage_event_runs.run_uuid=qe.run_uuid
RETURNING lineage_event_runs.run_uuid
),
events_chunk AS (
SELECT e.*
FROM lineage_events e
WHERE run_uuid IN (SELECT run_uuid FROM processed_runs)
),
insert_datasets AS (
INSERT INTO dataset_facets %s
),
insert_runs AS (
INSERT INTO run_facets %s
),
insert_jobs AS (
INSERT INTO job_facets %s
)
INSERT INTO facet_migration_lock
SELECT events_chunk.created_at, events_chunk.run_uuid
FROM events_chunk
""",
V56_1__FacetViews.getDatasetFacetsDefinitionSQL("events_chunk"),
V56_1__FacetViews.getRunFacetsDefinitionSQL("events_chunk"),
V56_1__FacetViews.getJobFacetsDefinitionSQL("events_chunk"));
Expand Down Expand Up @@ -140,17 +127,17 @@ public void migrate(Context context) throws Exception {
jdbi = Jdbi.create(context.getConnection());
}

if (getLock(GET_INITIAL_LOCK_SQL).isEmpty()) {
int estimatedEventsCount = estimateCountLineageEvents();
log.info("Estimating {} events in lineage_events table", estimatedEventsCount);
if (estimatedEventsCount == 0 && countLineageEvents() == 0) {
// lineage_events table is empty -> no need to run migration
// anyway. we need to create lock to mark that no data requires migration
execute("INSERT INTO facet_migration_lock VALUES (NOW(), null)");

createTargetViews();
return;
}
Optional<MigrationLock> lastExpectedLock = getLock(GET_FINISHING_LOCK_SQL);

if (!manual && countLineageEvents() >= BASIC_MIGRATION_LIMIT) {
if (!manual && estimatedEventsCount >= BASIC_MIGRATION_LIMIT) {
log.warn(
"""
==================================================
Expand All @@ -168,14 +155,21 @@ public void migrate(Context context) throws Exception {
return;
}

log.info("Configured chunkSize is {}", getChunkSize());
MigrationLock lock = getLock(GET_CURRENT_LOCK_SQL).orElse(getLock(GET_INITIAL_LOCK_SQL).get());
while (!lock.equals(lastExpectedLock.get())) {
lock = backFillChunk(lock);
log.info(
"Migrating chunk finished. Still having {} records to migrate.",
countLineageEventsToProcess(lock));
}
jdbi.withHandle(
h -> {
h.createUpdate(CREATE_TEMP_EVENT_RUNS_TABLE).execute();
h.createUpdate(CREATE_INDEX_EVENT_RUNS_TABLE).execute();
log.info("Configured chunkSize is {}", getChunkSize());
boolean doMigration = true;
while (doMigration) {
int results = backFillChunk(h);
log.info("Migrating chunk finished processing {} records.", results);
if (results < 1) {
doMigration = false;
}
}
return null;
});

createTargetViews();
log.info("All records migrated");
Expand All @@ -195,51 +189,17 @@ private void execute(String sql) {
jdbi.inTransaction(handle -> handle.execute(sql));
}

private MigrationLock backFillChunk(MigrationLock lock) {
private int backFillChunk(Handle h) {
String backFillQuery = getBackFillFacetsSQL();
return jdbi.withHandle(
h ->
h.createQuery(backFillQuery)
.bind("chunkSize", getChunkSize())
.bind("createdAt", lock.created_at)
.bind("runUuid", lock.run_uuid)
.map(
rs ->
new MigrationLock(
rs.getColumn(Columns.RUN_UUID, UUID.class),
rs.getColumn(Columns.CREATED_AT, Instant.class)))
.one());
return h.createUpdate(backFillQuery).bind("chunkSize", getChunkSize()).execute();
}

private Optional<MigrationLock> getLock(String sql) {
private int estimateCountLineageEvents() {
return jdbi.withHandle(
h ->
h.createQuery(sql)
.map(
rs ->
new MigrationLock(
rs.getColumn(Columns.RUN_UUID, UUID.class),
rs.getColumn(Columns.CREATED_AT, Instant.class)))
.findFirst());
h -> h.createQuery(ESTIMATE_COUNT_LINEAGE_EVENTS_SQL).mapTo(Integer.class).one());
}

private int countLineageEvents() {
return jdbi.withHandle(
h ->
h.createQuery(COUNT_LINEAGE_EVENTS_SQL)
.map(rs -> rs.getColumn("cnt", Integer.class))
.one());
return jdbi.withHandle(h -> h.createQuery(COUNT_LINEAGE_EVENTS_SQL).mapTo(Integer.class).one());
}

private int countLineageEventsToProcess(MigrationLock lock) {
return jdbi.withHandle(
h ->
h.createQuery(COUNT_LINEAGE_EVENTS_TO_PROCESS_SQL)
.bind("createdAt", lock.created_at)
.bind("runUuid", lock.run_uuid)
.map(rs -> rs.getColumn("cnt", Integer.class))
.one());
}

private record MigrationLock(UUID run_uuid, Instant created_at) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,44 +165,6 @@ public void testMigrateForMultipleChunks() throws Exception {
}
}

@Test
public void testWhenCurrentLockIsAvailable() throws Exception {
FacetTestUtils.createLineageWithFacets(openLineageDao);
FacetTestUtils.createLineageWithFacets(openLineageDao);
lineageRow =
FacetTestUtils.createLineageWithFacets(
openLineageDao); // point migration_lock to only match the latest lineage event

jdbi.withHandle(
h ->
h.execute(
"""
INSERT INTO facet_migration_lock
SELECT created_at, run_uuid FROM lineage_events
ORDER by created_at DESC LIMIT 1
""")); // last lineage row should be skipped

jdbi.withHandle(
h ->
h.execute(
"""
INSERT INTO facet_migration_lock
SELECT created_at, run_uuid FROM lineage_events
ORDER by created_at DESC LIMIT 1 OFFSET 1
""")); // middle lineage row should be skipped

try (MockedStatic<Jdbi> jdbiMockedStatic = Mockito.mockStatic(Jdbi.class)) {
when(Jdbi.create(connection)).thenReturn(jdbi);
subject.setChunkSize(1);

// clear migration lock and dataset_facets table
jdbi.inTransaction(handle -> handle.execute("DELETE FROM dataset_facets"));
subject.migrate(flywayContext);

assertThat(countDatasetFacets(jdbi)).isEqualTo(15);
}
}

@Test
public void testMigrateForLineageWithNoDatasets() throws Exception {
LineageEvent.JobFacet jobFacet =
Expand Down