Skip to content

Commit

Permalink
Runs row reduction (#2041)
Browse files Browse the repository at this point in the history
* Changed RunDao to use simple RunRow rather than ExtendedRunRow where possible

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Remove need to query JobRow on run completion

Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike authored Jul 26, 2022
1 parent ccbdb96 commit ee44ae0
Show file tree
Hide file tree
Showing 15 changed files with 159 additions and 167 deletions.
7 changes: 2 additions & 5 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,22 +286,19 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
* code location, and context. A version for a given job is created <i>only</i> when a {@link Run}
* transitions into a {@code COMPLETED}, {@code ABORTED}, or {@code FAILED} state.
*
* @param namespaceName The namespace for the job version.
* @param jobName The name of the job.
* @param jobRow The job.
* @param runUuid The unique ID of the run associated with the job version.
* @param runState The current run state.
* @param transitionedAt The timestamp of the run state transition.
* @return A {@link BagOfJobVersionInfo} object.
*/
default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
@NonNull String namespaceName,
@NonNull String jobName,
@NonNull JobRow jobRow,
@NonNull UUID runUuid,
@NonNull RunState runState,
@NonNull Instant transitionedAt) {
// Get the job.
final JobDao jobDao = createJobDao();
final JobRow jobRow = jobDao.findJobByNameAsRow(namespaceName, jobName).get();

// Get the job context.
final UUID jobContextUuid = jobRow.getJobContextUuid().get();
Expand Down
61 changes: 30 additions & 31 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
Expand Down Expand Up @@ -161,10 +160,9 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue("[]");
Optional<ExtendedRunRow> parentRunRow = runDao.findRunByUuidAsRow(uuid);
JobRow parentJobRow =
parentRunRow
.flatMap(run -> jobDao.findJobByUuidAsRow(run.getJobUuid()))
runDao
.findJobRowByRunUuid(uuid)
.orElseGet(
() -> {
JobRow newParentJobRow =
Expand All @@ -181,34 +179,36 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
null,
inputs);
log.info("Created new parent job record {}", newParentJobRow);

RunArgsRow argsRow =
runArgsDao.upsertRunArgs(
UUID.randomUUID(),
now,
"{}",
Utils.checksumFor(ImmutableMap.of()));
RunRow newRow =
runDao.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
newParentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType())
.map(this::getRunState)
.orElse(null),
now,
namespace.getName(),
newParentJobRow.getName(),
newParentJobRow.getLocation(),
newParentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);
return newParentJobRow;
});
log.debug("Found parent job record {}", parentJobRow);
if (parentRunRow.isEmpty()) {
RunArgsRow argsRow =
runArgsDao.upsertRunArgs(
UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of()));
ExtendedRunRow newRow =
runDao.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
parentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType())
.map(this::getRunState)
.orElse(null),
now,
namespace.getName(),
parentJobRow.getName(),
parentJobRow.getLocation(),
parentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);
}
return parentJobRow;
} catch (Exception e) {
throw new RuntimeException("Unable to insert parent run", e);
Expand Down Expand Up @@ -390,8 +390,7 @@ default void updateMarquezOnComplete(
BagOfJobVersionInfo bagOfJobVersionInfo =
createJobVersionDao()
.upsertJobVersionOnRunTransition(
updateLineageRow.getRun().getNamespaceName(),
updateLineageRow.getRun().getJobName(),
updateLineageRow.getJob(),
updateLineageRow.getRun().getUuid(),
runState,
event.getEventTime().toInstant());
Expand Down
95 changes: 20 additions & 75 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import marquez.common.models.RunId;
import marquez.common.models.RunState;
import marquez.db.mappers.ExtendedRunRowMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.mappers.RunMapper;
import marquez.db.mappers.RunRowMapper;
import marquez.db.models.DatasetRow;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.JobRow;
Expand All @@ -40,7 +42,9 @@
import org.jdbi.v3.sqlobject.transaction.Transaction;

@RegisterRowMapper(ExtendedRunRowMapper.class)
@RegisterRowMapper(RunRowMapper.class)
@RegisterRowMapper(RunMapper.class)
@RegisterRowMapper(JobRowMapper.class)
public interface RunDao extends BaseDao {
@SqlQuery("SELECT EXISTS (SELECT 1 FROM runs WHERE uuid = :rowUuid)")
boolean exists(UUID rowUuid);
Expand Down Expand Up @@ -103,7 +107,18 @@ public interface RunDao extends BaseDao {
Optional<Run> findRunByUuid(UUID runUuid);

@SqlQuery(BASE_FIND_RUN_SQL + "WHERE r.uuid = :runUuid")
Optional<ExtendedRunRow> findRunByUuidAsRow(UUID runUuid);
Optional<ExtendedRunRow> findRunByUuidAsExtendedRow(UUID runUuid);

@SqlQuery("SELECT * FROM runs r WHERE r.uuid = :runUuid")
Optional<RunRow> findRunByUuidAsRow(UUID runUuid);

@SqlQuery(
"""
SELECT j.* FROM jobs_view j
INNER JOIN runs_view r ON r.job_uuid=j.uuid
WHERE r.uuid=:uuid
""")
Optional<JobRow> findJobRowByRunUuid(UUID uuid);

@SqlQuery(
"""
Expand Down Expand Up @@ -195,8 +210,8 @@ SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
+ "nominal_start_time = COALESCE(EXCLUDED.nominal_start_time, runs.nominal_start_time), "
+ "nominal_end_time = COALESCE(EXCLUDED.nominal_end_time, runs.nominal_end_time), "
+ "location = EXCLUDED.location "
+ "RETURNING uuid")
UUID upsertWithRunState(
+ "RETURNING *")
RunRow upsert(
UUID runUuid,
UUID parentRunUuid,
String externalId,
Expand All @@ -213,42 +228,6 @@ UUID upsertWithRunState(
String location,
UUID jobContextUuid);

default ExtendedRunRow upsert(
UUID runUuid,
UUID parentRunUuid,
String externalId,
Instant now,
UUID jobUuid,
UUID jobVersionUuid,
UUID runArgsUuid,
Instant nominalStartTime,
Instant nominalEndTime,
RunState runStateType,
Instant runStateTime,
String namespaceName,
String jobName,
String location,
UUID jobContextUuid) {
UUID rowUuid =
upsertWithRunState(
runUuid,
parentRunUuid,
externalId,
now,
jobUuid,
jobVersionUuid,
runArgsUuid,
nominalStartTime,
nominalEndTime,
runStateType,
runStateTime,
namespaceName,
jobName,
location,
jobContextUuid);
return findRunByUuidAsRow(rowUuid).get();
}

@SqlQuery(
"INSERT INTO runs ( "
+ "uuid, "
Expand Down Expand Up @@ -286,8 +265,8 @@ default ExtendedRunRow upsert(
+ "nominal_start_time = COALESCE(EXCLUDED.nominal_start_time, runs.nominal_start_time), "
+ "nominal_end_time = COALESCE(EXCLUDED.nominal_end_time, runs.nominal_end_time), "
+ "location = EXCLUDED.location "
+ "RETURNING uuid")
UUID upsertWithoutRunState(
+ "RETURNING *")
RunRow upsert(
UUID runUuid,
UUID parentRunUuid,
String externalId,
Expand All @@ -303,40 +282,6 @@ UUID upsertWithoutRunState(
String location,
UUID jobContextUuid);

default ExtendedRunRow upsert(
UUID runUuid,
UUID parentRunUuid,
String externalId,
Instant now,
UUID jobUuid,
UUID jobVersionUuid,
UUID runArgsUuid,
Instant nominalStartTime,
Instant nominalEndTime,
UUID namespaceUuid,
String namespaceName,
String jobName,
String location,
UUID jobContextUuid) {
UUID runRowUuid =
upsertWithoutRunState(
runUuid,
parentRunUuid,
externalId,
now,
jobUuid,
jobVersionUuid,
runArgsUuid,
nominalStartTime,
nominalEndTime,
namespaceUuid,
namespaceName,
jobName,
location,
jobContextUuid);
return findRunByUuidAsRow(runRowUuid).get();
}

@SqlUpdate(
"INSERT INTO runs_input_mapping (run_uuid, dataset_version_uuid) "
+ "VALUES (:runUuid, :datasetVersionUuid) ON CONFLICT DO NOTHING")
Expand Down
60 changes: 60 additions & 0 deletions api/src/main/java/marquez/db/mappers/RunRowMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.db.mappers;

import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.timestampOrNull;
import static marquez.db.Columns.timestampOrThrow;
import static marquez.db.Columns.uuidOrNull;
import static marquez.db.Columns.uuidOrThrow;

import com.fasterxml.jackson.core.type.TypeReference;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import lombok.NonNull;
import marquez.common.Utils;
import marquez.common.models.DatasetVersionId;
import marquez.db.Columns;
import marquez.db.models.RunRow;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

public final class RunRowMapper implements RowMapper<RunRow> {
@Override
public RunRow map(@NonNull ResultSet results, @NonNull StatementContext context)
throws SQLException {
Set<String> columnNames = MapperUtils.getColumnNames(results.getMetaData());

return new RunRow(
uuidOrThrow(results, Columns.ROW_UUID),
timestampOrThrow(results, Columns.CREATED_AT),
timestampOrThrow(results, Columns.UPDATED_AT),
uuidOrNull(results, Columns.JOB_UUID),
uuidOrNull(results, Columns.JOB_VERSION_UUID),
uuidOrNull(results, Columns.PARENT_RUN_UUID),
uuidOrThrow(results, Columns.RUN_ARGS_UUID),
timestampOrNull(results, Columns.NOMINAL_START_TIME),
timestampOrNull(results, Columns.NOMINAL_END_TIME),
stringOrNull(results, Columns.CURRENT_RUN_STATE),
columnNames.contains(Columns.STARTED_AT)
? timestampOrNull(results, Columns.STARTED_AT)
: null,
uuidOrNull(results, Columns.START_RUN_STATE_UUID),
columnNames.contains(Columns.ENDED_AT) ? timestampOrNull(results, Columns.ENDED_AT) : null,
uuidOrNull(results, Columns.END_RUN_STATE_UUID));
}

private List<DatasetVersionId> toDatasetVersion(ResultSet rs, String column) throws SQLException {
String dsString = rs.getString(column);
if (dsString == null) {
return Collections.emptyList();
}
return Utils.fromJson(dsString, new TypeReference<List<DatasetVersionId>>() {});
}
}
18 changes: 13 additions & 5 deletions api/src/main/java/marquez/db/models/ExtendedRunRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class ExtendedRunRow extends RunRow {
@Getter @NonNull private final List<DatasetVersionId> inputVersions;
@Getter @NonNull private final List<DatasetVersionId> outputVersions;
@Getter private final String namespaceName;
@Getter private final String jobName;
@Getter private final String args;

public ExtendedRunRow(
Expand Down Expand Up @@ -48,17 +52,21 @@ public ExtendedRunRow(
jobVersionUuid,
parentRunUuid,
runArgsUuid,
inputVersions,
outputVersions,
nominalStartTime,
nominalEndTime,
currentRunState,
startedAt,
startRunStateUuid,
endedAt,
endRunStateUuid,
namespaceName,
jobName);
endRunStateUuid);
this.inputVersions = inputVersions;
this.outputVersions = outputVersions;
this.args = args;
this.jobName = jobName;
this.namespaceName = namespaceName;
}

public boolean hasInputVersionUuids() {
return !inputVersions.isEmpty();
}
}
Loading

0 comments on commit ee44ae0

Please sign in to comment.