Skip to content

Commit

Permalink
Runless events - refactor job_versions_io_mapping (#2654)
Browse files Browse the repository at this point in the history
* get lineage from job_versions_io_mapping table only

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

* add made_current_at field to job_versions

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>

---------

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski authored Dec 13, 2023
1 parent a5a0e55 commit b73fb15
Show file tree
Hide file tree
Showing 10 changed files with 617 additions and 71 deletions.
97 changes: 71 additions & 26 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,40 +192,73 @@ ExtendedJobVersionRow upsertJobVersion(
String namespaceName);

/**
* Used to link an input dataset to a given job version.
* Used to upsert an input or output dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param inputDatasetUuid The unique ID of the input dataset.
* @param datasetUuid The unique ID of the output dataset
* @param ioType The {@link IoType} of the dataset.
* @param jobUuid The unique ID of the job.
*/
default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid) {
upsertInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, IoType.INPUT);
}
@SqlUpdate(
"""
INSERT INTO job_versions_io_mapping (
job_version_uuid, dataset_uuid, io_type, job_uuid, job_symlink_target_uuid, is_current_job_version, made_current_at)
VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, :symlinkTargetJobUuid, TRUE, NOW())
ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO NOTHING
""")
void upsertCurrentInputOrOutputDatasetFor(
UUID jobVersionUuid,
UUID datasetUuid,
UUID jobUuid,
UUID symlinkTargetJobUuid,
IoType ioType);

@SqlUpdate(
"""
UPDATE job_versions_io_mapping
SET is_current_job_version = FALSE
WHERE (job_uuid = :jobUuid OR job_symlink_target_uuid = :jobUuid)
AND job_version_uuid != :jobVersionUuid
AND io_type = :ioType
AND is_current_job_version = TRUE;
""")
void markInputOrOutputDatasetAsPreviousFor(UUID jobVersionUuid, UUID jobUuid, IoType ioType);

@SqlUpdate(
"""
UPDATE job_versions_io_mapping
SET is_current_job_version = FALSE
WHERE (job_uuid = :jobUuid OR job_symlink_target_uuid = :jobUuid)
AND io_type = :ioType
AND is_current_job_version = TRUE;
""")
void markInputOrOutputDatasetAsPreviousFor(UUID jobUuid, IoType ioType);

/**
* Used to link an output dataset to a given job version.
* Used to link an input dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param outputDatasetUuid The unique ID of the output dataset.
* @param inputDatasetUuid The unique ID of the input dataset.
* @param jobUuid The unique ID of the job.
*/
default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid) {
upsertInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, IoType.OUTPUT);
default void upsertInputDatasetFor(
UUID jobVersionUuid, UUID inputDatasetUuid, UUID jobUuid, UUID symlinkTargetJobUuid) {
markInputOrOutputDatasetAsPreviousFor(jobVersionUuid, jobUuid, IoType.INPUT);
upsertCurrentInputOrOutputDatasetFor(
jobVersionUuid, inputDatasetUuid, jobUuid, symlinkTargetJobUuid, IoType.INPUT);
}

/**
* Used to upsert an input or output dataset to a given job version.
* Used to link an output dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param datasetUuid The unique ID of the output dataset
* @param ioType The {@link IoType} of the dataset.
* @param outputDatasetUuid The unique ID of the output dataset.
* @param jobUuid The unique ID of the job.
*/
@SqlUpdate(
"""
INSERT INTO job_versions_io_mapping (
job_version_uuid, dataset_uuid, io_type)
VALUES (:jobVersionUuid, :datasetUuid, :ioType)
ON CONFLICT DO NOTHING
""")
void upsertInputOrOutputDatasetFor(UUID jobVersionUuid, UUID datasetUuid, IoType ioType);
default void upsertOutputDatasetFor(
UUID jobVersionUuid, UUID outputDatasetUuid, UUID jobUuid, UUID symlinkTargetJobUuid) {
markInputOrOutputDatasetAsPreviousFor(jobVersionUuid, jobUuid, IoType.OUTPUT);
upsertCurrentInputOrOutputDatasetFor(
jobVersionUuid, outputDatasetUuid, jobUuid, symlinkTargetJobUuid, IoType.OUTPUT);
}

