Skip to content

Commit

Permalink
OL facets - PR2 - read facets from views pointing to lineage_events t…
Browse files Browse the repository at this point in the history
…able

Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
  • Loading branch information
pawel-big-lebowski committed Jan 18, 2023
1 parent 6436ddc commit 2df36cf
Show file tree
Hide file tree
Showing 17 changed files with 264 additions and 99 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ jobs:
working_directory: ~/marquez
machine:
image: ubuntu-2004:current
resource_class: large
steps:
- checkout
- run: ./.circleci/get-docker-compose.sh
Expand Down
53 changes: 23 additions & 30 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,22 @@ WITH selected_datasets AS (
FROM datasets_view d
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = dv.run_uuid
) e ON e.run_uuid = dv.run_uuid
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = rim.run_uuid
) e ON e.run_uuid = rim.run_uuid
SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid
) df ON df.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
Expand All @@ -104,13 +104,9 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
FROM dataset_runs d2,
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
WHERE d2.run_uuid = d2.run_uuid
AND ds -> 'facets' IS NOT NULL
AND ds ->> 'name' = d2.name
AND ds ->> 'namespace' = d2.namespace_name
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
FROM dataset_runs AS d2
WHERE d2.run_uuid = d2.run_uuid AND d2.facet IS NOT NULL
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid""")
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);
Expand Down Expand Up @@ -148,22 +144,22 @@ WITH selected_datasets AS (
ORDER BY d.name
LIMIT :limit OFFSET :offset
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = dv.run_uuid
) e ON e.run_uuid = dv.run_uuid
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, event_time, event FROM lineage_events
WHERE run_uuid = rim.run_uuid
) e ON e.run_uuid = rim.run_uuid
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
Expand All @@ -176,13 +172,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets
FROM dataset_runs d2,
jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
FROM dataset_runs AS d2
WHERE d2.run_uuid = d2.run_uuid
AND ds -> 'facets' IS NOT NULL
AND ds ->> 'name' = d2.name
AND ds ->> 'namespace' = d2.namespace_name
AND d2.facet IS NOT NULL
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid
ORDER BY d.name""")
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/DatasetFacetsDao.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down
54 changes: 21 additions & 33 deletions api/src/main/java/marquez/db/DatasetVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,10 @@ WITH selected_dataset_versions AS (
FROM selected_dataset_versions dv
LEFT JOIN runs_input_mapping rim
ON rim.dataset_version_uuid = dv.uuid
), selected_dataset_version_events AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_version_runs dv
LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid
LEFT JOIN dataset_facets_view df ON df.dataset_uuid = dv.dataset_uuid
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
Expand All @@ -186,14 +186,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
FROM selected_dataset_version_events dve,
jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds
WHERE dve.run_uuid = dve.run_uuid
AND ds -> 'facets' IS NOT NULL
AND ds ->> 'name' = dve.dataset_name
AND ds ->> 'namespace' = dve.namespace_name
GROUP BY dve.uuid
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid""")
Optional<DatasetVersion> findBy(UUID version);

Expand All @@ -211,10 +207,10 @@ WITH selected_dataset_versions AS (
FROM selected_dataset_versions dv
LEFT JOIN runs_input_mapping rim
ON rim.dataset_version_uuid = dv.uuid
), selected_dataset_version_events AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_version_runs dv
LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid
LEFT JOIN dataset_facets_view df ON df.dataset_uuid = dv.dataset_uuid
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
Expand All @@ -229,14 +225,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
FROM selected_dataset_version_events dve,
jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds
WHERE dve.run_uuid = dve.run_uuid
AND ds -> 'facets' IS NOT NULL
AND ds ->> 'name' = dve.dataset_name
AND ds ->> 'namespace' = dve.namespace_name
GROUP BY dve.uuid
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid""")
Optional<DatasetVersion> findByUuid(UUID uuid);

