From b73fb150e4051d9eaf98211ccdf6fdf1a40896ec Mon Sep 17 00:00:00 2001 From: "pawel.leszczynski" Date: Wed, 13 Dec 2023 08:58:18 +0100 Subject: [PATCH 1/7] Runless events - refactor job_versions_io_mapping (#2654) * get lineage from job_versions_io_mapping table only Signed-off-by: Pawel Leszczynski * add made_current_at field to job_versions Signed-off-by: Pawel Leszczynski --------- Signed-off-by: Pawel Leszczynski --- .../main/java/marquez/db/JobVersionDao.java | 97 +++++++--- api/src/main/java/marquez/db/LineageDao.java | 76 ++++---- .../main/java/marquez/db/OpenLineageDao.java | 11 +- ...V67_2_JobVersionsIOMappingBackfillJob.java | 65 +++++++ .../R__1_Jobs_view_and_rewrite_function.sql | 4 + ..._versions_io_mapping_add_job_reference.sql | 12 ++ .../java/marquez/db/BackfillTestUtils.java | 104 ++++++++++- .../java/marquez/db/JobVersionDaoTest.java | 137 +++++++++++++- api/src/test/java/marquez/db/TestingDb.java | 6 +- ...V67_2_JobFacetsBackfillJobVersionTest.java | 176 ++++++++++++++++++ 10 files changed, 617 insertions(+), 71 deletions(-) create mode 100644 api/src/main/java/marquez/db/migrations/V67_2_JobVersionsIOMappingBackfillJob.java create mode 100644 api/src/main/resources/marquez/db/migration/V67.1__job_versions_io_mapping_add_job_reference.sql create mode 100644 api/src/test/java/marquez/db/migrations/V67_2_JobFacetsBackfillJobVersionTest.java diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index 9173c8109f..23f7a9f118 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -192,40 +192,73 @@ ExtendedJobVersionRow upsertJobVersion( String namespaceName); /** - * Used to link an input dataset to a given job version. + * Used to upsert an input or output dataset to a given job version. * * @param jobVersionUuid The unique ID of the job version. - * @param inputDatasetUuid The unique ID of the input dataset. + * @param datasetUuid The unique ID of the output dataset + * @param ioType The {@link IoType} of the dataset. + * @param jobUuid The unique ID of the job. */ - default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid) { - upsertInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, IoType.INPUT); - } + @SqlUpdate( + """ + INSERT INTO job_versions_io_mapping ( + job_version_uuid, dataset_uuid, io_type, job_uuid, job_symlink_target_uuid, is_current_job_version, made_current_at) + VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, :symlinkTargetJobUuid, TRUE, NOW()) + ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO NOTHING + """) + void upsertCurrentInputOrOutputDatasetFor( + UUID jobVersionUuid, + UUID datasetUuid, + UUID jobUuid, + UUID symlinkTargetJobUuid, + IoType ioType); + + @SqlUpdate( + """ + UPDATE job_versions_io_mapping + SET is_current_job_version = FALSE + WHERE (job_uuid = :jobUuid OR job_symlink_target_uuid = :jobUuid) + AND job_version_uuid != :jobVersionUuid + AND io_type = :ioType + AND is_current_job_version = TRUE; + """) + void markInputOrOutputDatasetAsPreviousFor(UUID jobVersionUuid, UUID jobUuid, IoType ioType); + + @SqlUpdate( + """ + UPDATE job_versions_io_mapping + SET is_current_job_version = FALSE + WHERE (job_uuid = :jobUuid OR job_symlink_target_uuid = :jobUuid) + AND io_type = :ioType + AND is_current_job_version = TRUE; + """) + void markInputOrOutputDatasetAsPreviousFor(UUID jobUuid, IoType ioType); /** - * Used to link an output dataset to a given job version. + * Used to link an input dataset to a given job version. * - * @param jobVersionUuid The unique ID of the job version. - * @param outputDatasetUuid The unique ID of the output dataset. + * @param inputDatasetUuid The unique ID of the input dataset. + * @param jobUuid The unique ID of the job. */ - default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid) { - upsertInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, IoType.OUTPUT); + default void upsertInputDatasetFor( + UUID jobVersionUuid, UUID inputDatasetUuid, UUID jobUuid, UUID symlinkTargetJobUuid) { + markInputOrOutputDatasetAsPreviousFor(jobVersionUuid, jobUuid, IoType.INPUT); + upsertCurrentInputOrOutputDatasetFor( + jobVersionUuid, inputDatasetUuid, jobUuid, symlinkTargetJobUuid, IoType.INPUT); } /** - * Used to upsert an input or output dataset to a given job version. + * Used to link an output dataset to a given job version. * - * @param jobVersionUuid The unique ID of the job version. - * @param datasetUuid The unique ID of the output dataset - * @param ioType The {@link IoType} of the dataset. + * @param outputDatasetUuid The unique ID of the output dataset. + * @param jobUuid The unique ID of the job. */ - @SqlUpdate( - """ - INSERT INTO job_versions_io_mapping ( - job_version_uuid, dataset_uuid, io_type) - VALUES (:jobVersionUuid, :datasetUuid, :ioType) - ON CONFLICT DO NOTHING - """) - void upsertInputOrOutputDatasetFor(UUID jobVersionUuid, UUID datasetUuid, IoType ioType); + default void upsertOutputDatasetFor( + UUID jobVersionUuid, UUID outputDatasetUuid, UUID jobUuid, UUID symlinkTargetJobUuid) { + markInputOrOutputDatasetAsPreviousFor(jobVersionUuid, jobUuid, IoType.OUTPUT); + upsertCurrentInputOrOutputDatasetFor( + jobVersionUuid, outputDatasetUuid, jobUuid, symlinkTargetJobUuid, IoType.OUTPUT); + } /** * Returns the input datasets to a given job version. @@ -366,14 +399,20 @@ default BagOfJobVersionInfo upsertRunlessJobVersion( inputs.forEach( i -> { jobVersionDao.upsertInputDatasetFor( - jobVersionRow.getUuid(), i.getDatasetVersionRow().getDatasetUuid()); + jobVersionRow.getUuid(), + i.getDatasetVersionRow().getDatasetUuid(), + jobVersionRow.getJobUuid(), + jobRow.getSymlinkTargetId()); }); // Link the output datasets to the job version. outputs.forEach( o -> { jobVersionDao.upsertOutputDatasetFor( - jobVersionRow.getUuid(), o.getDatasetVersionRow().getDatasetUuid()); + jobVersionRow.getUuid(), + o.getDatasetVersionRow().getDatasetUuid(), + jobVersionRow.getJobUuid(), + jobRow.getSymlinkTargetId()); }); jobDao.updateVersionFor(jobRow.getUuid(), jobRow.getCreatedAt(), jobVersionRow.getUuid()); @@ -468,14 +507,20 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( jobVersionInputs.forEach( jobVersionInput -> { jobVersionDao.upsertInputDatasetFor( - jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid()); + jobVersionRow.getUuid(), + jobVersionInput.getDatasetUuid(), + jobVersionRow.getJobUuid(), + jobRow.getSymlinkTargetId()); }); // Link the output datasets to the job version. jobVersionOutputs.forEach( jobVersionOutput -> { jobVersionDao.upsertOutputDatasetFor( - jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid()); + jobVersionRow.getUuid(), + jobVersionOutput.getDatasetUuid(), + jobVersionRow.getJobUuid(), + jobRow.getSymlinkTargetId()); }); // Link the job version to the run. diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index 5e520b22a6..6a550a04ba 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -56,43 +56,45 @@ public record UpstreamRunRow(JobSummary job, RunSummary run, DatasetSummary inpu @SqlQuery( """ WITH RECURSIVE - -- Find the current version of a job or its symlink target if the target has no - -- current_version_uuid. This ensures that we don't lose lineage for a job after it is - -- symlinked to another job but before that target job has run successfully. - job_current_version AS ( - SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid, - COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid - FROM jobs j - LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid - WHERE s.current_version_uuid IS NULL - ), - job_io AS ( - SELECT j.job_uuid, - ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs, - ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs - FROM job_versions_io_mapping io - INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid - GROUP BY j.job_uuid - ), - lineage(job_uuid, inputs, outputs) AS ( - SELECT v.job_uuid AS job_uuid, - COALESCE(inputs, Array[]::uuid[]) AS inputs, - COALESCE(outputs, Array[]::uuid[]) AS outputs, - 0 AS depth - FROM jobs j - INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid - LEFT JOIN job_io io ON io.job_uuid=v.job_uuid - WHERE j.uuid IN () OR j.symlink_target_uuid IN () - UNION - SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1 - FROM job_io io, - lineage l - WHERE io.job_uuid != l.job_uuid AND - array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs) - AND depth < :depth) - SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids - FROM lineage l2 - INNER JOIN jobs_view j ON j.uuid=l2.job_uuid; + job_io AS ( + SELECT + io.job_uuid AS job_uuid, + io.job_symlink_target_uuid AS job_symlink_target_uuid, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='INPUT') AS inputs, + ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='OUTPUT') AS outputs + FROM job_versions_io_mapping io + WHERE io.is_current_job_version = TRUE + GROUP BY io.job_symlink_target_uuid, io.job_uuid + ), + lineage(job_uuid, job_symlink_target_uuid, inputs, outputs) AS ( + SELECT job_uuid, + job_symlink_target_uuid, + COALESCE(inputs, Array[]::uuid[]) AS inputs, + COALESCE(outputs, Array[]::uuid[]) AS outputs, + 0 AS depth + FROM job_io + WHERE job_uuid IN () OR job_symlink_target_uuid IN () + UNION + SELECT io.job_uuid, io.job_symlink_target_uuid, io.inputs, io.outputs, l.depth + 1 + FROM job_io io, lineage l + WHERE (io.job_uuid != l.job_uuid) AND + array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs) + AND depth < :depth), + lineage_outside_job_io(job_uuid) AS ( + SELECT + param_jobs.param_job_uuid as job_uuid, + j.symlink_target_uuid, + Array[]::uuid[] AS inputs, + Array[]::uuid[] AS outputs, + 0 AS depth + FROM (SELECT unnest(ARRAY[]::UUID[]) AS param_job_uuid) param_jobs + LEFT JOIN lineage l on param_jobs.param_job_uuid = l.job_uuid + INNER JOIN jobs j ON j.uuid = param_jobs.param_job_uuid + WHERE l.job_uuid IS NULL + ) + SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids + FROM (SELECT * FROM lineage UNION SELECT * FROM lineage_outside_job_io) l2 + INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.job_symlink_target_uuid) """) Set getLineage(@BindList Set jobIds, int depth); diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 2663d7ed7e..a1512e755d 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -33,6 +33,7 @@ import marquez.common.models.SourceType; import marquez.db.DatasetFieldDao.DatasetFieldMapping; import marquez.db.JobVersionDao.BagOfJobVersionInfo; +import marquez.db.JobVersionDao.IoType; import marquez.db.RunDao.RunUpsert; import marquez.db.RunDao.RunUpsert.RunUpsertBuilder; import marquez.db.mappers.LineageEventMapper; @@ -362,7 +363,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper // RunInput list uses null as a sentinel value List datasetInputs = null; - if (event.getInputs() != null) { + if (event.getInputs() != null && !event.getInputs().isEmpty()) { datasetInputs = new ArrayList<>(); for (Dataset dataset : event.getInputs()) { DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, true); @@ -370,12 +371,15 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); } + } else { + // mark job_versions_io_mapping as obsolete + daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.INPUT); } bag.setInputs(Optional.ofNullable(datasetInputs)); // RunInput list uses null as a sentinel value List datasetOutputs = null; - if (event.getOutputs() != null) { + if (event.getOutputs() != null && !event.getOutputs().isEmpty()) { datasetOutputs = new ArrayList<>(); for (Dataset dataset : event.getOutputs()) { DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, false); @@ -383,6 +387,9 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); } + } else { + // mark job_versions_io_mapping as obsolete + daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.OUTPUT); } bag.setOutputs(Optional.ofNullable(datasetOutputs)); diff --git a/api/src/main/java/marquez/db/migrations/V67_2_JobVersionsIOMappingBackfillJob.java b/api/src/main/java/marquez/db/migrations/V67_2_JobVersionsIOMappingBackfillJob.java new file mode 100644 index 0000000000..0e6690d708 --- /dev/null +++ b/api/src/main/java/marquez/db/migrations/V67_2_JobVersionsIOMappingBackfillJob.java @@ -0,0 +1,65 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.migrations; + +import lombok.extern.slf4j.Slf4j; +import org.flywaydb.core.api.MigrationVersion; +import org.flywaydb.core.api.migration.Context; +import org.flywaydb.core.api.migration.JavaMigration; +import org.jdbi.v3.core.Jdbi; + +@Slf4j +public class V67_2_JobVersionsIOMappingBackfillJob implements JavaMigration { + + public static final String UPDATE_QUERY = + """ + UPDATE job_versions_io_mapping + SET + job_uuid = j.uuid, + job_symlink_target_uuid = j.symlink_target_uuid, + is_current_job_version = (jv.uuid = j.current_version_uuid)::BOOLEAN, + made_current_at = NOW() + FROM job_versions jv + INNER JOIN jobs_view j ON j.uuid = jv.job_uuid + WHERE jv.uuid = job_versions_io_mapping.job_version_uuid + """; + + @Override + public MigrationVersion getVersion() { + return MigrationVersion.fromVersion("67.2"); + } + + @Override + public void migrate(Context context) throws Exception { + Jdbi jdbi = Jdbi.create(context.getConnection()); + jdbi.withHandle(h -> h.createUpdate(UPDATE_QUERY).execute()); + } + + @Override + public String getDescription() { + return "Back fill job_uuid and is_current_job_version in job_versions_io_mapping table"; + } + + @Override + public Integer getChecksum() { + return null; + } + + @Override + public boolean isUndo() { + return false; + } + + @Override + public boolean canExecuteInTransaction() { + return false; + } + + @Override + public boolean isBaselineMigration() { + return false; + } +} diff --git a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql index eb390b9d5c..8f22f987a2 100644 --- a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql +++ b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql @@ -111,6 +111,10 @@ BEGIN LEFT JOIN aliases a ON a.link_target_uuid = j.uuid ) j WHERE jobs.uuid=j.uuid; + UPDATE job_versions_io_mapping + SET job_symlink_target_uuid=j.symlink_target_uuid + FROM jobs j + WHERE job_versions_io_mapping.job_uuid=j.uuid AND j.uuid = NEW.uuid; END IF; SELECT * INTO inserted_job FROM jobs_view WHERE uuid=job_uuid OR (new_symlink_target_uuid IS NOT NULL AND uuid=new_symlink_target_uuid); diff --git a/api/src/main/resources/marquez/db/migration/V67.1__job_versions_io_mapping_add_job_reference.sql b/api/src/main/resources/marquez/db/migration/V67.1__job_versions_io_mapping_add_job_reference.sql new file mode 100644 index 0000000000..3491add251 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V67.1__job_versions_io_mapping_add_job_reference.sql @@ -0,0 +1,12 @@ +ALTER TABLE job_versions_io_mapping ADD COLUMN job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE; +ALTER TABLE job_versions_io_mapping ADD COLUMN job_symlink_target_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE; +ALTER TABLE job_versions_io_mapping ADD COLUMN is_current_job_version boolean DEFAULT FALSE; +ALTER TABLE job_versions_io_mapping ADD COLUMN made_current_at TIMESTAMP; + +-- To add job_uuid to the unique constraint, we first drop the primary key, then recreate it; note given that job_version_uuid can be NULL, we need to check that job_version_uuid != NULL before inserting (duplicate columns otherwise) +ALTER TABLE job_versions_io_mapping DROP CONSTRAINT job_versions_io_mapping_pkey; +ALTER TABLE job_versions_io_mapping ALTER COLUMN job_version_uuid DROP NOT NULL; + +CREATE INDEX job_versions_io_mapping_job_uuid_job_symlink_target_uuid ON job_versions_io_mapping (job_uuid, job_symlink_target_uuid); + +ALTER TABLE job_versions_io_mapping ADD CONSTRAINT job_versions_io_mapping_mapping_pkey UNIQUE (job_version_uuid, dataset_uuid, io_type, job_uuid); \ No newline at end of file diff --git a/api/src/test/java/marquez/db/BackfillTestUtils.java b/api/src/test/java/marquez/db/BackfillTestUtils.java index 07b798ed95..16dd9ccfed 100644 --- a/api/src/test/java/marquez/db/BackfillTestUtils.java +++ b/api/src/test/java/marquez/db/BackfillTestUtils.java @@ -17,9 +17,12 @@ import java.util.Optional; import java.util.UUID; import marquez.common.Utils; +import marquez.common.models.DatasetType; +import marquez.db.models.DatasetRow; import marquez.db.models.NamespaceRow; import marquez.db.models.RunArgsRow; import marquez.db.models.RunRow; +import marquez.db.models.SourceRow; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.JobFacet; import marquez.service.models.LineageEvent.JobLink; @@ -115,8 +118,14 @@ INSERT INTO job_versions (uuid, created_at, updated_at, job_uuid, version, locat .job( new LineageEvent.Job( NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP))) - .inputs(Collections.emptyList()) - .outputs(Collections.emptyList()) + .inputs( + Collections.singletonList( + new LineageEvent.Dataset( + "namespace", "dataset_a", LineageEvent.DatasetFacets.builder().build()))) + .outputs( + Collections.singletonList( + new LineageEvent.Dataset( + "namespace", "dataset_b", LineageEvent.DatasetFacets.builder().build()))) .producer(PRODUCER_URL.toString()) .build(); PGobject eventJson = new PGobject(); @@ -158,4 +167,95 @@ INSERT INTO jobs (uuid, type, created_at, updated_at, namespace_uuid, name, name .first(); }); } + + public static UUID writeJobVersion( + Jdbi jdbi, UUID jobUuid, String location, String jobName, NamespaceRow namespace) + throws SQLException { + return jdbi.withHandle( + h -> { + return h.createQuery( + """ + INSERT INTO job_versions ( + uuid, + created_at, + updated_at, + job_uuid, + location, + version, + job_name, + namespace_uuid, + namespace_name + ) + VALUES ( + :uuid, + :created_at, + :updated_at, + :job_uuid, + :location, + :version, + :job_name, + :namespace_uuid, + :namespace_name + ) + RETURNING uuid + """) + .bind("uuid", UUID.randomUUID()) + .bind("created_at", Instant.now()) + .bind("updated_at", Instant.now()) + .bind("job_uuid", jobUuid) + .bind("location", location) + .bind("version", UUID.randomUUID()) + .bind("job_name", jobUuid) + .bind("namespace_uuid", namespace.getUuid()) + .bind("namespace_name", namespace.getName()) + .mapTo(UUID.class) + .first(); + }); + } + + public static DatasetRow writeDataset(Jdbi jdbi, NamespaceRow namespaceRow, String datasetName) { + DatasetDao datasetDao = jdbi.onDemand(DatasetDao.class); + + SourceRow sourceRow = + jdbi.onDemand(SourceDao.class) + .upsert(UUID.randomUUID(), "type", Instant.now(), "name", "http://a"); + + return datasetDao.upsert( + UUID.randomUUID(), + DatasetType.DB_TABLE, + Instant.now(), + namespaceRow.getUuid(), + namespaceRow.getName(), + sourceRow.getUuid(), + "sourceName", + datasetName, + "", + "", + false); + } + + public static UUID writeJobIOMapping(Jdbi jdbi, UUID jobUuid, UUID datasetUuid) + throws SQLException { + return jdbi.withHandle( + h -> { + return h.createQuery( + """ + INSERT INTO job_versions_io_mapping ( + job_version_uuid, + dataset_uuid, + io_type, + job_uuid, + is_current_job_version + ) + VALUES (:job_version_uuid, :dataset_uuid, :io_type, :job_uuid, TRUE) + RETURNING uuid + """) + .bind("job_version_uuid", UUID.randomUUID()) + .bind("dataset_uuid", Instant.now()) + .bind("io_type", Instant.now()) + .bind("job_uuid", jobUuid) + .mapTo(UUID.class) + .first(); + }); + } } diff --git a/api/src/test/java/marquez/db/JobVersionDaoTest.java b/api/src/test/java/marquez/db/JobVersionDaoTest.java index b36f505693..aa0caaf4fc 100644 --- a/api/src/test/java/marquez/db/JobVersionDaoTest.java +++ b/api/src/test/java/marquez/db/JobVersionDaoTest.java @@ -6,12 +6,16 @@ package marquez.db; import static marquez.Generator.newTimestamp; +import static marquez.common.models.CommonModelGenerator.newDescription; import static marquez.common.models.CommonModelGenerator.newJobName; +import static marquez.common.models.CommonModelGenerator.newJobType; import static marquez.common.models.CommonModelGenerator.newLocation; import static marquez.common.models.CommonModelGenerator.newVersion; import static marquez.db.JobVersionDao.BagOfJobVersionInfo; import static marquez.db.models.DbModelGenerator.newRowUuid; +import static marquez.service.models.ServiceModelGenerator.newInputsWith; import static marquez.service.models.ServiceModelGenerator.newJobMetaWith; +import static marquez.service.models.ServiceModelGenerator.newOutputsWith; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -52,6 +56,7 @@ public class JobVersionDaoTest extends BaseIntegrationTest { static Jdbi jdbiForTesting; static DatasetVersionDao datasetVersionDao; + static DatasetDao datasetDao; static JobDao jobDao; static RunDao runDao; static OpenLineageDao openLineageDao; @@ -63,6 +68,7 @@ public class JobVersionDaoTest extends BaseIntegrationTest { @BeforeAll public static void setUpOnce(final Jdbi jdbi) { jdbiForTesting = jdbi; + datasetDao = jdbiForTesting.onDemand(DatasetDao.class); datasetVersionDao = jdbiForTesting.onDemand(DatasetVersionDao.class); jobDao = jdbi.onDemand(JobDao.class); runDao = jdbi.onDemand(RunDao.class); @@ -190,7 +196,11 @@ public void testGetJobVersion() { .orElseThrow( () -> new IllegalStateException("Can't find test dataset " + ds.getName())); - jobVersionDao.upsertInputDatasetFor(jobVersionRow.getUuid(), dataset.getUuid()); + jobVersionDao.upsertInputDatasetFor( + jobVersionRow.getUuid(), + dataset.getUuid(), + jobVersionRow.getJobUuid(), + jobRow.getSymlinkTargetId()); } for (DatasetId ds : jobMeta.getOutputs()) { DatasetRow dataset = @@ -199,7 +209,11 @@ public void testGetJobVersion() { .orElseThrow( () -> new IllegalStateException("Can't find test dataset " + ds.getName())); - jobVersionDao.upsertOutputDatasetFor(jobVersionRow.getUuid(), dataset.getUuid()); + jobVersionDao.upsertOutputDatasetFor( + jobVersionRow.getUuid(), + dataset.getUuid(), + jobVersionRow.getJobUuid(), + jobRow.getSymlinkTargetId()); } Optional jobVersion = jobVersionDao.findJobVersion(namespaceRow.getName(), jobRow.getName(), version.getValue()); @@ -425,4 +439,123 @@ public void testUpsertRunlessJobVersion() { .extracting(JobVersion::getInputs, InstanceOfAssertFactories.list(UUID.class)) .isNotEmpty(); } + + @Test + public void testUpsertDatasetMarksOtherRowsObsolete() { + // (1) Add a new job; the input and output datasets for the job will also be added. + final JobMeta jobMeta = + new JobMeta( + newJobType(), + newInputsWith(NamespaceName.of(namespaceRow.getName()), 1), + newOutputsWith(NamespaceName.of(namespaceRow.getName()), 1), + newLocation(), + newDescription(), + null); + + final JobRow jobRow = + DbTestUtils.newJobWith( + jdbiForTesting, namespaceRow.getName(), newJobName().getValue(), jobMeta); + + // (2) Get UUID of the datasets + DatasetId inputDatasetId = jobMeta.getInputs().stream().findFirst().get(); + DatasetId outputDatasetId = jobMeta.getOutputs().stream().findFirst().get(); + + UUID inputDatasetUuid = + datasetDao + .getUuid(inputDatasetId.getNamespace().getValue(), inputDatasetId.getName().getValue()) + .get() + .getUuid(); + UUID outputDatasetUuid = + datasetDao + .getUuid( + outputDatasetId.getNamespace().getValue(), outputDatasetId.getName().getValue()) + .get() + .getUuid(); + + // (3) Upsert job version row + UUID jobVersionUuid = + jobVersionDao + .upsertJobVersion( + newRowUuid(), + newTimestamp(), + jobRow.getUuid(), + newLocation().toString(), + UUID.randomUUID(), + jobRow.getName(), + namespaceRow.getUuid(), + namespaceRow.getName()) + .getUuid(); + + // (4) upsert job_versions_io rows for each dataset + jobVersionDao.upsertInputDatasetFor( + jobVersionUuid, inputDatasetUuid, jobRow.getUuid(), jobRow.getSymlinkTargetId()); + jobVersionDao.upsertOutputDatasetFor( + jobVersionUuid, outputDatasetUuid, jobRow.getUuid(), jobRow.getSymlinkTargetId()); + + // (5) there should be 2 rows in job_versions_io_mapping + assertThat( + jdbiForTesting + .withHandle( + h -> + h.createQuery( + "SELECT count(*) as cnt FROM job_versions_io_mapping WHERE job_uuid = :jobUuid AND is_current_job_version = TRUE") + .bind("jobUuid", jobRow.getUuid()) + .map(rv -> rv.getColumn("cnt", Integer.class)) + .one()) + .intValue()) + .isEqualTo(2); + + // (2) Modify job - create a new version of it + UUID newJobVersion = UUID.randomUUID(); + ExtendedJobVersionRow newVersionRow = + DbTestUtils.newJobVersion( + jdbiForTesting, + jobRow.getUuid(), + newJobVersion, + jobRow.getName(), + namespaceRow.getUuid(), + namespaceRow.getName()); + + // (4) upsert job_versions_io rows for each dataset + jobVersionDao.upsertInputDatasetFor( + newVersionRow.getUuid(), + inputDatasetUuid, + jobRow.getUuid(), + jobRow.getUuid()); // for testing use symlink job uuid same as job uuid + jobVersionDao.upsertOutputDatasetFor( + newVersionRow.getUuid(), + outputDatasetUuid, + jobRow.getUuid(), + jobRow.getUuid()); // for testing use symlink job uuid same as job uuid + + // (5) Verify input and output datasets if they are the current ones + assertThat( + jdbiForTesting + .withHandle( + h -> + h.createQuery( + "SELECT count(*) as cnt FROM job_versions_io_mapping WHERE job_uuid = :jobUuid") + .bind("jobUuid", jobRow.getUuid()) + .map(rv -> rv.getColumn("cnt", Integer.class)) + .one()) + .intValue()) + .isEqualTo(4); + + assertThat( + jdbiForTesting + .withHandle( + h -> + h.createQuery( + """ + SELECT count(*) as cnt FROM job_versions_io_mapping + WHERE job_uuid = :jobUuid AND is_current_job_version = TRUE + AND job_symlink_target_uuid = :symlinkTargetId + """) + .bind("jobUuid", jobRow.getUuid()) + .bind("symlinkTargetId", jobRow.getUuid()) + .map(rv -> rv.getColumn("cnt", Integer.class)) + .one()) + .intValue()) + .isEqualTo(2); + } } diff --git a/api/src/test/java/marquez/db/TestingDb.java b/api/src/test/java/marquez/db/TestingDb.java index 0ccb2af65f..a1718d6cc5 100644 --- a/api/src/test/java/marquez/db/TestingDb.java +++ b/api/src/test/java/marquez/db/TestingDb.java @@ -186,8 +186,10 @@ JobVersionRow upsert(@NonNull JobVersionRow row) { row.getJobName(), row.getNamespaceUuid(), row.getNamespaceName()); - row.getInputUuids().forEach(in -> dao.upsertInputDatasetFor(row.getUuid(), in)); - row.getInputUuids().forEach(out -> dao.upsertInputDatasetFor(row.getUuid(), out)); + row.getInputUuids() + .forEach(in -> dao.upsertInputDatasetFor(row.getUuid(), in, row.getJobUuid(), null)); + row.getInputUuids() + .forEach(out -> dao.upsertInputDatasetFor(row.getUuid(), out, row.getJobUuid(), null)); // ... delegate.onDemand(JobDao.class).updateVersionFor(row.getJobUuid(), NOW, upserted.getUuid()); return upserted; diff --git a/api/src/test/java/marquez/db/migrations/V67_2_JobFacetsBackfillJobVersionTest.java b/api/src/test/java/marquez/db/migrations/V67_2_JobFacetsBackfillJobVersionTest.java new file mode 100644 index 0000000000..a47a82c91b --- /dev/null +++ b/api/src/test/java/marquez/db/migrations/V67_2_JobFacetsBackfillJobVersionTest.java @@ -0,0 +1,176 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.migrations; + +import static marquez.db.BackfillTestUtils.writeDataset; +import static marquez.db.BackfillTestUtils.writeJob; +import static marquez.db.BackfillTestUtils.writeJobVersion; +import static marquez.db.LineageTestUtils.NAMESPACE; +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.sql.Connection; +import java.sql.SQLException; +import java.time.Instant; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import marquez.db.JobVersionDao.IoType; +import marquez.db.NamespaceDao; +import marquez.db.OpenLineageDao; +import marquez.db.models.DatasetRow; +import marquez.db.models.NamespaceRow; +import marquez.jdbi.JdbiExternalPostgresExtension.FlywaySkipRepeatable; +import marquez.jdbi.JdbiExternalPostgresExtension.FlywayTarget; +import marquez.jdbi.MarquezJdbiExternalPostgresExtension; +import org.flywaydb.core.api.configuration.Configuration; +import org.flywaydb.core.api.migration.Context; +import org.jdbi.v3.core.Jdbi; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Test to validate if a job_uuid, job_symlink_target_uuid and is_current_job_version are filled + * properly within job_versions_io_mapping table + */ +@ExtendWith(MarquezJdbiExternalPostgresExtension.class) +@FlywayTarget("67.2") +@FlywaySkipRepeatable() +@Slf4j +public class V67_2_JobFacetsBackfillJobVersionTest { + + private static V67_2_JobVersionsIOMappingBackfillJob migration = + new V67_2_JobVersionsIOMappingBackfillJob(); + static Jdbi jdbi; + private static OpenLineageDao openLineageDao; + + @BeforeAll + public static void setUpOnce(Jdbi jdbi) { + V67_2_JobFacetsBackfillJobVersionTest.jdbi = jdbi; + openLineageDao = jdbi.onDemand(OpenLineageDao.class); + } + + @Test + public void testBackFill() throws SQLException, JsonProcessingException { + NamespaceDao namespaceDao = jdbi.onDemand(NamespaceDao.class); + Instant now = Instant.now(); + NamespaceRow namespace = + namespaceDao.upsertNamespaceRow(UUID.randomUUID(), now, NAMESPACE, "me"); + + // (1) Write a job + UUID symlinkJobUuid = writeJob(jdbi, "symlink", now, namespace); + UUID jobUuid = writeJob(jdbi, "job", now, namespace); + + // (2) Write a job version + UUID oldJobVersion = writeJobVersion(jdbi, jobUuid, "location", "job", namespace); + UUID currentJobVersion = writeJobVersion(jdbi, jobUuid, "location", "job", namespace); + + jdbi.withHandle( + h -> + h.createUpdate( + """ + UPDATE jobs + SET current_version_uuid = :current_version_uuid, symlink_target_uuid = :symlink_target_uuid + WHERE uuid = :job_uuid + """) + .bind("current_version_uuid", currentJobVersion) + .bind("job_uuid", jobUuid) + .bind("symlink_target_uuid", symlinkJobUuid) + .execute()); + + // (3) Write a dataset + DatasetRow dataset = writeDataset(jdbi, namespace, "some_dataset"); + + // (4) Write a job io mapping + insertJobIOMapping(oldJobVersion, dataset); + insertJobIOMapping(currentJobVersion, dataset); + + // (5) Run Migration + runMigration(); + + // (4) Verify job_version column in job_facets table is updated + assertThat( + jdbi.withHandle( + h -> + h.createQuery( + """ + SELECT count(*) FROM job_versions_io_mapping + WHERE job_version_uuid = :job_version_uuid + AND job_uuid = :job_uuid + AND is_current_job_version = TRUE + AND job_symlink_target_uuid = :symlink_target_uuid + """) + .bind("job_version_uuid", currentJobVersion) + .bind("job_uuid", jobUuid) + .bind("symlink_target_uuid", symlinkJobUuid) + .mapTo(Integer.class) + .findFirst()) + .get()) + .isEqualTo(1); + + assertThat( + jdbi.withHandle( + h -> + h.createQuery( + """ + SELECT count(*) FROM job_versions_io_mapping + WHERE job_version_uuid = :job_version_uuid + AND job_uuid = :job_uuid + AND is_current_job_version = FALSE + AND job_symlink_target_uuid = :symlink_target_uuid + """) + .bind("job_version_uuid", oldJobVersion) + .bind("job_uuid", jobUuid) + .bind("symlink_target_uuid", symlinkJobUuid) + .mapTo(Integer.class) + .findFirst()) + .get()) + .isEqualTo(1); + } + + private static void insertJobIOMapping(UUID jobVersion, DatasetRow dataset) { + jdbi.withHandle( + h -> { + return h.createQuery( + """ + INSERT INTO job_versions_io_mapping ( + job_version_uuid, dataset_uuid, io_type) + VALUES (:job_version_uuid, :dataset_uuid, :io_type) + ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_current_job_version = TRUE + RETURNING job_version_uuid + """) + .bind("job_version_uuid", jobVersion) + .bind("dataset_uuid", dataset.getUuid()) + .bind("io_type", IoType.OUTPUT) + .mapTo(UUID.class) + .first(); + }); + } + + private static void runMigration() { + jdbi.useHandle( + handle -> { + try { + Context context = + new Context() { + @Override + public Configuration getConfiguration() { + return null; + } + + @Override + public Connection getConnection() { + return handle.getConnection(); + } + }; + // apply migrations in order + new V67_2_JobVersionsIOMappingBackfillJob().migrate(context); + } catch (Exception e) { + throw new AssertionError("Unable to execute migration", e); + } + }); + } +} From 2c8613a3bda9e417d22ef23643ca3d882d2d5062 Mon Sep 17 00:00:00 2001 From: "pawel.leszczynski" Date: Thu, 14 Dec 2023 14:01:43 +0100 Subject: [PATCH 2/7] Support streaming jobs in Marquez (#2682) * Support streaming jobs in Marquez Signed-off-by: Pawel Leszczynski * write job versions on running Signed-off-by: Pawel Leszczynski * refactor isStreamingJob method Signed-off-by: Pawel Leszczynski --------- Signed-off-by: Pawel Leszczynski --- CHANGELOG.md | 2 + .../main/java/marquez/db/JobVersionDao.java | 129 +++++++------ .../main/java/marquez/db/OpenLineageDao.java | 72 ++++++-- .../java/marquez/db/mappers/JobMapper.java | 74 ++++++-- .../marquez/service/OpenLineageService.java | 6 +- .../main/java/marquez/service/RunService.java | 10 +- .../main/java/marquez/service/models/Job.java | 6 +- .../marquez/service/models/LineageEvent.java | 53 +++++- .../marquez/ColumnLineageIntegrationTest.java | 4 +- .../java/marquez/db/BackfillTestUtils.java | 4 +- .../marquez/db/ColumnLineageTestUtils.java | 4 +- .../test/java/marquez/db/DatasetDaoTest.java | 2 +- .../java/marquez/db/DatasetFacetsDaoTest.java | 13 +- .../test/java/marquez/db/FacetTestUtils.java | 23 ++- .../java/marquez/db/JobFacetsDaoTest.java | 31 ++-- .../java/marquez/db/JobVersionDaoTest.java | 10 +- .../test/java/marquez/db/LineageDaoTest.java | 12 +- .../java/marquez/db/OpenLineageDaoTest.java | 32 ++-- api/src/test/java/marquez/db/RunDaoTest.java | 10 +- .../java/marquez/db/RunFacetsDaoTest.java | 20 +- .../marquez/db/mappers/JobMapperTest.java | 27 +++ .../migrations/V57_1__BackfillFacetsTest.java | 4 +- .../service/ColumnLineageServiceTest.java | 3 +- .../marquez/service/LineageServiceTest.java | 59 +++++- .../OpenLineageServiceIntegrationTest.java | 174 +++++++++++++++++- .../service/models/LineageEventTest.java | 14 ++ .../resources/open_lineage/event_full.json | 7 + 27 files changed, 634 insertions(+), 171 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bfa3356906..e37062a68f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ *Save into Marquez model datasets sent via `DatasetEvent` event type * API: support `JobEvent` [`#2661`](https://github.com/MarquezProject/marquez/pull/2661) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) *Save into Marquez model jobs and datasets sent via `JobEvent` event type. +* API: support streaming jobs [`#2682`](https://github.com/MarquezProject/marquez/pull/2682) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Creates job version and reference rows at the beginning of the job instead of on complete. Updates job version within the run if anything changes. ## [0.42.0](https://github.com/MarquezProject/marquez/compare/0.41.0...0.42.0) - 2023-10-17 ### Added diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index 23f7a9f118..b4be634f3f 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -280,6 +280,14 @@ default List findOutputDatasetsFor(UUID jobVersionUuid) { return findInputOrOutputDatasetsFor(jobVersionUuid, IoType.OUTPUT); } + /** + * Verifies if a job with a specified job version is present in table. + * + * @param version Version identifier + */ + @SqlQuery("SELECT EXISTS (SELECT 1 FROM job_versions WHERE version = :version)") + boolean versionExists(UUID version); + /** * Returns the input or output datasets for a given job version. * @@ -447,98 +455,73 @@ private static ExtendedDatasetVersionRow toExtendedDatasetVersionRow(DatasetReco * and context. A version for a given job is created only when a {@link Run} transitions * into a {@code COMPLETED}, {@code ABORTED}, or {@code FAILED} state. * - * @param jobRow The job. - * @param runUuid The unique ID of the run associated with the job version. + * @param jobRowRunDetails The job row with run details. * @param runState The current run state. * @param transitionedAt The timestamp of the run state transition. * @return A {@link BagOfJobVersionInfo} object. */ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( - @NonNull JobRow jobRow, - @NonNull UUID runUuid, + @NonNull JobRowRunDetails jobRowRunDetails, @NonNull RunState runState, - @NonNull Instant transitionedAt) { + @NonNull Instant transitionedAt, + boolean linkJobToJobVersion) { // Get the job. final JobDao jobDao = createJobDao(); - // Get the inputs and outputs dataset versions for the run associated with the job version. - final DatasetVersionDao datasetVersionDao = createDatasetVersionDao(); - final List jobVersionInputs = - datasetVersionDao.findInputDatasetVersionsFor(runUuid); - final List jobVersionOutputs = - datasetVersionDao.findOutputDatasetVersionsFor(runUuid); - - // Get the namespace for the job. - final NamespaceRow namespaceRow = - createNamespaceDao().findNamespaceByName(jobRow.getNamespaceName()).get(); - - // Generate the version for the job; the version may already exist. - final Version jobVersion = - Utils.newJobVersionFor( - NamespaceName.of(jobRow.getNamespaceName()), - JobName.of( - Optional.ofNullable(jobRow.getParentJobName()) - .map(pn -> pn + "." + jobRow.getSimpleName()) - .orElse(jobRow.getName())), - toDatasetIds( - jobVersionInputs.stream() - .map(i -> (DatasetVersionRow) i) - .collect(Collectors.toList())), - toDatasetIds( - jobVersionOutputs.stream() - .map(o -> (DatasetVersionRow) o) - .collect(Collectors.toList())), - jobRow.getLocation()); - // Add the job version. final JobVersionDao jobVersionDao = createJobVersionDao(); + final JobVersionRow jobVersionRow = jobVersionDao.upsertJobVersion( UUID.randomUUID(), transitionedAt, // Use the timestamp of when the run state transitioned. - jobRow.getUuid(), - jobRow.getLocation(), - jobVersion.getValue(), - jobRow.getName(), - namespaceRow.getUuid(), - jobRow.getNamespaceName()); + jobRowRunDetails.jobRow.getUuid(), + jobRowRunDetails.jobRow.getLocation(), + jobRowRunDetails.jobVersion.getValue(), + jobRowRunDetails.jobRow.getName(), + jobRowRunDetails.namespaceRow.getUuid(), + jobRowRunDetails.jobRow.getNamespaceName()); // Link the input datasets to the job version. - jobVersionInputs.forEach( + jobRowRunDetails.jobVersionInputs.forEach( jobVersionInput -> { jobVersionDao.upsertInputDatasetFor( jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid(), jobVersionRow.getJobUuid(), - jobRow.getSymlinkTargetId()); + jobRowRunDetails.jobRow.getSymlinkTargetId()); }); // Link the output datasets to the job version. - jobVersionOutputs.forEach( + jobRowRunDetails.jobVersionOutputs.forEach( jobVersionOutput -> { jobVersionDao.upsertOutputDatasetFor( jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid(), jobVersionRow.getJobUuid(), - jobRow.getSymlinkTargetId()); + jobRowRunDetails.jobRow.getSymlinkTargetId()); }); // Link the job version to the run. - createRunDao().updateJobVersion(runUuid, jobVersionRow.getUuid()); + createRunDao().updateJobVersion(jobRowRunDetails.runUuid, jobVersionRow.getUuid()); // Link the run to the job version; multiple run instances may be linked to a job version. - jobVersionDao.updateLatestRunFor(jobVersionRow.getUuid(), transitionedAt, runUuid); + jobVersionDao.updateLatestRunFor( + jobVersionRow.getUuid(), transitionedAt, jobRowRunDetails.runUuid); // Link the job facets to this job version - jobVersionDao.linkJobFacetsToJobVersion(runUuid, jobVersionRow.getUuid()); + jobVersionDao.linkJobFacetsToJobVersion(jobRowRunDetails.runUuid, jobVersionRow.getUuid()); - // Link the job version to the job only if the run is marked done and has transitioned into one - // of the following states: COMPLETED, ABORTED, or FAILED. - if (runState.isDone()) { - jobDao.updateVersionFor(jobRow.getUuid(), transitionedAt, jobVersionRow.getUuid()); + if (linkJobToJobVersion) { + jobDao.updateVersionFor( + jobRowRunDetails.jobRow.getUuid(), transitionedAt, jobVersionRow.getUuid()); } - return new BagOfJobVersionInfo(jobRow, jobVersionRow, jobVersionInputs, jobVersionOutputs); + return new BagOfJobVersionInfo( + jobRowRunDetails.jobRow, + jobVersionRow, + jobRowRunDetails.jobVersionInputs, + jobRowRunDetails.jobVersionOutputs); } /** Returns the specified {@link ExtendedDatasetVersionRow}s as {@link DatasetId}s. */ @@ -556,6 +539,40 @@ private DatasetId toDatasetId(DatasetVersionRow dataset) { NamespaceName.of(dataset.getNamespaceName()), DatasetName.of(dataset.getDatasetName())); } + default JobRowRunDetails loadJobRowRunDetails(JobRow jobRow, UUID runUuid) { + // Get the inputs and outputs dataset versions for the run associated with the job version. + final DatasetVersionDao datasetVersionDao = createDatasetVersionDao(); + final List jobVersionInputs = + datasetVersionDao.findInputDatasetVersionsFor(runUuid); + final List jobVersionOutputs = + datasetVersionDao.findOutputDatasetVersionsFor(runUuid); + + // Get the namespace for the job. + final NamespaceRow namespaceRow = + createNamespaceDao().findNamespaceByName(jobRow.getNamespaceName()).get(); + + // Generate the version for the job; the version may already exist. + final Version jobVersion = + Utils.newJobVersionFor( + NamespaceName.of(jobRow.getNamespaceName()), + JobName.of( + Optional.ofNullable(jobRow.getParentJobName()) + .map(pn -> pn + "." + jobRow.getSimpleName()) + .orElse(jobRow.getName())), + toDatasetIds( + jobVersionInputs.stream() + .map(i -> (DatasetVersionRow) i) + .collect(Collectors.toList())), + toDatasetIds( + jobVersionOutputs.stream() + .map(o -> (DatasetVersionRow) o) + .collect(Collectors.toList())), + jobRow.getLocation()); + + return new JobRowRunDetails( + jobRow, runUuid, namespaceRow, jobVersionInputs, jobVersionOutputs, jobVersion); + } + /** A container class for job version info. */ @Value class BagOfJobVersionInfo { @@ -567,6 +584,14 @@ class BagOfJobVersionInfo { record JobDataset(String namespace, String name, IoType ioType) {} + record JobRowRunDetails( + JobRow jobRow, + UUID runUuid, + NamespaceRow namespaceRow, + List jobVersionInputs, + List jobVersionOutputs, + Version jobVersion) {} + class JobDatasetMapper implements RowMapper { @Override public JobDataset map(ResultSet rs, StatementContext ctx) throws SQLException { diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index a1512e755d..2963f56ce2 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -27,13 +27,13 @@ import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; import marquez.common.models.DatasetType; -import marquez.common.models.JobType; import marquez.common.models.NamespaceName; import marquez.common.models.RunState; import marquez.common.models.SourceType; import marquez.db.DatasetFieldDao.DatasetFieldMapping; import marquez.db.JobVersionDao.BagOfJobVersionInfo; import marquez.db.JobVersionDao.IoType; +import marquez.db.JobVersionDao.JobRowRunDetails; import marquez.db.RunDao.RunUpsert; import marquez.db.RunDao.RunUpsert.RunUpsertBuilder; import marquez.db.mappers.LineageEventMapper; @@ -167,9 +167,13 @@ SELECT count(*) default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) { UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper); RunState runState = getRunState(event.getEventType()); - if (event.getEventType() != null && runState.isDone()) { + + if (event.getJob() != null && event.getJob().isStreamingJob()) { + updateMarquezOnStreamingJob(event, updateLineageRow, runState); + } else if (event.getEventType() != null && runState.isDone()) { updateMarquezOnComplete(event, updateLineageRow, runState); } + return updateLineageRow; } @@ -559,7 +563,7 @@ private JobRow buildJobFromEvent( jobDao.upsertJob( UUID.randomUUID(), parent.getUuid(), - getJobType(job), + job.type(), now, namespace.getUuid(), namespace.getName(), @@ -572,7 +576,7 @@ private JobRow buildJobFromEvent( () -> jobDao.upsertJob( UUID.randomUUID(), - getJobType(job), + job.type(), now, namespace.getUuid(), namespace.getName(), @@ -680,7 +684,7 @@ private JobRow createParentJobRunRecord( createJobDao() .upsertJob( UUID.randomUUID(), - getJobType(job), + job.type(), now, namespace.getUuid(), namespace.getName(), @@ -745,16 +749,58 @@ default Set toDatasetId(List datasets) { default void updateMarquezOnComplete( LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) { + final JobVersionDao jobVersionDao = createJobVersionDao(); + // Link the job version to the job only if the run is marked done and has transitioned into one + // of the following states: COMPLETED, ABORTED, or FAILED. + final boolean linkJobToJobVersion = runState.isDone(); + BagOfJobVersionInfo bagOfJobVersionInfo = - createJobVersionDao() - .upsertJobVersionOnRunTransition( - updateLineageRow.getJob(), - updateLineageRow.getRun().getUuid(), - runState, - event.getEventTime().toInstant()); + jobVersionDao.upsertJobVersionOnRunTransition( + jobVersionDao.loadJobRowRunDetails( + updateLineageRow.getJob(), updateLineageRow.getRun().getUuid()), + runState, + event.getEventTime().toInstant(), + linkJobToJobVersion); updateLineageRow.setJobVersionBag(bagOfJobVersionInfo); } + /** + * A separate method is used as the logic to update Marquez model differs for streaming and batch. + * The assumption for batch is that the job version is created when task is done and cumulative + * list of input and output datasets from all the events is used to compute the job version UUID. + * However, this wouldn't make sense for streaming jobs, which are mostly long living and produce + * output before completing. + * + *

In this case, a job version is created based on the list of input and output datasets + * referenced by this job. If a job starts with inputs:{A,B} and outputs:{C}, new job version is + * created immediately at job start. If a following event produces inputs:{A}, outputs:{C}, then + * the union of all datasets registered within this job does not change, and thus job version does + * not get modified. In case of receiving another event with no inputs nor outputs, job version + * still will not get modified as its hash is evaluated based on the datasets attached to the run. + * + *

However, in case of event with inputs:{A,B,D} and outputs:{C}, new hash gets computed and + * new job version row is inserted into the table. + * + * @param event + * @param updateLineageRow + * @param runState + */ + default void updateMarquezOnStreamingJob( + LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) { + final JobVersionDao jobVersionDao = createJobVersionDao(); + JobRowRunDetails jobRowRunDetails = + jobVersionDao.loadJobRowRunDetails( + updateLineageRow.getJob(), updateLineageRow.getRun().getUuid()); + + if (!jobVersionDao.versionExists(jobRowRunDetails.jobVersion().getValue())) { + // need to insert new job version + BagOfJobVersionInfo bagOfJobVersionInfo = + jobVersionDao.upsertJobVersionOnRunTransition( + jobRowRunDetails, runState, event.getEventTime().toInstant(), true); + updateLineageRow.setJobVersionBag(bagOfJobVersionInfo); + } + } + default String getUrlOrNull(String uri) { try { return new URI(uri).toASCIIString(); @@ -772,10 +818,6 @@ default String formatNamespaceName(String namespace) { return namespace.replaceAll("[^a-z:/A-Z0-9\\-_.@+]", "_"); } - default JobType getJobType(Job job) { - return JobType.BATCH; - } - default DatasetRecord upsertLineageDataset( ModelDaos daos, Dataset ds, Instant now, UUID runUuid, boolean isInput) { daos.initBaseDao(this); diff --git a/api/src/main/java/marquez/db/mappers/JobMapper.java b/api/src/main/java/marquez/db/mappers/JobMapper.java index c331dc413b..1592f9247e 100644 --- a/api/src/main/java/marquez/db/mappers/JobMapper.java +++ b/api/src/main/java/marquez/db/mappers/JobMapper.java @@ -15,9 +15,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableList.Builder; +import com.google.common.collect.ImmutableMap; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashSet; +import java.util.Map; +import java.util.Optional; import java.util.Set; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; @@ -36,29 +41,35 @@ @Slf4j public final class JobMapper implements RowMapper { private static final ObjectMapper MAPPER = Utils.getMapper(); + private static final String JOB_TYPE_FACET_NAME = "jobType"; @Override public Job map(@NonNull ResultSet results, @NonNull StatementContext context) throws SQLException { - return new Job( - new JobId( - NamespaceName.of(stringOrThrow(results, Columns.NAMESPACE_NAME)), - JobName.of(stringOrThrow(results, Columns.NAME))), - JobType.valueOf(stringOrThrow(results, Columns.TYPE)), - JobName.of(stringOrThrow(results, Columns.NAME)), - stringOrThrow(results, Columns.SIMPLE_NAME), - stringOrNull(results, Columns.PARENT_JOB_NAME), - timestampOrThrow(results, Columns.CREATED_AT), - timestampOrThrow(results, Columns.UPDATED_AT), - getDatasetFromJsonOrNull(results, "current_inputs"), - new HashSet<>(), - urlOrNull(results, "current_location"), - stringOrNull(results, Columns.DESCRIPTION), - // Latest Run is resolved in the JobDao. This can be brought in via a join and - // and a jsonb but custom deserializers will need to be introduced - null, - toFacetsOrNull(results, Columns.FACETS), - uuidOrNull(results, Columns.CURRENT_VERSION_UUID)); + ImmutableMap facetsOrNull = toFacetsOrNull(results, Columns.FACETS); + Job job = + new Job( + new JobId( + NamespaceName.of(stringOrThrow(results, Columns.NAMESPACE_NAME)), + JobName.of(stringOrThrow(results, Columns.NAME))), + JobType.valueOf( + stringOrThrow(results, Columns.TYPE)), // TODO: store job type in a table + JobName.of(stringOrThrow(results, Columns.NAME)), + stringOrThrow(results, Columns.SIMPLE_NAME), + stringOrNull(results, Columns.PARENT_JOB_NAME), + timestampOrThrow(results, Columns.CREATED_AT), + timestampOrThrow(results, Columns.UPDATED_AT), + getDatasetFromJsonOrNull(results, "current_inputs"), + new HashSet<>(), + urlOrNull(results, "current_location"), + stringOrNull(results, Columns.DESCRIPTION), + // Latest Run is resolved in the JobDao. This can be brought in via a join and + // and a jsonb but custom deserializers will need to be introduced + null, + facetsOrNull, + uuidOrNull(results, Columns.CURRENT_VERSION_UUID), + getLabels(facetsOrNull)); + return job; } Set getDatasetFromJsonOrNull(@NonNull ResultSet results, String column) @@ -78,4 +89,29 @@ Set getDatasetFromJsonOrNull(@NonNull ResultSet results, String colum return new HashSet<>(); } } + + private ImmutableList getLabels(ImmutableMap facetsOrNull) { + Builder builder = ImmutableList.builder(); + + if (facetsOrNull == null) { + return builder.build(); + } + + Optional.ofNullable(getJobTypeFacetField(facetsOrNull, "jobType")) + .ifPresent(e -> builder.add(e)); + + Optional.ofNullable(getJobTypeFacetField(facetsOrNull, "integration")) + .ifPresent(e -> builder.add(e)); + + return builder.build(); + } + + private String getJobTypeFacetField(ImmutableMap facetsOrNull, String field) { + return Optional.ofNullable(facetsOrNull.get(JOB_TYPE_FACET_NAME)) + .filter(o -> o instanceof Map) + .map(m -> (Map) m) + .filter(m -> m.containsKey(field)) + .map(m -> m.get(field).toString()) + .orElse(null); + } } diff --git a/api/src/main/java/marquez/service/OpenLineageService.java b/api/src/main/java/marquez/service/OpenLineageService.java index 5c88ebe3ff..4f2ec6f09e 100644 --- a/api/src/main/java/marquez/service/OpenLineageService.java +++ b/api/src/main/java/marquez/service/OpenLineageService.java @@ -142,7 +142,11 @@ public CompletableFuture createAsync(LineageEvent event) { .thenAccept( (update) -> { if (event.getEventType() != null) { - if (event.getEventType().equalsIgnoreCase("COMPLETE")) { + boolean isStreaming = + Optional.ofNullable(event.getJob()) + .map(j -> j.isStreamingJob()) + .orElse(false); + if (event.getEventType().equalsIgnoreCase("COMPLETE") || isStreaming) { buildJobOutputUpdate(update).ifPresent(runService::notify); } buildJobInputUpdate(update).ifPresent(runService::notify); diff --git a/api/src/main/java/marquez/service/RunService.java b/api/src/main/java/marquez/service/RunService.java index 27144e1833..05c3957360 100644 --- a/api/src/main/java/marquez/service/RunService.java +++ b/api/src/main/java/marquez/service/RunService.java @@ -84,14 +84,14 @@ public void markRunAs( runStateDao.updateRunStateFor(runId.getValue(), runState, transitionedAt); if (runState.isDone()) { + JobRow jobRow = + jobDao.findJobByNameAsRow(runRow.getNamespaceName(), runRow.getJobName()).orElseThrow(); BagOfJobVersionInfo bagOfJobVersionInfo = jobVersionDao.upsertJobVersionOnRunTransition( - jobDao - .findJobByNameAsRow(runRow.getNamespaceName(), runRow.getJobName()) - .orElseThrow(), - runRow.getUuid(), + jobVersionDao.loadJobRowRunDetails(jobRow, runRow.getUuid()), runState, - transitionedAt); + transitionedAt, + true); // TODO: We should also notify that the outputs have been updated when a run is in a done // state to be consistent with existing job versioning logic. We'll want to add testing to diff --git a/api/src/main/java/marquez/service/models/Job.java b/api/src/main/java/marquez/service/models/Job.java index 4ed2d3fbd4..bcdaee1bfb 100644 --- a/api/src/main/java/marquez/service/models/Job.java +++ b/api/src/main/java/marquez/service/models/Job.java @@ -5,6 +5,7 @@ package marquez.service.models; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.net.URL; import java.time.Instant; @@ -41,6 +42,7 @@ public final class Job { @Nullable @Setter private Run latestRun; @Getter private final ImmutableMap facets; @Nullable private UUID currentVersion; + @Getter @Nullable private ImmutableList labels; public Job( @NonNull final JobId id, @@ -56,7 +58,8 @@ public Job( @Nullable final String description, @Nullable final Run latestRun, @Nullable final ImmutableMap facets, - @Nullable UUID currentVersion) { + @Nullable UUID currentVersion, + @Nullable ImmutableList labels) { this.id = id; this.type = type; this.name = name; @@ -72,6 +75,7 @@ public Job( this.latestRun = latestRun; this.facets = (facets == null) ? ImmutableMap.of() : facets; this.currentVersion = currentVersion; + this.labels = (labels == null) ? ImmutableList.of() : labels; } public Optional getLocation() { diff --git a/api/src/main/java/marquez/service/models/LineageEvent.java b/api/src/main/java/marquez/service/models/LineageEvent.java index 6117e1fc3d..05820cd53d 100644 --- a/api/src/main/java/marquez/service/models/LineageEvent.java +++ b/api/src/main/java/marquez/service/models/LineageEvent.java @@ -15,6 +15,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import javax.annotation.Nullable; import javax.validation.Valid; import javax.validation.constraints.NotNull; @@ -24,6 +25,7 @@ import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; +import marquez.common.models.JobType; /** * Requires jackson serialization features: mapper.registerModule(new JavaTimeModule()); @@ -206,6 +208,25 @@ public static class Job extends BaseJsonModel { @NotNull private String namespace; @NotNull private String name; @Valid private JobFacet facets; + + /** + * Verifies if a job is a streaming job. + * + * @return + */ + @JsonIgnore + public boolean isStreamingJob() { + return Optional.ofNullable(this.facets) + .map(JobFacet::getJobType) + .map(JobTypeJobFacet::getProcessingType) + .filter(type -> type.equalsIgnoreCase("STREAMING")) + .isPresent(); + } + + @JsonIgnore + public JobType type() { + return isStreamingJob() ? JobType.STREAM : JobType.BATCH; + } } @Builder @@ -214,12 +235,13 @@ public static class Job extends BaseJsonModel { @Setter @Valid @ToString - @JsonPropertyOrder({"documentation", "sourceCodeLocation", "sql", "description"}) + @JsonPropertyOrder({"documentation", "sourceCodeLocation", "sql", "description", "jobType"}) public static class JobFacet { @Valid private DocumentationJobFacet documentation; @Valid private SourceCodeLocationJobFacet sourceCodeLocation; @Valid private SQLJobFacet sql; + @Valid private JobTypeJobFacet jobType; @Builder.Default @JsonIgnore private Map additional = new LinkedHashMap<>(); @JsonAnySetter @@ -240,6 +262,10 @@ public SourceCodeLocationJobFacet getSourceCodeLocation() { return sourceCodeLocation; } + public JobTypeJobFacet getJobType() { + return jobType; + } + public SQLJobFacet getSql() { return sql; } @@ -297,6 +323,31 @@ public SQLJobFacet(@NotNull URI _producer, @NotNull URI _schemaURL, @NotNull Str } } + @NoArgsConstructor + @Getter + @Setter + @Valid + @ToString + public static class JobTypeJobFacet extends BaseFacet { + + @NotNull private String processingType; + @NotNull private String integration; + @NotNull private String jobType; + + @Builder + public JobTypeJobFacet( + @NotNull URI _producer, + @NotNull URI _schemaURL, + @NotNull String processingType, + @NotNull String integration, + @NotNull String jobType) { + super(_producer, _schemaURL); + this.processingType = processingType; + this.integration = integration; + this.jobType = jobType; + } + } + @Builder @AllArgsConstructor @NoArgsConstructor diff --git a/api/src/test/java/marquez/ColumnLineageIntegrationTest.java b/api/src/test/java/marquez/ColumnLineageIntegrationTest.java index c161c2e672..bca7686e73 100644 --- a/api/src/test/java/marquez/ColumnLineageIntegrationTest.java +++ b/api/src/test/java/marquez/ColumnLineageIntegrationTest.java @@ -23,6 +23,7 @@ import marquez.db.OpenLineageDao; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.JobFacet; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -37,8 +38,7 @@ public class ColumnLineageIntegrationTest extends BaseIntegrationTest { public void setup(Jdbi jdbi) { OpenLineageDao openLineageDao = jdbi.onDemand(OpenLineageDao.class); - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = JobFacet.builder().build(); LineageEvent.Dataset dataset_A = getDatasetA(); LineageEvent.Dataset dataset_B = getDatasetB(); diff --git a/api/src/test/java/marquez/db/BackfillTestUtils.java b/api/src/test/java/marquez/db/BackfillTestUtils.java index 16dd9ccfed..fe3509b663 100644 --- a/api/src/test/java/marquez/db/BackfillTestUtils.java +++ b/api/src/test/java/marquez/db/BackfillTestUtils.java @@ -115,9 +115,7 @@ INSERT INTO job_versions (uuid, created_at, updated_at, job_uuid, version, locat nominalTimeRunFacet, parentRun.orElse(null), ImmutableMap.of("airflow_version", ImmutableMap.of("version", "abc"))))) - .job( - new LineageEvent.Job( - NAMESPACE, jobName, new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP))) + .job(new LineageEvent.Job(NAMESPACE, jobName, JobFacet.builder().build())) .inputs( Collections.singletonList( new LineageEvent.Dataset( diff --git a/api/src/test/java/marquez/db/ColumnLineageTestUtils.java b/api/src/test/java/marquez/db/ColumnLineageTestUtils.java index 571704c5c3..813b07b29a 100644 --- a/api/src/test/java/marquez/db/ColumnLineageTestUtils.java +++ b/api/src/test/java/marquez/db/ColumnLineageTestUtils.java @@ -14,6 +14,7 @@ import marquez.api.JdbiUtils; import marquez.db.models.UpdateLineageRow; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.JobFacet; import org.jdbi.v3.core.Jdbi; public class ColumnLineageTestUtils { @@ -110,8 +111,7 @@ public static LineageEvent.Dataset getDatasetC() { public static UpdateLineageRow createLineage( OpenLineageDao openLineageDao, LineageEvent.Dataset input, LineageEvent.Dataset output) { - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = JobFacet.builder().build(); return LineageTestUtils.createLineageRow( openLineageDao, "job_" + UUID.randomUUID(), diff --git a/api/src/test/java/marquez/db/DatasetDaoTest.java b/api/src/test/java/marquez/db/DatasetDaoTest.java index 930cf1a4b4..b27324c343 100644 --- a/api/src/test/java/marquez/db/DatasetDaoTest.java +++ b/api/src/test/java/marquez/db/DatasetDaoTest.java @@ -41,7 +41,7 @@ class DatasetDaoTest { private static DatasetDao datasetDao; private static OpenLineageDao openLineageDao; - private final JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + private final JobFacet jobFacet = JobFacet.builder().build(); static Jdbi jdbi; diff --git a/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java b/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java index 6a4e9e0ed3..01d3c3bd19 100644 --- a/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java @@ -20,6 +20,7 @@ import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.Dataset; +import marquez.service.models.LineageEvent.JobFacet; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -308,8 +309,7 @@ public void testInsertDatasetFacetsForUnknownTypeFacet() { @Test public void testInsertOutputDatasetFacetsFor() { - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow lineageRow = LineageTestUtils.createLineageRow( @@ -340,8 +340,7 @@ public void testInsertOutputDatasetFacetsFor() { @Test public void testInsertInputDatasetFacetsFor() { - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow lineageRow = LineageTestUtils.createLineageRow( @@ -372,8 +371,7 @@ public void testInsertInputDatasetFacetsFor() { private UpdateLineageRow createLineageRowWithInputDataset( LineageEvent.DatasetFacets.DatasetFacetsBuilder inputDatasetFacetsbuilder) { - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = JobFacet.builder().build(); return LineageTestUtils.createLineageRow( openLineageDao, @@ -389,8 +387,7 @@ private UpdateLineageRow createLineageRowWithInputDataset( private UpdateLineageRow createLineageRowWithOutputDataset( LineageEvent.DatasetFacets.DatasetFacetsBuilder outputDatasetFacetsbuilder) { - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = JobFacet.builder().build(); return LineageTestUtils.createLineageRow( openLineageDao, diff --git a/api/src/test/java/marquez/db/FacetTestUtils.java b/api/src/test/java/marquez/db/FacetTestUtils.java index 5e78e9b689..ea516ab679 100644 --- a/api/src/test/java/marquez/db/FacetTestUtils.java +++ b/api/src/test/java/marquez/db/FacetTestUtils.java @@ -9,6 +9,7 @@ import java.util.UUID; import marquez.db.models.UpdateLineageRow; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.JobFacet; import org.apache.commons.lang3.StringUtils; import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; @@ -20,14 +21,20 @@ public class FacetTestUtils { public static UpdateLineageRow createLineageWithFacets(OpenLineageDao openLineageDao) { LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet( - new LineageEvent.DocumentationJobFacet(PRODUCER_URL, SCHEMA_URL, "some-documentation"), - new LineageEvent.SourceCodeLocationJobFacet( - PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"), - new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query"), - Map.of( - "ownership", "some-owner", - "sourceCode", "some-code")); + JobFacet.builder() + .documentation( + new LineageEvent.DocumentationJobFacet( + PRODUCER_URL, SCHEMA_URL, "some-documentation")) + .sourceCodeLocation( + new LineageEvent.SourceCodeLocationJobFacet( + PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git")) + .sql(new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query")) + .additional( + Map.of( + "ownership", "some-owner", + "sourceCode", "some-code")) + .build(); + return LineageTestUtils.createLineageRow( openLineageDao, "job_" + UUID.randomUUID(), diff --git a/api/src/test/java/marquez/db/JobFacetsDaoTest.java b/api/src/test/java/marquez/db/JobFacetsDaoTest.java index e6b48e7d7d..91a60c1157 100644 --- a/api/src/test/java/marquez/db/JobFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/JobFacetsDaoTest.java @@ -18,6 +18,7 @@ import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.JobFacet; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -63,12 +64,14 @@ public void insertJobFacets() { openLineageDao, "job_" + UUID.randomUUID(), "COMPLETE", - new LineageEvent.JobFacet( - null, - new LineageEvent.SourceCodeLocationJobFacet( - PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"), - null, - LineageTestUtils.EMPTY_MAP), + JobFacet.builder() + .sourceCodeLocation( + new LineageEvent.SourceCodeLocationJobFacet( + PRODUCER_URL, + SCHEMA_URL, + "git", + "git@github.com:OpenLineage/OpenLineage.git")) + .build(), Collections.emptyList(), Collections.emptyList()); @@ -88,12 +91,16 @@ public void insertJobFacets() { @Test public void testGetFacetsByRunUuid() { LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet( - new LineageEvent.DocumentationJobFacet(PRODUCER_URL, SCHEMA_URL, "some-documentation"), - new LineageEvent.SourceCodeLocationJobFacet( - PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"), - new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query"), - null); + JobFacet.builder() + .documentation( + new LineageEvent.DocumentationJobFacet( + PRODUCER_URL, SCHEMA_URL, "some-documentation")) + .sourceCodeLocation( + new LineageEvent.SourceCodeLocationJobFacet( + PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git")) + .sql(new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query")) + .build(); + UpdateLineageRow lineageRow = LineageTestUtils.createLineageRow( openLineageDao, diff --git a/api/src/test/java/marquez/db/JobVersionDaoTest.java b/api/src/test/java/marquez/db/JobVersionDaoTest.java index aa0caaf4fc..b2d3a6b91a 100644 --- a/api/src/test/java/marquez/db/JobVersionDaoTest.java +++ b/api/src/test/java/marquez/db/JobVersionDaoTest.java @@ -242,7 +242,10 @@ public void testGetJobVersions() { jdbiForTesting, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs()); jobVersionDao.upsertJobVersionOnRunTransition( - jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now()); + jobVersionDao.loadJobRowRunDetails(jobRow, runRow.getUuid()), + RunState.COMPLETED, + Instant.now(), + true); List jobVersions = jobVersionDao.findAllJobVersions(namespaceRow.getName(), jobRow.getName(), 10, 0); @@ -311,7 +314,10 @@ public void testUpsertJobVersionOnRunTransition() { // (6) Add a new job version on the run state transition to COMPLETED. final BagOfJobVersionInfo bagOfJobVersionInfo = jobVersionDao.upsertJobVersionOnRunTransition( - jobRow, runRow.getUuid(), RunState.COMPLETED, newTimestamp()); + jobVersionDao.loadJobRowRunDetails(jobRow, runRow.getUuid()), + RunState.COMPLETED, + newTimestamp(), + true); // Ensure the job version is associated with the latest run. final RunRow latestRunRowForJobVersion = runDao.findRunByUuidAsRow(runRow.getUuid()).get(); diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index aed23901a8..006275508f 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -67,7 +67,7 @@ public class LineageDaoTest { new SchemaField("firstname", "string", "the first name"), new SchemaField("lastname", "string", "the last name"), new SchemaField("birthdate", "date", "the date of birth"))); - private final JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + private final JobFacet jobFacet = JobFacet.builder().build(); static Jdbi jdbi; @@ -393,7 +393,7 @@ public void testGetLineageWithJobThatSharesNoDatasets() { /** A failed consumer job doesn't show up in the datasets out edges */ @Test public void testGetLineageWithFailedConsumer() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( @@ -513,7 +513,7 @@ public void testGetInputDatasetsWithJobThatHasMultipleVersions() { /** A failed producer job doesn't show up in the lineage */ @Test public void testGetLineageWithFailedProducer() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( @@ -542,7 +542,7 @@ public void testGetLineageWithFailedProducer() { /** A failed producer job doesn't show up in the lineage */ @Test public void testGetLineageChangedJobVersion() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( @@ -568,7 +568,7 @@ public void testGetLineageChangedJobVersion() { @Test public void testGetJobFromInputOrOutput() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( @@ -592,7 +592,7 @@ public void testGetJobFromInputOrOutput() { @Test public void testGetJobFromInputOrOutputPrefersRecentOutputJob() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); // add some consumer jobs prior to the write so we know that the sort isn't simply picking // the first job created diff --git a/api/src/test/java/marquez/db/OpenLineageDaoTest.java b/api/src/test/java/marquez/db/OpenLineageDaoTest.java index e21a1070f3..aaca26b6b0 100644 --- a/api/src/test/java/marquez/db/OpenLineageDaoTest.java +++ b/api/src/test/java/marquez/db/OpenLineageDaoTest.java @@ -73,7 +73,7 @@ public static void setUpOnce(Jdbi jdbi) { /** When reading a dataset, the version is assumed to be the version last written */ @Test void testUpdateMarquezModel() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -117,11 +117,9 @@ void testUpdateMarquezModelWithDatasetEvent() { @Test void testUpdateMarquezModelWithJobEvent() { JobFacet jobFacet = - new JobFacet( - DocumentationJobFacet.builder().description("documentation").build(), - null, - null, - LineageTestUtils.EMPTY_MAP); + JobFacet.builder() + .documentation(DocumentationJobFacet.builder().description("documentation").build()) + .build(); Job job = new Job(NAMESPACE, READ_JOB_NAME, jobFacet); @@ -183,7 +181,7 @@ void testUpdateMarquezModelLifecycleStateChangeFacet() { PRODUCER_URL, SCHEMA_URL, "TRUNCATE")) .build()); - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, WRITE_JOB_NAME, "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList(dataset)); @@ -195,7 +193,7 @@ void testUpdateMarquezModelLifecycleStateChangeFacet() { @Test void testUpdateMarquezModelDatasetWithColumnLineageFacet() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -247,7 +245,7 @@ void testUpdateMarquezModelDatasetWithColumnLineageFacet() { @Test void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenInputFieldDoesNotExist() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -283,7 +281,7 @@ void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenOutputFieldDoesNotEx TRANSFORMATION_TYPE))))) .build()); - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -334,7 +332,7 @@ void testUpsertColumnLineageData() { UPDATED_TRANSFORMATION_TYPE))))) .build()); - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob1 = LineageTestUtils.createLineageRow( dao, @@ -385,7 +383,7 @@ void testUpdateMarquezModelDatasetWithSymlinks() { "symlinkNamespace", "symlinkName", "some-type")))) .build()); - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, WRITE_JOB_NAME, "COMPLETE", jobFacet, Arrays.asList(), Arrays.asList(dataset)); @@ -425,7 +423,7 @@ void testUpdateMarquezModelDatasetWithSymlinks() { */ @Test void testUpdateMarquezModelWithInputOnlyDataset() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -449,7 +447,7 @@ void testUpdateMarquezModelWithInputOnlyDataset() { */ @Test void testUpdateMarquezModelWithNonMatchingReadSchema() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -496,7 +494,7 @@ void testUpdateMarquezModelWithNonMatchingReadSchema() { */ @Test void testUpdateMarquezModelWithPriorWrites() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob1 = LineageTestUtils.createLineageRow( dao, @@ -569,7 +567,7 @@ void testUpdateMarquezModelWithPriorWrites() { @Test void testGetOpenLineageEvents() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow writeJob = LineageTestUtils.createLineageRow( dao, @@ -590,7 +588,7 @@ void testGetOpenLineageEvents() { @Test void testInputOutputDatasetFacets() { - JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + JobFacet jobFacet = JobFacet.builder().build(); UpdateLineageRow lineageRow = LineageTestUtils.createLineageRow( dao, diff --git a/api/src/test/java/marquez/db/RunDaoTest.java b/api/src/test/java/marquez/db/RunDaoTest.java index 9eb1fc2bcd..ce6d9f5329 100644 --- a/api/src/test/java/marquez/db/RunDaoTest.java +++ b/api/src/test/java/marquez/db/RunDaoTest.java @@ -83,7 +83,10 @@ public void getRun() { jdbi, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs()); jobVersionDao.upsertJobVersionOnRunTransition( - jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now()); + jobVersionDao.loadJobRowRunDetails(jobRow, runRow.getUuid()), + RunState.COMPLETED, + Instant.now(), + true); Optional run = runDao.findRunByUuid(runRow.getUuid()); assertThat(run) @@ -198,7 +201,10 @@ private Stream createRunsForJob( jdbi, runRow.getUuid(), RunState.COMPLETED, outputs); jobVersionDao.upsertJobVersionOnRunTransition( - jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now()); + jobVersionDao.loadJobRowRunDetails(jobRow, runRow.getUuid()), + RunState.COMPLETED, + Instant.now(), + true); return runRow; }); } diff --git a/api/src/test/java/marquez/db/RunFacetsDaoTest.java b/api/src/test/java/marquez/db/RunFacetsDaoTest.java index 9dfa40c85a..965423a80d 100644 --- a/api/src/test/java/marquez/db/RunFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/RunFacetsDaoTest.java @@ -58,7 +58,7 @@ public void setup(Jdbi jdbi) { openLineageDao, "job_" + UUID.randomUUID(), "COMPLETE", - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP), + JobFacet.builder().build(), Collections.emptyList(), Collections.emptyList(), new LineageEvent.ParentRunFacet( @@ -112,7 +112,7 @@ public void testInsertRunFacetsForSparkLogicalPlanWhenPlanAlreadyPresent() { openLineageDao, "job_" + UUID.randomUUID(), "COMPLETE", - new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP), + JobFacet.builder().build(), Collections.emptyList(), Collections.emptyList(), new ParentRunFacet( @@ -164,12 +164,16 @@ public void testInsertRunFacetsForCustomFacet() { @Test public void testGetFacetsByRunUuid() { LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet( - new LineageEvent.DocumentationJobFacet(PRODUCER_URL, SCHEMA_URL, "some-documentation"), - new LineageEvent.SourceCodeLocationJobFacet( - PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git"), - new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query"), - null); + JobFacet.builder() + .documentation( + new LineageEvent.DocumentationJobFacet( + PRODUCER_URL, SCHEMA_URL, "some-documentation")) + .sourceCodeLocation( + new LineageEvent.SourceCodeLocationJobFacet( + PRODUCER_URL, SCHEMA_URL, "git", "git@github.com:OpenLineage/OpenLineage.git")) + .sql(new LineageEvent.SQLJobFacet(PRODUCER_URL, SCHEMA_URL, "some sql query")) + .build(); + UpdateLineageRow lineageRow = LineageTestUtils.createLineageRow( openLineageDao, diff --git a/api/src/test/java/marquez/db/mappers/JobMapperTest.java b/api/src/test/java/marquez/db/mappers/JobMapperTest.java index 6dc8bed3f7..b88941761f 100644 --- a/api/src/test/java/marquez/db/mappers/JobMapperTest.java +++ b/api/src/test/java/marquez/db/mappers/JobMapperTest.java @@ -18,6 +18,7 @@ import java.util.TimeZone; import java.util.UUID; import marquez.common.Utils; +import marquez.common.models.JobType; import marquez.db.Columns; import marquez.service.models.Job; import org.jdbi.v3.core.statement.StatementContext; @@ -31,6 +32,11 @@ class JobMapperTest { private static ResultSet resultSet; private static TimeZone defaultTZ = TimeZone.getDefault(); + private static String JOB_FACET = + """ + [{"jobType": {"jobType": "QUERY", "integration": "FLINK", "processingType": "STREAMING"}}] + """; + @BeforeAll public static void setUp() throws SQLException, MalformedURLException { TimeZone.setDefault(TimeZone.getTimeZone("UTC")); @@ -87,4 +93,25 @@ void shouldMapFullJob() throws SQLException { Job actual = underTest.map(resultSet, mock(StatementContext.class)); assertThat(actual).isEqualTo(expected); } + + @Test + void testMapJobTypeJobFacet() throws SQLException { + ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class); + + when(resultSet.getString(Columns.TYPE)).thenReturn("STREAM"); + when(resultSet.getObject(Columns.TYPE)).thenReturn("STREAM"); + + when(resultSet.getMetaData()).thenReturn(resultSetMetaData); + when(resultSetMetaData.getColumnCount()).thenReturn(1); + when(resultSetMetaData.getColumnName(1)).thenReturn(Columns.FACETS); + + when(resultSet.getString(Columns.FACETS)).thenReturn(JOB_FACET); + when(resultSet.getObject(Columns.FACETS)).thenReturn(JOB_FACET); + JobMapper underTest = new JobMapper(); + + Job actual = underTest.map(resultSet, mock(StatementContext.class)); + + assertThat(actual.getType()).isEqualTo(JobType.STREAM); + assertThat(actual.getLabels()).containsExactly("QUERY", "FLINK"); + } } diff --git a/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java b/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java index b5f98d12e4..adcfa95552 100644 --- a/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java +++ b/api/src/test/java/marquez/db/migrations/V57_1__BackfillFacetsTest.java @@ -28,6 +28,7 @@ import marquez.db.models.UpdateLineageRow; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.JobFacet; import org.flywaydb.core.api.migration.Context; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.AfterEach; @@ -167,8 +168,7 @@ public void testMigrateForMultipleChunks() throws Exception { @Test public void testMigrateForLineageWithNoDatasets() throws Exception { - LineageEvent.JobFacet jobFacet = - new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + LineageEvent.JobFacet jobFacet = LineageEvent.JobFacet.builder().build(); LineageTestUtils.createLineageRow( openLineageDao, "job_" + UUID.randomUUID(), diff --git a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java index 7f5634e6a5..62d7355f6f 100644 --- a/api/src/test/java/marquez/service/ColumnLineageServiceTest.java +++ b/api/src/test/java/marquez/service/ColumnLineageServiceTest.java @@ -41,6 +41,7 @@ import marquez.service.models.Dataset; import marquez.service.models.Lineage; import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.JobFacet; import marquez.service.models.Node; import marquez.service.models.NodeId; import org.jdbi.v3.core.Jdbi; @@ -70,7 +71,7 @@ public static void setUpOnce(Jdbi jdbi) { fieldDao = jdbi.onDemand(DatasetFieldDao.class); datasetDao = jdbi.onDemand(DatasetDao.class); lineageService = new ColumnLineageService(dao, fieldDao); - jobFacet = new LineageEvent.JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); + jobFacet = JobFacet.builder().build(); } @AfterEach diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 97f6b9ce02..9366bbb8b9 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -12,11 +12,13 @@ import static org.junit.Assert.assertTrue; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; import marquez.api.JdbiUtils; +import marquez.common.models.DatasetId; import marquez.common.models.DatasetName; import marquez.common.models.InputDatasetVersion; import marquez.common.models.JobId; @@ -39,6 +41,7 @@ import marquez.service.models.Lineage; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.JobFacet; +import marquez.service.models.LineageEvent.JobTypeJobFacet; import marquez.service.models.LineageEvent.SchemaField; import marquez.service.models.Node; import marquez.service.models.NodeId; @@ -71,8 +74,7 @@ public class LineageServiceTest { new SchemaField("firstname", "string", "the first name"), new SchemaField("lastname", "string", "the last name"), new SchemaField("birthdate", "date", "the date of birth"))); - private final JobFacet jobFacet = new JobFacet(null, null, null, LineageTestUtils.EMPTY_MAP); - + private final JobFacet jobFacet = JobFacet.builder().build(); static Jdbi jdbi; @BeforeAll @@ -425,6 +427,59 @@ public void testLineageWithWithCycle() { .matches(n -> n.isJobType() && n.asJobId().getName().getValue().equals("writeJob")); } + @Test + public void testGetLineageForRunningStreamingJob() { + Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build(); + Dataset output = Dataset.builder().name("output-dataset").namespace(NAMESPACE).build(); + + // (1) Run batch job which outputs input-dataset + LineageTestUtils.createLineageRow( + openLineageDao, + "someInputBatchJob", + "COMPLETE", + jobFacet, + Collections.emptyList(), + Arrays.asList(input)); + + // (2) Run streaming job on the reading output of this job + LineageTestUtils.createLineageRow( + openLineageDao, + "streamingjob", + "RUNNING", + JobFacet.builder() + .jobType(JobTypeJobFacet.builder().processingType("STREAMING").build()) + .build(), + Arrays.asList(input), + Arrays.asList(output)); + + // (3) Run batch job that reads output of streaming job (Note: streaming job is still running) + LineageTestUtils.createLineageRow( + openLineageDao, + "someOutputBatchJob", + "COMPLETE", + jobFacet, + Arrays.asList(output), + Collections.emptyList()); + + // (4) lineage on output dataset shall be same as lineage on input dataset + Lineage lineageFromInput = + lineageService.lineage( + NodeId.of( + new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("input-dataset"))), + 5, + true); + + Lineage lineageFromOutput = + lineageService.lineage( + NodeId.of( + new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))), + 5, + true); + + assertThat(lineageFromInput.getGraph()).hasSize(5); // 2 datasets + 3 jobs + assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph()); + } + @Test public void testLineageForOrphanedDataset() { UpdateLineageRow writeJob = diff --git a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java index d80ba94b00..dfef90fa29 100644 --- a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java +++ b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java @@ -36,6 +36,7 @@ import marquez.db.DatasetDao; import marquez.db.DatasetVersionDao; import marquez.db.JobDao; +import marquez.db.JobVersionDao; import marquez.db.NamespaceDao; import marquez.db.OpenLineageDao; import marquez.db.RunArgsDao; @@ -57,6 +58,8 @@ import marquez.service.models.LineageEvent.DatasetFacets; import marquez.service.models.LineageEvent.DatasourceDatasetFacet; import marquez.service.models.LineageEvent.JobFacet; +import marquez.service.models.LineageEvent.JobTypeJobFacet; +import marquez.service.models.LineageEvent.LineageEventBuilder; import marquez.service.models.LineageEvent.RunFacet; import marquez.service.models.LineageEvent.SQLJobFacet; import marquez.service.models.LineageEvent.SchemaDatasetFacet; @@ -84,8 +87,9 @@ public class OpenLineageServiceIntegrationTest { private JobService jobService; private OpenLineageDao openLineageDao; - + private Jdbi jdbi; private JobDao jobDao; + private JobVersionDao jobVersionDao; private DatasetDao datasetDao; private DatasetVersionDao datasetVersionDao; private ArgumentCaptor runInputListener; @@ -148,9 +152,11 @@ public ExpectedResults( @BeforeEach public void setup(Jdbi jdbi) throws SQLException { + this.jdbi = jdbi; openLineageDao = jdbi.onDemand(OpenLineageDao.class); datasetVersionDao = jdbi.onDemand(DatasetVersionDao.class); jobDao = jdbi.onDemand(JobDao.class); + jobVersionDao = jdbi.onDemand(JobVersionDao.class); runService = mock(RunService.class); jobService = new JobService(jobDao, runService); runInputListener = ArgumentCaptor.forClass(JobInputUpdate.class); @@ -548,6 +554,172 @@ void testJobEvent() throws ExecutionException, InterruptedException { .isEqualTo("theDatasource"); } + @Test + void testStreamingJob() throws ExecutionException, InterruptedException { + // (1) Create output + LineageEvent.Dataset input = + LineageEvent.Dataset.builder().name(DATASET_NAME).namespace(NAMESPACE).build(); + + LineageEvent.Dataset output = + LineageEvent.Dataset.builder() + .name(DATASET_NAME) + .namespace(NAMESPACE) + .facets( + DatasetFacets.builder() + .schema( + new SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new SchemaField("col", "STRING", "my name")))) + .build()) + .build(); + + // (2) Streaming job not followed by a COMPLETE event writing to a output + UUID firstRunId = UUID.randomUUID(); + lineageService + .createAsync( + LineageEvent.builder() + .eventType("RUNNING") + .run(new LineageEvent.Run(firstRunId.toString(), RunFacet.builder().build())) + .job( + LineageEvent.Job.builder() + .name("streaming_job_name") + .namespace(NAMESPACE) + .facets( + JobFacet.builder() + .jobType( + JobTypeJobFacet.builder() + .processingType("STREAMING") + .integration("FLINK") + .jobType("JOB") + .build()) + .build()) + .build()) + .eventTime(Instant.now().atZone(TIMEZONE)) + .inputs(Collections.singletonList(input)) + .outputs(Collections.singletonList(output)) + .build()) + .get(); + Optional datasetRow = datasetDao.findDatasetByName(NAMESPACE, DATASET_NAME); + + // (3) Assert that output is present and has dataset_version written + assertThat(datasetRow).isPresent().flatMap(Dataset::getCurrentVersion).isPresent(); + + // (4) Assert that job is present and its current version is present + Job job = jobDao.findJobByName(NAMESPACE, "streaming_job_name").get(); + assertThat(job.getInputs()).hasSize(1); + assertThat(job.getCurrentVersion()).isPresent(); + assertThat(job.getType()).isEqualTo(JobType.STREAM); + assertThat(job.getLabels()).containsExactly("JOB", "FLINK"); + + UUID initialJobVersion = job.getCurrentVersion().get(); + Instant updatedAt = + jdbi.withHandle( + h -> + h.createQuery("SELECT max(updated_at) FROM job_versions") + .mapTo(Instant.class) + .first()); + + // (5) Send COMPLETE event streaming job + lineageService + .createAsync( + LineageEvent.builder() + .eventType("COMPLETE") + .run(new LineageEvent.Run(firstRunId.toString(), RunFacet.builder().build())) + .job( + LineageEvent.Job.builder() + .name("streaming_job_name") + .namespace(NAMESPACE) + .facets( + JobFacet.builder() + .jobType( + JobTypeJobFacet.builder() + .processingType("STREAMING") + .integration("FLINK") + .jobType("JOB") + .build()) + .build()) + .build()) + .eventTime(Instant.now().atZone(TIMEZONE)) + .inputs(Collections.emptyList()) + .outputs(Collections.emptyList()) + .build()) + .get(); + + // (6) Assert job version exists + job = jobDao.findJobByName(NAMESPACE, "streaming_job_name").get(); + assertThat(job.getCurrentVersion()).isPresent(); + assertThat(job.getType()).isEqualTo(JobType.STREAM); + assertThat(job.getLabels()).containsExactly("JOB", "FLINK"); + assertThat(job.getCurrentVersion().get()).isEqualTo(initialJobVersion); + + // (7) Make sure updated_at in job version did not change + Instant lastUpdatedAt = + jdbi.withHandle( + h -> + h.createQuery("SELECT max(updated_at) FROM job_versions") + .mapTo(Instant.class) + .first()); + assertThat(updatedAt).isEqualTo(lastUpdatedAt); + } + + @Test + void testStreamingJobCreateSingleJobAndDatasetVersion() + throws ExecutionException, InterruptedException { + LineageEvent.Dataset dataset = + LineageEvent.Dataset.builder().name(DATASET_NAME).namespace(NAMESPACE).build(); + UUID firstRunId = UUID.randomUUID(); + LineageEventBuilder eventBuilder = + LineageEvent.builder() + .eventType("RUNNING") + .run(new LineageEvent.Run(firstRunId.toString(), RunFacet.builder().build())) + .job( + LineageEvent.Job.builder() + .name("streaming_job_name") + .namespace(NAMESPACE) + .facets( + JobFacet.builder() + .jobType(JobTypeJobFacet.builder().processingType("STREAMING").build()) + .build()) + .build()) + .eventTime(Instant.now().atZone(TIMEZONE)); + LineageEvent lineageEvent = eventBuilder.outputs(Collections.singletonList(dataset)).build(); + + // (1) Emit running event + lineageService.createAsync(lineageEvent).get(); + + UUID datasetVersionUuid = + datasetDao.findDatasetAsRow(NAMESPACE, DATASET_NAME).get().getCurrentVersionUuid().get(); + int initialJobVersionsCount = + jobVersionDao.findAllJobVersions(NAMESPACE, "streaming_job_name", 10, 0).size(); + + // (2) Emit other running event + lineageService.createAsync(lineageEvent).get(); + + // (3) Emit running event with no input nor output datasets + lineageService.createAsync(eventBuilder.build()).get(); + assertThat(jobVersionDao.findAllJobVersions(NAMESPACE, "streaming_job_name", 10, 0).size()) + .isEqualTo(initialJobVersionsCount); + + // (4) Emit event with other input dataset + LineageEvent.Dataset otherdataset = + LineageEvent.Dataset.builder().name("otherDataset").namespace(NAMESPACE).build(); + lineageService + .createAsync(eventBuilder.inputs(Collections.singletonList(otherdataset)).build()) + .get(); + assertThat(jobVersionDao.findAllJobVersions(NAMESPACE, "streaming_job_name", 10, 0).size()) + .isEqualTo(initialJobVersionsCount + 1); + + // (5) Verify dataset's version has not changed + assertThat( + datasetDao + .findDatasetAsRow(NAMESPACE, DATASET_NAME) + .get() + .getCurrentVersionUuid() + .get()) + .isEqualTo(datasetVersionUuid); + } + private void checkExists(LineageEvent.Dataset ds) { DatasetService datasetService = new DatasetService(openLineageDao, runService); diff --git a/api/src/test/java/marquez/service/models/LineageEventTest.java b/api/src/test/java/marquez/service/models/LineageEventTest.java index b06fe2946d..a78bc98f57 100644 --- a/api/src/test/java/marquez/service/models/LineageEventTest.java +++ b/api/src/test/java/marquez/service/models/LineageEventTest.java @@ -24,6 +24,8 @@ import java.util.List; import marquez.common.Utils; import marquez.common.models.FlexibleDateTimeDeserializer; +import marquez.service.models.LineageEvent.JobTypeJobFacet; +import org.junit.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -95,4 +97,16 @@ public void testSerialization(ObjectMapper mapper, String expectedFile) throws I JsonNode actualNode = mapper.readTree(serialized); assertThat(actualNode).isEqualTo(expectedNode); } + + @Test + public void testJobTypeJobFacetSerialization() throws IOException { + URL expectedResource = Resources.getResource(EVENT_FULL); + LineageEvent deserialized = + (LineageEvent) Utils.newObjectMapper().readValue(expectedResource, BaseEvent.class); + JobTypeJobFacet facet = deserialized.getJob().getFacets().getJobType(); + + assertThat(facet.getJobType()).isEqualTo("QUERY"); + assertThat(facet.getIntegration()).isEqualTo("FLINK"); + assertThat(facet.getProcessingType()).isEqualTo("STREAMING"); + } } diff --git a/api/src/test/resources/open_lineage/event_full.json b/api/src/test/resources/open_lineage/event_full.json index 6aa5109b97..2aab96d3b6 100644 --- a/api/src/test/resources/open_lineage/event_full.json +++ b/api/src/test/resources/open_lineage/event_full.json @@ -61,6 +61,13 @@ }, "query": "SELECT * FROM foo" }, + "jobType": { + "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", + "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#MyCustomJobFacet", + "jobType": "QUERY", + "integration": "FLINK", + "processingType": "STREAMING" + }, "additionalProp1": { "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client", "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.yml#MyCustomJobFacet", From 033cea31e8e8dcbca2583169ad402242a0137a7f Mon Sep 17 00:00:00 2001 From: "pawel.leszczynski" Date: Fri, 15 Dec 2023 14:49:40 +0100 Subject: [PATCH 3/7] remove unnecessary lombok dep with delombok plugin (#2701) Signed-off-by: Pawel Leszczynski --- api/src/main/java/marquez/db/OpenLineageDao.java | 3 +-- build.gradle | 8 +------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 2963f56ce2..9a5364fa93 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -35,7 +35,6 @@ import marquez.db.JobVersionDao.IoType; import marquez.db.JobVersionDao.JobRowRunDetails; import marquez.db.RunDao.RunUpsert; -import marquez.db.RunDao.RunUpsert.RunUpsertBuilder; import marquez.db.mappers.LineageEventMapper; import marquez.db.models.ColumnLineageRow; import marquez.db.models.DatasetFieldRow; @@ -330,7 +329,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper final UUID runUuid = runToUuid(event.getRun().getRunId()); RunRow run; - RunUpsertBuilder runUpsertBuilder = + RunUpsert.RunUpsertBuilder runUpsertBuilder = RunUpsert.builder() .runUuid(runUuid) .parentRunUuid(parentUuid.orElse(null)) diff --git a/build.gradle b/build.gradle index b685ae5175..711eb87b1a 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,6 @@ buildscript { classpath 'com.adarshr:gradle-test-logger-plugin:3.2.0' classpath 'gradle.plugin.com.github.johnrengelman:shadow:7.1.2' classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.22.0' - classpath "io.freefair.gradle:lombok-plugin:8.4" } } @@ -40,7 +39,6 @@ subprojects { apply plugin: 'com.github.johnrengelman.shadow' apply plugin: "com.diffplug.spotless" apply plugin: "pmd" - apply plugin: "io.freefair.lombok" project(':api') { apply plugin: 'application' @@ -97,11 +95,7 @@ subprojects { archiveClassifier.set("sources") } - task delombokJavadocs(type: Javadoc) { - source = delombok - } - - task javadocJar(type: Jar, dependsOn: delombokJavadocs) { + task javadocJar(type: Jar) { from javadoc.destinationDir archiveClassifier.set("javadoc") } From 4194d284087e856b8d1b30675bca65adac5199c1 Mon Sep 17 00:00:00 2001 From: David Sharp <34074888+davidsharp7@users.noreply.github.com> Date: Sat, 16 Dec 2023 07:08:57 +1300 Subject: [PATCH 4/7] add DELETE end point for dataset tags (#2698) * add DELETE end point for dataset tags Signed-off-by: sharpd * ran code linter to re-format Signed-off-by: sharpd * add integration tests and Marquez client Signed-off-by: sharpd --------- Signed-off-by: sharpd Co-authored-by: sharpd --- .../java/marquez/api/DatasetResource.java | 27 ++++++ api/src/main/java/marquez/db/DatasetDao.java | 27 ++++++ .../api/BaseResourceIntegrationTest.java | 83 ++++++++++++++++++- .../api/TagResourceIntegrationTest.java | 26 ++++++ .../java/marquez/client/MarquezClient.java | 6 ++ .../marquez/client/MarquezClientTest.java | 13 +++ 6 files changed, 180 insertions(+), 2 deletions(-) diff --git a/api/src/main/java/marquez/api/DatasetResource.java b/api/src/main/java/marquez/api/DatasetResource.java index 49c9e68813..bcf8a07931 100644 --- a/api/src/main/java/marquez/api/DatasetResource.java +++ b/api/src/main/java/marquez/api/DatasetResource.java @@ -198,6 +198,33 @@ public Response tag( return Response.ok(dataset).build(); } + @Timed + @ResponseMetered + @ExceptionMetered + @DELETE + @Path("/{dataset}/tags/{tag}") + @Produces(APPLICATION_JSON) + public Response deleteDatasetTag( + @PathParam("namespace") NamespaceName namespaceName, + @PathParam("dataset") DatasetName datasetName, + @PathParam("tag") TagName tagName) { + throwIfNotExists(namespaceName); + throwIfNotExists(namespaceName, datasetName); + + log.info( + "Deleted tag '{}' from dataset '{}' on namespace '{}'", + tagName.getValue(), + datasetName.getValue(), + namespaceName.getValue()); + datasetService.deleteDatasetTag( + namespaceName.getValue(), datasetName.getValue(), tagName.getValue()); + Dataset dataset = + datasetService + .findDatasetByName(namespaceName.getValue(), datasetName.getValue()) + .orElseThrow(() -> new DatasetNotFoundException(datasetName)); + return Response.ok(dataset).build(); + } + @Timed @ResponseMetered @ExceptionMetered diff --git a/api/src/main/java/marquez/db/DatasetDao.java b/api/src/main/java/marquez/db/DatasetDao.java index 5ec88a6222..c96e59d6d8 100644 --- a/api/src/main/java/marquez/db/DatasetDao.java +++ b/api/src/main/java/marquez/db/DatasetDao.java @@ -272,6 +272,33 @@ DatasetRow upsert( """) Optional delete(String namespaceName, String name); + @SqlUpdate( + """ + DELETE FROM datasets_tag_mapping dtm + WHERE EXISTS ( + SELECT 1 + FROM + datasets d + JOIN + tags t + ON + d.uuid = dtm.dataset_uuid + AND + t.uuid = dtm.tag_uuid + JOIN + namespaces n + ON + d.namespace_uuid = n.uuid + WHERE + d.name = :datasetName + AND + t.name = :tagName + AND + n.name = :namespaceName + ); + """) + void deleteDatasetTag(String namespaceName, String datasetName, String tagName); + @Transaction default Dataset upsertDatasetMeta( NamespaceName namespaceName, DatasetName datasetName, DatasetMeta datasetMeta) { diff --git a/api/src/test/java/marquez/api/BaseResourceIntegrationTest.java b/api/src/test/java/marquez/api/BaseResourceIntegrationTest.java index 8483f5e63a..76f858d246 100644 --- a/api/src/test/java/marquez/api/BaseResourceIntegrationTest.java +++ b/api/src/test/java/marquez/api/BaseResourceIntegrationTest.java @@ -5,9 +5,16 @@ package marquez.api; +import static marquez.common.models.CommonModelGenerator.newConnectionUrlFor; +import static marquez.common.models.CommonModelGenerator.newDatasetName; +import static marquez.common.models.CommonModelGenerator.newDbSourceType; +import static marquez.common.models.CommonModelGenerator.newDescription; import static marquez.common.models.CommonModelGenerator.newNamespaceName; +import static marquez.common.models.CommonModelGenerator.newOwnerName; +import static marquez.common.models.CommonModelGenerator.newSourceName; import static marquez.db.DbTest.POSTGRES_14; +import com.google.common.collect.ImmutableSet; import io.dropwizard.testing.ConfigOverride; import io.dropwizard.testing.ResourceHelpers; import io.dropwizard.testing.junit5.DropwizardAppExtension; @@ -16,11 +23,17 @@ import io.openlineage.client.transports.HttpTransport; import java.net.URI; import java.net.URL; +import java.util.Set; import marquez.MarquezApp; import marquez.MarquezConfig; import marquez.client.MarquezClient; import marquez.client.Utils; +import marquez.client.models.DatasetId; +import marquez.client.models.DbTableMeta; +import marquez.client.models.NamespaceMeta; +import marquez.client.models.SourceMeta; import marquez.client.models.Tag; +import marquez.common.models.SourceType; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.testcontainers.containers.PostgreSQLContainer; @@ -56,17 +69,61 @@ abstract class BaseResourceIntegrationTest { static final Tag PII = new Tag("PII", "Personally identifiable information"); static final Tag SENSITIVE = new Tag("SENSITIVE", "Contains sensitive information"); + // Namespace static String NAMESPACE_NAME; - static DropwizardAppExtension MARQUEZ_APP; + static String NAMESPACE_DESCRIPTION; + static String OWNER_NAME; + + // Source + static String SOURCE_TYPE; + static String SOURCE_NAME; + static URI CONNECTION_URL; + static String SOURCE_DESCRIPTION; + + // Dataset + static String DB_TABLE_SOURCE_TYPE; + static String DB_TABLE_SOURCE_NAME; + static URI DB_TABLE_CONNECTION_URL; + static String DB_TABLE_SOURCE_DESCRIPTION; + static DatasetId DB_TABLE_ID; + static String DB_TABLE_NAME; + static String DB_TABLE_PHYSICAL_NAME; + static String DB_TABLE_DESCRIPTION; + static Set DB_TABLE_TAGS; + static DbTableMeta DB_TABLE_META; + static DropwizardAppExtension MARQUEZ_APP; static OpenLineage OL; static OpenLineageClient OL_CLIENT; static MarquezClient MARQUEZ_CLIENT; @BeforeAll public static void setUpOnce() throws Exception { - // (1) Use a randomly generated namespace. + // (1) Use a randomly generated namespace/dataset NAMESPACE_NAME = newNamespaceName().getValue(); + NAMESPACE_DESCRIPTION = newDescription(); + OWNER_NAME = newOwnerName().getValue(); + + SOURCE_TYPE = newDbSourceType().getValue(); + SOURCE_NAME = newSourceName().getValue(); + SOURCE_DESCRIPTION = newDescription(); + + DB_TABLE_SOURCE_TYPE = SourceType.of("POSTGRESQL").getValue(); + DB_TABLE_SOURCE_NAME = SOURCE_NAME; + DB_TABLE_SOURCE_DESCRIPTION = newDescription(); + DB_TABLE_ID = newDatasetIdWith(NAMESPACE_NAME); + DB_TABLE_NAME = DB_TABLE_ID.getName(); + DB_TABLE_PHYSICAL_NAME = DB_TABLE_NAME; + DB_TABLE_DESCRIPTION = newDescription(); + DB_TABLE_TAGS = ImmutableSet.of(PII.getName()); + DB_TABLE_CONNECTION_URL = newConnectionUrlFor(SourceType.of("POSTGRESQL")); + DB_TABLE_META = + DbTableMeta.builder() + .physicalName(DB_TABLE_PHYSICAL_NAME) + .sourceName(DB_TABLE_SOURCE_NAME) + .tags(DB_TABLE_TAGS) + .description(DB_TABLE_DESCRIPTION) + .build(); // (2) Configure Marquez application using test configuration and database. MARQUEZ_APP = @@ -89,6 +146,28 @@ public static void setUpOnce() throws Exception { MARQUEZ_CLIENT = MarquezClient.builder().baseUrl(url).build(); } + protected void createNamespace(String namespaceName) { + // (1) Create namespace for db table + final NamespaceMeta namespaceMeta = + NamespaceMeta.builder().ownerName(OWNER_NAME).description(NAMESPACE_DESCRIPTION).build(); + + MARQUEZ_CLIENT.createNamespace(namespaceName, namespaceMeta); + } + + protected static DatasetId newDatasetIdWith(final String namespaceName) { + return new DatasetId(namespaceName, newDatasetName().getValue()); + } + + protected void createSource(String sourceName) { + final SourceMeta sourceMeta = + SourceMeta.builder() + .type(DB_TABLE_SOURCE_TYPE) + .connectionUrl(DB_TABLE_CONNECTION_URL) + .description(DB_TABLE_SOURCE_DESCRIPTION) + .build(); + MARQUEZ_CLIENT.createSource(sourceName, sourceMeta); + } + @AfterAll public static void cleanUp() { MARQUEZ_APP.after(); diff --git a/api/src/test/java/marquez/api/TagResourceIntegrationTest.java b/api/src/test/java/marquez/api/TagResourceIntegrationTest.java index 47808da31d..4588e3eaea 100644 --- a/api/src/test/java/marquez/api/TagResourceIntegrationTest.java +++ b/api/src/test/java/marquez/api/TagResourceIntegrationTest.java @@ -10,10 +10,12 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Set; +import marquez.client.models.Dataset; import marquez.client.models.Tag; import org.junit.jupiter.api.Test; public class TagResourceIntegrationTest extends BaseResourceIntegrationTest { + @Test public void testApp_createTag() { // (1) List tags. @@ -36,4 +38,28 @@ public void testApp_listTags() { // (2) Ensure tags 'PII', 'SENSITIVE' defined in 'config.test.yml' are present. assertThat(tags).contains(PII, SENSITIVE); } + + @Test + public void testApp_testDatasetTagDelete() { + // Create Namespace + createNamespace(NAMESPACE_NAME); + // create a source + createSource(DB_TABLE_SOURCE_NAME); + // Create Dataset + MARQUEZ_CLIENT.createDataset(NAMESPACE_NAME, DB_TABLE_NAME, DB_TABLE_META); + + // Tag Dataset with TESTDATASETTAG tag + Dataset taggedDataset = + MARQUEZ_CLIENT.tagDatasetWith(NAMESPACE_NAME, DB_TABLE_NAME, "TESTDATASETTAG"); + assertThat(taggedDataset.getTags()).contains("TESTDATASETTAG"); + + // Test that the tag TESTDATASETTAG is deleted from the dataset + Dataset taggedDeleteDataset = + MARQUEZ_CLIENT.deleteDatasetTag(NAMESPACE_NAME, DB_TABLE_NAME, "TESTDATASETTAG"); + assertThat(taggedDeleteDataset.getTags()).doesNotContain("TESTDATASETTAG"); + // assert the number of tags should be 1 + assertThat(taggedDeleteDataset.getTags()).hasSize(1); + // assert that only PII remains + assertThat(taggedDeleteDataset.getTags()).containsExactly("PII"); + } } diff --git a/clients/java/src/main/java/marquez/client/MarquezClient.java b/clients/java/src/main/java/marquez/client/MarquezClient.java index 3d55eac28f..962f176e9f 100644 --- a/clients/java/src/main/java/marquez/client/MarquezClient.java +++ b/clients/java/src/main/java/marquez/client/MarquezClient.java @@ -240,6 +240,12 @@ public Dataset tagDatasetWith( return Dataset.fromJson(bodyAsJson); } + public Dataset deleteDatasetTag( + @NonNull String namespaceName, @NonNull String datasetName, @NonNull String tagName) { + final String bodyAsJson = http.delete(url.toDatasetTagUrl(namespaceName, datasetName, tagName)); + return Dataset.fromJson(bodyAsJson); + } + public Dataset tagFieldWith( @NonNull String namespaceName, @NonNull String datasetName, diff --git a/clients/java/src/test/java/marquez/client/MarquezClientTest.java b/clients/java/src/test/java/marquez/client/MarquezClientTest.java index 5395337bfa..8e80b5e0ef 100644 --- a/clients/java/src/test/java/marquez/client/MarquezClientTest.java +++ b/clients/java/src/test/java/marquez/client/MarquezClientTest.java @@ -989,6 +989,19 @@ public void testTagDataset() throws Exception { assertThat(dataset).isEqualTo(DB_TABLE); } + @Test + public void testDeleteDatasetTag() throws Exception { + final URL url = + buildUrlFor( + "/namespaces/%s/datasets/%s/tags/%s", NAMESPACE_NAME, DB_TABLE_NAME, "tag_name"); + + final String runAsJson = Utils.getMapper().writeValueAsString(DB_TABLE); + when(http.delete(url)).thenReturn(runAsJson); + + final Dataset dataset = client.deleteDatasetTag(NAMESPACE_NAME, DB_TABLE_NAME, "tag_name"); + assertThat(dataset).isEqualTo(DB_TABLE); + } + @Test public void testTagField() throws Exception { final URL url = From beb00b82c8b1631828116538305818ef75e01c4b Mon Sep 17 00:00:00 2001 From: Michael Robinson <68482867+merobi-hub@users.noreply.github.com> Date: Fri, 15 Dec 2023 14:33:01 -0500 Subject: [PATCH 5/7] Update changelog for 0.43.0 release. (#2702) Signed-off-by: Michael Robinson --- CHANGELOG.md | 37 +++++++++--- dev/get_changes.py | 140 ++++++++++++++++++++++++--------------------- 2 files changed, 106 insertions(+), 71 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e37062a68f..cfa0adf0b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,14 +1,37 @@ # Changelog -## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.42.0...HEAD) +## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.43.0...HEAD) +## [0.43.0](https://github.com/MarquezProject/marquez/compare/0.42.0...0.43.0) - 2023-12-15 ### Added -* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) - *Save into Marquez model datasets sent via `DatasetEvent` event type -* API: support `JobEvent` [`#2661`](https://github.com/MarquezProject/marquez/pull/2661) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) - *Save into Marquez model jobs and datasets sent via `JobEvent` event type. -* API: support streaming jobs [`#2682`](https://github.com/MarquezProject/marquez/pull/2682) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) - *Creates job version and reference rows at the beginning of the job instead of on complete. Updates job version within the run if anything changes. +* API: refactor the `RunDao` SQL query [`#2685`](https://github.com/MarquezProject/marquez/pull/2685) [@sophiely](https://github.com/sophiely) + *Improves the performance of the SQL query used for listing all runs.* +* API: refactor dataset version query [`#2683`](https://github.com/MarquezProject/marquez/pull/2683) [@sophiely](https://github.com/sophiely) + *Improves the performance of the SQL query used for the dataset version.* +* API: add support for a `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [`#2654`](https://github.com/MarquezProject/marquez/pull/2654) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Adds a feature for saving into the Marquez model datasets sent via the `DatasetEvent` event type. Includes optimization of the lineage query.* +* API: add support for a `JobEvent` [`#2661`](https://github.com/MarquezProject/marquez/pull/2661) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Adds a feature for saving into the Marquez model jobs and datasets sent via the `JobEvent` event type.* +* API: add support for streaming jobs [`#2682`](https://github.com/MarquezProject/marquez/pull/2682) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Creates job version and reference rows at the beginning of the job instead of on complete. Updates the job version within the run if anything changes.* +* API/spec: implement upstream run-level lineage [`#2658`](https://github.com/MarquezProject/marquez/pull/2658) [@julienledem](https://github.com/julienledem) + *Returns the version of each job and dataset a run is depending on.* +* API: add `DELETE` endpoint for dataset tags [`#2698`](https://github.com/MarquezProject/marquez/pull/2698) [@davidsharp7](https://github.com/davidsharp7) + *Creates a new endpoint for removing the linkage between a dataset and a tag in `datasets_tag_mapping` to supply a way to delete a tag from a dataset via the API. +* Web: add a dataset drawer [`#2672`](https://github.com/MarquezProject/marquez/pull/2672) [@davidsharp7](https://github.com/davidsharp7) + *Adds a drawer to the dataset column view in the GUI.* + +### Fixed: +* Client/Java: change url path encoding to match jersey decoding [`#2693`](https://github.com/MarquezProject/marquez/pull/2693) [@davidjgoss](https://github.com/davidjgoss) + *Swaps out the implementation of `MarquezPathV1::encode` to use the `UrlEscapers` path segment escaper, which does proper URI encoding.* +* Web: fix pagination in the Jobs route [`#2655`](https://github.com/MarquezProject/marquez/pull/2655) [@merobi-hub](https://github.com/merobi-hub) + *Hides job pagination in the case of no jobs.* +* Web: fix empty search experience [`#2679`](https://github.com/MarquezProject/marquez/pull/2679) [@phixMe](https://github.com/phixMe) + *Use of the previous search value was resulting in a bad request for the first character of a search.* + +### Removed: +* Client/Java: remove maven-archiver dependency from the Java client [`#2695`](https://github.com/MarquezProject/marquez/pull/2695) [@davidjgoss](https://github.com/davidjgoss) + *Removes a dependency from `build.gradle` that was bringing some transitive vulnerabilities.* ## [0.42.0](https://github.com/MarquezProject/marquez/compare/0.41.0...0.42.0) - 2023-10-17 ### Added diff --git a/dev/get_changes.py b/dev/get_changes.py index e69d9ab006..9b3c10b004 100755 --- a/dev/get_changes.py +++ b/dev/get_changes.py @@ -1,116 +1,128 @@ -#!/bin/bash +#!/usr/bin/env python3 # # Copyright 2018-2023 contributors to the Marquez project # SPDX-License-Identifier: Apache-2.0 -from github import Github -import rich_click as click from datetime import date from typing import TYPE_CHECKING + +import rich_click as click +from github import Github + if TYPE_CHECKING: from github.PullRequest import PullRequest -class GetChanges: +class GetChanges: def __init__(self, github_token: str, previous: str, current: str, path: str): self.github_token = github_token self.previous = previous self.current = current self.path = path self.pulls: list[PullRequest] = [] - self.rel_title_str: str = '' + self.rel_title_str: str = "" self.text: list[str] = [] self.new_contributors: dict[str:str] = {} def get_pulls(self): - print('Working on it...') + print("Working on it...") g = Github(self.github_token) repo = g.get_repo("MarquezProject/marquez") prev_date = repo.get_release(self.previous).created_at commits = repo.get_commits(since=prev_date) self.pulls = [pull for commit in commits for pull in commit.get_pulls()] - + def write_title(self): - self.rel_title_str = f'## [{self.current}](https://github.com/MarquezProject/marquez/compare/{self.previous}...{self.current}) - {date.today()}' - + self.rel_title_str = f"## [{self.current}](https://github.com/MarquezProject/marquez/compare/{self.previous}...{self.current}) - {date.today()}" # noqa: E501 + def describe_changes(self): for pull in self.pulls: - """ Assembles change description with PR and user URLs """ entry = [] - if pull.user.login != 'dependabot[bot]': + if pull.user.login != "dependabot[bot]": labels = [] for label in pull.labels: - if label.name != 'documentation': + if label.name != "documentation": labels.append(label.name) - change_str = f'* **{labels[0]}: {pull.title}** [`#{pull.number}`]({pull.html_url}) [@{pull.user.login}]({pull.user.html_url}) ' - + try: + change_str = f"* **{labels[0]}: {pull.title}** [`#{pull.number}`]({pull.html_url}) [@{pull.user.login}]({pull.user.html_url}) " # noqa: E501 + except Exception: + continue """ Extracts one-line description if present """ - beg = pull.body.find('One-line summary:') + 18 - if beg == 17: - change_descrip_str = ' **' - else: - test = pull.body.find('### Checklist') - if test == -1: - end = beg + 75 + try: + beg = pull.body.find("One-line summary:") + 18 + if beg == 17: # noqa: PLR2004 + change_descrip_str = " **" else: - end = test - 1 - descrip = pull.body[beg:end].split() - descrip_str = ' '.join(descrip) - change_descrip_str = f' *{descrip_str}*' - - entry.append(change_str + '\n') - entry.append(change_descrip_str + '\n') + test = pull.body.find("### Checklist") + end = beg + 75 if test == -1 else test - 1 + descrip = pull.body[beg:end].split() + descrip_str = " ".join(descrip) + change_descrip_str = f" *{descrip_str}*" + except Exception: + continue + + """ Checks for new contributor """ + self.check_new_contributor(pull) + + entry.append(change_str + "\n") + entry.append(change_descrip_str + "\n") self.text.append(entry) - def get_new_contributors(self): - for pull in self.pulls: - comments = pull.get_issue_comments() - for comment in comments: - if 'Thanks for opening your' in comment.body: - self.new_contributors[pull.user.login] = pull.user.url + def check_new_contributor(self, pull): + comments = pull.get_issue_comments() + for comment in comments: + if "Thanks for opening your" in comment.body: + self.new_contributors[pull.user.login] = pull.user.url + + def print_new_contributors(self): if self.new_contributors: - print('New contributors:') + print("New contributors:") for k, v in self.new_contributors.items(): - print(f'@{k}: {v}') + print(f"@{k}: {v}") else: - print('Note: no new contributors were found.') + print("Note: no new contributors were identified.") def update_changelog(self): - f = open('changes.txt', 'w+') - f = open('changes.txt', 'a') - f.write(self.rel_title_str + '\n') - for entry in self.text: - for line in entry: - f.write(line) - f.close() - - with open('changes.txt', 'r+') as f: + with open("changes.txt", "a") as f: + f.write(self.rel_title_str + "\n") + for entry in self.text: + for line in entry: + f.write(line) + f.close() + + with open("changes.txt", "r+") as f: new_changes = f.read() - with open(self.path, 'r') as contents: + with open(self.path) as contents: save = contents.read() - with open(self.path, 'w') as contents: + with open(self.path, "w") as contents: contents.write(new_changes) - with open(self.path, 'a') as contents: + with open(self.path, "a") as contents: contents.write(save) + @click.command() @click.option( - '--github_token', type=str, default='' + "--github_token", + type=str, + default="", ) @click.option( - "--previous", type=str, default='' + "--previous", + type=str, + default="", ) @click.option( - "--current", type=str, default='' + "--current", + type=str, + default="", ) @click.option( - "--path", - type=str, - default='', - help='absolute path to changelog', + "--path", + type=str, + default="../marquez/CHANGELOG.md", + help="path to changelog", ) - def main( github_token: str, previous: str, @@ -118,18 +130,18 @@ def main( path: str, ): c = GetChanges( - github_token=github_token, - previous=previous, - current=current, - path=path + github_token=github_token, + previous=previous, + current=current, + path=path, ) c.get_pulls() c.describe_changes() c.write_title() c.update_changelog() - c.get_new_contributors() - print('...done!') + c.print_new_contributors() + print("...done!") + if __name__ == "__main__": main() - From 15cdf061fe489fdac4954facc693d32c4ad86f83 Mon Sep 17 00:00:00 2001 From: Michael Robinson Date: Fri, 15 Dec 2023 14:34:01 -0500 Subject: [PATCH 6/7] Prepare for release 0.43.0 Signed-off-by: Michael Robinson --- .circleci/db-migration.sh | 2 +- .env.example | 2 +- chart/Chart.yaml | 2 +- chart/values.yaml | 4 ++-- clients/java/README.md | 4 ++-- docker/up.sh | 4 ++-- docs/openapi.html | 18 ++++++++++-------- gradle.properties | 2 +- spec/openapi.yml | 2 +- 9 files changed, 21 insertions(+), 19 deletions(-) diff --git a/.circleci/db-migration.sh b/.circleci/db-migration.sh index 68884dedde..96c5f5ffbf 100755 --- a/.circleci/db-migration.sh +++ b/.circleci/db-migration.sh @@ -13,7 +13,7 @@ # Version of PostgreSQL readonly POSTGRES_VERSION="14" # Version of Marquez -readonly MARQUEZ_VERSION=0.42.0 +readonly MARQUEZ_VERSION=0.43.0 # Build version of Marquez readonly MARQUEZ_BUILD_VERSION="$(git log --pretty=format:'%h' -n 1)" # SHA1 diff --git a/.env.example b/.env.example index 2e638efa5c..5d404c8616 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,4 @@ API_PORT=5000 API_ADMIN_PORT=5001 WEB_PORT=3000 -TAG=0.42.0 +TAG=0.43.0 diff --git a/chart/Chart.yaml b/chart/Chart.yaml index a10ba5d669..ca8f69f7b5 100644 --- a/chart/Chart.yaml +++ b/chart/Chart.yaml @@ -29,4 +29,4 @@ name: marquez sources: - https://github.com/MarquezProject/marquez - https://marquezproject.github.io/marquez/ -version: 0.42.0 +version: 0.43.0 diff --git a/chart/values.yaml b/chart/values.yaml index 0dbb395028..10dd64fdba 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -17,7 +17,7 @@ marquez: image: registry: docker.io repository: marquezproject/marquez - tag: 0.42.0 + tag: 0.43.0 pullPolicy: IfNotPresent ## Name of the existing secret containing credentials for the Marquez installation. ## When this is specified, it will take precedence over the values configured in the 'db' section. @@ -75,7 +75,7 @@ web: image: registry: docker.io repository: marquezproject/marquez-web - tag: 0.42.0 + tag: 0.43.0 pullPolicy: IfNotPresent ## Marquez website will run on this port ## diff --git a/clients/java/README.md b/clients/java/README.md index 987aee6afc..619717bc35 100644 --- a/clients/java/README.md +++ b/clients/java/README.md @@ -10,14 +10,14 @@ Maven: io.github.marquezproject marquez-java - 0.42.0 + 0.43.0 ``` or Gradle: ```groovy -implementation 'io.github.marquezproject:marquez-java:0.42.0 +implementation 'io.github.marquezproject:marquez-java:0.43.0 ``` ## Usage diff --git a/docker/up.sh b/docker/up.sh index b40da59549..46e66b7ece 100755 --- a/docker/up.sh +++ b/docker/up.sh @@ -8,9 +8,9 @@ set -e # Version of Marquez -readonly VERSION=0.42.0 +readonly VERSION=0.43.0 # Build version of Marquez -readonly BUILD_VERSION=0.42.0 +readonly BUILD_VERSION=0.43.0 title() { echo -e "\033[1m${1}\033[0m" diff --git a/docs/openapi.html b/docs/openapi.html index 4b5b824501..3bb59e6427 100644 --- a/docs/openapi.html +++ b/docs/openapi.html @@ -2159,7 +2159,7 @@ -

Marquez (0.42.0)

Download OpenAPI specification:Download

License: Apache 2.0

Marquez is an open source metadata service for the collection, aggregation, and visualization of a data ecosystem's metadata.

+ " fill="currentColor">

Marquez (0.43.0)

Download OpenAPI specification:Download

License: Apache 2.0

Marquez is an open source metadata service for the collection, aggregation, and visualization of a data ecosystem's metadata.

Namespaces

Create a namespace

Creates a new namespace object. A namespace enables the contextual grouping of related jobs and datasets. Namespaces must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), dashes (-), colons (:), slashes (/), or dots (.). A namespace is case-insensitive with a maximum length of 1024 characters. Note jobs and datasets will be unique within a namespace, but not across namespaces.

path Parameters
namespace
required
string <= 1024 characters
Example: my-namespace

The name of the namespace.

Request Body schema: application/json
ownerName
required
string

The owner of the namespace.

@@ -2368,22 +2368,24 @@
depth
integer
Default: 20

Depth of lineage graph to create.

Responses

Response samples

Content type
application/json
{
  • "graph": [
    ]
}

Column lineage

Get a column-lineage graph

query Parameters
nodeId
required
string
Example: nodeId=dataset:food_delivery:public.delivery_7_days

The ID of the node. A node can either be a dataset node, a dataset field node or a job node. The format of nodeId for dataset is dataset:<namespace_of_dataset>:<name_of_the_dataset>, for dataset field is datasetField:<namespace_of_dataset>:<name_of_the_dataset>:<name_of_field>, and for job is job:<namespace_of_the_job>:<name_of_the_job>.

+
http://localhost:5000/api/v1/lineage

Response samples

Content type
application/json
{
  • "graph": [
    ]
}

Get the upstream lineage for a given run

Responses

Response samples

Content type
application/json
{
  • "runs": [
    ]
}

Column lineage

Get a column lineage graph

query Parameters
nodeId
required
string
Example: nodeId=dataset:food_delivery:public.delivery_7_days

The ID of the node. A node can either be a dataset node, a dataset field node or a job node. The format of nodeId for dataset is dataset:<namespace_of_dataset>:<name_of_the_dataset>, for dataset field is datasetField:<namespace_of_dataset>:<name_of_the_dataset>:<name_of_field>, and for job is job:<namespace_of_the_job>:<name_of_the_job>.

depth
integer
Default: 20

Depth of lineage graph to create.

withDownstream
boolean
Default: false

Determines if downstream lineage should be returned.

Responses

Response samples

Content type
application/json
{
  • "graph": [
    ]
}

Tags

Create a tag

Creates a new tag object.

+
http://localhost:5000/api/v1/column-lineage

Response samples

Content type
application/json
{
  • "graph": [
    ]
}

Tags

Create a tag

Creates a new tag object.

path Parameters
tag
required
string
Example: SENSITIVE

The name of the tag.

Request Body schema: application/json
description
string

The description of the tag.

Responses

Request samples

Content type
application/json
{
  • "description": "My first tag!"
}

Response samples

Content type
application/json
{
  • "tags": [
    ]
}

List all tags

Returns a list of tags.

+
http://localhost:5000/api/v1/tags/{tag}

Request samples

Content type
application/json
{
  • "description": "My first tag!"
}

Response samples

Content type
application/json
{
  • "tags": [
    ]
}

List all tags

Returns a list of tags.

query Parameters
limit
integer
Default: 100
Example: limit=25

The number of results to return from offset.

offset
integer
Default: 0

The initial position from which to return results.

Responses

Response samples

Content type
application/json
{
  • "tags": [
    ]
}

Search

Query all datasets and jobs

Returns one or more datasets and jobs of your query.

+
http://localhost:5000/api/v1/tags

Response samples

Content type
application/json
{
  • "tags": [
    ]
}

Search

Query all datasets and jobs

Returns one or more datasets and jobs of your query.

query Parameters
q
required
string
Example: q=my-dataset

Query containing pattern to match; datasets and jobs pattern matching is string based and case-insensitive. Use percent sign (%) to match any string of zero or more characters (my-job%), or an underscore (_) to match a single character (_job_).

filter
string
Example: filter=dataset

Filters the results of your query by dataset or job.

sort
string
Example: sort=name

Sorts the results of your query by name or updated_at.

@@ -2393,9 +2395,9 @@
after
stringYYYY-MM-DD
Example: after=2022-09-15

Match jobs or datasets after YYYY-MM-DD.

Responses

Response samples

Content type
application/json
{
  • "totalCount": 1,
  • "results": [
    ]
}
+
http://localhost:5000/api/v1/search

Response samples

Content type
application/json
{
  • "totalCount": 1,
  • "results": [
    ]
}