/**
* Returns the input datasets to a given job version.
Expand Down Expand Up @@ -366,14 +399,20 @@ default BagOfJobVersionInfo upsertRunlessJobVersion(
inputs.forEach(
i -> {
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(), i.getDatasetVersionRow().getDatasetUuid());
jobVersionRow.getUuid(),
i.getDatasetVersionRow().getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
});

// Link the output datasets to the job version.
outputs.forEach(
o -> {
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(), o.getDatasetVersionRow().getDatasetUuid());
jobVersionRow.getUuid(),
o.getDatasetVersionRow().getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
});

jobDao.updateVersionFor(jobRow.getUuid(), jobRow.getCreatedAt(), jobVersionRow.getUuid());
Expand Down Expand Up @@ -468,14 +507,20 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
jobVersionInputs.forEach(
jobVersionInput -> {
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid());
jobVersionRow.getUuid(),
jobVersionInput.getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
});

// Link the output datasets to the job version.
jobVersionOutputs.forEach(
jobVersionOutput -> {
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid());
jobVersionRow.getUuid(),
jobVersionOutput.getDatasetUuid(),
jobVersionRow.getJobUuid(),
jobRow.getSymlinkTargetId());
});

// Link the job version to the run.
Expand Down
76 changes: 39 additions & 37 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,43 +56,45 @@ public record UpstreamRunRow(JobSummary job, RunSummary run, DatasetSummary inpu
@SqlQuery(
"""
WITH RECURSIVE
-- Find the current version of a job or its symlink target if the target has no
-- current_version_uuid. This ensures that we don't lose lineage for a job after it is
-- symlinked to another job but before that target job has run successfully.
job_current_version AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid
FROM jobs j
LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid
WHERE s.current_version_uuid IS NULL
),
job_io AS (
SELECT j.job_uuid,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
FROM job_versions_io_mapping io
INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid
GROUP BY j.job_uuid
),
lineage(job_uuid, inputs, outputs) AS (
SELECT v.job_uuid AS job_uuid,
COALESCE(inputs, Array[]::uuid[]) AS inputs,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM jobs j
INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid
LEFT JOIN job_io io ON io.job_uuid=v.job_uuid
WHERE j.uuid IN (<jobIds>) OR j.symlink_target_uuid IN (<jobIds>)
UNION
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io,
lineage l
WHERE io.job_uuid != l.job_uuid AND
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
AND depth < :depth)
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
FROM lineage l2
INNER JOIN jobs_view j ON j.uuid=l2.job_uuid;
job_io AS (
SELECT
io.job_uuid AS job_uuid,
io.job_symlink_target_uuid AS job_symlink_target_uuid,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='OUTPUT') AS outputs
FROM job_versions_io_mapping io
WHERE io.is_current_job_version = TRUE
GROUP BY io.job_symlink_target_uuid, io.job_uuid
),
lineage(job_uuid, job_symlink_target_uuid, inputs, outputs) AS (
SELECT job_uuid,
job_symlink_target_uuid,
COALESCE(inputs, Array[]::uuid[]) AS inputs,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM job_io
WHERE job_uuid IN (<jobIds>) OR job_symlink_target_uuid IN (<jobIds>)
UNION
SELECT io.job_uuid, io.job_symlink_target_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io, lineage l
WHERE (io.job_uuid != l.job_uuid) AND
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
AND depth < :depth),
lineage_outside_job_io(job_uuid) AS (
SELECT
param_jobs.param_job_uuid as job_uuid,
j.symlink_target_uuid,
Array[]::uuid[] AS inputs,
Array[]::uuid[] AS outputs,
0 AS depth
FROM (SELECT unnest(ARRAY[<jobIds>]::UUID[]) AS param_job_uuid) param_jobs
LEFT JOIN lineage l on param_jobs.param_job_uuid = l.job_uuid
INNER JOIN jobs j ON j.uuid = param_jobs.param_job_uuid
WHERE l.job_uuid IS NULL
)
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
FROM (SELECT * FROM lineage UNION SELECT * FROM lineage_outside_job_io) l2
INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.job_symlink_target_uuid)
""")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

Expand Down
11 changes: 9 additions & 2 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import marquez.common.models.SourceType;
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.JobVersionDao.IoType;
import marquez.db.RunDao.RunUpsert;
import marquez.db.RunDao.RunUpsert.RunUpsertBuilder;
import marquez.db.mappers.LineageEventMapper;
Expand Down Expand Up @@ -362,27 +363,33 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper

// RunInput list uses null as a sentinel value
List<DatasetRecord> datasetInputs = null;
if (event.getInputs() != null) {
if (event.getInputs() != null && !event.getInputs().isEmpty()) {
datasetInputs = new ArrayList<>();
for (Dataset dataset : event.getInputs()) {
DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, true);
datasetInputs.add(record);
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
// mark job_versions_io_mapping as obsolete
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.INPUT);
}
bag.setInputs(Optional.ofNullable(datasetInputs));

// RunInput list uses null as a sentinel value
List<DatasetRecord> datasetOutputs = null;
if (event.getOutputs() != null) {
if (event.getOutputs() != null && !event.getOutputs().isEmpty()) {
datasetOutputs = new ArrayList<>();
for (Dataset dataset : event.getOutputs()) {
DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, false);
datasetOutputs.add(record);
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
// mark job_versions_io_mapping as obsolete
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.OUTPUT);
}

bag.setOutputs(Optional.ofNullable(datasetOutputs));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.migrations;

import lombok.extern.slf4j.Slf4j;
import org.flywaydb.core.api.MigrationVersion;
import org.flywaydb.core.api.migration.Context;
import org.flywaydb.core.api.migration.JavaMigration;
import org.jdbi.v3.core.Jdbi;

@Slf4j
public class V67_2_JobVersionsIOMappingBackfillJob implements JavaMigration {

public static final String UPDATE_QUERY =
"""
UPDATE job_versions_io_mapping
SET
job_uuid = j.uuid,
job_symlink_target_uuid = j.symlink_target_uuid,
is_current_job_version = (jv.uuid = j.current_version_uuid)::BOOLEAN,
made_current_at = NOW()
FROM job_versions jv
INNER JOIN jobs_view j ON j.uuid = jv.job_uuid
WHERE jv.uuid = job_versions_io_mapping.job_version_uuid
""";

@Override
public MigrationVersion getVersion() {
return MigrationVersion.fromVersion("67.2");
}

@Override
public void migrate(Context context) throws Exception {
Jdbi jdbi = Jdbi.create(context.getConnection());
jdbi.withHandle(h -> h.createUpdate(UPDATE_QUERY).execute());
}

@Override
public String getDescription() {
return "Back fill job_uuid and is_current_job_version in job_versions_io_mapping table";
}

@Override
public Integer getChecksum() {
return null;
}

@Override
public boolean isUndo() {
return false;
}

@Override
public boolean canExecuteInTransaction() {
return false;
}

@Override
public boolean isBaselineMigration() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ BEGIN
LEFT JOIN aliases a ON a.link_target_uuid = j.uuid
) j
WHERE jobs.uuid=j.uuid;
UPDATE job_versions_io_mapping
SET job_symlink_target_uuid=j.symlink_target_uuid
FROM jobs j
WHERE job_versions_io_mapping.job_uuid=j.uuid AND j.uuid = NEW.uuid;
END IF;
SELECT * INTO inserted_job FROM jobs_view
WHERE uuid=job_uuid OR (new_symlink_target_uuid IS NOT NULL AND uuid=new_symlink_target_uuid);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ALTER TABLE job_versions_io_mapping ADD COLUMN job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE;
ALTER TABLE job_versions_io_mapping ADD COLUMN job_symlink_target_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE;
ALTER TABLE job_versions_io_mapping ADD COLUMN is_current_job_version boolean DEFAULT FALSE;
ALTER TABLE job_versions_io_mapping ADD COLUMN made_current_at TIMESTAMP;

-- To add job_uuid to the unique constraint, we first drop the primary key, then recreate it; note given that job_version_uuid can be NULL, we need to check that job_version_uuid != NULL before inserting (duplicate columns otherwise)
ALTER TABLE job_versions_io_mapping DROP CONSTRAINT job_versions_io_mapping_pkey;
ALTER TABLE job_versions_io_mapping ALTER COLUMN job_version_uuid DROP NOT NULL;

CREATE INDEX job_versions_io_mapping_job_uuid_job_symlink_target_uuid ON job_versions_io_mapping (job_uuid, job_symlink_target_uuid);

ALTER TABLE job_versions_io_mapping ADD CONSTRAINT job_versions_io_mapping_mapping_pkey UNIQUE (job_version_uuid, dataset_uuid, io_type, job_uuid);
Loading

0 comments on commit b73fb15

Please sign in to comment.