Expand Down Expand Up @@ -283,10 +275,10 @@ WITH selected_dataset_versions AS (
FROM selected_dataset_versions dv
LEFT JOIN runs_input_mapping rim
ON rim.dataset_version_uuid = dv.uuid
), selected_dataset_version_events AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_version_runs dv
LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid
LEFT JOIN dataset_facets_view df ON df.dataset_uuid = dv.dataset_uuid
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
Expand All @@ -301,14 +293,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets
FROM selected_dataset_version_events dve,
jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds
WHERE dve.run_uuid = dve.run_uuid
AND ds -> 'facets' IS NOT NULL
AND ds ->> 'name' = dve.dataset_name
AND ds ->> 'namespace' = dve.namespace_name
GROUP BY dve.uuid
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid
ORDER BY dv.created_at DESC""")
List<DatasetVersion> findAll(String namespaceName, String datasetName, int limit, int offset);
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/FacetUtils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down
20 changes: 10 additions & 10 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ SELECT EXISTS (
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(e.facets) AS facets
SELECT run_uuid, JSON_AGG(e.facet) AS facets
FROM (
SELECT run_uuid, event->'job'->'facets' AS facets
FROM lineage_events AS le
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid
SELECT jf.run_uuid, jf.facet
FROM job_facets_view AS jf
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=jf.run_uuid
INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid
WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName
ORDER BY event_time ASC
ORDER BY lineage_event_time ASC
) e
GROUP BY e.run_uuid
) f ON f.run_uuid=jv.latest_run_uuid
Expand Down Expand Up @@ -135,14 +135,14 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(e.facets) AS facets
SELECT run_uuid, JSON_AGG(e.facet) AS facets
FROM (
SELECT run_uuid, event->'job'->'facets' AS facets
FROM lineage_events AS le
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid
SELECT jf.run_uuid, jf.facet
FROM job_facets_view AS jf
INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=jf.run_uuid
INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid
WHERE j2.namespace_name=:namespaceName
ORDER BY event_time ASC
ORDER BY lineage_event_time ASC
) e
GROUP BY e.run_uuid
) f ON f.run_uuid=jv.latest_run_uuid
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/JobFacetsDao.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down
8 changes: 4 additions & 4 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ WITH job_version_io AS (
LEFT JOIN job_version_io dsio ON dsio.job_version_uuid = jv.uuid
LEFT OUTER JOIN runs r ON r.uuid = jv.latest_run_uuid
LEFT JOIN LATERAL (
SELECT le.run_uuid, JSON_AGG(event -> 'run' -> 'facets') AS facets
FROM lineage_events le
WHERE le.run_uuid=jv.latest_run_uuid
GROUP BY le.run_uuid
SELECT jf.run_uuid, JSON_AGG(jf.facet ORDER BY jf.lineage_event_time ASC) AS facets
FROM job_facets_view AS jf
WHERE jf.run_uuid=jv.latest_run_uuid AND jf.job_uuid = jv.job_uuid
GROUP BY jf.run_uuid
) AS f ON r.uuid = f.run_uuid
LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
LEFT JOIN LATERAL (
Expand Down
12 changes: 6 additions & 6 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,6 @@ WHERE ds.uuid IN (<dsUuids>)""")
+ "LEFT JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n"
+ "LEFT JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n"
+ " FROM lineage_events le\n"
+ " WHERE le.run_uuid=r.uuid\n"
+ " GROUP BY le.run_uuid\n"
+ ") AS f ON r.uuid=f.run_uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT im.run_uuid,\n"
+ " JSON_AGG(json_build_object('namespace', dv.namespace_name,\n"
+ " 'name', dv.dataset_name,\n"
Expand All @@ -130,6 +124,12 @@ WHERE ds.uuid IN (<dsUuids>)""")
+ " GROUP BY im.run_uuid\n"
+ ") ri ON ri.run_uuid=r.uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n"
+ " FROM run_facets_view AS rf\n"
+ " WHERE rf.run_uuid=r.uuid\n"
+ " GROUP BY rf.run_uuid\n"
+ ") AS f ON r.uuid=f.run_uuid\n"
+ "LEFT JOIN LATERAL (\n"
+ " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n"
+ " 'name', dataset_name,\n"
+ " 'version', version)) AS output_versions\n"
Expand Down
14 changes: 7 additions & 7 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public interface RunDao extends BaseDao {
+ "FROM runs_view AS r\n"
+ "LEFT OUTER JOIN\n"
+ "(\n"
+ " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n"
+ " FROM lineage_events le\n"
+ " GROUP BY le.run_uuid\n"
+ " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n"
+ " FROM run_facets_view rf\n"
+ " GROUP BY rf.run_uuid\n"
+ ") AS f ON r.uuid=f.run_uuid\n"
+ "LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n"
+ "LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n"
Expand Down Expand Up @@ -129,10 +129,10 @@ public interface RunDao extends BaseDao {
INNER JOIN jobs_view j ON r.job_uuid=j.uuid
LEFT JOIN LATERAL
(
SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets
FROM lineage_events le
WHERE le.run_uuid=r.uuid
GROUP BY le.run_uuid
SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets
FROM run_facets_view rf
WHERE rf.run_uuid=r.uuid
GROUP BY rf.run_uuid
) AS f ON r.uuid=f.run_uuid
LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid
Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/db/RunFacetsDao.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 contributors to the Marquez project
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

Expand Down
Loading

0 comments on commit 2df36cf

Please sign in to comment.