diff --git a/CHANGELOG.md b/CHANGELOG.md index b98d53ea3a..3fe533c057 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,10 @@ * Web: fix Unix epoch time display for null `endedAt` values [`#2647`](https://github.com/MarquezProject/marquez/pull/2647) [@merobi-hub](https://github.com/merobi-hub) *Fixes the issue of the GUI displaying Unix epoch time (midnight on January 1, 1970) in the case of running jobs/null `endedAt` values.* +### Added +* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski) + *Save into Marquez model datasets sent via `DatasetEvent` event type + ## [0.41.0](https://github.com/MarquezProject/marquez/compare/0.40.0...0.41.0) - 2023-09-20 ### Added * API: add support for the following parameters in the `SearchDao` [`#2556`](https://github.com/MarquezProject/marquez/pull/2556) [@tati](https://github.com/tati) [@wslulciuc](https://github.com/wslulciuc) diff --git a/api/src/main/java/marquez/api/OpenLineageResource.java b/api/src/main/java/marquez/api/OpenLineageResource.java index 259a500a53..253c8c46a7 100644 --- a/api/src/main/java/marquez/api/OpenLineageResource.java +++ b/api/src/main/java/marquez/api/OpenLineageResource.java @@ -39,6 +39,7 @@ import marquez.db.OpenLineageDao; import marquez.service.ServiceFactory; import marquez.service.models.BaseEvent; +import marquez.service.models.DatasetEvent; import marquez.service.models.LineageEvent; import marquez.service.models.NodeId; @@ -67,15 +68,11 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon if (event instanceof LineageEvent) { openLineageService .createAsync((LineageEvent) event) - .whenComplete( - (result, err) -> { - if (err != null) { - log.error("Unexpected error while processing request", err); - asyncResponse.resume(Response.status(determineStatusCode(err)).build()); - } else { - asyncResponse.resume(Response.status(201).build()); - } - }); + .whenComplete((result, err) -> onComplete(result, err, asyncResponse)); + } else if (event instanceof DatasetEvent) { + openLineageService + .createAsync((DatasetEvent) event) + .whenComplete((result, err) -> onComplete(result, err, asyncResponse)); } else { log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName()); @@ -84,6 +81,15 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon } } + private void onComplete(Void result, Throwable err, AsyncResponse asyncResponse) { + if (err != null) { + log.error("Unexpected error while processing request", err); + asyncResponse.resume(Response.status(determineStatusCode(err)).build()); + } else { + asyncResponse.resume(Response.status(201).build()); + } + } + private int determineStatusCode(Throwable e) { if (e instanceof CompletionException) { return determineStatusCode(e.getCause()); diff --git a/api/src/main/java/marquez/db/DatasetFacetsDao.java b/api/src/main/java/marquez/db/DatasetFacetsDao.java index 679a9bfaa3..ce4a2556cd 100644 --- a/api/src/main/java/marquez/db/DatasetFacetsDao.java +++ b/api/src/main/java/marquez/db/DatasetFacetsDao.java @@ -12,6 +12,7 @@ import java.util.Spliterators; import java.util.UUID; import java.util.stream.StreamSupport; +import javax.annotation.Nullable; import lombok.NonNull; import marquez.common.Utils; import marquez.service.models.LineageEvent; @@ -126,9 +127,9 @@ void insertDatasetFacet( default void insertDatasetFacetsFor( @NonNull UUID datasetUuid, @NonNull UUID datasetVersionUuid, - @NonNull UUID runUuid, + @Nullable UUID runUuid, @NonNull Instant lineageEventTime, - @NonNull String lineageEventType, + @Nullable String lineageEventType, @NonNull LineageEvent.DatasetFacets datasetFacets) { final Instant now = Instant.now(); diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index c6f7559cec..7519e200ef 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -33,6 +33,8 @@ import marquez.common.models.SourceType; import marquez.db.DatasetFieldDao.DatasetFieldMapping; import marquez.db.JobVersionDao.BagOfJobVersionInfo; +import marquez.db.RunDao.RunUpsert; +import marquez.db.RunDao.RunUpsert.RunUpsertBuilder; import marquez.db.mappers.LineageEventMapper; import marquez.db.models.ColumnLineageRow; import marquez.db.models.DatasetFieldRow; @@ -41,6 +43,7 @@ import marquez.db.models.DatasetVersionRow; import marquez.db.models.InputFieldData; import marquez.db.models.JobRow; +import marquez.db.models.ModelDaos; import marquez.db.models.NamespaceRow; import marquez.db.models.RunArgsRow; import marquez.db.models.RunRow; @@ -48,6 +51,8 @@ import marquez.db.models.SourceRow; import marquez.db.models.UpdateLineageRow; import marquez.db.models.UpdateLineageRow.DatasetRecord; +import marquez.service.models.BaseEvent; +import marquez.service.models.DatasetEvent; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.DatasetFacets; @@ -93,6 +98,14 @@ void createLineageEvent( PGobject event, String producer); + @SqlUpdate( + "INSERT INTO lineage_events (" + + "event_time, " + + "event, " + + "producer) " + + "VALUES (?, ?, ?)") + void createDatasetEvent(Instant eventTime, PGobject event, String producer); + @SqlQuery("SELECT event FROM lineage_events WHERE run_uuid = :runUuid") List findLineageEventsByRunUuid(UUID runUuid); @@ -135,45 +148,55 @@ default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper map return updateLineageRow; } - default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) { - NamespaceDao namespaceDao = createNamespaceDao(); - DatasetSymlinkDao datasetSymlinkDao = createDatasetSymlinkDao(); - DatasetDao datasetDao = createDatasetDao(); - SourceDao sourceDao = createSourceDao(); - JobDao jobDao = createJobDao(); - JobFacetsDao jobFacetsDao = createJobFacetsDao(); - DatasetVersionDao datasetVersionDao = createDatasetVersionDao(); - DatasetFieldDao datasetFieldDao = createDatasetFieldDao(); - DatasetFacetsDao datasetFacetsDao = createDatasetFacetsDao(); - RunDao runDao = createRunDao(); - RunArgsDao runArgsDao = createRunArgsDao(); - RunStateDao runStateDao = createRunStateDao(); - ColumnLineageDao columnLineageDao = createColumnLineageDao(); - RunFacetsDao runFacetsDao = createRunFacetsDao(); + default UpdateLineageRow updateMarquezModel(DatasetEvent event, ObjectMapper mapper) { + ModelDaos daos = new ModelDaos(); + daos.initBaseDao(this); + Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); + + UpdateLineageRow bag = new UpdateLineageRow(); + NamespaceRow namespace = + daos.getNamespaceDao() + .upsertNamespaceRow( + UUID.randomUUID(), + now, + formatNamespaceName(event.getDataset().getNamespace()), + DEFAULT_NAMESPACE_OWNER); + bag.setNamespace(namespace); + + Dataset dataset = event.getDataset(); + List datasetOutputs = new ArrayList<>(); + DatasetRecord record = upsertLineageDataset(daos, dataset, now, null, false); + datasetOutputs.add(record); + insertDatasetFacets(daos, dataset, record, null, null, now); + insertOutputDatasetFacets(daos, dataset, record, null, null, now); + + daos.getDatasetDao() + .updateVersion( + record.getDatasetVersionRow().getDatasetUuid(), + Instant.now(), + record.getDatasetVersionRow().getUuid()); + + bag.setOutputs(Optional.ofNullable(datasetOutputs)); + return bag; + } + default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper mapper) { + ModelDaos daos = new ModelDaos(); + daos.initBaseDao(this); Instant now = event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(); UpdateLineageRow bag = new UpdateLineageRow(); NamespaceRow namespace = - namespaceDao.upsertNamespaceRow( - UUID.randomUUID(), - now, - formatNamespaceName(event.getJob().getNamespace()), - DEFAULT_NAMESPACE_OWNER); + daos.getNamespaceDao() + .upsertNamespaceRow( + UUID.randomUUID(), + now, + formatNamespaceName(event.getJob().getNamespace()), + DEFAULT_NAMESPACE_OWNER); bag.setNamespace(namespace); - Instant nominalStartTime = - Optional.ofNullable(event.getRun().getFacets()) - .flatMap(f -> Optional.ofNullable(f.getNominalTime())) - .map(NominalTimeRunFacet::getNominalStartTime) - .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) - .orElse(null); - Instant nominalEndTime = - Optional.ofNullable(event.getRun().getFacets()) - .flatMap(f -> Optional.ofNullable(f.getNominalTime())) - .map(NominalTimeRunFacet::getNominalEndTime) - .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) - .orElse(null); + Instant nominalStartTime = getNominalStartTime(event); + Instant nominalEndTime = getNominalEndTime(event); Optional parentRun = Optional.ofNullable(event.getRun()).map(Run::getFacets).map(RunFacet::getParent); @@ -181,177 +204,188 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper JobRow job = buildJobFromEvent( - event, mapper, jobDao, now, namespace, nominalStartTime, nominalEndTime, parentRun); + event, + mapper, + daos.getJobDao(), + now, + namespace, + nominalStartTime, + nominalEndTime, + parentRun); bag.setJob(job); Map runArgsMap = createRunArgs(event); RunArgsRow runArgs = - runArgsDao.upsertRunArgs( - UUID.randomUUID(), now, Utils.toJson(runArgsMap), Utils.checksumFor(runArgsMap)); + daos.getRunArgsDao() + .upsertRunArgs( + UUID.randomUUID(), now, Utils.toJson(runArgsMap), Utils.checksumFor(runArgsMap)); bag.setRunArgs(runArgs); final UUID runUuid = runToUuid(event.getRun().getRunId()); - RunRow run; + RunUpsertBuilder runUpsertBuilder = + RunUpsert.builder() + .runUuid(runUuid) + .parentRunUuid(parentUuid.orElse(null)) + .externalId(event.getRun().getRunId()) + .now(now) + .jobUuid(job.getUuid()) + .jobVersionUuid(null) + .runArgsUuid(runArgs.getUuid()) + .namespaceName(namespace.getName()) + .jobName(job.getName()) + .location(job.getLocation()); + if (event.getEventType() != null) { - RunState runStateType = getRunState(event.getEventType()); - run = - runDao.upsert( - runUuid, - parentUuid.orElse(null), - event.getRun().getRunId(), - now, - job.getUuid(), - null, - runArgs.getUuid(), - nominalStartTime, - nominalEndTime, - runStateType, - now, - namespace.getName(), - job.getName(), - job.getLocation()); - // Add ... - Optional.ofNullable(event.getRun().getFacets()) - .ifPresent( - runFacet -> - runFacetsDao.insertRunFacetsFor( - runUuid, now, event.getEventType(), event.getRun().getFacets())); - } else { - run = - runDao.upsert( - runUuid, - parentUuid.orElse(null), - event.getRun().getRunId(), - now, - job.getUuid(), - null, - runArgs.getUuid(), - nominalStartTime, - nominalEndTime, - namespace.getName(), - job.getName(), - job.getLocation()); + runUpsertBuilder.runStateType(getRunState(event.getEventType())).runStateTime(now); } + run = daos.getRunDao().upsert(runUpsertBuilder.build()); + insertRunFacets(daos, event, runUuid, now); bag.setRun(run); if (event.getEventType() != null) { RunState runStateType = getRunState(event.getEventType()); RunStateRow runState = - runStateDao.upsert(UUID.randomUUID(), now, run.getUuid(), runStateType); + daos.getRunStateDao().upsert(UUID.randomUUID(), now, run.getUuid(), runStateType); bag.setRunState(runState); if (runStateType.isDone()) { - runDao.updateEndState(run.getUuid(), now, runState.getUuid()); + daos.getRunDao().updateEndState(run.getUuid(), now, runState.getUuid()); } else if (runStateType.isStarting()) { - runDao.updateStartState(run.getUuid(), now, runState.getUuid()); + daos.getRunDao().updateStartState(run.getUuid(), now, runState.getUuid()); } } - // Add ... - Optional.ofNullable(event.getJob().getFacets()) - .ifPresent( - jobFacet -> - jobFacetsDao.insertJobFacetsFor( - job.getUuid(), runUuid, now, event.getEventType(), event.getJob().getFacets())); + insertJobFacets(daos, event, job.getUuid(), runUuid, now); // RunInput list uses null as a sentinel value List datasetInputs = null; if (event.getInputs() != null) { datasetInputs = new ArrayList<>(); for (Dataset dataset : event.getInputs()) { - DatasetRecord record = - upsertLineageDataset( - dataset, - now, - runUuid, - true, - namespaceDao, - datasetSymlinkDao, - sourceDao, - datasetDao, - datasetVersionDao, - datasetFieldDao, - runDao, - columnLineageDao); + DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, true); datasetInputs.add(record); - - // Facets ... - Optional.ofNullable(dataset.getFacets()) - .ifPresent( - facets -> - datasetFacetsDao.insertDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - runUuid, - now, - event.getEventType(), - facets)); - - // InputFacets ... - Optional.ofNullable(dataset.getInputFacets()) - .ifPresent( - facets -> - datasetFacetsDao.insertInputDatasetFacetsFor( - record.getDatasetRow().getUuid(), - record.getDatasetVersionRow().getUuid(), - runUuid, - now, - event.getEventType(), - facets)); + insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); + insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); } } bag.setInputs(Optional.ofNullable(datasetInputs)); + // RunInput list uses null as a sentinel value List datasetOutputs = null; if (event.getOutputs() != null) { datasetOutputs = new ArrayList<>(); for (Dataset dataset : event.getOutputs()) { - DatasetRecord record = - upsertLineageDataset( - dataset, - now, - runUuid, - false, - namespaceDao, - datasetSymlinkDao, - sourceDao, - datasetDao, - datasetVersionDao, - datasetFieldDao, - runDao, - columnLineageDao); + DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, false); datasetOutputs.add(record); + insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); + insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now); + } + } - // Facets ... - Optional.ofNullable(dataset.getFacets()) - .ifPresent( - facets -> - datasetFacetsDao.insertDatasetFacetsFor( + bag.setOutputs(Optional.ofNullable(datasetOutputs)); + return bag; + } + + private static Instant getNominalStartTime(LineageEvent event) { + return Optional.ofNullable(event.getRun().getFacets()) + .flatMap(f -> Optional.ofNullable(f.getNominalTime())) + .map(NominalTimeRunFacet::getNominalStartTime) + .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) + .orElse(null); + } + + private static Instant getNominalEndTime(LineageEvent event) { + return Optional.ofNullable(event.getRun().getFacets()) + .flatMap(f -> Optional.ofNullable(f.getNominalTime())) + .map(NominalTimeRunFacet::getNominalEndTime) + .map(t -> t.withZoneSameInstant(ZoneId.of("UTC")).toInstant()) + .orElse(null); + } + + private void insertRunFacets(ModelDaos daos, LineageEvent event, UUID runUuid, Instant now) { + // Add ... + Optional.ofNullable(event.getRun().getFacets()) + .ifPresent( + runFacet -> + daos.getRunFacetsDao() + .insertRunFacetsFor( + runUuid, now, event.getEventType(), event.getRun().getFacets())); + } + + private void insertJobFacets( + ModelDaos daos, LineageEvent event, UUID jobUuid, UUID runUuid, Instant now) { + // Add ... + Optional.ofNullable(event.getJob().getFacets()) + .ifPresent( + jobFacet -> + daos.getJobFacetsDao() + .insertJobFacetsFor( + jobUuid, runUuid, now, event.getEventType(), event.getJob().getFacets())); + } + + private void insertDatasetFacets( + ModelDaos daos, + Dataset dataset, + DatasetRecord record, + UUID runUuid, + String eventType, + Instant now) { + // Facets ... + Optional.ofNullable(dataset.getFacets()) + .ifPresent( + facets -> + daos.getDatasetFacetsDao() + .insertDatasetFacetsFor( record.getDatasetRow().getUuid(), record.getDatasetVersionRow().getUuid(), runUuid, now, - event.getEventType(), + eventType, facets)); + } - // OutputFacets ... - Optional.ofNullable(dataset.getOutputFacets()) - .ifPresent( - facets -> - datasetFacetsDao.insertOutputDatasetFacetsFor( + private void insertInputDatasetFacets( + ModelDaos daos, + Dataset dataset, + DatasetRecord record, + UUID runUuid, + String eventType, + Instant now) { + // InputFacets ... + Optional.ofNullable(dataset.getInputFacets()) + .ifPresent( + facets -> + daos.getDatasetFacetsDao() + .insertInputDatasetFacetsFor( record.getDatasetRow().getUuid(), record.getDatasetVersionRow().getUuid(), runUuid, now, - event.getEventType(), + eventType, facets)); - } - } + } - bag.setOutputs(Optional.ofNullable(datasetOutputs)); - return bag; + private void insertOutputDatasetFacets( + ModelDaos daos, + Dataset dataset, + DatasetRecord record, + UUID runUuid, + String eventType, + Instant now) { + // OutputFacets ... + Optional.ofNullable(dataset.getOutputFacets()) + .ifPresent( + facets -> + daos.getDatasetFacetsDao() + .insertOutputDatasetFacetsFor( + record.getDatasetRow().getUuid(), + record.getDatasetVersionRow().getUuid(), + runUuid, + now, + eventType, + facets)); } private JobRow buildJobFromEvent( @@ -623,35 +657,25 @@ default JobType getJobType(Job job) { } default DatasetRecord upsertLineageDataset( - Dataset ds, - Instant now, - UUID runUuid, - boolean isInput, - NamespaceDao namespaceDao, - DatasetSymlinkDao datasetSymlinkDao, - SourceDao sourceDao, - DatasetDao datasetDao, - DatasetVersionDao datasetVersionDao, - DatasetFieldDao datasetFieldDao, - RunDao runDao, - ColumnLineageDao columnLineageDao) { + ModelDaos daos, Dataset ds, Instant now, UUID runUuid, boolean isInput) { NamespaceRow dsNamespace = - namespaceDao.upsertNamespaceRow( - UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER); + daos.getNamespaceDao() + .upsertNamespaceRow(UUID.randomUUID(), now, ds.getNamespace(), DEFAULT_NAMESPACE_OWNER); SourceRow source; if (ds.getFacets() != null && ds.getFacets().getDataSource() != null) { source = - sourceDao.upsert( - UUID.randomUUID(), - getSourceType(ds), - now, - ds.getFacets().getDataSource().getName(), - getUrlOrNull(ds.getFacets().getDataSource().getUri())); + daos.getSourceDao() + .upsert( + UUID.randomUUID(), + getSourceType(ds), + now, + ds.getFacets().getDataSource().getName(), + getUrlOrNull(ds.getFacets().getDataSource().getUri())); } else { source = - sourceDao.upsertOrDefault( - UUID.randomUUID(), getSourceType(ds), now, DEFAULT_SOURCE_NAME, ""); + daos.getSourceDao() + .upsertOrDefault(UUID.randomUUID(), getSourceType(ds), now, DEFAULT_SOURCE_NAME, ""); } String dsDescription = null; @@ -660,20 +684,22 @@ default DatasetRecord upsertLineageDataset( } NamespaceRow datasetNamespace = - namespaceDao.upsertNamespaceRow( - UUID.randomUUID(), - now, - formatNamespaceName(ds.getNamespace()), - DEFAULT_NAMESPACE_OWNER); + daos.getNamespaceDao() + .upsertNamespaceRow( + UUID.randomUUID(), + now, + formatNamespaceName(ds.getNamespace()), + DEFAULT_NAMESPACE_OWNER); DatasetSymlinkRow symlink = - datasetSymlinkDao.upsertDatasetSymlinkRow( - UUID.randomUUID(), - formatDatasetName(ds.getName()), - dsNamespace.getUuid(), - true, - null, - now); + daos.getDatasetSymlinkDao() + .upsertDatasetSymlinkRow( + UUID.randomUUID(), + formatDatasetName(ds.getName()), + dsNamespace.getUuid(), + true, + null, + now); Optional.ofNullable(ds.getFacets()) .map(facets -> facets.getSymlinks()) @@ -682,19 +708,20 @@ default DatasetRecord upsertLineageDataset( el.getIdentifiers().stream() .forEach( id -> - datasetSymlinkDao.doUpsertDatasetSymlinkRow( - symlink.getUuid(), - id.getName(), - namespaceDao - .upsertNamespaceRow( - UUID.randomUUID(), - now, - id.getNamespace(), - DEFAULT_NAMESPACE_OWNER) - .getUuid(), - false, - id.getType(), - now))); + daos.getDatasetSymlinkDao() + .doUpsertDatasetSymlinkRow( + symlink.getUuid(), + id.getName(), + daos.getNamespaceDao() + .upsertNamespaceRow( + UUID.randomUUID(), + now, + id.getNamespace(), + DEFAULT_NAMESPACE_OWNER) + .getUuid(), + false, + id.getType(), + now))); String dslifecycleState = Optional.ofNullable(ds.getFacets()) .map(DatasetFacets::getLifecycleStateChange) @@ -702,18 +729,19 @@ default DatasetRecord upsertLineageDataset( .orElse(""); DatasetRow datasetRow = - datasetDao.upsert( - symlink.getUuid(), - getDatasetType(ds), - now, - datasetNamespace.getUuid(), - datasetNamespace.getName(), - source.getUuid(), - source.getName(), - formatDatasetName(ds.getName()), - ds.getName(), - dsDescription, - dslifecycleState.equalsIgnoreCase("DROP")); + daos.getDatasetDao() + .upsert( + symlink.getUuid(), + getDatasetType(ds), + now, + datasetNamespace.getUuid(), + datasetNamespace.getName(), + source.getUuid(), + source.getName(), + formatDatasetName(ds.getName()), + ds.getName(), + dsDescription, + dslifecycleState.equalsIgnoreCase("DROP")); List fields = Optional.ofNullable(ds.getFacets()) @@ -726,7 +754,7 @@ default DatasetRecord upsertLineageDataset( datasetRow .getCurrentVersionUuid() .filter(v -> isInput) // only fetch the current version if this is a read - .flatMap(datasetVersionDao::findRowByUuid) + .flatMap(daos.getDatasetVersionDao()::findRowByUuid) // if this is a write _or_ if the dataset has no current version, // create a new version .orElseGet( @@ -742,16 +770,17 @@ default DatasetRecord upsertLineageDataset( runUuid) .getValue(); DatasetVersionRow row = - datasetVersionDao.upsert( - UUID.randomUUID(), - now, - dsRow.getUuid(), - versionUuid, - isInput ? null : runUuid, - datasetVersionDao.toPgObjectSchemaFields(fields), - dsNamespace.getName(), - ds.getName(), - dslifecycleState); + daos.getDatasetVersionDao() + .upsert( + UUID.randomUUID(), + now, + dsRow.getUuid(), + versionUuid, + isInput ? null : runUuid, + daos.getDatasetVersionDao().toPgObjectSchemaFields(fields), + dsNamespace.getName(), + ds.getName(), + dslifecycleState); return row; }); List datasetFieldMappings = new ArrayList<>(); @@ -759,28 +788,29 @@ default DatasetRecord upsertLineageDataset( if (fields != null) { for (SchemaField field : fields) { DatasetFieldRow datasetFieldRow = - datasetFieldDao.upsert( - UUID.randomUUID(), - now, - field.getName(), - field.getType(), - field.getDescription(), - datasetRow.getUuid()); + daos.getDatasetFieldDao() + .upsert( + UUID.randomUUID(), + now, + field.getName(), + field.getType(), + field.getDescription(), + datasetRow.getUuid()); datasetFields.add(datasetFieldRow); datasetFieldMappings.add( new DatasetFieldMapping(datasetVersionRow.getUuid(), datasetFieldRow.getUuid())); } } - datasetFieldDao.updateFieldMapping(datasetFieldMappings); + daos.getDatasetFieldDao().updateFieldMapping(datasetFieldMappings); if (isInput) { - runDao.updateInputMapping(runUuid, datasetVersionRow.getUuid()); + daos.getRunDao().updateInputMapping(runUuid, datasetVersionRow.getUuid()); // TODO - this is a short term fix until // https://github.com/MarquezProject/marquez/issues/1361 // is fully thought out if (datasetRow.getCurrentVersionUuid().isEmpty()) { - datasetDao.updateVersion(dsRow.getUuid(), now, datasetVersionRow.getUuid()); + daos.getDatasetDao().updateVersion(dsRow.getUuid(), now, datasetVersionRow.getUuid()); datasetRow = datasetRow.withCurrentVersionUuid(datasetVersionRow.getUuid()); } } @@ -788,14 +818,7 @@ default DatasetRecord upsertLineageDataset( List columnLineageRows = Collections.emptyList(); if (!isInput) { columnLineageRows = - upsertColumnLineage( - runUuid, - ds, - now, - datasetFields, - columnLineageDao, - datasetFieldDao, - datasetVersionRow); + upsertColumnLineage(runUuid, ds, now, datasetFields, datasetVersionRow, daos); } return new DatasetRecord(datasetRow, datasetVersionRow, datasetNamespace, columnLineageRows); @@ -806,13 +829,13 @@ private List upsertColumnLineage( Dataset ds, Instant now, List datasetFields, - ColumnLineageDao columnLineageDao, - DatasetFieldDao datasetFieldDao, - DatasetVersionRow datasetVersionRow) { + DatasetVersionRow datasetVersionRow, + ModelDaos daos) { Logger log = LoggerFactory.getLogger(OpenLineageDao.class); // get all the fields related to this particular run - List runFields = datasetFieldDao.findInputFieldsDataAssociatedWithRun(runUuid); + List runFields = + daos.getDatasetFieldDao().findInputFieldsDataAssociatedWithRun(runUuid); log.debug("Found input datasets fields for run '{}': {}", runUuid, runFields); return Optional.ofNullable(ds.getFacets()) @@ -864,7 +887,8 @@ private List upsertColumnLineage( outputField.get().getName(), datasetVersionRow.getUuid(), inputFields); - return columnLineageDao + return daos + .getColumnLineageDao() .upsertColumnLineageRow( datasetVersionRow.getUuid(), outputField.get().getUuid(), @@ -938,7 +962,7 @@ default UUID runToUuid(String runId) { } } - default PGobject createJsonArray(LineageEvent event, ObjectMapper mapper) { + default PGobject createJsonArray(BaseEvent event, ObjectMapper mapper) { try { PGobject jsonObject = new PGobject(); jsonObject.setType("json"); diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index fa22eba479..32c39cf4a2 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -14,6 +14,7 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import lombok.Builder; import lombok.NonNull; import marquez.common.Utils; import marquez.common.models.DatasetId; @@ -301,6 +302,40 @@ RunRow upsert( String jobName, String location); + default RunRow upsert(RunUpsert runUpsert) { + if (runUpsert.runStateType == null) { + return upsert( + runUpsert.runUuid(), + runUpsert.parentRunUuid(), + runUpsert.externalId(), + runUpsert.now(), + runUpsert.jobUuid(), + runUpsert.jobVersionUuid(), + runUpsert.runArgsUuid(), + runUpsert.nominalStartTime(), + runUpsert.nominalEndTime(), + runUpsert.namespaceName(), + runUpsert.jobName(), + runUpsert.location()); + } else { + return upsert( + runUpsert.runUuid(), + runUpsert.parentRunUuid(), + runUpsert.externalId(), + runUpsert.now(), + runUpsert.jobUuid(), + runUpsert.jobVersionUuid(), + runUpsert.runArgsUuid(), + runUpsert.nominalStartTime(), + runUpsert.nominalEndTime(), + runUpsert.runStateType(), + runUpsert.runStateTime(), + runUpsert.namespaceName(), + runUpsert.jobName(), + runUpsert.location()); + } + } + @SqlUpdate( "INSERT INTO runs_input_mapping (run_uuid, dataset_version_uuid) " + "VALUES (:runUuid, :datasetVersionUuid) ON CONFLICT DO NOTHING") @@ -452,4 +487,21 @@ default RunRow upsertRunMeta( ) """) Optional findByLatestJob(String namespace, String jobName); + + @Builder + record RunUpsert( + UUID runUuid, + UUID parentRunUuid, + String externalId, + Instant now, + UUID jobUuid, + UUID jobVersionUuid, + UUID runArgsUuid, + Instant nominalStartTime, + Instant nominalEndTime, + RunState runStateType, + Instant runStateTime, + String namespaceName, + String jobName, + String location) {} } diff --git a/api/src/main/java/marquez/db/models/ModelDaos.java b/api/src/main/java/marquez/db/models/ModelDaos.java new file mode 100644 index 0000000000..f123ee9f67 --- /dev/null +++ b/api/src/main/java/marquez/db/models/ModelDaos.java @@ -0,0 +1,146 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.models; + +import marquez.db.BaseDao; +import marquez.db.ColumnLineageDao; +import marquez.db.DatasetDao; +import marquez.db.DatasetFacetsDao; +import marquez.db.DatasetFieldDao; +import marquez.db.DatasetSymlinkDao; +import marquez.db.DatasetVersionDao; +import marquez.db.JobDao; +import marquez.db.JobFacetsDao; +import marquez.db.NamespaceDao; +import marquez.db.RunArgsDao; +import marquez.db.RunDao; +import marquez.db.RunFacetsDao; +import marquez.db.RunStateDao; +import marquez.db.SourceDao; + +/** + * Container for storing all the Dao classes which ensures parent interface methods are called + * exactly once. + */ +public final class ModelDaos { + private NamespaceDao namespaceDao = null; + private DatasetSymlinkDao datasetSymlinkDao = null; + private DatasetDao datasetDao = null; + private SourceDao sourceDao = null; + private DatasetVersionDao datasetVersionDao = null; + private DatasetFieldDao datasetFieldDao = null; + private RunDao runDao = null; + private DatasetFacetsDao datasetFacetsDao = null; + private ColumnLineageDao columnLineageDao = null; + private JobDao jobDao = null; + private JobFacetsDao jobFacetsDao = null; + private RunArgsDao runArgsDao = null; + private RunStateDao runStateDao = null; + private RunFacetsDao runFacetsDao = null; + private BaseDao baseDao; + + public void initBaseDao(BaseDao baseDao) { + this.baseDao = baseDao; + } + + public NamespaceDao getNamespaceDao() { + if (namespaceDao == null) { + namespaceDao = baseDao.createNamespaceDao(); + } + return namespaceDao; + } + + public DatasetSymlinkDao getDatasetSymlinkDao() { + if (datasetSymlinkDao == null) { + datasetSymlinkDao = baseDao.createDatasetSymlinkDao(); + } + return datasetSymlinkDao; + } + + public DatasetDao getDatasetDao() { + if (datasetDao == null) { + datasetDao = baseDao.createDatasetDao(); + } + return datasetDao; + } + + public SourceDao getSourceDao() { + if (sourceDao == null) { + sourceDao = baseDao.createSourceDao(); + } + return sourceDao; + } + + public DatasetVersionDao getDatasetVersionDao() { + if (datasetVersionDao == null) { + datasetVersionDao = baseDao.createDatasetVersionDao(); + } + return datasetVersionDao; + } + + public DatasetFieldDao getDatasetFieldDao() { + if (datasetFieldDao == null) { + datasetFieldDao = baseDao.createDatasetFieldDao(); + } + return datasetFieldDao; + } + + public RunDao getRunDao() { + if (runDao == null) { + runDao = baseDao.createRunDao(); + } + return runDao; + } + + public ColumnLineageDao getColumnLineageDao() { + if (columnLineageDao == null) { + columnLineageDao = baseDao.createColumnLineageDao(); + } + return columnLineageDao; + } + + public DatasetFacetsDao getDatasetFacetsDao() { + if (datasetFacetsDao == null) { + datasetFacetsDao = baseDao.createDatasetFacetsDao(); + } + return datasetFacetsDao; + } + + public JobDao getJobDao() { + if (jobDao == null) { + jobDao = baseDao.createJobDao(); + } + return jobDao; + } + + public JobFacetsDao getJobFacetsDao() { + if (jobFacetsDao == null) { + jobFacetsDao = baseDao.createJobFacetsDao(); + } + return jobFacetsDao; + } + + public RunArgsDao getRunArgsDao() { + if (runArgsDao == null) { + runArgsDao = baseDao.createRunArgsDao(); + } + return runArgsDao; + } + + public RunStateDao getRunStateDao() { + if (runStateDao == null) { + runStateDao = baseDao.createRunStateDao(); + } + return runStateDao; + } + + public RunFacetsDao getRunFacetsDao() { + if (runFacetsDao == null) { + runFacetsDao = baseDao.createRunFacetsDao(); + } + return runFacetsDao; + } +} diff --git a/api/src/main/java/marquez/service/OpenLineageService.java b/api/src/main/java/marquez/service/OpenLineageService.java index 8c441a0225..ebeb0ff10c 100644 --- a/api/src/main/java/marquez/service/OpenLineageService.java +++ b/api/src/main/java/marquez/service/OpenLineageService.java @@ -45,6 +45,7 @@ import marquez.service.RunTransitionListener.RunInput; import marquez.service.RunTransitionListener.RunOutput; import marquez.service.RunTransitionListener.RunTransition; +import marquez.service.models.DatasetEvent; import marquez.service.models.LineageEvent; import marquez.service.models.RunMeta; @@ -67,6 +68,30 @@ public OpenLineageService(BaseDao baseDao, RunService runService, Executor execu this.executor = executor; } + public CompletableFuture createAsync(DatasetEvent event) { + CompletableFuture openLineage = + CompletableFuture.runAsync( + withSentry( + withMdc( + () -> + createDatasetEvent( + event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(), + createJsonArray(event, mapper), + event.getProducer()))), + executor); + + CompletableFuture marquez = + CompletableFuture.runAsync( + withSentry( + withMdc( + () -> { + updateMarquezModel(event, mapper); + })), + executor); + + return CompletableFuture.allOf(marquez, openLineage); + } + public CompletableFuture createAsync(LineageEvent event) { UUID runUuid = runUuidFromEvent(event.getRun()); CompletableFuture openLineage = diff --git a/api/src/main/resources/marquez/db/migration/V65__dataset_facets_lineage_event_type_nullable.sql b/api/src/main/resources/marquez/db/migration/V65__dataset_facets_lineage_event_type_nullable.sql new file mode 100644 index 0000000000..0d106ce6f0 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V65__dataset_facets_lineage_event_type_nullable.sql @@ -0,0 +1 @@ +ALTER TABLE dataset_facets ALTER COLUMN lineage_event_type DROP NOT NULL; diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index 4729a64971..fec39865c7 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -55,7 +55,6 @@ import marquez.client.models.Run; import marquez.common.Utils; import marquez.db.LineageTestUtils; -import marquez.service.models.DatasetEvent; import marquez.service.models.JobEvent; import org.assertj.core.api.InstanceOfAssertFactories; import org.jdbi.v3.core.Jdbi; @@ -1376,7 +1375,7 @@ public void testSendOpenLineage(String pathToOpenLineageEvent) throws IOExceptio } @Test - public void testSendDatasetEventIsDecoded() throws IOException { + public void testSendDatasetEvent() throws IOException { final String openLineageEventAsString = Resources.toString(Resources.getResource(EVENT_DATASET_EVENT), Charset.defaultCharset()); @@ -1394,16 +1393,31 @@ public void testSendDatasetEventIsDecoded() throws IOException { // Ensure the event was received. Map respMap = resp.join(); - assertThat(respMap.containsKey(200)).isTrue(); // Status should be 200 instead of 201 + assertThat(respMap.containsKey(201)).isTrue(); // (3) Convert the OpenLineage event to Json. - DatasetEvent datasetEvent = - marquez.client.Utils.fromJson(respMap.get(200), new TypeReference() {}); - assertThat(datasetEvent.getDataset().getName()).isEqualTo("my-dataset-name"); - assertThat(datasetEvent.getDataset().getFacets().getSchema().getFields()).hasSize(1); - assertThat(datasetEvent.getDataset().getFacets().getSchema().getFields().get(0).getName()) - .isEqualTo("col_a"); - assertThat(datasetEvent.getEventTime().toString()).startsWith("2020-12-28T09:52:00.001"); + final JsonNode openLineageEventAsJson = + Utils.fromJson(openLineageEventAsString, new TypeReference() {}); + + // (4) Verify dataset facet associated with the OpenLineage event. + final JsonNode json = openLineageEventAsJson.path("dataset"); + + final String namespace = json.path("namespace").asText(); + final String output = json.path("name").asText(); + final JsonNode expectedFacets = json.path("facets"); + + final Dataset dataset = client.getDataset(namespace, output); + assertThat(Utils.getMapper().convertValue(dataset.getFacets(), JsonNode.class)) + .isEqualTo(expectedFacets); + + List datasetVersions = client.listDatasetVersions(namespace, output); + assertThat(datasetVersions).isNotEmpty(); + + DatasetVersion latestDatasetVersion = datasetVersions.get(0); + assertThat(latestDatasetVersion.getNamespace()).isEqualTo(namespace); + assertThat(latestDatasetVersion.getName()).isEqualTo(output); + assertThat(Utils.getMapper().convertValue(latestDatasetVersion.getFacets(), JsonNode.class)) + .isEqualTo(expectedFacets); } @Test diff --git a/api/src/test/java/marquez/db/LineageTestUtils.java b/api/src/test/java/marquez/db/LineageTestUtils.java index 84dc6d18f1..c45a1a5bca 100644 --- a/api/src/test/java/marquez/db/LineageTestUtils.java +++ b/api/src/test/java/marquez/db/LineageTestUtils.java @@ -24,6 +24,7 @@ import marquez.common.Utils; import marquez.db.models.UpdateLineageRow; import marquez.db.models.UpdateLineageRow.DatasetRecord; +import marquez.service.models.DatasetEvent; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.Dataset; import marquez.service.models.LineageEvent.DatasetFacets; @@ -173,6 +174,44 @@ public static UpdateLineageRow createLineageRow( return updateLineageRow; } + /** + * Create an {@link UpdateLineageRow} from the input job details and datasets. + * + * @param dao + * @param dataset + * @return + */ + public static UpdateLineageRow createLineageRow(OpenLineageDao dao, Dataset dataset) { + + DatasetEvent event = + DatasetEvent.builder() + .eventTime(Instant.now().atZone(LOCAL_ZONE)) + .dataset(dataset) + .producer(PRODUCER_URL.toString()) + .build(); + + // emulate an OpenLineage DatasetEvent + event + .getProperties() + .put( + "_schemaURL", + "https://openlineage.io/spec/1-0-1/OpenLineage.json#/definitions/RunEvent"); + UpdateLineageRow updateLineageRow = dao.updateMarquezModel(event, Utils.getMapper()); + PGobject jsonObject = new PGobject(); + jsonObject.setType("json"); + try { + jsonObject.setValue(Utils.toJson(event)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + dao.createDatasetEvent( + event.getEventTime().withZoneSameInstant(ZoneId.of("UTC")).toInstant(), + jsonObject, + event.getProducer()); + + return updateLineageRow; + } + public static DatasetFacets newDatasetFacet(SchemaField... fields) { return newDatasetFacet(EMPTY_MAP, fields); } diff --git a/api/src/test/java/marquez/db/OpenLineageDaoTest.java b/api/src/test/java/marquez/db/OpenLineageDaoTest.java index d0ddf7253b..2648d253e9 100644 --- a/api/src/test/java/marquez/db/OpenLineageDaoTest.java +++ b/api/src/test/java/marquez/db/OpenLineageDaoTest.java @@ -96,6 +96,22 @@ void testUpdateMarquezModel() { .isEqualTo(writeJob.getOutputs().get().get(0).getDatasetVersionRow()); } + @Test + void testUpdateMarquezModelWithDatasetEvent() { + UpdateLineageRow datasetEventRow = + LineageTestUtils.createLineageRow( + dao, new Dataset(LineageTestUtils.NAMESPACE, DATASET_NAME, datasetFacets)); + + assertThat(datasetEventRow.getOutputs()).isPresent(); + assertThat(datasetEventRow.getOutputs().get()).hasSize(1).first(); + assertThat(datasetEventRow.getOutputs().get().get(0).getDatasetRow()) + .hasFieldOrPropertyWithValue("name", DATASET_NAME) + .hasFieldOrPropertyWithValue("namespaceName", LineageTestUtils.NAMESPACE); + + assertThat(datasetEventRow.getOutputs().get().get(0).getDatasetVersionRow()) + .hasNoNullFieldsOrPropertiesExcept("runUuid"); + } + @Test void testUpdateMarquezModelLifecycleStateChangeFacet() { Dataset dataset = diff --git a/api/src/test/java/marquez/service/LineageServiceTest.java b/api/src/test/java/marquez/service/LineageServiceTest.java index 7f6828dfa0..df6083146e 100644 --- a/api/src/test/java/marquez/service/LineageServiceTest.java +++ b/api/src/test/java/marquez/service/LineageServiceTest.java @@ -56,7 +56,6 @@ public class LineageServiceTest { private static LineageDao lineageDao; private static LineageService lineageService; private static OpenLineageDao openLineageDao; - private static DatasetDao datasetDao; private static JobDao jobDao; diff --git a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java index 0a705ff91d..e1f9cc42ce 100644 --- a/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java +++ b/api/src/test/java/marquez/service/OpenLineageServiceIntegrationTest.java @@ -5,6 +5,8 @@ package marquez.service; +import static marquez.db.LineageTestUtils.PRODUCER_URL; +import static marquez.db.LineageTestUtils.SCHEMA_URL; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -28,6 +30,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import marquez.common.Utils; +import marquez.common.models.FieldName; import marquez.common.models.JobType; import marquez.common.models.RunState; import marquez.db.DatasetDao; @@ -47,11 +50,14 @@ import marquez.service.RunTransitionListener.JobOutputUpdate; import marquez.service.RunTransitionListener.RunTransition; import marquez.service.models.Dataset; +import marquez.service.models.DatasetEvent; import marquez.service.models.Job; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.DatasetFacets; import marquez.service.models.LineageEvent.DatasourceDatasetFacet; import marquez.service.models.LineageEvent.RunFacet; +import marquez.service.models.LineageEvent.SchemaDatasetFacet; +import marquez.service.models.LineageEvent.SchemaField; import marquez.service.models.Run; import org.jdbi.v3.core.Jdbi; import org.junit.jupiter.api.Assertions; @@ -437,6 +443,45 @@ void testJobIsNotHiddenAfterSubsequentOLEvent() throws ExecutionException, Inter assertThat(jobService.findJobByName(NAMESPACE, name)).isNotEmpty(); } + @Test + void testDatasetEvent() throws ExecutionException, InterruptedException { + LineageEvent.Dataset dataset = + LineageEvent.Dataset.builder() + .name(DATASET_NAME) + .namespace(NAMESPACE) + .facets( + DatasetFacets.builder() + .schema( + new SchemaDatasetFacet( + PRODUCER_URL, + SCHEMA_URL, + Arrays.asList(new SchemaField("col", "STRING", "my name")))) + .dataSource( + DatasourceDatasetFacet.builder() + .name("theDatasource") + .uri("http://thedatasource") + .build()) + .build()) + .build(); + + lineageService + .createAsync( + DatasetEvent.builder() + .eventTime(Instant.now().atZone(TIMEZONE)) + .dataset(dataset) + .build()) + .get(); + + Optional datasetRow = datasetDao.findDatasetByName(NAMESPACE, DATASET_NAME); + assertThat(datasetRow).isPresent().map(Dataset::getCurrentVersion).isPresent(); + assertThat(datasetRow.get().getSourceName().getValue()).isEqualTo("theDatasource"); + assertThat(datasetRow.get().getFields()) + .hasSize(1) + .first() + .hasFieldOrPropertyWithValue("name", FieldName.of("col")) + .hasFieldOrPropertyWithValue("type", "STRING"); + } + private void checkExists(LineageEvent.Dataset ds) { DatasetService datasetService = new DatasetService(openLineageDao, runService); diff --git a/build.gradle b/build.gradle index 150aee7792..3ad061b0ce 100644 --- a/build.gradle +++ b/build.gradle @@ -22,6 +22,7 @@ buildscript { classpath 'com.adarshr:gradle-test-logger-plugin:3.2.0' classpath 'gradle.plugin.com.github.johnrengelman:shadow:7.1.2' classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.20.0' + classpath "io.freefair.gradle:lombok-plugin:8.4" } } @@ -39,6 +40,7 @@ subprojects { apply plugin: 'com.github.johnrengelman.shadow' apply plugin: "com.diffplug.spotless" apply plugin: "pmd" + apply plugin: "io.freefair.lombok" project(':api') { apply plugin: 'application' @@ -95,7 +97,11 @@ subprojects { archiveClassifier.set("sources") } - task javadocJar(type: Jar, dependsOn: javadoc) { + task delombokJavadocs(type: Javadoc) { + source = delombok + } + + task javadocJar(type: Jar, dependsOn: delombokJavadocs) { from javadoc.destinationDir archiveClassifier.set("javadoc") }