Skip to content

Commit

Permalink
Migration script and DAO updates for job renaming and symlinking (Mar…
Browse files Browse the repository at this point in the history
…quezProject#1947)

* Add migration script to create job symlink target

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Update JobDao to find job symlink targets and add tests

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Update RunDao to fetch runs for jobs with symlinks

Signed-off-by: Michael Collado <collado.mike@gmail.com>

* Update changelog

Signed-off-by: Michael Collado <collado.mike@gmail.com>
  • Loading branch information
collado-mike authored Apr 20, 2022
1 parent 35e0a45 commit 2c4a4db
Show file tree
Hide file tree
Showing 12 changed files with 502 additions and 147 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* Add support for `LifecycleStateChangeFacet` with an ability to softly delete datasets [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Enable pod specific annotations in Marquez Helm Chart via `marquez.podAnnotations` [@wslulciuc](https://github.com/wslulciuc)
* Add support for job renaming/redirection via symlink [@collado-mike](https://github.com/collado-mike)

## [0.21.0](https://github.com/MarquezProject/marquez/compare/0.20.0...0.21.0) - 2022-03-03

Expand Down
4 changes: 4 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

@Slf4j
public final class Columns {

private Columns() {}

private static final ObjectMapper MAPPER = Utils.getMapper();
Expand Down Expand Up @@ -78,6 +79,9 @@ private Columns() {}
/* STREAM VERSION ROW COLUMNS */
public static final String SCHEMA_LOCATION = "schema_location";

/* JOB ROW COLUMNS */
public static final String SYMLINK_TARGET_UUID = "symlink_target_uuid";

/* JOB VERSION I/O ROW COLUMNS */
public static final String INPUT_UUIDS = "input_uuids";
public static final String OUTPUT_UUIDS = "output_uuids";
Expand Down
157 changes: 99 additions & 58 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,34 @@ public interface JobDao extends BaseDao {
void updateVersionFor(UUID rowUuid, Instant updatedAt, UUID currentVersionUuid);

@SqlQuery(
"SELECT j.*, jc.context, f.facets\n"
+ " FROM jobs AS j\n"
+ " LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid\n"
+ " LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid\n"
+ " LEFT OUTER JOIN (\n"
+ " SELECT run_uuid, JSON_AGG(e.facets) AS facets\n"
+ " FROM (\n"
+ " SELECT run_uuid, event->'job'->'facets' AS facets\n"
+ " FROM lineage_events AS le\n"
+ " INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid\n"
+ " INNER JOIN jobs j2 ON j2.current_version_uuid=jv2.uuid\n"
+ " WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName\n"
+ " ORDER BY event_time ASC\n"
+ " ) e\n"
+ " GROUP BY e.run_uuid\n"
+ " ) f ON f.run_uuid=jv.latest_run_uuid\n"
+ "WHERE j.namespace_name = :namespaceName AND j.name = :jobName")
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, symlink_target_uuid
FROM jobs j
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
UNION
SELECT j.uuid, j.symlink_target_uuid
FROM jobs j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, jc.context, f.facets
FROM jobs j
INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(e.facets) AS facets
FROM (
SELECT run_uuid, event->'job'->'facets' AS facets
FROM lineage_events AS le
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid
INNER JOIN jobs j2 ON j2.current_version_uuid=jv2.uuid
WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName
ORDER BY event_time ASC
) e
GROUP BY e.run_uuid
) f ON f.run_uuid=jv.latest_run_uuid
""")
Optional<Job> findJobByName(String namespaceName, String jobName);

default Optional<Job> findWithRun(String namespaceName, String jobName) {
Expand All @@ -78,11 +89,21 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {
}

@SqlQuery(
"SELECT j.*, n.name AS namespace_name FROM jobs AS j "
+ "INNER JOIN namespaces AS n "
+ " ON (n.name = :namespaceName AND "
+ " j.namespace_uuid = n.uuid AND "
+ " j.name = :jobName)")
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, symlink_target_uuid
FROM jobs j
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
UNION
SELECT j.uuid, j.symlink_target_uuid
FROM jobs j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, n.name AS namespace_name
FROM jobs AS j
INNER JOIN job_ids jn ON jn.uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
""")
Optional<JobRow> findJobByNameAsRow(String namespaceName, String jobName);

@SqlQuery(
Expand All @@ -103,14 +124,17 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {
+ " GROUP BY e.run_uuid\n"
+ " ) f ON f.run_uuid=jv.latest_run_uuid\n"
+ "WHERE j.namespace_name = :namespaceName\n"
+ "AND j.symlink_target_uuid IS NULL\n"
+ "ORDER BY j.name "
+ "LIMIT :limit OFFSET :offset")
List<Job> findAll(String namespaceName, int limit, int offset);

@SqlQuery("SELECT count(*) FROM jobs AS j")
int count(String namespaceName);
@SqlQuery("SELECT count(*) FROM jobs AS j WHERE symlink_target_uuid IS NULL")
int count();

@SqlQuery("SELECT count(*) FROM jobs AS j WHERE j.namespace_name = :namespaceName")
@SqlQuery(
"SELECT count(*) FROM jobs AS j WHERE j.namespace_name = :namespaceName\n"
+ "AND symlink_target_uuid IS NULL")
int countFor(String namespaceName);

default List<Job> findAllWithRun(String namespaceName, int limit, int offset) {
Expand Down Expand Up @@ -147,6 +171,15 @@ default void setJobData(Run run, Job j) {

default JobRow upsertJobMeta(
NamespaceName namespaceName, JobName jobName, JobMeta jobMeta, ObjectMapper mapper) {
return upsertJobMeta(namespaceName, jobName, null, jobMeta, mapper);
}

default JobRow upsertJobMeta(
NamespaceName namespaceName,
JobName jobName,
UUID symlinkTargetUuid,
JobMeta jobMeta,
ObjectMapper mapper) {
Instant createdAt = Instant.now();
NamespaceRow namespace =
createNamespaceDao()
Expand All @@ -170,6 +203,7 @@ default JobRow upsertJobMeta(
jobMeta.getDescription().orElse(null),
contextRow.getUuid(),
toUrlString(jobMeta.getLocation().orElse(null)),
symlinkTargetUuid,
toJson(jobMeta.getInputs(), mapper));
}

Expand All @@ -192,39 +226,45 @@ default PGobject toJson(Set<DatasetId> dataset, ObjectMapper mapper) {
}

@SqlQuery(
"INSERT INTO jobs ("
+ "uuid, "
+ "type, "
+ "created_at, "
+ "updated_at, "
+ "namespace_uuid, "
+ "namespace_name, "
+ "name, "
+ "description,"
+ "current_job_context_uuid,"
+ "current_location,"
+ "current_inputs"
+ ") VALUES ( "
+ ":uuid, "
+ ":type, "
+ ":now, "
+ ":now, "
+ ":namespaceUuid, "
+ ":namespaceName, "
+ ":name, "
+ ":description, "
+ ":jobContextUuid, "
+ ":location, "
+ ":inputs "
+ ") ON CONFLICT (name, namespace_uuid) DO "
+ "UPDATE SET "
+ "updated_at = EXCLUDED.updated_at, "
+ "type = EXCLUDED.type, "
+ "description = EXCLUDED.description, "
+ "current_job_context_uuid = EXCLUDED.current_job_context_uuid, "
+ "current_location = EXCLUDED.current_location, "
+ "current_inputs = EXCLUDED.current_inputs "
+ "RETURNING *")
"""
INSERT INTO jobs AS j (
uuid,
type,
created_at,
updated_at,
namespace_uuid,
namespace_name,
name,
description,
current_job_context_uuid,
current_location,
current_inputs,
symlink_target_uuid
) VALUES (
:uuid,
:type,
:now,
:now,
:namespaceUuid,
:namespaceName,
:name,
:description,
:jobContextUuid,
:location,
:inputs,
:symlinkTargetId
) ON CONFLICT (name, namespace_uuid) DO
UPDATE SET
updated_at = EXCLUDED.updated_at,
type = EXCLUDED.type,
description = EXCLUDED.description,
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
current_location = EXCLUDED.current_location,
current_inputs = EXCLUDED.current_inputs,
-- update the symlink target if not null. otherwise, keep the old value
symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid)
RETURNING *
""")
JobRow upsertJob(
UUID uuid,
JobType type,
Expand All @@ -235,5 +275,6 @@ JobRow upsertJob(
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs);
}
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
description,
jobContext.getUuid(),
location,
null,
jobDao.toJson(toDatasetId(event.getInputs()), mapper));
bag.setJob(job);

Expand Down
103 changes: 64 additions & 39 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,39 +104,50 @@ public interface RunDao extends BaseDao {
Optional<ExtendedRunRow> findRunByUuidAsRow(UUID runUuid);

@SqlQuery(
"SELECT r.*, ra.args, ctx.context, f.facets,\n"
+ "jv.namespace_name, jv.job_name, jv.version AS job_version,\n"
+ "ri.input_versions, ro.output_versions\n"
+ "FROM runs AS r\n"
+ "LEFT OUTER JOIN\n"
+ "(\n"
+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n"
+ " FROM lineage_events le\n"
+ " INNER JOIN runs ON runs.uuid=le.run_uuid\n"
+ " WHERE runs.job_name=:jobName AND runs.namespace_name=:namespace\n"
+ " GROUP BY le.run_uuid\n"
+ ") AS f ON r.uuid=f.run_uuid\n"
+ "LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n"
+ "LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n"
+ "LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
+ "LEFT OUTER JOIN (\n"
+ " SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,\n"
+ " 'name', dv.dataset_name,\n"
+ " 'version', dv.version)) AS input_versions\n"
+ " FROM runs_input_mapping im\n"
+ " INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid\n"
+ " GROUP BY im.run_uuid\n"
+ ") ri ON ri.run_uuid=r.uuid\n"
+ "LEFT OUTER JOIN (\n"
+ " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n"
+ " 'name', dataset_name,\n"
+ " 'version', version)) AS output_versions\n"
+ " FROM dataset_versions\n"
+ " GROUP BY run_uuid\n"
+ ") ro ON ro.run_uuid=r.uuid\n"
+ "WHERE r.namespace_name = :namespace and r.job_name = :jobName\n"
+ "ORDER BY STARTED_AT DESC NULLS LAST\n"
+ "LIMIT :limit OFFSET :offset")
"""
WITH RECURSIVE job_names AS (
SELECT uuid, namespace_name, name, symlink_target_uuid
FROM jobs j
WHERE j.namespace_name=:namespace AND j.name=:jobName
UNION
SELECT j.uuid, j.namespace_name, j.name, j.symlink_target_uuid
FROM jobs j
INNER JOIN job_names jn ON j.uuid=jn.symlink_target_uuid OR j.symlink_target_uuid=jn.uuid
)
SELECT r.*, ra.args, ctx.context, f.facets,
jv.namespace_name, jv.job_name, jv.version AS job_version,
ri.input_versions, ro.output_versions
FROM runs AS r
INNER JOIN job_names j ON r.namespace_name=j.namespace_name AND r.job_name=j.name
LEFT OUTER JOIN
(
SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets
FROM lineage_events le
INNER JOIN runs ON runs.uuid=le.run_uuid
WHERE runs.job_name=:jobName AND runs.namespace_name=:namespace
GROUP BY le.run_uuid
) AS f ON r.uuid=f.run_uuid
LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid
LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid
LEFT OUTER JOIN (
SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,
'name', dv.dataset_name,
'version', dv.version)) AS input_versions
FROM runs_input_mapping im
INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid
GROUP BY im.run_uuid
) ri ON ri.run_uuid=r.uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
'name', dataset_name,
'version', version)) AS output_versions
FROM dataset_versions
GROUP BY run_uuid
) ro ON ro.run_uuid=r.uuid
ORDER BY STARTED_AT DESC NULLS LAST
LIMIT :limit OFFSET :offset
""")
List<Run> findAll(String namespace, String jobName, int limit, int offset);

@SqlQuery(
Expand Down Expand Up @@ -384,11 +395,25 @@ default RunRow upsertRunMeta(
void updateJobVersion(UUID runUuid, UUID jobVersionUuid);

@SqlQuery(
BASE_FIND_RUN_SQL
+ "WHERE r.uuid=(\n"
+ " SELECT uuid FROM runs WHERE namespace_name = :namespace and job_name = :jobName\n"
+ " ORDER BY transitioned_at DESC\n"
+ " LIMIT 1\n"
+ ")")
"""
WITH RECURSIVE job_names AS (
SELECT uuid, namespace_name, name, symlink_target_uuid
FROM jobs j
WHERE j.namespace_name=:namespace AND j.name=:jobName
UNION
SELECT j.uuid, j.namespace_name, j.name, j.symlink_target_uuid
FROM jobs j
INNER JOIN job_names jn ON j.uuid=jn.symlink_target_uuid OR j.symlink_target_uuid=jn.uuid
)
"""
+ BASE_FIND_RUN_SQL
+ """
WHERE r.uuid=(
SELECT r.uuid FROM runs r
INNER JOIN job_names j ON j.namespace_name=r.namespace_name AND j.name=r.job_name
ORDER BY transitioned_at DESC
LIMIT 1
)
""")
Optional<Run> findByLatestJob(String namespace, String jobName);
}
3 changes: 2 additions & 1 deletion api/src/main/java/marquez/db/mappers/JobRowMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public JobRow map(@NonNull ResultSet results, @NonNull StatementContext context)
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
uuidOrNull(results, "current_job_context_uuid"),
stringOrNull(results, "current_location"),
getDatasetFromJsonOrNull(results, "current_inputs"));
getDatasetFromJsonOrNull(results, "current_inputs"),
uuidOrNull(results, Columns.SYMLINK_TARGET_UUID));
}

Set<DatasetId> getDatasetFromJsonOrNull(@NonNull ResultSet results, String column)
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/marquez/db/models/JobRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class JobRow {
@Nullable UUID jobContextUuid;
@Nullable String location;
@Nullable Set<DatasetId> inputs;
@Nullable UUID symlinkTargetId;

public Optional<String> getDescription() {
return Optional.ofNullable(description);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE jobs ADD COLUMN symlink_target_uuid uuid REFERENCES jobs (uuid);
CREATE INDEX jobs_symlinks ON jobs (symlink_target_uuid)
INCLUDE (uuid, namespace_name, name)
WHERE symlink_target_uuid IS NOT NULL;
Loading

0 comments on commit 2c4a4db

Please sign in to comment.