Skip to content

Commit

Permalink
job-mapping rename job_versions_io_mapping to job_io_mapping
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Nov 8, 2023
1 parent b7e40f8 commit 79acd2d
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 50 deletions.
4 changes: 2 additions & 2 deletions api/src/main/java/marquez/db/DbRetention.java
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ CREATE OR REPLACE FUNCTION delete_datasets_older_than_x_days()
BEGIN
CREATE TEMPORARY TABLE used_datasets_as_io_in_x_days AS (
SELECT dataset_uuid
FROM job_versions_io_mapping AS jvio INNER JOIN job_versions AS jv
FROM job_io_mapping AS jvio INNER JOIN job_versions AS jv
ON jvio.job_version_uuid = jv.uuid
WHERE jv.created_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days'
);
Expand Down Expand Up @@ -621,7 +621,7 @@ CREATE OR REPLACE FUNCTION estimate_number_of_rows_older_than_x_days(retention_q
"""
CREATE TEMPORARY TABLE used_datasets_as_input_in_x_days AS (
SELECT dataset_uuid
FROM job_versions_io_mapping AS jvio INNER JOIN job_versions AS jv
FROM job_io_mapping AS jvio INNER JOIN job_versions AS jv
ON jvio.job_version_uuid = jv.uuid
WHERE jv.created_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days'
AND jvio.io_type = 'INPUT'
Expand Down
93 changes: 64 additions & 29 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ WITH job_version_io AS (
JSON_AGG(json_build_object('namespace', ds.namespace_name,
'name', ds.name))
FILTER (WHERE io.io_type = 'OUTPUT') AS output_datasets
FROM job_versions_io_mapping io
FROM job_io_mapping io
INNER JOIN job_versions jv ON jv.uuid = io.job_version_uuid
INNER JOIN datasets_view ds ON ds.uuid = io.dataset_uuid
INNER JOIN jobs_view j ON j.uuid=jv.job_uuid
Expand Down Expand Up @@ -192,40 +192,67 @@ ExtendedJobVersionRow upsertJobVersion(
String namespaceName);

/**
* Used to link an input dataset to a given job version.
* Used to upsert an input or output dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param inputDatasetUuid The unique ID of the input dataset.
* @param datasetUuid The unique ID of the output dataset
* @param ioType The {@link IoType} of the dataset.
* @param jobUuid The unique ID of the job.
*/
default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid) {
upsertInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, IoType.INPUT);
}
@SqlUpdate(
"""
INSERT INTO job_io_mapping (
job_version_uuid, dataset_uuid, io_type, job_uuid, is_job_version_current)
VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, TRUE)
ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_job_version_current = TRUE
""")
void upsertCurrentInputOrOutputDatasetFor(
UUID jobVersionUuid, UUID datasetUuid, UUID jobUuid, IoType ioType);

@SqlUpdate(
"""
UPDATE job_io_mapping
SET is_job_version_current = FALSE
WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid)
AND job_version_uuid != :jobVersionUuid
AND io_type = :ioType
AND is_job_version_current = TRUE;
""")
void markVersionIOMappingNotCurrent(UUID jobVersionUuid, UUID jobUuid, IoType ioType);

@SqlUpdate(
"""
UPDATE job_io_mapping
SET is_job_version_current = FALSE
WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid)
AND io_type = :ioType
AND is_job_version_current = TRUE;
""")
void markVersionIOMappingNotCurrent(UUID jobUuid, IoType ioType);

/**
* Used to link an output dataset to a given job version.
* Used to link an input dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param outputDatasetUuid The unique ID of the output dataset.
* @param inputDatasetUuid The unique ID of the input dataset.
* @param jobUuid The unique ID of the job.
*/
default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid) {
upsertInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, IoType.OUTPUT);
default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid, UUID jobUuid) {
markVersionIOMappingNotCurrent(jobVersionUuid, jobUuid, IoType.INPUT);
upsertCurrentInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, jobUuid, IoType.INPUT);
// TODO: include this in test -> check if jobUuid is set
}

/**
* Used to upsert an input or output dataset to a given job version.
* Used to link an output dataset to a given job version.
*
* @param jobVersionUuid The unique ID of the job version.
* @param datasetUuid The unique ID of the output dataset
* @param ioType The {@link IoType} of the dataset.
* @param outputDatasetUuid The unique ID of the output dataset.
* @param jobUuid The unique ID of the job.
*/
@SqlUpdate(
"""
INSERT INTO job_versions_io_mapping (
job_version_uuid, dataset_uuid, io_type)
VALUES (:jobVersionUuid, :datasetUuid, :ioType)
ON CONFLICT DO NOTHING
""")
void upsertInputOrOutputDatasetFor(UUID jobVersionUuid, UUID datasetUuid, IoType ioType);
default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid, UUID jobUuid) {
markVersionIOMappingNotCurrent(jobVersionUuid, jobUuid, IoType.OUTPUT);
upsertCurrentInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, jobUuid, IoType.OUTPUT);
// TODO: include this in test -> check if jobUuid is set
}

/**
* Returns the input datasets to a given job version.
Expand Down Expand Up @@ -256,7 +283,7 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
@SqlQuery(
"""
SELECT dataset_uuid
FROM job_versions_io_mapping
FROM job_io_mapping
WHERE job_version_uuid = :jobVersionUuid
AND io_type = :ioType
""")
Expand All @@ -265,7 +292,7 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
@SqlQuery(
"""
SELECT d.namespace_name, d.name, io.io_type
FROM job_versions_io_mapping io
FROM job_io_mapping io
INNER JOIN jobs_view j ON j.current_version_uuid = io.job_version_uuid
INNER JOIN datasets_view d on d.uuid = io.dataset_uuid
WHERE j.name = :jobName AND j.namespace_name=:jobNamespace
Expand Down Expand Up @@ -366,14 +393,18 @@ default BagOfJobVersionInfo upsertRunlessJobVersion(
inputs.forEach(
i -> {
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(), i.getDatasetVersionRow().getDatasetUuid());
jobVersionRow.getUuid(),
i.getDatasetVersionRow().getDatasetUuid(),
jobVersionRow.getJobUuid());
});

