From 46e26da80aba9f11f7418e36168806ead2831dd7 Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Thu, 5 Oct 2023 15:35:02 +0200 Subject: [PATCH] Runless events - refactor job_versions_io_mapping Signed-off-by: Pawel Leszczynski --- .../main/java/marquez/db/JobVersionDao.java | 51 +++++++++++---- api/src/main/java/marquez/db/LineageDao.java | 63 ++++++++++--------- .../main/java/marquez/db/OpenLineageDao.java | 13 +++- .../R__1_Jobs_view_and_rewrite_function.sql | 4 ++ ..._versions_io_mapping_add_job_reference.sql | 15 +++++ ..._versions_io_mapping_add_job_reference.sql | 6 ++ .../java/marquez/db/JobVersionDaoTest.java | 6 +- .../test/java/marquez/db/LineageDaoTest.java | 1 - api/src/test/java/marquez/db/TestingDb.java | 6 +- 9 files changed, 117 insertions(+), 48 deletions(-) create mode 100644 api/src/main/resources/marquez/db/migration/V66__job_versions_io_mapping_add_job_reference.sql create mode 100644 api/src/main/resources/marquez/db/migration/V67__job_versions_io_mapping_add_job_reference.sql diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index 597414d2e8..df3b2fb15c 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -183,38 +183,65 @@ ExtendedJobVersionRow upsertJobVersion( /** * Used to link an input dataset to a given job version. * - * @param jobVersionUuid The unique ID of the job version. * @param inputDatasetUuid The unique ID of the input dataset. + * @param jobUuid The unique ID of the job. */ - default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid) { - upsertInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, IoType.INPUT); + default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid, UUID jobUuid) { + markVersionIOMappingNotCurrent(jobVersionUuid, jobUuid, IoType.INPUT); + upsertCurrentInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, jobUuid, IoType.INPUT); + // TODO: include this in test -> check if jobUuid is set } /** * Used to link an output dataset to a given job version. * - * @param jobVersionUuid The unique ID of the job version. * @param outputDatasetUuid The unique ID of the output dataset. + * @param jobUuid The unique ID of the job. */ - default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid) { - upsertInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, IoType.OUTPUT); + default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid, UUID jobUuid) { + markVersionIOMappingNotCurrent(jobVersionUuid, jobUuid, IoType.OUTPUT); + upsertCurrentInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, jobUuid, IoType.OUTPUT); + // TODO: include this in test -> check if jobUuid is set } + @SqlUpdate( + """ + UPDATE job_versions_io_mapping + SET is_job_version_current = FALSE + WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid) + AND job_version_uuid != :jobVersionUuid + AND io_type = :ioType + AND is_job_version_current = TRUE; + """) + void markVersionIOMappingNotCurrent(UUID jobVersionUuid, UUID jobUuid, IoType ioType); + + @SqlUpdate( + """ + UPDATE job_versions_io_mapping + SET is_job_version_current = FALSE + WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid) + AND io_type = :ioType + AND is_job_version_current = TRUE; + """) + void markVersionIOMappingNotCurrent(UUID jobUuid, IoType ioType); + /** * Used to upsert an input or output dataset to a given job version. * * @param jobVersionUuid The unique ID of the job version. * @param datasetUuid The unique ID of the output dataset * @param ioType The {@link IoType} of the dataset. + * @param jobUuid The unique ID of the job. */ @SqlUpdate( """ INSERT INTO job_versions_io_mapping ( - job_version_uuid, dataset_uuid, io_type) - VALUES (:jobVersionUuid, :datasetUuid, :ioType) - ON CONFLICT DO NOTHING + job_version_uuid, dataset_uuid, io_type, job_uuid, is_job_version_current) + VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, TRUE) + ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_job_version_current = TRUE """) - void upsertInputOrOutputDatasetFor(UUID jobVersionUuid, UUID datasetUuid, IoType ioType); + void upsertCurrentInputOrOutputDatasetFor( + UUID jobVersionUuid, UUID datasetUuid, UUID jobUuid, IoType ioType); /** * Returns the input datasets to a given job version. @@ -344,14 +371,14 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( jobVersionInputs.forEach( jobVersionInput -> { jobVersionDao.upsertInputDatasetFor( - jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid()); + jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid(), jobRow.getUuid()); }); // Link the output datasets to the job version. jobVersionOutputs.forEach( jobVersionOutput -> { jobVersionDao.upsertOutputDatasetFor( - jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid()); + jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid(), jobRow.getUuid()); }); // Link the job version to the run. diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index c45a06e5a9..febc68a84f 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -39,46 +39,51 @@ public interface LineageDao { @SqlQuery( """ WITH RECURSIVE - -- Find the current version of a job or its symlink target if the target has no - -- current_version_uuid. This ensures that we don't lose lineage for a job after it is - -- symlinked to another job but before that target job has run successfully. - job_current_version AS ( - SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, - COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid - FROM jobs j - LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid - WHERE s.current_version_uuid IS NULL - ), - job_io AS ( - SELECT j.job_uuid, - ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs, - ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs + job_io AS ( + SELECT + io.job_uuid AS job_uuid, + io.symlink_target_job_uuid AS symlink_target_job_uuid, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='INPUT') AS inputs, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='OUTPUT') AS outputs FROM job_versions_io_mapping io - INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid - GROUP BY j.job_uuid + WHERE io.is_job_version_current = TRUE + GROUP BY io.symlink_target_job_uuid, io.job_uuid ), - lineage(job_uuid, inputs, outputs) AS ( - SELECT v.job_uuid AS job_uuid, + lineage(job_uuid, symlink_target_job_uuid, inputs, outputs) AS ( + SELECT job_uuid, + symlink_target_job_uuid, COALESCE(inputs, Array[]::uuid[]) AS inputs, COALESCE(outputs, Array[]::uuid[]) AS outputs, 0 AS depth - FROM jobs j - INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid - LEFT JOIN job_io io ON io.job_uuid=v.job_uuid - WHERE j.uuid IN () OR j.symlink_target_uuid IN () + FROM job_io + WHERE job_uuid IN () OR symlink_target_job_uuid IN () UNION - SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1 - FROM job_io io, - lineage l - WHERE io.job_uuid != l.job_uuid AND + SELECT io.job_uuid, io.symlink_target_job_uuid, io.inputs, io.outputs, l.depth + 1 + FROM job_io io, lineage l + WHERE (io.job_uuid != l.job_uuid) AND array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs) - AND depth < :depth) + AND depth < :depth), + lineage_outside_job_io(job_uuid) AS ( + SELECT + param_jobs.param_job_uuid as job_uuid, + j.symlink_target_uuid, + Array[]::uuid[] AS inputs, + Array[]::uuid[] AS outputs, + 0 AS depth + FROM (SELECT unnest(ARRAY[]::UUID[]) AS param_job_uuid) param_jobs + LEFT JOIN lineage l on param_jobs.param_job_uuid = l.job_uuid + INNER JOIN jobs j ON j.uuid = param_jobs.param_job_uuid + WHERE l.job_uuid IS NULL + ) SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids - FROM lineage l2 - INNER JOIN jobs_view j ON j.uuid=l2.job_uuid; + FROM (SELECT * FROM lineage UNION SELECT * FROM lineage_outside_job_io) l2 + INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.symlink_target_job_uuid) """) Set getLineage(@BindList Set jobIds, int depth); + // TODO: verify size of the lineage without (DISCTINCT -> recursion is growing but should not + // happen) + @SqlQuery( """ SELECT ds.*, dv.fields, dv.lifecycle_state diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 41111ae55f..033a7eccfc 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -33,6 +33,7 @@ import marquez.common.models.SourceType; import marquez.db.DatasetFieldDao.DatasetFieldMapping; import marquez.db.JobVersionDao.BagOfJobVersionInfo; +import marquez.db.JobVersionDao.IoType; import marquez.db.mappers.LineageEventMapper; import marquez.db.models.ColumnLineageRow; import marquez.db.models.DatasetFieldRow; @@ -225,6 +226,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper DatasetDao datasetDao = createDatasetDao(); SourceDao sourceDao = createSourceDao(); JobDao jobDao = createJobDao(); + JobVersionDao jobVersionDao = createJobVersionDao(); JobFacetsDao jobFacetsDao = createJobFacetsDao(); DatasetVersionDao datasetVersionDao = createDatasetVersionDao(); DatasetFieldDao datasetFieldDao = createDatasetFieldDao(); @@ -342,7 +344,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper // RunInput list uses null as a sentinel value List datasetInputs = null; - if (event.getInputs() != null) { + if (event.getInputs() != null && !event.getInputs().isEmpty()) { datasetInputs = new ArrayList<>(); for (Dataset dataset : event.getInputs()) { DatasetRecord record = @@ -385,11 +387,15 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper event.getEventType(), facets)); } + } else { + // mark job_versions_io_mapping as non-current + jobVersionDao.markVersionIOMappingNotCurrent(job.getUuid(), IoType.INPUT); } + bag.setInputs(Optional.ofNullable(datasetInputs)); // RunInput list uses null as a sentinel value List datasetOutputs = null; - if (event.getOutputs() != null) { + if (event.getOutputs() != null && !event.getOutputs().isEmpty()) { datasetOutputs = new ArrayList<>(); for (Dataset dataset : event.getOutputs()) { DatasetRecord record = @@ -432,6 +438,9 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper event.getEventType(), facets)); } + } else { + // mark job_versions_io_mapping as non-current + jobVersionDao.markVersionIOMappingNotCurrent(job.getUuid(), IoType.OUTPUT); } bag.setOutputs(Optional.ofNullable(datasetOutputs)); diff --git a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql index eb390b9d5c..c5484f2359 100644 --- a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql +++ b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql @@ -111,6 +111,10 @@ BEGIN LEFT JOIN aliases a ON a.link_target_uuid = j.uuid ) j WHERE jobs.uuid=j.uuid; + UPDATE job_versions_io_mapping + SET symlink_target_job_uuid=j.symlink_target_uuid + FROM jobs j + WHERE job_versions_io_mapping.job_uuid=j.uuid AND j.uuid = NEW.uuid; END IF; SELECT * INTO inserted_job FROM jobs_view WHERE uuid=job_uuid OR (new_symlink_target_uuid IS NOT NULL AND uuid=new_symlink_target_uuid); diff --git a/api/src/main/resources/marquez/db/migration/V66__job_versions_io_mapping_add_job_reference.sql b/api/src/main/resources/marquez/db/migration/V66__job_versions_io_mapping_add_job_reference.sql new file mode 100644 index 0000000000..8aa74599e0 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V66__job_versions_io_mapping_add_job_reference.sql @@ -0,0 +1,15 @@ +ALTER TABLE job_versions_io_mapping ADD COLUMN job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE; +ALTER TABLE job_versions_io_mapping ADD COLUMN symlink_target_job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE; +ALTER TABLE job_versions_io_mapping ADD COLUMN is_job_version_current boolean DEFAULT FALSE; +-- TODO: create index for lineage query and update + +CREATE INDEX job_versions_io_mapping_job_uuid ON job_versions_io_mapping (job_uuid); + +-- To add job_uuid to the unique constraint, we first drop the primary key, then recreate it; note given that job_version_uuid can be NULL, we need to check that job_version_uuid != NULL before inserting (duplicate columns otherwise) +ALTER TABLE job_versions_io_mapping DROP CONSTRAINT job_versions_io_mapping_pkey; +ALTER TABLE job_versions_io_mapping ADD CONSTRAINT job_versions_io_mapping_pkey UNIQUE (job_version_uuid,dataset_uuid,io_type,job_uuid); + +-- TODO: add a test which verifies correctness for UNIQUE <- adds multiple rows to job_versions_io_mapping + +-- TODO: take care of is_current +-- TODO: take care of symlink_job_uuid diff --git a/api/src/main/resources/marquez/db/migration/V67__job_versions_io_mapping_add_job_reference.sql b/api/src/main/resources/marquez/db/migration/V67__job_versions_io_mapping_add_job_reference.sql new file mode 100644 index 0000000000..fe2bd6bf30 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V67__job_versions_io_mapping_add_job_reference.sql @@ -0,0 +1,6 @@ +UPDATE job_versions_io_mapping +SET job_uuid = job_versions.job_uuid + FROM job_versions +WHERE job_versions_io_mapping.job_version_uuid = job_versions.uuid; + +-- TODO: include a test for that, can be a migration test \ No newline at end of file diff --git a/api/src/test/java/marquez/db/JobVersionDaoTest.java b/api/src/test/java/marquez/db/JobVersionDaoTest.java index 45cdbbfa56..56237d54e7 100644 --- a/api/src/test/java/marquez/db/JobVersionDaoTest.java +++ b/api/src/test/java/marquez/db/JobVersionDaoTest.java @@ -172,7 +172,8 @@ public void testGetJobVersion() { .orElseThrow( () -> new IllegalStateException("Can't find test dataset " + ds.getName())); - jobVersionDao.upsertInputDatasetFor(jobVersionRow.getUuid(), dataset.getUuid()); + jobVersionDao.upsertInputDatasetFor( + jobVersionRow.getUuid(), dataset.getUuid(), jobRow.getUuid()); } for (DatasetId ds : jobMeta.getOutputs()) { DatasetRow dataset = @@ -181,7 +182,8 @@ public void testGetJobVersion() { .orElseThrow( () -> new IllegalStateException("Can't find test dataset " + ds.getName())); - jobVersionDao.upsertOutputDatasetFor(jobVersionRow.getUuid(), dataset.getUuid()); + jobVersionDao.upsertOutputDatasetFor( + jobVersionRow.getUuid(), dataset.getUuid(), jobVersionRow.getJobUuid()); } Optional jobVersion = jobVersionDao.findJobVersion(namespaceRow.getName(), jobRow.getName(), version.getValue()); diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 354ab495dc..a2557cb03b 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -169,7 +169,6 @@ public void testGetLineage() { @Test public void testGetLineageForSymlinkedJob() throws SQLException { - UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( openLineageDao, diff --git a/api/src/test/java/marquez/db/TestingDb.java b/api/src/test/java/marquez/db/TestingDb.java index 0ccb2af65f..b01fb07a2d 100644 --- a/api/src/test/java/marquez/db/TestingDb.java +++ b/api/src/test/java/marquez/db/TestingDb.java @@ -186,8 +186,10 @@ JobVersionRow upsert(@NonNull JobVersionRow row) { row.getJobName(), row.getNamespaceUuid(), row.getNamespaceName()); - row.getInputUuids().forEach(in -> dao.upsertInputDatasetFor(row.getUuid(), in)); - row.getInputUuids().forEach(out -> dao.upsertInputDatasetFor(row.getUuid(), out)); + row.getInputUuids() + .forEach(in -> dao.upsertInputDatasetFor(row.getUuid(), in, row.getJobUuid())); + row.getInputUuids() + .forEach(out -> dao.upsertInputDatasetFor(row.getUuid(), out, row.getJobUuid())); // ... delegate.onDemand(JobDao.class).updateVersionFor(row.getJobUuid(), NOW, upserted.getUuid()); return upserted;