From 7e0f47acef51bfffee57977f1c090d672f6856cf Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Mon, 18 Jul 2022 16:07:26 -0700 Subject: [PATCH 1/2] Changed RunDao to use simple RunRow rather than ExtendedRunRow where possible Signed-off-by: Michael Collado --- .../main/java/marquez/db/OpenLineageDao.java | 58 +++++------ api/src/main/java/marquez/db/RunDao.java | 95 ++++--------------- .../java/marquez/db/mappers/RunRowMapper.java | 60 ++++++++++++ .../marquez/db/models/ExtendedRunRow.java | 18 +++- .../main/java/marquez/db/models/RunRow.java | 12 +-- .../java/marquez/service/DatasetService.java | 2 +- .../main/java/marquez/service/JobService.java | 2 +- .../marquez/service/OpenLineageService.java | 17 +++- .../main/java/marquez/service/RunService.java | 2 +- .../java/marquez/db/BackfillTestUtils.java | 6 +- api/src/test/java/marquez/db/DbTestUtils.java | 3 +- .../java/marquez/db/JobVersionDaoTest.java | 3 +- api/src/test/java/marquez/db/RunDaoTest.java | 2 +- .../V44_3_BackfillJobsWithParentsTest.java | 4 +- 14 files changed, 146 insertions(+), 138 deletions(-) create mode 100644 api/src/main/java/marquez/db/mappers/RunRowMapper.java diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index f2b69735bb..2bde7afbd0 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -32,7 +32,6 @@ import marquez.db.models.DatasetFieldRow; import marquez.db.models.DatasetRow; import marquez.db.models.DatasetVersionRow; -import marquez.db.models.ExtendedRunRow; import marquez.db.models.JobContextRow; import marquez.db.models.JobRow; import marquez.db.models.NamespaceRow; @@ -161,10 +160,9 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper PGobject inputs = new PGobject(); inputs.setType("json"); inputs.setValue("[]"); - Optional parentRunRow = runDao.findRunByUuidAsRow(uuid); JobRow parentJobRow = - parentRunRow - .flatMap(run -> jobDao.findJobByUuidAsRow(run.getJobUuid())) + runDao + .findJobRowByRunUuid(uuid) .orElseGet( () -> { JobRow newParentJobRow = @@ -181,34 +179,36 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper null, inputs); log.info("Created new parent job record {}", newParentJobRow); + + RunArgsRow argsRow = + runArgsDao.upsertRunArgs( + UUID.randomUUID(), + now, + "{}", + Utils.checksumFor(ImmutableMap.of())); + RunRow newRow = + runDao.upsert( + uuid, + null, + facet.getRun().getRunId(), + now, + newParentJobRow.getUuid(), + null, + argsRow.getUuid(), + nominalStartTime, + nominalEndTime, + Optional.ofNullable(event.getEventType()) + .map(this::getRunState) + .orElse(null), + now, + namespace.getName(), + newParentJobRow.getName(), + newParentJobRow.getLocation(), + newParentJobRow.getJobContextUuid().orElse(null)); + log.info("Created new parent run record {}", newRow); return newParentJobRow; }); log.debug("Found parent job record {}", parentJobRow); - if (parentRunRow.isEmpty()) { - RunArgsRow argsRow = - runArgsDao.upsertRunArgs( - UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of())); - ExtendedRunRow newRow = - runDao.upsert( - uuid, - null, - facet.getRun().getRunId(), - now, - parentJobRow.getUuid(), - null, - argsRow.getUuid(), - nominalStartTime, - nominalEndTime, - Optional.ofNullable(event.getEventType()) - .map(this::getRunState) - .orElse(null), - now, - namespace.getName(), - parentJobRow.getName(), - parentJobRow.getLocation(), - parentJobRow.getJobContextUuid().orElse(null)); - log.info("Created new parent run record {}", newRow); - } return parentJobRow; } catch (Exception e) { throw new RuntimeException("Unable to insert parent run", e); diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index 9926519fa3..836a0c8b7d 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -22,7 +22,9 @@ import marquez.common.models.RunId; import marquez.common.models.RunState; import marquez.db.mappers.ExtendedRunRowMapper; +import marquez.db.mappers.JobRowMapper; import marquez.db.mappers.RunMapper; +import marquez.db.mappers.RunRowMapper; import marquez.db.models.DatasetRow; import marquez.db.models.ExtendedRunRow; import marquez.db.models.JobRow; @@ -40,7 +42,9 @@ import org.jdbi.v3.sqlobject.transaction.Transaction; @RegisterRowMapper(ExtendedRunRowMapper.class) +@RegisterRowMapper(RunRowMapper.class) @RegisterRowMapper(RunMapper.class) +@RegisterRowMapper(JobRowMapper.class) public interface RunDao extends BaseDao { @SqlQuery("SELECT EXISTS (SELECT 1 FROM runs WHERE uuid = :rowUuid)") boolean exists(UUID rowUuid); @@ -103,7 +107,18 @@ public interface RunDao extends BaseDao { Optional findRunByUuid(UUID runUuid); @SqlQuery(BASE_FIND_RUN_SQL + "WHERE r.uuid = :runUuid") - Optional findRunByUuidAsRow(UUID runUuid); + Optional findRunByUuidAsExtendedRow(UUID runUuid); + + @SqlQuery("SELECT * FROM runs r WHERE r.uuid = :runUuid") + Optional findRunByUuidAsRow(UUID runUuid); + + @SqlQuery( + """ + SELECT j.* FROM jobs_view j + INNER JOIN runs_view r ON r.job_uuid=j.uuid + WHERE r.uuid=:uuid +""") + Optional findJobRowByRunUuid(UUID uuid); @SqlQuery( """ @@ -195,8 +210,8 @@ SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name, + "nominal_start_time = COALESCE(EXCLUDED.nominal_start_time, runs.nominal_start_time), " + "nominal_end_time = COALESCE(EXCLUDED.nominal_end_time, runs.nominal_end_time), " + "location = EXCLUDED.location " - + "RETURNING uuid") - UUID upsertWithRunState( + + "RETURNING *") + RunRow upsert( UUID runUuid, UUID parentRunUuid, String externalId, @@ -213,42 +228,6 @@ UUID upsertWithRunState( String location, UUID jobContextUuid); - default ExtendedRunRow upsert( - UUID runUuid, - UUID parentRunUuid, - String externalId, - Instant now, - UUID jobUuid, - UUID jobVersionUuid, - UUID runArgsUuid, - Instant nominalStartTime, - Instant nominalEndTime, - RunState runStateType, - Instant runStateTime, - String namespaceName, - String jobName, - String location, - UUID jobContextUuid) { - UUID rowUuid = - upsertWithRunState( - runUuid, - parentRunUuid, - externalId, - now, - jobUuid, - jobVersionUuid, - runArgsUuid, - nominalStartTime, - nominalEndTime, - runStateType, - runStateTime, - namespaceName, - jobName, - location, - jobContextUuid); - return findRunByUuidAsRow(rowUuid).get(); - } - @SqlQuery( "INSERT INTO runs ( " + "uuid, " @@ -286,8 +265,8 @@ default ExtendedRunRow upsert( + "nominal_start_time = COALESCE(EXCLUDED.nominal_start_time, runs.nominal_start_time), " + "nominal_end_time = COALESCE(EXCLUDED.nominal_end_time, runs.nominal_end_time), " + "location = EXCLUDED.location " - + "RETURNING uuid") - UUID upsertWithoutRunState( + + "RETURNING *") + RunRow upsert( UUID runUuid, UUID parentRunUuid, String externalId, @@ -303,40 +282,6 @@ UUID upsertWithoutRunState( String location, UUID jobContextUuid); - default ExtendedRunRow upsert( - UUID runUuid, - UUID parentRunUuid, - String externalId, - Instant now, - UUID jobUuid, - UUID jobVersionUuid, - UUID runArgsUuid, - Instant nominalStartTime, - Instant nominalEndTime, - UUID namespaceUuid, - String namespaceName, - String jobName, - String location, - UUID jobContextUuid) { - UUID runRowUuid = - upsertWithoutRunState( - runUuid, - parentRunUuid, - externalId, - now, - jobUuid, - jobVersionUuid, - runArgsUuid, - nominalStartTime, - nominalEndTime, - namespaceUuid, - namespaceName, - jobName, - location, - jobContextUuid); - return findRunByUuidAsRow(runRowUuid).get(); - } - @SqlUpdate( "INSERT INTO runs_input_mapping (run_uuid, dataset_version_uuid) " + "VALUES (:runUuid, :datasetVersionUuid) ON CONFLICT DO NOTHING") diff --git a/api/src/main/java/marquez/db/mappers/RunRowMapper.java b/api/src/main/java/marquez/db/mappers/RunRowMapper.java new file mode 100644 index 0000000000..44c1adbe4f --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/RunRowMapper.java @@ -0,0 +1,60 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.db.mappers; + +import static marquez.db.Columns.stringOrNull; +import static marquez.db.Columns.timestampOrNull; +import static marquez.db.Columns.timestampOrThrow; +import static marquez.db.Columns.uuidOrNull; +import static marquez.db.Columns.uuidOrThrow; + +import com.fasterxml.jackson.core.type.TypeReference; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import lombok.NonNull; +import marquez.common.Utils; +import marquez.common.models.DatasetVersionId; +import marquez.db.Columns; +import marquez.db.models.RunRow; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +public final class RunRowMapper implements RowMapper { + @Override + public RunRow map(@NonNull ResultSet results, @NonNull StatementContext context) + throws SQLException { + Set columnNames = MapperUtils.getColumnNames(results.getMetaData()); + + return new RunRow( + uuidOrThrow(results, Columns.ROW_UUID), + timestampOrThrow(results, Columns.CREATED_AT), + timestampOrThrow(results, Columns.UPDATED_AT), + uuidOrNull(results, Columns.JOB_UUID), + uuidOrNull(results, Columns.JOB_VERSION_UUID), + uuidOrNull(results, Columns.PARENT_RUN_UUID), + uuidOrThrow(results, Columns.RUN_ARGS_UUID), + timestampOrNull(results, Columns.NOMINAL_START_TIME), + timestampOrNull(results, Columns.NOMINAL_END_TIME), + stringOrNull(results, Columns.CURRENT_RUN_STATE), + columnNames.contains(Columns.STARTED_AT) + ? timestampOrNull(results, Columns.STARTED_AT) + : null, + uuidOrNull(results, Columns.START_RUN_STATE_UUID), + columnNames.contains(Columns.ENDED_AT) ? timestampOrNull(results, Columns.ENDED_AT) : null, + uuidOrNull(results, Columns.END_RUN_STATE_UUID)); + } + + private List toDatasetVersion(ResultSet rs, String column) throws SQLException { + String dsString = rs.getString(column); + if (dsString == null) { + return Collections.emptyList(); + } + return Utils.fromJson(dsString, new TypeReference>() {}); + } +} diff --git a/api/src/main/java/marquez/db/models/ExtendedRunRow.java b/api/src/main/java/marquez/db/models/ExtendedRunRow.java index 23054c99db..6bd3a622bf 100644 --- a/api/src/main/java/marquez/db/models/ExtendedRunRow.java +++ b/api/src/main/java/marquez/db/models/ExtendedRunRow.java @@ -18,6 +18,10 @@ @EqualsAndHashCode(callSuper = true) @ToString(callSuper = true) public class ExtendedRunRow extends RunRow { + @Getter @NonNull private final List inputVersions; + @Getter @NonNull private final List outputVersions; + @Getter private final String namespaceName; + @Getter private final String jobName; @Getter private final String args; public ExtendedRunRow( @@ -48,17 +52,21 @@ public ExtendedRunRow( jobVersionUuid, parentRunUuid, runArgsUuid, - inputVersions, - outputVersions, nominalStartTime, nominalEndTime, currentRunState, startedAt, startRunStateUuid, endedAt, - endRunStateUuid, - namespaceName, - jobName); + endRunStateUuid); + this.inputVersions = inputVersions; + this.outputVersions = outputVersions; this.args = args; + this.jobName = jobName; + this.namespaceName = namespaceName; + } + + public boolean hasInputVersionUuids() { + return !inputVersions.isEmpty(); } } diff --git a/api/src/main/java/marquez/db/models/RunRow.java b/api/src/main/java/marquez/db/models/RunRow.java index 32a86a8053..7aebd14ae6 100644 --- a/api/src/main/java/marquez/db/models/RunRow.java +++ b/api/src/main/java/marquez/db/models/RunRow.java @@ -6,7 +6,6 @@ package marquez.db.models; import java.time.Instant; -import java.util.List; import java.util.Optional; import java.util.UUID; import javax.annotation.Nullable; @@ -15,7 +14,6 @@ import lombok.Getter; import lombok.NonNull; import lombok.ToString; -import marquez.common.models.DatasetVersionId; @AllArgsConstructor @EqualsAndHashCode @@ -24,12 +22,10 @@ public class RunRow { @Getter @NonNull private final UUID uuid; @Getter @NonNull private final Instant createdAt; @Getter @NonNull private final Instant updatedAt; - @Getter @NonNull private final UUID jobUuid; + @Getter private final UUID jobUuid; @Nullable private final UUID jobVersionUuid; @Nullable private final UUID parentRunUuid; @Getter @NonNull private final UUID runArgsUuid; - @Getter @NonNull private final List inputVersions; - @Getter @NonNull private final List outputVersions; @Nullable private final Instant nominalStartTime; @Nullable private final Instant nominalEndTime; @Nullable private final String currentRunState; @@ -37,12 +33,6 @@ public class RunRow { @Nullable private final UUID startRunStateUuid; @Nullable private final Instant endedAt; @Nullable private final UUID endRunStateUuid; - @Getter private final String namespaceName; - @Getter private final String jobName; - - public boolean hasInputVersionUuids() { - return !inputVersions.isEmpty(); - } public Optional getParentRunUuid() { return Optional.ofNullable(parentRunUuid); diff --git a/api/src/main/java/marquez/service/DatasetService.java b/api/src/main/java/marquez/service/DatasetService.java index 18a1f205d0..266f8d5154 100644 --- a/api/src/main/java/marquez/service/DatasetService.java +++ b/api/src/main/java/marquez/service/DatasetService.java @@ -62,7 +62,7 @@ public Dataset createOrUpdate( @NonNull DatasetMeta datasetMeta) { if (datasetMeta.getRunId().isPresent()) { UUID runUuid = datasetMeta.getRunId().get().getValue(); - ExtendedRunRow runRow = runDao.findRunByUuidAsRow(runUuid).get(); + ExtendedRunRow runRow = runDao.findRunByUuidAsExtendedRow(runUuid).get(); List outputs = datasetVersionDao.findOutputDatasetVersionsFor(runUuid); diff --git a/api/src/main/java/marquez/service/JobService.java b/api/src/main/java/marquez/service/JobService.java index 12ed0c57f1..e4057c07b1 100644 --- a/api/src/main/java/marquez/service/JobService.java +++ b/api/src/main/java/marquez/service/JobService.java @@ -52,7 +52,7 @@ public Job createOrUpdate( if (jobMeta.getRunId().isPresent()) { UUID runUuid = jobMeta.getRunId().get().getValue(); runDao.notifyJobChange(runUuid, jobRow, jobMeta); - ExtendedRunRow runRow = runDao.findRunByUuidAsRow(runUuid).get(); + ExtendedRunRow runRow = runDao.findRunByUuidAsExtendedRow(runUuid).get(); List inputs = datasetVersionDao.findInputDatasetVersionsFor(runUuid); diff --git a/api/src/main/java/marquez/service/OpenLineageService.java b/api/src/main/java/marquez/service/OpenLineageService.java index 7d593c308e..adfc6d2e10 100644 --- a/api/src/main/java/marquez/service/OpenLineageService.java +++ b/api/src/main/java/marquez/service/OpenLineageService.java @@ -34,6 +34,7 @@ import marquez.db.DatasetDao; import marquez.db.DatasetVersionDao; import marquez.db.models.ExtendedDatasetVersionRow; +import marquez.db.models.JobRow; import marquez.db.models.RunArgsRow; import marquez.db.models.RunRow; import marquez.db.models.UpdateLineageRow; @@ -121,7 +122,12 @@ private Optional buildJobOutputUpdate(UpdateLineageRow record) private Optional buildJobInputUpdate(UpdateLineageRow record) { RunId runId = RunId.of(record.getRun().getUuid()); return buildJobInput( - record.getRun(), record.getRunArgs(), buildJobVersionId(record), runId, record); + record.getRun(), + record.getRunArgs(), + record.getJob(), + buildJobVersionId(record), + runId, + record); } public JobVersionId buildJobVersionId(UpdateLineageRow record) { @@ -161,14 +167,15 @@ Optional buildJobOutput( new JobOutputUpdate( runId, jobVersionId, - JobName.of(record.getRun().getJobName()), - NamespaceName.of(record.getRun().getNamespaceName()), + JobName.of(record.getJob().getName()), + NamespaceName.of(record.getJob().getNamespaceName()), runOutputs)); } Optional buildJobInput( RunRow run, RunArgsRow runArgsRow, + JobRow jobRow, JobVersionId jobVersionId, RunId runId, UpdateLineageRow record) { @@ -203,8 +210,8 @@ Optional buildJobInput( .args(runArgs) .build(), jobVersionId, - JobName.of(run.getJobName()), - NamespaceName.of(run.getNamespaceName()), + JobName.of(jobRow.getName()), + NamespaceName.of(jobRow.getNamespaceName()), runInputs)); } diff --git a/api/src/main/java/marquez/service/RunService.java b/api/src/main/java/marquez/service/RunService.java index ef2ad9b3e9..db8d46aa99 100644 --- a/api/src/main/java/marquez/service/RunService.java +++ b/api/src/main/java/marquez/service/RunService.java @@ -77,7 +77,7 @@ public void markRunAs( if (transitionedAt == null) { transitionedAt = Instant.now(); } - ExtendedRunRow runRow = findRunByUuidAsRow(runId.getValue()).get(); + ExtendedRunRow runRow = findRunByUuidAsExtendedRow(runId.getValue()).get(); runStateDao.updateRunStateFor(runId.getValue(), runState, transitionedAt); if (runState.isDone()) { diff --git a/api/src/test/java/marquez/db/BackfillTestUtils.java b/api/src/test/java/marquez/db/BackfillTestUtils.java index c51c50beab..5258d201f3 100644 --- a/api/src/test/java/marquez/db/BackfillTestUtils.java +++ b/api/src/test/java/marquez/db/BackfillTestUtils.java @@ -17,9 +17,9 @@ import java.util.Optional; import java.util.UUID; import marquez.common.Utils; -import marquez.db.models.ExtendedRunRow; import marquez.db.models.NamespaceRow; import marquez.db.models.RunArgsRow; +import marquez.db.models.RunRow; import marquez.service.models.LineageEvent; import marquez.service.models.LineageEvent.JobFacet; import marquez.service.models.LineageEvent.JobLink; @@ -35,7 +35,7 @@ public class BackfillTestUtils { public static final String COMPLETE = "COMPLETE"; - public static ExtendedRunRow writeNewEvent( + public static RunRow writeNewEvent( Jdbi jdbi, String jobName, Instant now, @@ -52,7 +52,7 @@ public static ExtendedRunRow writeNewEvent( runArgsDao.upsertRunArgs( UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of())); UUID runUuid = UUID.randomUUID(); - ExtendedRunRow runRow = + RunRow runRow = runDao.upsert( runUuid, null, diff --git a/api/src/test/java/marquez/db/DbTestUtils.java b/api/src/test/java/marquez/db/DbTestUtils.java index c6645e36c5..9fb7531d57 100644 --- a/api/src/test/java/marquez/db/DbTestUtils.java +++ b/api/src/test/java/marquez/db/DbTestUtils.java @@ -41,7 +41,6 @@ import marquez.common.models.RunState; import marquez.db.models.DatasetRow; import marquez.db.models.ExtendedJobVersionRow; -import marquez.db.models.ExtendedRunRow; import marquez.db.models.JobContextRow; import marquez.db.models.JobRow; import marquez.db.models.JobVersionRow; @@ -256,7 +255,7 @@ static RunRow newRun(final Jdbi jdbi, JobRow jobRow) { } /** Adds a new {@link RunRow} object to the {@code runs} table. */ - static ExtendedRunRow newRun( + static RunRow newRun( final Jdbi jdbi, final UUID jobUuid, final UUID jobVersionUuid, diff --git a/api/src/test/java/marquez/db/JobVersionDaoTest.java b/api/src/test/java/marquez/db/JobVersionDaoTest.java index 983859d7de..626b5b6919 100644 --- a/api/src/test/java/marquez/db/JobVersionDaoTest.java +++ b/api/src/test/java/marquez/db/JobVersionDaoTest.java @@ -27,7 +27,6 @@ import marquez.db.models.DatasetRow; import marquez.db.models.ExtendedDatasetVersionRow; import marquez.db.models.ExtendedJobVersionRow; -import marquez.db.models.ExtendedRunRow; import marquez.db.models.JobRow; import marquez.db.models.NamespaceRow; import marquez.db.models.RunArgsRow; @@ -128,7 +127,7 @@ public void testUpdateLatestRunFor() { // (2) Add a new run. final RunArgsRow runArgsRow = DbTestUtils.newRunArgs(jdbiForTesting); - final ExtendedRunRow runRow = + final RunRow runRow = DbTestUtils.newRun( jdbiForTesting, jobVersionRow.getJobUuid(), diff --git a/api/src/test/java/marquez/db/RunDaoTest.java b/api/src/test/java/marquez/db/RunDaoTest.java index 248406ce2e..42f1f9a64c 100644 --- a/api/src/test/java/marquez/db/RunDaoTest.java +++ b/api/src/test/java/marquez/db/RunDaoTest.java @@ -243,7 +243,7 @@ public void updateRowWithNullNominalTimeDoesNotUpdateNominalTime() { null, namespaceRow.getUuid(), namespaceRow.getName(), - row.getJobName(), + jobRow.getName(), null, null); diff --git a/api/src/test/java/marquez/db/migrations/V44_3_BackfillJobsWithParentsTest.java b/api/src/test/java/marquez/db/migrations/V44_3_BackfillJobsWithParentsTest.java index b754ac8e30..c959555110 100644 --- a/api/src/test/java/marquez/db/migrations/V44_3_BackfillJobsWithParentsTest.java +++ b/api/src/test/java/marquez/db/migrations/V44_3_BackfillJobsWithParentsTest.java @@ -17,8 +17,8 @@ import java.util.UUID; import marquez.db.NamespaceDao; import marquez.db.OpenLineageDao; -import marquez.db.models.ExtendedRunRow; import marquez.db.models.NamespaceRow; +import marquez.db.models.RunRow; import marquez.jdbi.JdbiExternalPostgresExtension.FlywayTarget; import marquez.jdbi.MarquezJdbiExternalPostgresExtension; import org.flywaydb.core.api.configuration.Configuration; @@ -50,7 +50,7 @@ public void testBackfill() throws SQLException, JsonProcessingException { NamespaceRow namespace = namespaceDao.upsertNamespaceRow(UUID.randomUUID(), now, NAMESPACE, "me"); String parentName = "parentJob"; - ExtendedRunRow parentRun = writeNewEvent(jdbi, parentName, now, namespace, null, null); + RunRow parentRun = writeNewEvent(jdbi, parentName, now, namespace, null, null); String task1Name = "task1"; writeNewEvent(jdbi, task1Name, now, namespace, parentRun.getUuid().toString(), parentName); From 5c88d5b92fc335d9ea9d0272f825fad65c4fb0df Mon Sep 17 00:00:00 2001 From: Michael Collado Date: Mon, 18 Jul 2022 16:09:43 -0700 Subject: [PATCH 2/2] Remove need to query JobRow on run completion Signed-off-by: Michael Collado --- api/src/main/java/marquez/db/JobVersionDao.java | 7 ++----- api/src/main/java/marquez/db/OpenLineageDao.java | 3 +-- api/src/main/java/marquez/service/RunService.java | 8 ++++++-- api/src/test/java/marquez/db/JobVersionDaoTest.java | 12 ++---------- api/src/test/java/marquez/db/RunDaoTest.java | 12 ++---------- 5 files changed, 13 insertions(+), 29 deletions(-) diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index 28fcc5eb18..ffe25708f6 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -286,22 +286,19 @@ default List findOutputDatasetsFor(UUID jobVersionUuid) { * code location, and context. A version for a given job is created only when a {@link Run} * transitions into a {@code COMPLETED}, {@code ABORTED}, or {@code FAILED} state. * - * @param namespaceName The namespace for the job version. - * @param jobName The name of the job. + * @param jobRow The job. * @param runUuid The unique ID of the run associated with the job version. * @param runState The current run state. * @param transitionedAt The timestamp of the run state transition. * @return A {@link BagOfJobVersionInfo} object. */ default BagOfJobVersionInfo upsertJobVersionOnRunTransition( - @NonNull String namespaceName, - @NonNull String jobName, + @NonNull JobRow jobRow, @NonNull UUID runUuid, @NonNull RunState runState, @NonNull Instant transitionedAt) { // Get the job. final JobDao jobDao = createJobDao(); - final JobRow jobRow = jobDao.findJobByNameAsRow(namespaceName, jobName).get(); // Get the job context. final UUID jobContextUuid = jobRow.getJobContextUuid().get(); diff --git a/api/src/main/java/marquez/db/OpenLineageDao.java b/api/src/main/java/marquez/db/OpenLineageDao.java index 2bde7afbd0..265dcf992a 100644 --- a/api/src/main/java/marquez/db/OpenLineageDao.java +++ b/api/src/main/java/marquez/db/OpenLineageDao.java @@ -390,8 +390,7 @@ default void updateMarquezOnComplete( BagOfJobVersionInfo bagOfJobVersionInfo = createJobVersionDao() .upsertJobVersionOnRunTransition( - updateLineageRow.getRun().getNamespaceName(), - updateLineageRow.getRun().getJobName(), + updateLineageRow.getJob(), updateLineageRow.getRun().getUuid(), runState, event.getEventTime().toInstant()); diff --git a/api/src/main/java/marquez/service/RunService.java b/api/src/main/java/marquez/service/RunService.java index db8d46aa99..efe1b3056b 100644 --- a/api/src/main/java/marquez/service/RunService.java +++ b/api/src/main/java/marquez/service/RunService.java @@ -27,6 +27,7 @@ import marquez.common.models.RunId; import marquez.common.models.RunState; import marquez.db.BaseDao; +import marquez.db.JobDao; import marquez.db.JobVersionDao; import marquez.db.JobVersionDao.BagOfJobVersionInfo; import marquez.db.RunStateDao; @@ -48,6 +49,7 @@ public class RunService extends DelegatingDaos.DelegatingRunDao { private final JobVersionDao jobVersionDao; private final RunStateDao runStateDao; private final Collection runTransitionListeners; + private final JobDao jobDao; public RunService( @NonNull BaseDao baseDao, Collection runTransitionListeners) { @@ -55,6 +57,7 @@ public RunService( this.jobVersionDao = baseDao.createJobVersionDao(); this.runStateDao = baseDao.createRunStateDao(); this.runTransitionListeners = runTransitionListeners; + this.jobDao = baseDao.createJobDao(); } /** @@ -83,8 +86,9 @@ public void markRunAs( if (runState.isDone()) { BagOfJobVersionInfo bagOfJobVersionInfo = jobVersionDao.upsertJobVersionOnRunTransition( - runRow.getNamespaceName(), - runRow.getJobName(), + jobDao + .findJobByNameAsRow(runRow.getNamespaceName(), runRow.getJobName()) + .orElseThrow(), runRow.getUuid(), runState, transitionedAt); diff --git a/api/src/test/java/marquez/db/JobVersionDaoTest.java b/api/src/test/java/marquez/db/JobVersionDaoTest.java index 626b5b6919..c36bc94235 100644 --- a/api/src/test/java/marquez/db/JobVersionDaoTest.java +++ b/api/src/test/java/marquez/db/JobVersionDaoTest.java @@ -215,11 +215,7 @@ public void testGetJobVersions() { jdbiForTesting, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs()); jobVersionDao.upsertJobVersionOnRunTransition( - jobRow.getNamespaceName(), - jobRow.getName(), - runRow.getUuid(), - RunState.COMPLETED, - Instant.now()); + jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now()); List jobVersions = jobVersionDao.findAllJobVersions(namespaceRow.getName(), jobRow.getName(), 10, 0); @@ -288,11 +284,7 @@ public void testUpsertJobVersionOnRunTransition() { // (6) Add a new job version on the run state transition to COMPLETED. final BagOfJobVersionInfo bagOfJobVersionInfo = jobVersionDao.upsertJobVersionOnRunTransition( - jobRow.getNamespaceName(), - jobRow.getName(), - runRow.getUuid(), - RunState.COMPLETED, - newTimestamp()); + jobRow, runRow.getUuid(), RunState.COMPLETED, newTimestamp()); // Ensure the job version is associated with the latest run. final RunRow latestRunRowForJobVersion = runDao.findRunByUuidAsRow(runRow.getUuid()).get(); diff --git a/api/src/test/java/marquez/db/RunDaoTest.java b/api/src/test/java/marquez/db/RunDaoTest.java index 42f1f9a64c..87b0d79925 100644 --- a/api/src/test/java/marquez/db/RunDaoTest.java +++ b/api/src/test/java/marquez/db/RunDaoTest.java @@ -97,11 +97,7 @@ public void getRun() { jdbi, runRow.getUuid(), RunState.COMPLETED, jobMeta.getOutputs()); jobVersionDao.upsertJobVersionOnRunTransition( - jobRow.getNamespaceName(), - jobRow.getName(), - runRow.getUuid(), - RunState.COMPLETED, - Instant.now()); + jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now()); Optional run = runDao.findRunByUuid(runRow.getUuid()); assertThat(run) @@ -211,11 +207,7 @@ private Stream createRunsForJob( jdbi, runRow.getUuid(), RunState.COMPLETED, outputs); jobVersionDao.upsertJobVersionOnRunTransition( - jobRow.getNamespaceName(), - jobRow.getName(), - runRow.getUuid(), - RunState.COMPLETED, - Instant.now()); + jobRow, runRow.getUuid(), RunState.COMPLETED, Instant.now()); return runRow; }); }