// Link the output datasets to the job version.
outputs.forEach(
o -> {
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(), o.getDatasetVersionRow().getDatasetUuid());
jobVersionRow.getUuid(),
o.getDatasetVersionRow().getDatasetUuid(),
jobVersionRow.getJobUuid());
});

jobDao.updateVersionFor(jobRow.getUuid(), jobRow.getCreatedAt(), jobVersionRow.getUuid());
Expand Down Expand Up @@ -468,14 +499,18 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
jobVersionInputs.forEach(
jobVersionInput -> {
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid());
jobVersionRow.getUuid(),
jobVersionInput.getDatasetUuid(),
jobVersionRow.getJobUuid());
});

// Link the output datasets to the job version.
jobVersionOutputs.forEach(
jobVersionOutput -> {
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid());
jobVersionRow.getUuid(),
jobVersionOutput.getDatasetUuid(),
jobVersionRow.getJobUuid());
});

// Link the job version to the run.
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ job_io AS (
SELECT j.job_uuid,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
FROM job_versions_io_mapping io
FROM job_io_mapping io
INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid
GROUP BY j.job_uuid
),
Expand Down Expand Up @@ -99,7 +99,7 @@ WHERE ds.uuid IN (<dsUuids>)""")
"""
SELECT j.uuid FROM jobs j
INNER JOIN job_versions jv ON jv.job_uuid = j.uuid
INNER JOIN job_versions_io_mapping io ON io.job_version_uuid = jv.uuid
INNER JOIN job_io_mapping io ON io.job_version_uuid = jv.uuid
INNER JOIN datasets_view ds ON ds.uuid = io.dataset_uuid
WHERE ds.name = :datasetName AND ds.namespace_name = :namespaceName
ORDER BY io_type DESC, jv.created_at DESC
Expand Down
7 changes: 7 additions & 0 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import marquez.common.models.SourceType;
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
import marquez.db.JobVersionDao.IoType;
import marquez.db.RunDao.RunUpsert;
import marquez.db.RunDao.RunUpsert.RunUpsertBuilder;
import marquez.db.mappers.LineageEventMapper;
Expand Down Expand Up @@ -370,6 +371,9 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
// mark job_io_mapping as non-current
daos.getJobVersionDao().markVersionIOMappingNotCurrent(job.getUuid(), IoType.INPUT);
}
bag.setInputs(Optional.ofNullable(datasetInputs));

Expand All @@ -383,6 +387,9 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
}
} else {
// mark job_io_mapping as non-current
daos.getJobVersionDao().markVersionIOMappingNotCurrent(job.getUuid(), IoType.OUTPUT);
}

bag.setOutputs(Optional.ofNullable(datasetOutputs));
Expand Down
24 changes: 12 additions & 12 deletions api/src/main/java/marquez/graphql/GraphqlDaos.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ List<RowMap<String, Object>> getDistinctJobVersionsByDatasetVersionOutput(
List<RowMap<String, Object>> getDatasetsByNamespace(UUID namespaceUuid);

@SqlQuery(
"SELECT d.* from datasets_view d inner join job_versions_io_mapping m on m.dataset_uuid = d.uuid where m.job_version_uuid = :jobVersionUuid and io_type = :ioType")
"SELECT d.* from datasets_view d inner join job_io_mapping m on m.dataset_uuid = d.uuid where m.job_version_uuid = :jobVersionUuid and io_type = :ioType")
List<RowMap<String, Object>> getIOMappingByJobVersion(UUID jobVersionUuid, IoType ioType);

