Skip to content

Commit

Permalink
Runless events - consume job event
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Oct 30, 2023
1 parent 3aacc39 commit 5158fc2
Show file tree
Hide file tree
Showing 13 changed files with 464 additions and 92 deletions.
5 changes: 5 additions & 0 deletions api/src/main/java/marquez/api/OpenLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import marquez.service.ServiceFactory;
import marquez.service.models.BaseEvent;
import marquez.service.models.DatasetEvent;
import marquez.service.models.JobEvent;
import marquez.service.models.LineageEvent;
import marquez.service.models.NodeId;

Expand Down Expand Up @@ -73,6 +74,10 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon
openLineageService
.createAsync((DatasetEvent) event)
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
} else if (event instanceof JobEvent) {
openLineageService
.createAsync((JobEvent) event)
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
} else {
log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName());

Expand Down
21 changes: 7 additions & 14 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,14 @@ SELECT EXISTS (

@SqlQuery(
"""
SELECT j.*, f.facets
WITH job_versions_facets AS (
SELECT job_version_uuid, JSON_AGG(facet) as facets
FROM job_facets
GROUP BY job_version_uuid
)
SELECT j.*, facets
FROM jobs_view j
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(e.facet) AS facets
FROM (
SELECT jf.run_uuid, jf.facet
FROM job_facets_view AS jf
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=jf.run_uuid
INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid
WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName
ORDER BY lineage_event_time ASC
) e
GROUP BY e.run_uuid
) f ON f.run_uuid=jv.latest_run_uuid
LEFT OUTER JOIN job_versions_facets f ON j.current_version_uuid = f.job_version_uuid
WHERE j.namespace_name=:namespaceName AND (j.name=:jobName OR :jobName = ANY(j.aliases))
""")
Optional<Job> findJobByName(String namespaceName, String jobName);
Expand Down
53 changes: 51 additions & 2 deletions api/src/main/java/marquez/db/JobFacetsDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Spliterators;
import java.util.UUID;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import lombok.NonNull;
import marquez.common.Utils;
import marquez.db.mappers.JobFacetsMapper;
Expand Down Expand Up @@ -55,6 +56,32 @@ void insertJobFacet(
String name,
PGobject facet);

@SqlUpdate(
"""
INSERT INTO job_facets (
created_at,
job_uuid,
job_version_uuid,
lineage_event_time,
name,
facet
) VALUES (
:createdAt,
:jobUuid,
:jobVersionUuid,
:lineageEventTime,
:name,
:facet
)
""")
void insertJobFacet(
Instant createdAt,
UUID jobUuid,
UUID jobVersionUuid,
Instant lineageEventTime,
String name,
PGobject facet);

@SqlQuery(
"""
SELECT
Expand All @@ -72,9 +99,31 @@ void insertJobFacet(
@Transaction
default void insertJobFacetsFor(
@NonNull UUID jobUuid,
@NonNull UUID runUuid,
@NonNull UUID jobVersionUuid,
@NonNull Instant lineageEventTime,
@NonNull LineageEvent.JobFacet jobFacet) {
final Instant now = Instant.now();

JsonNode jsonNode = Utils.getMapper().valueToTree(jobFacet);
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(jsonNode.fieldNames(), Spliterator.DISTINCT), false)
.forEach(
fieldName ->
insertJobFacet(
now,
jobUuid,
jobVersionUuid,
lineageEventTime,
fieldName,
FacetUtils.toPgObject(fieldName, jsonNode.get(fieldName))));
}

@Transaction
default void insertJobFacetsFor(
@NonNull UUID jobUuid,
@Nullable UUID runUuid,
@NonNull Instant lineageEventTime,
@NonNull String lineageEventType,
@Nullable String lineageEventType,
@NonNull LineageEvent.JobFacet jobFacet) {
final Instant now = Instant.now();

Expand Down
106 changes: 98 additions & 8 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.Value;
import marquez.api.models.JobVersion;
Expand All @@ -24,11 +25,13 @@
import marquez.common.models.Version;
import marquez.db.mappers.ExtendedJobVersionRowMapper;
import marquez.db.mappers.JobVersionMapper;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.ExtendedDatasetVersionRow;
import marquez.db.models.ExtendedJobVersionRow;
import marquez.db.models.JobRow;
import marquez.db.models.JobVersionRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.UpdateLineageRow.DatasetRecord;
import marquez.service.models.Run;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.statement.SqlQuery;
Expand Down Expand Up @@ -283,6 +286,82 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
@SqlQuery("SELECT COUNT(*) FROM job_versions")
int count();

/**
* Links facets of the given run to
*
* @param runUuid
* @param jobVersionUuid
*/
@SqlUpdate(
"""
UPDATE job_facets
SET job_version_uuid = :jobVersionUuid
WHERE run_uuid = :runUuid
""")
void linkJobFacetsToJobVersion(UUID runUuid, UUID jobVersionUuid);

/**
* Used to upsert an immutable {@link JobVersionRow}. A {@link Version} is generated using {@link
* Utils#newJobVersionFor(NamespaceName, JobName, ImmutableSet, ImmutableSet, String)} based on
* the jobs inputs and inputs, source code location, and context.
*
* @param jobRow The job.
* @return A {@link BagOfJobVersionInfo} object.
*/
default BagOfJobVersionInfo upsertJobVersion(
@NonNull JobRow jobRow, List<DatasetRecord> inputs, List<DatasetRecord> outputs) {
// Get the job.
final JobDao jobDao = createJobDao();

// Get the namespace for the job.
final NamespaceRow namespaceRow =
createNamespaceDao().findNamespaceByName(jobRow.getNamespaceName()).get();

// Generate the version for the job; the version may already exist.
final Version jobVersion =
Utils.newJobVersionFor(
NamespaceName.of(jobRow.getNamespaceName()),
JobName.of(
Optional.ofNullable(jobRow.getParentJobName())
.map(pn -> pn + "." + jobRow.getSimpleName())
.orElse(jobRow.getName())),
toDatasetIds(
inputs.stream().map(i -> i.getDatasetVersionRow()).collect(Collectors.toList())),
toDatasetIds(
outputs.stream().map(i -> i.getDatasetVersionRow()).collect(Collectors.toList())),
jobRow.getLocation());

// Add the job version.
final JobVersionDao jobVersionDao = createJobVersionDao();
final JobVersionRow jobVersionRow =
jobVersionDao.upsertJobVersion(
UUID.randomUUID(),
jobRow.getCreatedAt(),
jobRow.getUuid(),
jobRow.getLocation(),
jobVersion.getValue(),
jobRow.getName(),
namespaceRow.getUuid(),
jobRow.getNamespaceName());

// Link the input datasets to the job version.
inputs.forEach(
i -> {
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(), i.getDatasetVersionRow().getDatasetUuid());
});

// Link the output datasets to the job version.
outputs.forEach(
o -> {
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(), o.getDatasetVersionRow().getDatasetUuid());
});

jobDao.updateVersionFor(jobRow.getUuid(), jobRow.getCreatedAt(), jobVersionRow.getUuid());
return new BagOfJobVersionInfo(jobRow, jobVersionRow, null, null); // TODO: extendedJobVersion can be present here & can be used within tests
}

/**
* Used to upsert an immutable {@link JobVersionRow} object when a {@link Run} has transitioned. A
* {@link Version} is generated using {@link Utils#newJobVersionFor(NamespaceName, JobName,
Expand Down Expand Up @@ -323,8 +402,14 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
Optional.ofNullable(jobRow.getParentJobName())
.map(pn -> pn + "." + jobRow.getSimpleName())
.orElse(jobRow.getName())),
toDatasetIds(jobVersionInputs),
toDatasetIds(jobVersionOutputs),
toDatasetIds(
jobVersionInputs.stream()
.map(i -> (DatasetVersionRow) i)
.collect(Collectors.toList())),
toDatasetIds(
jobVersionOutputs.stream()
.map(o -> (DatasetVersionRow) o)
.collect(Collectors.toList())),
jobRow.getLocation());

// Add the job version.
Expand Down Expand Up @@ -360,6 +445,9 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
// Link the run to the job version; multiple run instances may be linked to a job version.
jobVersionDao.updateLatestRunFor(jobVersionRow.getUuid(), transitionedAt, runUuid);

// Link the job facets to this job version
jobVersionDao.linkJobFacetsToJobVersion(runUuid, jobVersionRow.getUuid());

// Link the job version to the job only if the run is marked done and has transitioned into one
// of the following states: COMPLETED, ABORTED, or FAILED.
if (runState.isDone()) {
Expand All @@ -371,17 +459,19 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(

/** Returns the specified {@link ExtendedDatasetVersionRow}s as {@link DatasetId}s. */
default ImmutableSortedSet<DatasetId> toDatasetIds(
@NonNull final List<ExtendedDatasetVersionRow> datasetVersionRows) {
@NonNull final List<DatasetVersionRow> datasetVersionRows) {
final ImmutableSortedSet.Builder<DatasetId> datasetIds = ImmutableSortedSet.naturalOrder();
for (final ExtendedDatasetVersionRow datasetVersionRow : datasetVersionRows) {
datasetIds.add(
new DatasetId(
NamespaceName.of(datasetVersionRow.getNamespaceName()),
DatasetName.of(datasetVersionRow.getDatasetName())));
for (final DatasetVersionRow datasetVersionRow : datasetVersionRows) {
datasetIds.add(toDatasetId(datasetVersionRow));
}
return datasetIds.build();
}

private DatasetId toDatasetId(DatasetVersionRow dataset) {
return new DatasetId(
NamespaceName.of(dataset.getNamespaceName()), DatasetName.of(dataset.getDatasetName()));
}

/** A container class for job version info. */
@Value
class BagOfJobVersionInfo {
Expand Down
Loading

0 comments on commit 5158fc2

Please sign in to comment.