From 5158fc2f461c131f7ace5fcba38bf811193a925b Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Thu, 26 Oct 2023 15:35:20 +0200 Subject: [PATCH] Runless events - consume job event Signed-off-by: Pawel Leszczynski --- .../java/marquez/api/OpenLineageResource.java | 5 + api/src/main/java/marquez/db/JobDao.java | 21 +-- .../main/java/marquez/db/JobFacetsDao.java | 53 +++++- .../main/java/marquez/db/JobVersionDao.java | 106 +++++++++++- .../main/java/marquez/db/OpenLineageDao.java | 160 ++++++++++++++---- .../java/marquez/db/models/ModelDaos.java | 9 + .../marquez/service/OpenLineageService.java | 27 +++ .../db/migration/V66__job_facets_changes.sql | 3 + ...V67__job_facets_fill_job_versions_uuid.sql | 7 + .../marquez/OpenLineageIntegrationTest.java | 35 ++-- .../java/marquez/db/LineageTestUtils.java | 43 ++++- .../java/marquez/db/OpenLineageDaoTest.java | 63 ++++--- .../OpenLineageServiceIntegrationTest.java | 24 +++ 13 files changed, 464 insertions(+), 92 deletions(-) create mode 100644 api/src/main/resources/marquez/db/migration/V66__job_facets_changes.sql create mode 100644 api/src/main/resources/marquez/db/migration/V67__job_facets_fill_job_versions_uuid.sql diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index 253c8c46a7..3b10334bd9 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -40,6 +40,7 @@ import marquez.service.ServiceFactory; import marquez.service.models.BaseEvent; import marquez.service.models.DatasetEvent; +import marquez.service.models.JobEvent; import marquez.service.models.LineageEvent; import marquez.service.models.NodeId; @@ -73,6 +74,10 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon openLineageService .createAsync((DatasetEvent) event) .whenComplete((result, err) -> onComplete(result, err, asyncResponse)); + } else if (event instanceof JobEvent) { + openLineageService + .createAsync((JobEvent) event) + .whenComplete((result, err) -> onComplete(result, err, asyncResponse)); } else { log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName()); diff --git a/api/src/main/java/marquez/db/JobDao.java b/api/src/main/java/marquez/db/JobDao.java index c1a92680df..00ff3a8293 100644 --- a/api/src/main/java/marquez/db/JobDao.java +++ b/api/src/main/java/marquez/db/JobDao.java @@ -56,21 +56,14 @@ SELECT EXISTS ( @SqlQuery( """ - SELECT j.*, f.facets + WITH job_versions_facets AS ( + SELECT job_version_uuid, JSON_AGG(facet) as facets + FROM job_facets + GROUP BY job_version_uuid + ) + SELECT j.*, facets FROM jobs_view j - LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid - LEFT OUTER JOIN ( - SELECT run_uuid, JSON_AGG(e.facet) AS facets - FROM ( - SELECT jf.run_uuid, jf.facet - FROM job_facets_view AS jf - INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=jf.run_uuid - INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid - WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName - ORDER BY lineage_event_time ASC - ) e - GROUP BY e.run_uuid - ) f ON f.run_uuid=jv.latest_run_uuid + LEFT OUTER JOIN job_versions_facets f ON j.current_version_uuid = f.job_version_uuid WHERE j.namespace_name=:namespaceName AND (j.name=:jobName OR :jobName = ANY(j.aliases)) """) Optional findJobByName(String namespaceName, String jobName); diff --git a/api/src/main/java/marquez/db/JobFacetsDao.java b/api/src/main/java/marquez/db/JobFacetsDao.java index 53518681aa..a800a3782f 100644 --- a/api/src/main/java/marquez/db/JobFacetsDao.java +++ b/api/src/main/java/marquez/db/JobFacetsDao.java @@ -11,6 +11,7 @@ import java.util.Spliterators; import java.util.UUID; import java.util.stream.StreamSupport; +import javax.annotation.Nullable; import lombok.NonNull; import marquez.common.Utils; import marquez.db.mappers.JobFacetsMapper; @@ -55,6 +56,32 @@ void insertJobFacet( String name, PGobject facet); + @SqlUpdate( + """ + INSERT INTO job_facets ( + created_at, + job_uuid, + job_version_uuid, + lineage_event_time, + name, + facet + ) VALUES ( + :createdAt, + :jobUuid, + :jobVersionUuid, + :lineageEventTime, + :name, + :facet + ) + """) + void insertJobFacet( + Instant createdAt, + UUID jobUuid, + UUID jobVersionUuid, + Instant lineageEventTime, + String name, + PGobject facet); + @SqlQuery( """ SELECT @@ -72,9 +99,31 @@ void insertJobFacet( @Transaction default void insertJobFacetsFor( @NonNull UUID jobUuid, - @NonNull UUID runUuid, + @NonNull UUID jobVersionUuid, + @NonNull Instant lineageEventTime, + @NonNull LineageEvent.JobFacet jobFacet) { + final Instant now = Instant.now(); + + JsonNode jsonNode = Utils.getMapper().valueToTree(jobFacet); + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false) + .forEach( + fieldName -> + insertJobFacet( + now, + jobUuid, + jobVersionUuid, + lineageEventTime, + fieldName, + FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName)))); + } + + @Transaction + default void insertJobFacetsFor( + @NonNull UUID jobUuid, + @Nullable UUID runUuid, @NonNull Instant lineageEventTime, - @NonNull String lineageEventType, + @Nullable String lineageEventType, @NonNull LineageEvent.JobFacet jobFacet) { final Instant now = Instant.now(); diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index 597414d2e8..3779d9d2c9 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -12,6 +12,7 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; import lombok.NonNull; import lombok.Value; import marquez.api.models.JobVersion; @@ -24,11 +25,13 @@ import marquez.common.models.Version; import marquez.db.mappers.ExtendedJobVersionRowMapper; import marquez.db.mappers.JobVersionMapper; +import marquez.db.models.DatasetVersionRow; import marquez.db.models.ExtendedDatasetVersionRow; import marquez.db.models.ExtendedJobVersionRow; import marquez.db.models.JobRow; import marquez.db.models.JobVersionRow; import marquez.db.models.NamespaceRow; +import marquez.db.models.UpdateLineageRow.DatasetRecord; import marquez.service.models.Run; import org.jdbi.v3.sqlobject.config.RegisterRowMapper; import org.jdbi.v3.sqlobject.statement.SqlQuery; @@ -283,6 +286,82 @@ default List findOutputDatasetsFor(UUID jobVersionUuid) { @SqlQuery("SELECT COUNT(*) FROM job_versions") int count(); + /** + * Links facets of the given run to + * + * @param runUuid + * @param jobVersionUuid + */ + @SqlUpdate( + """ + UPDATE job_facets + SET job_version_uuid = :jobVersionUuid + WHERE run_uuid = :runUuid + """) + void linkJobFacetsToJobVersion(UUID runUuid, UUID jobVersionUuid); + + /** + * Used to upsert an immutable {@link JobVersionRow}. A {@link Version} is generated using {@link + * Utils#newJobVersionFor(NamespaceName, JobName, ImmutableSet, ImmutableSet, String)} based on + * the jobs inputs and inputs, source code location, and context. + * + * @param jobRow The job. + * @return A {@link BagOfJobVersionInfo} object. + */ + default BagOfJobVersionInfo upsertJobVersion( + @NonNull JobRow jobRow, List inputs, List outputs) { + // Get the job. + final JobDao jobDao = createJobDao(); + + // Get the namespace for the job. + final NamespaceRow namespaceRow = + createNamespaceDao().findNamespaceByName(jobRow.getNamespaceName()).get(); + + // Generate the version for the job; the version may already exist. + final Version jobVersion = + Utils.newJobVersionFor( + NamespaceName.of(jobRow.getNamespaceName()), + JobName.of( + Optional.ofNullable(jobRow.getParentJobName()) + .map(pn -> pn + "." + jobRow.getSimpleName()) + .orElse(jobRow.getName())), + toDatasetIds( + inputs.stream().map(i -> i.getDatasetVersionRow()).collect(Collectors.toList())), + toDatasetIds( + outputs.stream().map(i -> i.getDatasetVersionRow()).collect(Collectors.toList())), + jobRow.getLocation()); + + // Add the job version. + final JobVersionDao jobVersionDao = createJobVersionDao(); + final JobVersionRow jobVersionRow = + jobVersionDao.upsertJobVersion( + UUID.randomUUID(), + jobRow.getCreatedAt(), + jobRow.getUuid(), + jobRow.getLocation(), + jobVersion.getValue(), + jobRow.getName(), + namespaceRow.getUuid(), + jobRow.getNamespaceName()); + + // Link the input datasets to the job version. + inputs.forEach( + i -> { + jobVersionDao.upsertInputDatasetFor( + jobVersionRow.getUuid(), i.getDatasetVersionRow().getDatasetUuid()); + }); + + // Link the output datasets to the job version. + outputs.forEach( + o -> { + jobVersionDao.upsertOutputDatasetFor( + jobVersionRow.getUuid(), o.getDatasetVersionRow().getDatasetUuid()); + }); + + jobDao.updateVersionFor(jobRow.getUuid(), jobRow.getCreatedAt(), jobVersionRow.getUuid()); + return new BagOfJobVersionInfo(jobRow, jobVersionRow, null, null); // TODO: extendedJobVersion can be present here & can be used within tests + } + /** * Used to upsert an immutable {@link JobVersionRow} object when a {@link Run} has transitioned. A * {@link Version} is generated using {@link Utils#newJobVersionFor(NamespaceName, JobName, @@ -323,8 +402,14 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( Optional.ofNullable(jobRow.getParentJobName()) .map(pn -> pn + "." + jobRow.getSimpleName()) .orElse(jobRow.getName())), - toDatasetIds(jobVersionInputs), - toDatasetIds(jobVersionOutputs), + toDatasetIds( + jobVersionInputs.stream() + .map(i -> (DatasetVersionRow) i) + .collect(Collectors.toList())), + toDatasetIds( + jobVersionOutputs.stream() + .map(o -> (DatasetVersionRow) o) + .collect(Collectors.toList())), jobRow.getLocation()); // Add the job version. @@ -360,6 +445,9 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( // Link the run to the job version; multiple run instances may be linked to a job version. jobVersionDao.updateLatestRunFor(jobVersionRow.getUuid(), transitionedAt, runUuid); + // Link the job facets to this job version + jobVersionDao.linkJobFacetsToJobVersion(runUuid, jobVersionRow.getUuid()); + // Link the job version to the job only if the run is marked done and has transitioned into one // of the following states: COMPLETED, ABORTED, or FAILED. if (runState.isDone()) { @@ -371,17 +459,19 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( /** Returns the specified {@link ExtendedDatasetVersionRow}s as {@link DatasetId}s. */ default ImmutableSortedSet toDatasetIds( - @NonNull final List datasetVersionRows) { + @NonNull final List datasetVersionRows) { final ImmutableSortedSet.Builder datasetIds = ImmutableSortedSet.naturalOrder(); - for (final ExtendedDatasetVersionRow datasetVersionRow : datasetVersionRows) { - datasetIds.add( - new DatasetId( - NamespaceName.of(datasetVersionRow.getNamespaceName()), - DatasetName.of(datasetVersionRow.getDatasetName()))); + for (final DatasetVersionRow datasetVersionRow : datasetVersionRows) { + datasetIds.add(toDatasetId(datasetVersionRow)); } return datasetIds.build(); } + private DatasetId toDatasetId(DatasetVersionRow dataset) { + return new DatasetId( + NamespaceName.of(dataset.getNamespaceName()), DatasetName.of(dataset.getDatasetName())); + } + /** A container class for job version info. */ @Value class BagOfJobVersionInfo { diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index d3a2d7f56b..a0e76c1f20 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -53,6 +53,7 @@ import marquez.db.models.UpdateLineageRow.DatasetRecord; import marquez.service.models.BaseEvent; import marquez.service.models.DatasetEvent; +import marquez.service.models.JobEvent; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.DatasetFacets; @@ -107,6 +108,17 @@ void createLineageEvent( + "VALUES (?, ?, ?)") void createDatasetEvent(Instant eventTime, PGobject event, String producer); + @SqlUpdate( + "INSERT INTO lineage_events (" + + "event_time, " + + "job_name, " + + "job_namespace, " + + "event, " + + "producer) " + + "VALUES (?, ?, ?, ?, ?)") + void createJobEvent( + Instant eventTime, String jobName, String jobNamespace, PGobject event, String producer); + @SqlQuery("SELECT event FROM lineage_events WHERE run_uuid = :runUuid") List findLineageEventsByRunUuid(UUID runUuid); @@ -180,6 +192,81 @@ default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper map return bag; } + default UpdateLineageRow updateMarquezModel(JobEvent event, ObjectMapper mapper) { + daos.initBaseDao(this); + Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); + + UpdateLineageRow bag = new UpdateLineageRow(); + NamespaceRow namespace = + daos.getNamespaceDao() + .upsertNamespaceRow( + UUID.randomUUID(), + now, + formatNamespaceName(event.getJob().getNamespace()), + DEFAULT_NAMESPACE_OWNER); + bag.setNamespace(namespace); + + JobRow job = + buildJobFromEvent( + event.getJob(), + event.getEventTime(), + null, + Collections.emptyList(), + mapper, + daos.getJobDao(), + now, + namespace, + null, + null, + Optional.empty()); + + bag.setJob(job); + + // RunInput list uses null as a sentinel value + List datasetInputs = new ArrayList<>(); + if (event.getInputs() != null) { + for (Dataset dataset : event.getInputs()) { + DatasetRecord record = upsertLineageDataset(dataset, now, null, true); + datasetInputs.add(record); + insertDatasetFacets(dataset, record, null, null, now); + insertInputDatasetFacets(dataset, record, null, null, now); + } + } + bag.setInputs(Optional.ofNullable(datasetInputs)); + + // RunInput list uses null as a sentinel value + List datasetOutputs = new ArrayList<>(); + if (event.getOutputs() != null) { + for (Dataset dataset : event.getOutputs()) { + DatasetRecord record = upsertLineageDataset(dataset, now, null, false); + datasetOutputs.add(record); + insertDatasetFacets(dataset, record, null, null, now); + insertOutputDatasetFacets(dataset, record, null, null, now); + } + } + + bag.setOutputs(Optional.ofNullable(datasetOutputs)); + + // write job versions row and link job facets to job version + BagOfJobVersionInfo bagOfJobVersionInfo = + daos.getJobVersionDao().upsertJobVersion(job, datasetInputs, datasetOutputs); + + // save job facets - need to be saved after job version is upserted + // Add ... + Optional.ofNullable(event.getJob().getFacets()) + .ifPresent( + jobFacet -> + daos.getJobFacetsDao() + .insertJobFacetsFor( + job.getUuid(), + bagOfJobVersionInfo.getJobVersionRow().getUuid(), + now, + event.getJob().getFacets())); + + bag.setJobVersionBag(bagOfJobVersionInfo); + return bag; + } + default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) { daos.initBaseDao(this); Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); @@ -203,7 +290,10 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper JobRow job = buildJobFromEvent( - event, + event.getJob(), + event.getEventTime(), + event.getEventType(), + event.getInputs(), mapper, daos.getJobDao(), now, @@ -256,7 +346,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper } } - insertJobFacets(event, job.getUuid(), runUuid, now); + insertJobFacets(event.getJob(), event.getEventType(), job.getUuid(), runUuid, now); // RunInput list uses null as a sentinel value List datasetInputs = null; @@ -313,14 +403,13 @@ private void insertRunFacets(LineageEvent event, UUID runUuid, Instant now) { runUuid, now, event.getEventType(), event.getRun().getFacets())); } - private void insertJobFacets(LineageEvent event, UUID jobUuid, UUID runUuid, Instant now) { + private void insertJobFacets(Job job, String eventType, UUID jobUuid, UUID runUuid, Instant now) { // Add ... - Optional.ofNullable(event.getJob().getFacets()) + Optional.ofNullable(job.getFacets()) .ifPresent( jobFacet -> daos.getJobFacetsDao() - .insertJobFacetsFor( - jobUuid, runUuid, now, event.getEventType(), event.getJob().getFacets())); + .insertJobFacetsFor(jobUuid, runUuid, now, eventType, job.getFacets())); } private void insertDatasetFacets( @@ -372,7 +461,10 @@ private void insertOutputDatasetFacets( } private JobRow buildJobFromEvent( - LineageEvent event, + Job job, + ZonedDateTime eventTime, + String eventType, + List inputs, ObjectMapper mapper, JobDao jobDao, Instant now, @@ -382,13 +474,13 @@ private JobRow buildJobFromEvent( Optional parentRun) { Logger log = LoggerFactory.getLogger(OpenLineageDao.class); String description = - Optional.ofNullable(event.getJob().getFacets()) + Optional.ofNullable(job.getFacets()) .map(JobFacet::getDocumentation) .map(DocumentationJobFacet::getDescription) .orElse(null); String location = - Optional.ofNullable(event.getJob().getFacets()) + Optional.ofNullable(job.getFacets()) .flatMap(f -> Optional.ofNullable(f.getSourceCodeLocation())) .flatMap(s -> Optional.ofNullable(s.getUrl())) .orElse(null); @@ -398,7 +490,9 @@ private JobRow buildJobFromEvent( parentUuid.map( uuid -> findParentJobRow( - event, + job, + eventTime, + eventType, namespace, location, nominalStartTime, @@ -412,17 +506,17 @@ private JobRow buildJobFromEvent( parentJob .map( p -> { - if (event.getJob().getName().startsWith(p.getName() + '.')) { - return event.getJob().getName().substring(p.getName().length() + 1); + if (job.getName().startsWith(p.getName() + '.')) { + return job.getName().substring(p.getName().length() + 1); } else { - return event.getJob().getName(); + return job.getName(); } }) - .orElse(event.getJob().getName()); + .orElse(job.getName()); log.debug( "Calculated job name {} from job {} with parent {}", jobName, - event.getJob().getName(), + job.getName(), parentJob.map(JobRow::getName)); return parentJob .map( @@ -430,7 +524,7 @@ private JobRow buildJobFromEvent( jobDao.upsertJob( UUID.randomUUID(), parent.getUuid(), - getJobType(event.getJob()), + getJobType(job), now, namespace.getUuid(), namespace.getName(), @@ -438,12 +532,12 @@ private JobRow buildJobFromEvent( description, location, null, - jobDao.toJson(toDatasetId(event.getInputs()), mapper))) + jobDao.toJson(toDatasetId(inputs), mapper))) .orElseGet( () -> jobDao.upsertJob( UUID.randomUUID(), - getJobType(event.getJob()), + getJobType(job), now, namespace.getUuid(), namespace.getName(), @@ -451,11 +545,13 @@ private JobRow buildJobFromEvent( description, location, null, - jobDao.toJson(toDatasetId(event.getInputs()), mapper))); + jobDao.toJson(toDatasetId(inputs), mapper))); } private JobRow findParentJobRow( - LineageEvent event, + Job job, + ZonedDateTime eventTime, + String eventType, NamespaceRow namespace, String location, Instant nominalStartTime, @@ -474,7 +570,7 @@ private JobRow findParentJobRow( .map( j -> { String parentJobName = - facet.getJob().getName().equals(event.getJob().getName()) + facet.getJob().getName().equals(job.getName()) ? Utils.parseParentJobName(facet.getJob().getName()) : facet.getJob().getName(); if (j.getNamespaceName().equals(facet.getJob().getNamespace()) @@ -496,7 +592,9 @@ private JobRow findParentJobRow( facet.getJob().getName(), parentRunUuid); return createParentJobRunRecord( - event, + job, + eventTime, + eventType, namespace, location, nominalStartTime, @@ -509,7 +607,9 @@ private JobRow findParentJobRow( .orElseGet( () -> createParentJobRunRecord( - event, + job, + eventTime, + eventType, namespace, location, nominalStartTime, @@ -525,7 +625,9 @@ private JobRow findParentJobRow( } private JobRow createParentJobRunRecord( - LineageEvent event, + Job job, + ZonedDateTime eventTime, + String eventType, NamespaceRow namespace, String location, Instant nominalStartTime, @@ -533,17 +635,17 @@ private JobRow createParentJobRunRecord( UUID uuid, ParentRunFacet facet, PGobject inputs) { - Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); + Instant now = eventTime.withZoneSameInstant(ZoneId.of("UTC")).toInstant(); Logger log = LoggerFactory.getLogger(OpenLineageDao.class); String parentJobName = - facet.getJob().getName().equals(event.getJob().getName()) + facet.getJob().getName().equals(job.getName()) ? Utils.parseParentJobName(facet.getJob().getName()) : facet.getJob().getName(); JobRow newParentJobRow = createJobDao() .upsertJob( UUID.randomUUID(), - getJobType(event.getJob()), + getJobType(job), now, namespace.getUuid(), namespace.getName(), @@ -558,7 +660,7 @@ private JobRow createParentJobRunRecord( createRunArgsDao() .upsertRunArgs(UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of())); - Optional runState = Optional.ofNullable(event.getEventType()).map(this::getRunState); + Optional runState = Optional.ofNullable(eventType).map(this::getRunState); RunDao runDao = createRunDao(); RunRow newRow = runDao.upsert( @@ -786,7 +888,7 @@ default DatasetRecord upsertLineageDataset( } daos.getDatasetFieldDao().updateFieldMapping(datasetFieldMappings); - if (isInput) { + if (isInput && runUuid != null) { daos.getRunDao().updateInputMapping(runUuid, datasetVersionRow.getUuid()); // TODO - this is a short term fix until diff --git a/api/src/main/java/marquez/db/models/ModelDaos.java b/api/src/main/java/marquez/db/models/ModelDaos.java index 1e2f2f8f85..54b2e1da84 100644 --- a/api/src/main/java/marquez/db/models/ModelDaos.java +++ b/api/src/main/java/marquez/db/models/ModelDaos.java @@ -14,6 +14,7 @@ import marquez.db.DatasetVersionDao; import marquez.db.JobDao; import marquez.db.JobFacetsDao; +import marquez.db.JobVersionDao; import marquez.db.NamespaceDao; import marquez.db.RunArgsDao; import marquez.db.RunDao; @@ -37,6 +38,7 @@ public final class ModelDaos { private static ColumnLineageDao columnLineageDao = null; private static JobDao jobDao = null; private static JobFacetsDao jobFacetsDao = null; + private static JobVersionDao jobVersionDao = null; private static RunArgsDao runArgsDao = null; private static RunStateDao runStateDao = null; private static RunFacetsDao runFacetsDao = null; @@ -143,4 +145,11 @@ public RunFacetsDao getRunFacetsDao() { } return runFacetsDao; } + + public JobVersionDao getJobVersionDao() { + if (jobVersionDao == null) { + jobVersionDao = baseDao.createJobVersionDao(); + } + return jobVersionDao; + } } diff --git a/api/src/main/java/marquez/service/OpenLineageService.java b/api/src/main/java/marquez/service/OpenLineageService.java index ebeb0ff10c..5c88ebe3ff 100644 --- a/api/src/main/java/marquez/service/OpenLineageService.java +++ b/api/src/main/java/marquez/service/OpenLineageService.java @@ -46,6 +46,7 @@ import marquez.service.RunTransitionListener.RunOutput; import marquez.service.RunTransitionListener.RunTransition; import marquez.service.models.DatasetEvent; +import marquez.service.models.JobEvent; import marquez.service.models.LineageEvent; import marquez.service.models.RunMeta; @@ -92,6 +93,32 @@ public CompletableFuture createAsync(DatasetEvent event) { return CompletableFuture.allOf(marquez, openLineage); } + public CompletableFuture createAsync(JobEvent event) { + CompletableFuture openLineage = + CompletableFuture.runAsync( + withSentry( + withMdc( + () -> + createJobEvent( + event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(), + event.getJob().getName(), + event.getJob().getNamespace(), + createJsonArray(event, mapper), + event.getProducer()))), + executor); + + CompletableFuture marquez = + CompletableFuture.runAsync( + withSentry( + withMdc( + () -> { + updateMarquezModel(event, mapper); + })), + executor); + + return CompletableFuture.allOf(marquez, openLineage); + } + public CompletableFuture createAsync(LineageEvent event) { UUID runUuid = runUuidFromEvent(event.getRun()); CompletableFuture openLineage = diff --git a/api/src/main/resources/marquez/db/migration/V66__job_facets_changes.sql b/api/src/main/resources/marquez/db/migration/V66__job_facets_changes.sql new file mode 100644 index 0000000000..e7e21d3776 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V66__job_facets_changes.sql @@ -0,0 +1,3 @@ +ALTER TABLE job_facets ALTER COLUMN lineage_event_type DROP NOT NULL; +ALTER TABLE job_facets DROP CONSTRAINT job_facets_run_uuid_fkey; +ALTER TABLE job_facets ADD COLUMN job_version_uuid uuid REFERENCES job_versions (uuid); \ No newline at end of file diff --git a/api/src/main/resources/marquez/db/migration/V67__job_facets_fill_job_versions_uuid.sql b/api/src/main/resources/marquez/db/migration/V67__job_facets_fill_job_versions_uuid.sql new file mode 100644 index 0000000000..e96f59e90a --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V67__job_facets_fill_job_versions_uuid.sql @@ -0,0 +1,7 @@ +UPDATE job_facets +SET job_version_uuid = job_versions.uuid +FROM job_versions +INNER JOIN runs r ON r.uuid=job_versions.latest_run_uuid +WHERE job_facets.run_uuid = r.uuid + +-- TODO: write some test for this migration \ No newline at end of file diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index fec39865c7..84b1711257 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -55,7 +55,6 @@ import marquez.client.models.Run; import marquez.common.Utils; import marquez.db.LineageTestUtils; -import marquez.service.models.JobEvent; import org.assertj.core.api.InstanceOfAssertFactories; import org.jdbi.v3.core.Jdbi; import org.jetbrains.annotations.NotNull; @@ -1421,11 +1420,13 @@ public void testSendDatasetEvent() throws IOException { } @Test - public void testSendJobEventIsDecoded() throws IOException { + public void testSendJobEvent() throws IOException { final String openLineageEventAsString = Resources.toString(Resources.getResource(EVENT_JOB_EVENT), Charset.defaultCharset()); + final JsonNode openLineageEventAsJson = + Utils.fromJson(openLineageEventAsString, new TypeReference() {}); - // (2) Send OpenLineage event. + // (1) Send OpenLineage event. final CompletableFuture> resp = this.sendLineage(openLineageEventAsString) .thenApply(r -> Collections.singletonMap(r.statusCode(), r.body())) @@ -1438,21 +1439,25 @@ public void testSendJobEventIsDecoded() throws IOException { // Ensure the event was received. Map respMap = resp.join(); + assertThat(respMap.containsKey(201)).isTrue(); - assertThat(respMap.containsKey(200)).isTrue(); // Status should be 200 instead of 201 - - // (3) Convert the OpenLineage event to Json. - JobEvent jobEvent = - marquez.client.Utils.fromJson(respMap.get(200), new TypeReference() {}); - assertThat(jobEvent.getJob().getNamespace()).isEqualTo("my-scheduler-namespace"); - assertThat(jobEvent.getJob().getName()).isEqualTo("myjob"); + // (2) Verify the job facets associated with the OpenLineage event. + final JsonNode jobAsJson = openLineageEventAsJson.path("job"); + final String jobNamespace = jobAsJson.path("namespace").asText(); + final String jobName = jobAsJson.path("name").asText(); + final JsonNode jobFacetsAsJson = jobAsJson.path("facets"); - assertThat(jobEvent.getInputs().get(0).getNamespace()).isEqualTo("my-datasource-namespace"); - assertThat(jobEvent.getInputs().get(0).getName()).isEqualTo("instance.schema.input-1"); + final Job job = client.getJob(jobNamespace, jobName); + LoggerFactory.getLogger(getClass()).info("Got job from server {}", job); + if (!jobFacetsAsJson.isMissingNode()) { + final JsonNode facetsForRunAsJson = + Utils.getMapper().convertValue(job.getFacets(), JsonNode.class); + assertThat(facetsForRunAsJson).isEqualTo(jobFacetsAsJson); + } else { + assertThat(job.getFacets()).isEmpty(); + } - assertThat(jobEvent.getOutputs().get(0).getNamespace()).isEqualTo("my-datasource-namespace"); - assertThat(jobEvent.getOutputs().get(0).getName()).isEqualTo("instance.schema.output-1"); - assertThat(jobEvent.getEventTime().toString()).startsWith("2020-12-28T09:52:00.001"); + // TODO: test dataset input /output + its facets } private void validateDatasetFacets(JsonNode json) { diff --git a/api/src/test/java/marquez/db/LineageTestUtils.java b/api/src/test/java/marquez/db/LineageTestUtils.java index c45a1a5bca..22c20997e9 100644 --- a/api/src/test/java/marquez/db/LineageTestUtils.java +++ b/api/src/test/java/marquez/db/LineageTestUtils.java @@ -25,6 +25,7 @@ import marquez.db.models.UpdateLineageRow; import marquez.db.models.UpdateLineageRow.DatasetRecord; import marquez.service.models.DatasetEvent; +import marquez.service.models.JobEvent; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.DatasetFacets; @@ -175,14 +176,13 @@ public static UpdateLineageRow createLineageRow( } /** - * Create an {@link UpdateLineageRow} from the input job details and datasets. + * Create an {@link UpdateLineageRow} from dataset. * * @param dao * @param dataset * @return */ public static UpdateLineageRow createLineageRow(OpenLineageDao dao, Dataset dataset) { - DatasetEvent event = DatasetEvent.builder() .eventTime(Instant.now().atZone(LOCAL_ZONE)) @@ -212,6 +212,45 @@ public static UpdateLineageRow createLineageRow(OpenLineageDao dao, Dataset data return updateLineageRow; } + /** + * Create an {@link UpdateLineageRow} from dataset. + * + * @param dao + * @param job + * @return + */ + public static UpdateLineageRow createLineageRow(OpenLineageDao dao, Job job) { + JobEvent event = + JobEvent.builder() + .eventTime(Instant.now().atZone(LOCAL_ZONE)) + .job(job) + .producer(PRODUCER_URL.toString()) + .build(); + + // emulate an OpenLineage JobEvent + event + .getProperties() + .put( + "_schemaURL", + "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/RunEvent"); + UpdateLineageRow updateLineageRow = dao.updateMarquezModel(event, Utils.getMapper()); + PGobject jsonObject = new PGobject(); + jsonObject.setType("json"); + try { + jsonObject.setValue(Utils.toJson(event)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + dao.createJobEvent( + event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(), + event.getJob().getName(), + event.getJob().getNamespace(), + jsonObject, + event.getProducer()); + + return updateLineageRow; + } + public static DatasetFacets newDatasetFacet(SchemaField... fields) { return newDatasetFacet(EMPTY_MAP, fields); } diff --git a/api/src/test/java/marquez/db/OpenLineageDaoTest.java b/api/src/test/java/marquez/db/OpenLineageDaoTest.java index 2648d253e9..c67fda3185 100644 --- a/api/src/test/java/marquez/db/OpenLineageDaoTest.java +++ b/api/src/test/java/marquez/db/OpenLineageDaoTest.java @@ -5,6 +5,7 @@ package marquez.db; +import static marquez.db.LineageTestUtils.NAMESPACE; import static marquez.db.LineageTestUtils.PRODUCER_URL; import static marquez.db.LineageTestUtils.SCHEMA_URL; import static org.assertj.core.api.Assertions.assertThat; @@ -23,6 +24,8 @@ import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.DatasetFacets; +import marquez.service.models.LineageEvent.DocumentationJobFacet; +import marquez.service.models.LineageEvent.Job; import marquez.service.models.LineageEvent.JobFacet; import marquez.service.models.LineageEvent.SchemaDatasetFacet; import marquez.service.models.LineageEvent.SchemaField; @@ -78,7 +81,7 @@ void testUpdateMarquezModel() { "COMPLETE", jobFacet, Arrays.asList(), - Arrays.asList(new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets))); + Arrays.asList(new Dataset(NAMESPACE, DATASET_NAME, datasetFacets))); UpdateLineageRow readJob = LineageTestUtils.createLineageRow( @@ -86,7 +89,7 @@ void testUpdateMarquezModel() { READ_JOB_NAME, "COMPLETE", jobFacet, - Arrays.asList(new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets)), + Arrays.asList(new Dataset(NAMESPACE, DATASET_NAME, datasetFacets)), Arrays.asList()); assertThat(writeJob.getJob().getLocation()).isNull(); @@ -99,24 +102,42 @@ void testUpdateMarquezModel() { @Test void testUpdateMarquezModelWithDatasetEvent() { UpdateLineageRow datasetEventRow = - LineageTestUtils.createLineageRow( - dao, new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets)); + LineageTestUtils.createLineageRow(dao, new Dataset(NAMESPACE, DATASET_NAME, datasetFacets)); assertThat(datasetEventRow.getOutputs()).isPresent(); assertThat(datasetEventRow.getOutputs().get()).hasSize(1).first(); assertThat(datasetEventRow.getOutputs().get().get(0).getDatasetRow()) .hasFieldOrPropertyWithValue("name", DATASET_NAME) - .hasFieldOrPropertyWithValue("namespaceName", LineageTestUtils.NAMESPACE); + .hasFieldOrPropertyWithValue("namespaceName", NAMESPACE); assertThat(datasetEventRow.getOutputs().get().get(0).getDatasetVersionRow()) .hasNoNullFieldsOrPropertiesExcept("runUuid"); } + @Test + void testUpdateMarquezModelWithJobEvent() { + JobFacet jobFacet = + new JobFacet( + DocumentationJobFacet.builder().description("documentation").build(), + null, + null, + LineageTestUtils.EMPTY_MAP); + + Job job = new Job(NAMESPACE, READ_JOB_NAME, jobFacet); + + UpdateLineageRow jobEventRow = LineageTestUtils.createLineageRow(dao, job); + + assertThat(jobEventRow.getJob().getNamespaceName()).isEqualTo(NAMESPACE); + assertThat(jobEventRow.getJob().getName()).isEqualTo(READ_JOB_NAME); + assertThat(jobEventRow.getJob().getDescription().get()).isEqualTo("documentation"); + assertThat(jobEventRow.getJob().getLocation()).isNull(); + } + @Test void testUpdateMarquezModelLifecycleStateChangeFacet() { Dataset dataset = new Dataset( - LineageTestUtils.NAMESPACE, + NAMESPACE, DATASET_NAME, LineageEvent.DatasetFacets.builder() .lifecycleStateChange( @@ -206,7 +227,7 @@ void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenInputFieldDoesNotExi void testUpdateMarquezModelDatasetWithColumnLineageFacetWhenOutputFieldDoesNotExist() { Dataset outputDatasetWithoutOutputFieldSchema = new Dataset( - LineageTestUtils.NAMESPACE, + NAMESPACE, DATASET_NAME, LineageEvent.DatasetFacets.builder() // schema is missing .columnLineage( @@ -252,7 +273,7 @@ void testUpsertColumnLineageData() { Dataset updateDataset = new Dataset( - LineageTestUtils.NAMESPACE, + NAMESPACE, DATASET_NAME, LineageEvent.DatasetFacets.builder() .schema( @@ -314,7 +335,7 @@ void testUpsertColumnLineageData() { void testUpdateMarquezModelDatasetWithSymlinks() { Dataset dataset = new Dataset( - LineageTestUtils.NAMESPACE, + NAMESPACE, DATASET_NAME, LineageEvent.DatasetFacets.builder() .symlinks( @@ -373,7 +394,7 @@ void testUpdateMarquezModelWithInputOnlyDataset() { WRITE_JOB_NAME, "RUNNING", jobFacet, - Arrays.asList(new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets)), + Arrays.asList(new Dataset(NAMESPACE, DATASET_NAME, datasetFacets)), Arrays.asList()); assertThat(writeJob.getInputs()) @@ -398,7 +419,7 @@ void testUpdateMarquezModelWithNonMatchingReadSchema() { "COMPLETE", jobFacet, Arrays.asList(), - Arrays.asList(new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets))); + Arrays.asList(new Dataset(NAMESPACE, DATASET_NAME, datasetFacets))); DatasetFacets overrideFacet = new DatasetFacets( @@ -422,7 +443,7 @@ void testUpdateMarquezModelWithNonMatchingReadSchema() { READ_JOB_NAME, "COMPLETE", jobFacet, - Arrays.asList(new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, overrideFacet)), + Arrays.asList(new Dataset(NAMESPACE, DATASET_NAME, overrideFacet)), Arrays.asList()); assertThat(writeJob.getOutputs()).isPresent().get().asList().size().isEqualTo(1); @@ -445,14 +466,14 @@ void testUpdateMarquezModelWithPriorWrites() { "COMPLETE", jobFacet, Arrays.asList(), - Arrays.asList(new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets))); + Arrays.asList(new Dataset(NAMESPACE, DATASET_NAME, datasetFacets))); UpdateLineageRow readJob1 = LineageTestUtils.createLineageRow( dao, READ_JOB_NAME, "COMPLETE", jobFacet, - Arrays.asList(new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets)), + Arrays.asList(new Dataset(NAMESPACE, DATASET_NAME, datasetFacets)), Arrays.asList()); UpdateLineageRow writeJob2 = @@ -462,7 +483,7 @@ void testUpdateMarquezModelWithPriorWrites() { "COMPLETE", jobFacet, Arrays.asList(), - Arrays.asList(new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets))); + Arrays.asList(new Dataset(NAMESPACE, DATASET_NAME, datasetFacets))); UpdateLineageRow writeJob3 = LineageTestUtils.createLineageRow( dao, @@ -470,7 +491,7 @@ void testUpdateMarquezModelWithPriorWrites() { "COMPLETE", jobFacet, Arrays.asList(), - Arrays.asList(new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets))); + Arrays.asList(new Dataset(NAMESPACE, DATASET_NAME, datasetFacets))); UpdateLineageRow readJob2 = LineageTestUtils.createLineageRow( @@ -478,7 +499,7 @@ void testUpdateMarquezModelWithPriorWrites() { READ_JOB_NAME, "COMPLETE", jobFacet, - Arrays.asList(new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets)), + Arrays.asList(new Dataset(NAMESPACE, DATASET_NAME, datasetFacets)), Arrays.asList()); // verify readJob1 read the version written by writeJob1 @@ -518,7 +539,7 @@ void testGetOpenLineageEvents() { "COMPLETE", jobFacet, Arrays.asList(), - Arrays.asList(new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets))); + Arrays.asList(new Dataset(NAMESPACE, DATASET_NAME, datasetFacets))); List lineageEvents = dao.findLineageEventsByRunUuid(writeJob.getRun().getUuid()); assertThat(lineageEvents).hasSize(1); @@ -526,9 +547,7 @@ void testGetOpenLineageEvents() { assertThat(lineageEvents.get(0).getEventType()).isEqualTo("COMPLETE"); LineageEvent.Job job = lineageEvents.get(0).getJob(); - assertThat(job) - .extracting("namespace", "name") - .contains(LineageTestUtils.NAMESPACE, WRITE_JOB_NAME); + assertThat(job).extracting("namespace", "name").contains(NAMESPACE, WRITE_JOB_NAME); } @Test @@ -609,7 +628,7 @@ private Dataset getInputDataset() { private Dataset getOutputDatasetWithColumnLineage() { return new Dataset( - LineageTestUtils.NAMESPACE, + NAMESPACE, DATASET_NAME, LineageEvent.DatasetFacets.builder() .schema( diff --git a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java index e1f9cc42ce..60b41d3342 100644 --- a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java +++ b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java @@ -52,10 +52,13 @@ import marquez.service.models.Dataset; import marquez.service.models.DatasetEvent; import marquez.service.models.Job; +import marquez.service.models.JobEvent; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.DatasetFacets; import marquez.service.models.LineageEvent.DatasourceDatasetFacet; +import marquez.service.models.LineageEvent.JobFacet; import marquez.service.models.LineageEvent.RunFacet; +import marquez.service.models.LineageEvent.SQLJobFacet; import marquez.service.models.LineageEvent.SchemaDatasetFacet; import marquez.service.models.LineageEvent.SchemaField; import marquez.service.models.Run; @@ -482,6 +485,27 @@ void testDatasetEvent() throws ExecutionException, InterruptedException { .hasFieldOrPropertyWithValue("type", "STRING"); } + @Test + void testJobEvent() throws ExecutionException, InterruptedException { + String query = "select * from table"; + LineageEvent.Job job = + LineageEvent.Job.builder() + .name(JOB_NAME) + .namespace(NAMESPACE) + .facets(JobFacet.builder().sql(SQLJobFacet.builder().query(query).build()).build()) + .build(); + + lineageService + .createAsync(JobEvent.builder().eventTime(Instant.now().atZone(TIMEZONE)).job(job).build()) + .get(); + + // TODO: add input / output dataset to a job & test their facets + + Optional jobByName = jobDao.findJobByName(NAMESPACE, JOB_NAME); + assertThat(jobByName).isPresent().map(Job::getCurrentVersion).isPresent(); + assertThat(jobByName.get().getFacets().get("sql").toString()).contains(query); + } + private void checkExists(LineageEvent.Dataset ds) { DatasetService datasetService = new DatasetService(openLineageDao, runService);