@SqlQuery(
"SELECT jv.uuid, jv.created_at, jv.updated_at, jv.job_uuid, jv.version, jv.location, "
+ " jv.latest_run_uuid, j.namespace_uuid, j.namespace_name, "
+ " j.name AS job_name "
+ " FROM job_versions_io_mapping m "
+ " FROM job_io_mapping m "
+ " inner join job_versions jv "
+ " on m.dataset_uuid = jv.uuid"
+ " inner join jobs_view j ON j.uuid=jv.job_uuid "
Expand Down Expand Up @@ -196,14 +196,14 @@ WITH RECURSIVE search_graph(job_name, namespace_name, depth, path, cycle) AS (
(
select j.name AS job_name, j.namespace_name, j.name as jx
from jobs_view j
inner join job_versions_io_mapping io_in on io_in.job_version_uuid = j.current_version_uuid and io_in.io_type = 'INPUT'
inner join job_versions_io_mapping io_out on io_out.dataset_uuid = io_in.dataset_uuid and io_out.io_type = 'OUTPUT'
inner join job_io_mapping io_in on io_in.job_version_uuid = j.current_version_uuid and io_in.io_type = 'INPUT'
inner join job_io_mapping io_out on io_out.dataset_uuid = io_in.dataset_uuid and io_out.io_type = 'OUTPUT'
inner join job_versions jv on jv.uuid = io_out.job_version_uuid
UNION ALL
select j.name AS job_name, jv.namespace_name, j.name as jx
from jobs_view j
inner join job_versions_io_mapping io_out on io_out.job_version_uuid = j.current_version_uuid and io_out.io_type = 'OUTPUT'
inner join job_versions_io_mapping io_in on io_in.dataset_uuid = io_out.dataset_uuid and io_in.io_type = 'INPUT'
inner join job_io_mapping io_out on io_out.job_version_uuid = j.current_version_uuid and io_out.io_type = 'OUTPUT'
inner join job_io_mapping io_in on io_in.dataset_uuid = io_out.dataset_uuid and io_in.io_type = 'INPUT'
inner join job_versions jv on jv.uuid = io_in.job_version_uuid
) l where l.jx = sg.job_name and NOT cycle
)
Expand All @@ -214,21 +214,21 @@ WITH RECURSIVE search_graph(job_name, namespace_name, depth, path, cycle) AS (
-- input datasets
left outer join (
select io_out.job_version_uuid, jsonb_agg((SELECT x FROM (SELECT ds_in.name, ds_in.namespace_name as namespace, o.out_agg as "inEdges", i.in_agg as "outEdges") AS x)) as agg
from job_versions_io_mapping io_out
from job_io_mapping io_out
inner join datasets_view ds_in on ds_in.uuid = io_out.dataset_uuid
-- output jobs for each input dataset
left outer join (
select io_of_in.dataset_uuid, jsonb_agg((select x from (select j_of_in.name, j_of_in.namespace_name as namespace) as x)) as in_agg
from jobs_view j_of_in
left outer join job_versions_io_mapping io_of_in on io_of_in.job_version_uuid = j_of_in.current_version_uuid
left outer join job_io_mapping io_of_in on io_of_in.job_version_uuid = j_of_in.current_version_uuid
and io_of_in.io_type = 'INPUT'
group by io_of_in.dataset_uuid
) i on i.dataset_uuid = io_out.dataset_uuid
-- input jobs for each input dataset
left outer join (
select io_of_out.dataset_uuid, jsonb_agg((select x from (select j_of_out.name, j_of_out.namespace_name as namespace) as x)) as out_agg
from jobs_view j_of_out
left outer join job_versions_io_mapping io_of_out
left outer join job_io_mapping io_of_out
on io_of_out.job_version_uuid = j_of_out.current_version_uuid
and io_of_out.io_type = 'OUTPUT'
group by io_of_out.dataset_uuid
Expand All @@ -240,21 +240,21 @@ select io_of_out.dataset_uuid, jsonb_agg((select x from (select j_of_out.name, j
--output datasets
left outer join(
select io_out.job_version_uuid, jsonb_agg((SELECT x FROM (SELECT ds_in.name, ds_in.namespace_name as namespace, o.out_agg as "inEdges", i.in_agg as "outEdges") AS x)) as agg
from job_versions_io_mapping io_out
from job_io_mapping io_out
inner join datasets_view ds_in on ds_in.uuid = io_out.dataset_uuid
-- output jobs for each output dataset
left outer join (
select io_of_in.dataset_uuid, jsonb_agg((select x from (select j_of_in.name, j_of_in.namespace_name as namespace) as x)) as in_agg
from jobs_view j_of_in
left outer join job_versions_io_mapping io_of_in on io_of_in.job_version_uuid = j_of_in.current_version_uuid
left outer join job_io_mapping io_of_in on io_of_in.job_version_uuid = j_of_in.current_version_uuid
and io_of_in.io_type = 'INPUT'
group by io_of_in.dataset_uuid
) i on i.dataset_uuid = io_out.dataset_uuid
-- input jobs for each output dataset
left outer join (
select io_of_out.dataset_uuid, jsonb_agg((select x from (select j_of_out.name, j_of_out.namespace_name as namespace) as x)) as out_agg
from jobs_view j_of_out
left outer join job_versions_io_mapping io_of_out
left outer join job_io_mapping io_of_out
on io_of_out.job_version_uuid = j_of_out.current_version_uuid
and io_of_out.io_type = 'OUTPUT'
group by io_of_out.dataset_uuid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ BEGIN
LEFT JOIN aliases a ON a.link_target_uuid = j.uuid
) j
WHERE jobs.uuid=j.uuid;
UPDATE job_io_mapping
SET symlink_target_job_uuid=j.symlink_target_uuid
FROM jobs j
WHERE job_io_mapping.job_uuid=j.uuid AND j.uuid = NEW.uuid;
END IF;
SELECT * INTO inserted_job FROM jobs_view
WHERE uuid=job_uuid OR (new_symlink_target_uuid IS NOT NULL AND uuid=new_symlink_target_uuid);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
ALTER TABLE job_versions_io_mapping ADD COLUMN job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE;
ALTER TABLE job_versions_io_mapping ADD COLUMN symlink_target_job_uuid uuid REFERENCES jobs(uuid) ON DELETE CASCADE;
ALTER TABLE job_versions_io_mapping ADD COLUMN is_job_version_current boolean DEFAULT FALSE;

-- To add job_uuid to the unique constraint, we first drop the primary key, then recreate it; note given that job_version_uuid can be NULL, we need to check that job_version_uuid != NULL before inserting (duplicate columns otherwise)
ALTER TABLE job_versions_io_mapping DROP CONSTRAINT job_versions_io_mapping_pkey;

ALTER TABLE job_versions_io_mapping RENAME TO job_io_mapping;
ALTER TABLE job_io_mapping ALTER COLUMN job_version_uuid DROP NOT NULL;

-- TODO: create index for lineage query and update
CREATE INDEX job_io_mapping_job_uuid ON job_io_mapping (job_uuid);

ALTER TABLE job_io_mapping ADD CONSTRAINT job_io_mapping_mapping_pkey UNIQUE (job_version_uuid, dataset_uuid, io_type, job_uuid);


-- TODO: add a test which verifies correctness for UNIQUE <- adds multiple rows to job_io_mapping

-- TODO: take care of is_current
-- TODO: take care of symlink_job_uuid
2 changes: 1 addition & 1 deletion api/src/test/java/marquez/api/JdbiUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static void cleanDatabase(Jdbi jdbi) {
handle.execute("DELETE FROM run_facets");
handle.execute("DELETE FROM runs");
handle.execute("DELETE FROM run_args");
handle.execute("DELETE FROM job_versions_io_mapping");
handle.execute("DELETE FROM job_io_mapping");
handle.execute("DELETE FROM job_versions");
handle.execute("DELETE FROM jobs_fqn");
handle.execute("DELETE FROM jobs");
Expand Down
6 changes: 4 additions & 2 deletions api/src/test/java/marquez/db/JobVersionDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public void testGetJobVersion() {
.orElseThrow(
() -> new IllegalStateException("Can't find test dataset " + ds.getName()));

jobVersionDao.upsertInputDatasetFor(jobVersionRow.getUuid(), dataset.getUuid());
jobVersionDao.upsertInputDatasetFor(
jobVersionRow.getUuid(), dataset.getUuid(), jobVersionRow.getJobUuid());
}
for (DatasetId ds : jobMeta.getOutputs()) {
DatasetRow dataset =
Expand All @@ -199,7 +200,8 @@ public void testGetJobVersion() {
.orElseThrow(
() -> new IllegalStateException("Can't find test dataset " + ds.getName()));

jobVersionDao.upsertOutputDatasetFor(jobVersionRow.getUuid(), dataset.getUuid());
jobVersionDao.upsertOutputDatasetFor(
jobVersionRow.getUuid(), dataset.getUuid(), jobVersionRow.getJobUuid());
}
Optional<JobVersion> jobVersion =
jobVersionDao.findJobVersion(namespaceRow.getName(), jobRow.getName(), version.getValue());
Expand Down
Loading

0 comments on commit 79acd2d

Please sign in to comment.