Skip to content

Commit

Permalink
improve dataset facets access (#2407)
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <leszczynski.pawel@gmail.com>
Co-authored-by: Willy Lulciuc <willy@datakin.com>
  • Loading branch information
pawel-big-lebowski and wslulciuc authored Feb 22, 2023
1 parent 89bb97a commit dbdbcc3
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 110 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
*Makes the button visible only if the link is not empty.*


* Improve dataset facets access [`#2407`](https://github.com/MarquezProject/marquez/pull/2407) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
* Improves database query performance for accessing datasets and datasets' versions.*

## [0.30.0](https://github.com/MarquezProject/marquez/compare/0.29.0...0.30.0) - 2023-01-31

### Added
Expand Down
82 changes: 22 additions & 60 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,30 +71,8 @@ void updateLastModifiedAt(

@SqlQuery(
"""
WITH selected_datasets AS (
SELECT d.*
FROM datasets_view d
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid
) df ON df.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
FROM datasets_view d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -104,11 +82,15 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
FROM dataset_runs AS d2
WHERE d2.run_uuid = d2.run_uuid AND d2.facet IS NOT NULL
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid""")
SELECT
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets_view AS df
WHERE df.facet IS NOT NULL
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)
""")
Optional<Dataset> findDatasetByName(String namespaceName, String datasetName);

default Optional<Dataset> findWithTags(String namespaceName, String datasetName) {
Expand Down Expand Up @@ -137,32 +119,8 @@ default void setFields(Dataset ds) {

@SqlQuery(
"""
WITH selected_datasets AS (
SELECT d.*
FROM datasets_view d
WHERE d.namespace_name = :namespaceName
ORDER BY d.name
LIMIT :limit OFFSET :offset
), dataset_runs AS (
SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = dv.run_uuid
UNION
SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet
FROM selected_datasets d
INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid
LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid
LEFT JOIN LATERAL (
SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view
WHERE dataset_uuid = dv.dataset_uuid
) df ON df.run_uuid = rim.run_uuid
)
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM selected_datasets d
FROM datasets_view d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -172,13 +130,17 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets
FROM dataset_runs AS d2
WHERE d2.run_uuid = d2.run_uuid
AND d2.facet IS NOT NULL
GROUP BY d2.uuid
) f ON f.dataset_uuid = d.uuid
ORDER BY d.name""")
SELECT
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets_view AS df
WHERE df.facet IS NOT NULL
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE d.namespace_name = :namespaceName
ORDER BY d.name
LIMIT :limit OFFSET :offset
""")
List<Dataset> findAll(String namespaceName, int limit, int offset);

@SqlQuery("SELECT count(*) FROM datasets_view")
Expand Down
78 changes: 28 additions & 50 deletions api/src/main/java/marquez/db/DatasetVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,10 @@ default void updateDatasetVersionMetric(

@SqlQuery(
"""
WITH selected_dataset_versions AS (
SELECT dv.*
FROM dataset_versions dv
WHERE dv.version = :version
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
t.tags, f.facets
FROM selected_dataset_versions dv
FROM dataset_versions dv
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -178,28 +169,21 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid""")
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.version = :version
""")
Optional<DatasetVersion> findBy(UUID version);

@SqlQuery(
"""
WITH selected_dataset_versions AS (
SELECT dv.*
FROM dataset_versions dv
WHERE dv.uuid = :uuid
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
t.tags, f.facets
FROM selected_dataset_versions dv
FROM dataset_versions dv
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -208,12 +192,14 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid""")
LEFT JOIN (
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.uuid = :uuid
""")
Optional<DatasetVersion> findByUuid(UUID uuid);

default Optional<DatasetVersion> findByWithRun(UUID version) {
Expand Down Expand Up @@ -244,22 +230,10 @@ default Optional<DatasetVersion> findByWithRun(UUID version) {

@SqlQuery(
"""
WITH selected_dataset_versions AS (
SELECT dv.*
FROM dataset_versions dv
WHERE dv.namespace_name = :namespaceName
AND dv.dataset_name = :datasetName
ORDER BY dv.created_at DESC
LIMIT :limit OFFSET :offset
), selected_dataset_version_facets AS (
SELECT dv.uuid, dv.dataset_name, dv.namespace_name, df.run_uuid, df.lineage_event_time, df.facet
FROM selected_dataset_versions dv
LEFT JOIN dataset_facets_view df ON df.dataset_version_uuid = dv.uuid
)
SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,
dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location,
t.tags, f.facets
FROM selected_dataset_versions dv
FROM dataset_versions dv
LEFT JOIN datasets_view d ON d.uuid = dv.dataset_uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
Expand All @@ -269,12 +243,16 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = dv.dataset_uuid
LEFT JOIN (
SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM selected_dataset_version_facets dvf
WHERE dvf.run_uuid = dvf.run_uuid
GROUP BY dvf.uuid
) f ON f.dataset_uuid = dv.uuid
ORDER BY dv.created_at DESC""")
SELECT dvf.dataset_version_uuid,
JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets
FROM dataset_facets_view dvf
GROUP BY dataset_version_uuid
) f ON f.dataset_version_uuid = dv.uuid
WHERE dv.namespace_name = :namespaceName
AND dv.dataset_name = :datasetName
ORDER BY dv.created_at DESC
LIMIT :limit OFFSET :offset
""")
List<DatasetVersion> findAll(String namespaceName, String datasetName, int limit, int offset);

default List<DatasetVersion> findAllWithRun(
Expand Down

0 comments on commit dbdbcc3

Please sign in to comment.