Skip to content

Commit

Permalink
[FIX] Dataset query to get only the latest facet for each version (#2859
Browse files Browse the repository at this point in the history
)

* fix dataset query to get only the latest facet for each version

Signed-off-by: sophiely <ly.sophie200@gmail.com>

* small fix in query

Signed-off-by: sophiely <ly.sophie200@gmail.com>

---------

Signed-off-by: sophiely <ly.sophie200@gmail.com>
  • Loading branch information
sophiely authored Jul 25, 2024
1 parent 801831c commit f73f115
Showing 1 changed file with 45 additions and 24 deletions.
69 changes: 45 additions & 24 deletions api/src/main/java/marquez/db/DatasetDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,30 +121,51 @@ default void setFields(Dataset ds) {

@SqlQuery(
"""
SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets
FROM datasets_view d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN (
SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid
FROM tags AS t
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid
) t ON t.dataset_uuid = d.uuid
LEFT JOIN (
SELECT
df.dataset_version_uuid,
JSONB_AGG(df.facet ORDER BY df.lineage_event_time ASC) AS facets
FROM dataset_facets AS df
WHERE df.facet IS NOT NULL AND
(df.type ILIKE 'dataset' OR df.type ILIKE 'unknown' OR df.type ILIKE 'input') AND
df.dataset_uuid IN (SELECT uuid FROM datasets_view WHERE namespace_name = :namespaceName ORDER BY name LIMIT :limit OFFSET :offset)
GROUP BY df.dataset_version_uuid
) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE d.namespace_name = :namespaceName
ORDER BY d.name
LIMIT :limit OFFSET :offset
""")
WITH facets_t AS
(SELECT df.dataset_version_uuid,
df.facet,
df."name",
df.created_at,
rank() OVER (PARTITION BY df.dataset_version_uuid, "name"
ORDER BY created_at DESC) AS r
FROM dataset_facets AS df
WHERE df.facet IS NOT NULL
AND (df.type ILIKE 'dataset'
OR df.type ILIKE 'unknown'
OR df.type ILIKE 'input')
AND df.dataset_uuid IN
(SELECT UUID
FROM datasets_view
WHERE namespace_name = :namespaceName
ORDER BY name
LIMIT 10
OFFSET :offset))
SELECT d.*,
dv.fields,
dv.lifecycle_state,
sv.schema_location,
t.tags,
facets
FROM datasets_view d
LEFT JOIN dataset_versions dv ON d.current_version_uuid = dv.uuid
LEFT JOIN stream_versions AS sv ON sv.dataset_version_uuid = dv.uuid
LEFT JOIN
(SELECT ARRAY_AGG(t.name) AS tags,
m.dataset_uuid
FROM tags AS t
INNER JOIN datasets_tag_mapping AS m ON m.tag_uuid = t.uuid
GROUP BY m.dataset_uuid) t ON t.dataset_uuid = d.uuid
LEFT JOIN
(SELECT df.dataset_version_uuid,
JSONB_AGG(df.facet) AS facets
FROM facets_t AS df
WHERE r = 1
GROUP BY df.dataset_version_uuid) f ON f.dataset_version_uuid = d.current_version_uuid
WHERE d.namespace_name = :namespaceName
ORDER BY d.name
LIMIT :limit
OFFSET :offset
""")
List<Dataset> findAll(String namespaceName, int limit, int offset);

@SqlQuery("SELECT count(*) FROM datasets_view")
Expand Down

0 comments on commit f73f115

Please sign in to comment.