Skip to content

Commit

Permalink
Remove job context.
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

Add call to API in frontend.
Add tests.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

Update javadocs.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>

Apply spotless fixes.

Signed-off-by: Jakub Dardzinski <kuba0221@gmail.com>
  • Loading branch information
JDarDagran committed Jan 20, 2023
1 parent 3492df2 commit d9daa4e
Show file tree
Hide file tree
Showing 76 changed files with 499 additions and 481 deletions.
7 changes: 4 additions & 3 deletions api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import lombok.NonNull;
import marquez.api.ColumnLineageResource;
import marquez.api.DatasetResource;
import marquez.api.FacetResource;
import marquez.api.JobResource;
import marquez.api.NamespaceResource;
import marquez.api.OpenLineageResource;
Expand All @@ -28,7 +29,6 @@
import marquez.db.DatasetDao;
import marquez.db.DatasetFieldDao;
import marquez.db.DatasetVersionDao;
import marquez.db.JobContextDao;
import marquez.db.JobDao;
import marquez.db.JobVersionDao;
import marquez.db.LineageDao;
Expand Down Expand Up @@ -67,7 +67,6 @@ public final class MarquezContext {
@Getter private final DatasetVersionDao datasetVersionDao;
@Getter private final JobDao jobDao;
@Getter private final JobVersionDao jobVersionDao;
@Getter private final JobContextDao jobContextDao;
@Getter private final RunDao runDao;
@Getter private final RunArgsDao runArgsDao;
@Getter private final RunStateDao runStateDao;
Expand All @@ -91,6 +90,7 @@ public final class MarquezContext {
@Getter private final SourceResource sourceResource;
@Getter private final DatasetResource datasetResource;
@Getter private final ColumnLineageResource columnLineageResource;
@Getter private final FacetResource facetResource;
@Getter private final JobResource jobResource;
@Getter private final TagResource tagResource;
@Getter private final OpenLineageResource openLineageResource;
Expand All @@ -116,7 +116,6 @@ private MarquezContext(
this.datasetVersionDao = jdbi.onDemand(DatasetVersionDao.class);
this.jobDao = jdbi.onDemand(JobDao.class);
this.jobVersionDao = jdbi.onDemand(JobVersionDao.class);
this.jobContextDao = jdbi.onDemand(JobContextDao.class);
this.runDao = jdbi.onDemand(RunDao.class);
this.runArgsDao = jdbi.onDemand(RunArgsDao.class);
this.runStateDao = jdbi.onDemand(RunStateDao.class);
Expand Down Expand Up @@ -158,6 +157,7 @@ private MarquezContext(
this.sourceResource = new SourceResource(serviceFactory);
this.datasetResource = new DatasetResource(serviceFactory);
this.columnLineageResource = new ColumnLineageResource(serviceFactory);
this.facetResource = new FacetResource(serviceFactory, openLineageDao);
this.jobResource = new JobResource(serviceFactory, jobVersionDao);
this.tagResource = new TagResource(serviceFactory);
this.openLineageResource = new OpenLineageResource(serviceFactory, openLineageDao);
Expand All @@ -168,6 +168,7 @@ private MarquezContext(
namespaceResource,
sourceResource,
datasetResource,
facetResource,
columnLineageResource,
jobResource,
tagResource,
Expand Down
63 changes: 63 additions & 0 deletions api/src/main/java/marquez/api/FacetResource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.api;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import marquez.common.models.RunId;
import marquez.db.OpenLineageDao;
import marquez.service.ServiceFactory;
import marquez.service.models.Facets;


@Slf4j
@Path("/api/v1/facets")
public class FacetResource extends BaseResource {

private final OpenLineageDao openLineageDao;

public FacetResource(
@NonNull final ServiceFactory serviceFactory, @NonNull final OpenLineageDao openLineageDao) {
super(serviceFactory);
this.openLineageDao = openLineageDao;
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Produces(APPLICATION_JSON)
@Path("/run/{id}")
public Response getRunFacets(
@PathParam("id") RunId runId) {
throwIfNotExists(runId);
Facets runFacets = openLineageDao.findRunFacetsByRunUuid(runId.getValue());
return Response.ok(runFacets).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Produces(APPLICATION_JSON)
@Path("job/run/{id}")
public Response getJobFacets(
@PathParam("id") RunId runId) {
throwIfNotExists(runId);
Facets jobFacets = openLineageDao.findJobFacetsByRunUuid(runId.getValue());
return Response.ok(jobFacets).build();
}
}
4 changes: 0 additions & 4 deletions api/src/main/java/marquez/api/models/JobVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package marquez.api.models;

import com.google.common.collect.ImmutableMap;
import java.net.URL;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -35,7 +34,6 @@ public final class JobVersion {
@Getter private final Version version;
@Getter private final NamespaceName namespace;
@Nullable private final URL location;
@Getter private final ImmutableMap<String, String> context;
@Getter private final List<DatasetId> inputs;
@Getter private final List<DatasetId> outputs;
@Getter @Nullable private final Run latestRun;
Expand All @@ -46,7 +44,6 @@ public JobVersion(
@NonNull final Instant createdAt,
@NonNull final Version version,
@Nullable final URL location,
@Nullable final ImmutableMap<String, String> context,
List<DatasetId> inputs,
List<DatasetId> outputs,
@Nullable Run latestRun) {
Expand All @@ -56,7 +53,6 @@ public JobVersion(
this.version = version;
this.namespace = id.getNamespace();
this.location = location;
this.context = (context == null) ? ImmutableMap.of() : context;
this.inputs = inputs;
this.outputs = outputs;
this.latestRun = latestRun;
Expand Down
6 changes: 1 addition & 5 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.Hashing;
import io.dropwizard.jackson.Jackson;
Expand Down Expand Up @@ -236,7 +235,6 @@ public static Instant toInstant(@Nullable final String asIso) {
* @param jobName The name of the job.
* @param jobInputIds The input dataset IDs for the job.
* @param jobOutputIds The output dataset IDs for the job.
* @param jobContext The context of the job.
* @param jobLocation The source code location for the job.
* @return A {@link Version} object based on the specified job meta.
*/
Expand All @@ -245,7 +243,6 @@ public static Version newJobVersionFor(
@NonNull final JobName jobName,
@NonNull final ImmutableSet<DatasetId> jobInputIds,
@NonNull final ImmutableSet<DatasetId> jobOutputIds,
@NonNull final ImmutableMap<String, String> jobContext,
@Nullable final String jobLocation) {
final byte[] bytes =
VERSION_JOINER
Expand All @@ -268,8 +265,7 @@ public static Version newJobVersionFor(
jobOutputId.getNamespace().getValue(),
jobOutputId.getName().getValue()))
.collect(joining(VERSION_DELIM)),
jobLocation,
KV_JOINER.join(jobContext))
jobLocation)
.getBytes(UTF_8);
return Version.of(UUID.nameUUIDFromBytes(bytes));
}
Expand Down
5 changes: 2 additions & 3 deletions api/src/main/java/marquez/common/models/DatasetId.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
Expand All @@ -16,8 +15,8 @@

/**
* ID for {@code Dataset}. The class implements {@link Comparable} to ensure job versions generated
* with {@link Utils#newJobVersionFor(NamespaceName, JobName, ImmutableSet, ImmutableSet,
* ImmutableMap, String)} are consistent as jobs may contain inputs and outputs out of order.
* with {@link Utils#newJobVersionFor(NamespaceName, JobName, ImmutableSet, ImmutableSet, String)}
* are consistent as jobs may contain inputs and outputs out of order.
*/
@EqualsAndHashCode
@ToString
Expand Down
3 changes: 0 additions & 3 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ public interface BaseDao extends SqlObject {
@CreateSqlObject
DatasetVersionDao createDatasetVersionDao();

@CreateSqlObject
JobContextDao createJobContextDao();

@CreateSqlObject
JobDao createJobDao();

Expand Down
4 changes: 0 additions & 4 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,9 @@ private Columns() {}

/* JOB VERSION ROW COLUMNS */
public static final String JOB_UUID = "job_uuid";
public static final String JOB_CONTEXT_UUID = "job_context_uuid";
public static final String LOCATION = "location";
public static final String LATEST_RUN_UUID = "latest_run_uuid";

/* JOB CONTEXT ROW COLUMNS */
public static final String CONTEXT = "context";

/* RUN ROW COLUMNS */
public static final String EXTERNAL_ID = "external_id";
public static final String RUN_ARGS_UUID = "run_args_uuid";
Expand Down
39 changes: 0 additions & 39 deletions api/src/main/java/marquez/db/JobContextDao.java

This file was deleted.

22 changes: 4 additions & 18 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import marquez.common.Utils;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
import marquez.common.models.JobName;
import marquez.common.models.JobType;
import marquez.common.models.NamespaceName;
import marquez.db.mappers.JobMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
import marquez.service.models.Job;
Expand Down Expand Up @@ -58,10 +56,9 @@ SELECT EXISTS (

@SqlQuery(
"""
SELECT j.*, jc.context, f.facets
SELECT j.*, f.facets
FROM jobs_view j
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(e.facets) AS facets
FROM (
Expand Down Expand Up @@ -130,10 +127,9 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {

@SqlQuery(
"""
SELECT j.*, jc.context, f.facets
SELECT j.*, f.facets
FROM jobs_view AS j
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(e.facets) AS facets
FROM (
Expand Down Expand Up @@ -208,13 +204,6 @@ default JobRow upsertJobMeta(
createNamespaceDao()
.upsertNamespaceRow(
UUID.randomUUID(), createdAt, namespaceName.getValue(), DEFAULT_NAMESPACE_OWNER);
JobContextRow contextRow =
createJobContextDao()
.upsert(
UUID.randomUUID(),
createdAt,
Utils.toJson(jobMeta.getContext()),
Utils.checksumFor(jobMeta.getContext()));
return upsertJob(
UUID.randomUUID(),
jobMeta.getType(),
Expand All @@ -223,7 +212,6 @@ default JobRow upsertJobMeta(
namespace.getName(),
jobName.getValue(),
jobMeta.getDescription().orElse(null),
contextRow.getUuid(),
toUrlString(jobMeta.getLocation().orElse(null)),
symlinkTargetUuid,
toJson(jobMeta.getInputs(), mapper));
Expand Down Expand Up @@ -276,7 +264,7 @@ INSERT INTO jobs_view AS j (
:namespaceName,
:name,
:description,
:jobContextUuid,
null,
:location,
:inputs,
:symlinkTargetId,
Expand All @@ -291,7 +279,6 @@ JobRow upsertJob(
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs);
Expand Down Expand Up @@ -328,7 +315,7 @@ INSERT INTO jobs_view AS j (
:namespaceName,
:name,
:description,
:jobContextUuid,
null,
:location,
:inputs,
:symlinkTargetId
Expand All @@ -344,7 +331,6 @@ JobRow upsertJob(
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs);
Expand Down
Loading

0 comments on commit d9daa4e

Please sign in to comment.