diff --git a/.circleci/config.yml b/.circleci/config.yml index 87ba3324ee..e9cce77793 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -166,6 +166,7 @@ jobs: working_directory: ~/marquez machine: image: ubuntu-2004:current + resource_class: large steps: - checkout - run: ./.circleci/get-docker-compose.sh diff --git a/api/src/main/java/marquez/db/DatasetDao.java b/api/src/main/java/marquez/db/DatasetDao.java index dda4373f62..7439813d63 100644 --- a/api/src/main/java/marquez/db/DatasetDao.java +++ b/api/src/main/java/marquez/db/DatasetDao.java @@ -76,22 +76,22 @@ WITH selected_datasets AS ( FROM datasets_view d WHERE CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks) ), dataset_runs AS ( - SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event + SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet FROM selected_datasets d - INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid + INNER JOIN dataset_versions AS dv ON dv.uuid = d.current_version_uuid LEFT JOIN LATERAL ( - SELECT run_uuid, event_time, event FROM lineage_events - WHERE run_uuid = dv.run_uuid - ) e ON e.run_uuid = dv.run_uuid + SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view + WHERE dataset_uuid = dv.dataset_uuid + ) df ON df.run_uuid = dv.run_uuid UNION - SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event + SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet FROM selected_datasets d INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid LEFT JOIN LATERAL ( - SELECT run_uuid, event_time, event FROM lineage_events - WHERE run_uuid = rim.run_uuid - ) e ON e.run_uuid = rim.run_uuid + SELECT dataset_uuid, run_uuid, lineage_event_time, facet FROM dataset_facets_view + WHERE dataset_uuid = dv.dataset_uuid AND run_uuid = rim.run_uuid + ) df ON df.run_uuid = rim.run_uuid ) SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets FROM selected_datasets d @@ -104,13 +104,9 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = d.uuid LEFT JOIN ( - SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets - FROM dataset_runs d2, - jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds - WHERE d2.run_uuid = d2.run_uuid - AND ds -> 'facets' IS NOT NULL - AND ds ->> 'name' = d2.name - AND ds ->> 'namespace' = d2.namespace_name + SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets + FROM dataset_runs AS d2 + WHERE d2.run_uuid = d2.run_uuid AND d2.facet IS NOT NULL GROUP BY d2.uuid ) f ON f.dataset_uuid = d.uuid""") Optional findDatasetByName(String namespaceName, String datasetName); @@ -148,22 +144,22 @@ WITH selected_datasets AS ( ORDER BY d.name LIMIT :limit OFFSET :offset ), dataset_runs AS ( - SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, event_time, event + SELECT d.uuid, d.name, d.namespace_name, dv.run_uuid, dv.lifecycle_state, lineage_event_time, facet FROM selected_datasets d INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid LEFT JOIN LATERAL ( - SELECT run_uuid, event_time, event FROM lineage_events - WHERE run_uuid = dv.run_uuid - ) e ON e.run_uuid = dv.run_uuid + SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view + WHERE dataset_uuid = dv.dataset_uuid + ) df ON df.run_uuid = dv.run_uuid UNION - SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, event_time, event + SELECT d.uuid, d.name, d.namespace_name, rim.run_uuid, lifecycle_state, lineage_event_time, facet FROM selected_datasets d INNER JOIN dataset_versions dv ON dv.uuid = d.current_version_uuid LEFT JOIN runs_input_mapping rim ON dv.uuid = rim.dataset_version_uuid LEFT JOIN LATERAL ( - SELECT run_uuid, event_time, event FROM lineage_events - WHERE run_uuid = rim.run_uuid - ) e ON e.run_uuid = rim.run_uuid + SELECT run_uuid, lineage_event_time, facet FROM dataset_facets_view + WHERE dataset_uuid = dv.dataset_uuid + ) df ON df.run_uuid = rim.run_uuid ) SELECT d.*, dv.fields, dv.lifecycle_state, sv.schema_location, t.tags, facets FROM selected_datasets d @@ -176,13 +172,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = d.uuid LEFT JOIN ( - SELECT d2.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time) AS facets - FROM dataset_runs d2, - jsonb_array_elements(coalesce(d2.event -> 'inputs', '[]'::jsonb) || coalesce(d2.event -> 'outputs', '[]'::jsonb)) AS ds + SELECT d2.uuid AS dataset_uuid, JSONB_AGG(d2.facet ORDER BY d2.lineage_event_time ASC) AS facets + FROM dataset_runs AS d2 WHERE d2.run_uuid = d2.run_uuid - AND ds -> 'facets' IS NOT NULL - AND ds ->> 'name' = d2.name - AND ds ->> 'namespace' = d2.namespace_name + AND d2.facet IS NOT NULL GROUP BY d2.uuid ) f ON f.dataset_uuid = d.uuid ORDER BY d.name""") diff --git a/api/src/main/java/marquez/db/DatasetFacetsDao.java b/api/src/main/java/marquez/db/DatasetFacetsDao.java index 2d18370f34..0d5bbd88bf 100644 --- a/api/src/main/java/marquez/db/DatasetFacetsDao.java +++ b/api/src/main/java/marquez/db/DatasetFacetsDao.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 contributors to the Marquez project + * Copyright 2018-2023 contributors to the Marquez project * SPDX-License-Identifier: Apache-2.0 */ diff --git a/api/src/main/java/marquez/db/DatasetVersionDao.java b/api/src/main/java/marquez/db/DatasetVersionDao.java index 1f67daaa7f..c679f0e760 100644 --- a/api/src/main/java/marquez/db/DatasetVersionDao.java +++ b/api/src/main/java/marquez/db/DatasetVersionDao.java @@ -168,10 +168,10 @@ WITH selected_dataset_versions AS ( FROM selected_dataset_versions dv LEFT JOIN runs_input_mapping rim ON rim.dataset_version_uuid = dv.uuid - ), selected_dataset_version_events AS ( - SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event + ), selected_dataset_version_facets AS ( + SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, df.lineage_event_time, df.facet FROM selected_dataset_version_runs dv - LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid + LEFT JOIN dataset_facets_view df ON df.dataset_uuid = dv.dataset_uuid ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, @@ -186,14 +186,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = dv.dataset_uuid LEFT JOIN ( - SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets - FROM selected_dataset_version_events dve, - jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds - WHERE dve.run_uuid = dve.run_uuid - AND ds -> 'facets' IS NOT NULL - AND ds ->> 'name' = dve.dataset_name - AND ds ->> 'namespace' = dve.namespace_name - GROUP BY dve.uuid + SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets + FROM selected_dataset_version_facets dvf + WHERE dvf.run_uuid = dvf.run_uuid + GROUP BY dvf.uuid ) f ON f.dataset_uuid = dv.uuid""") Optional findBy(UUID version); @@ -211,10 +207,10 @@ WITH selected_dataset_versions AS ( FROM selected_dataset_versions dv LEFT JOIN runs_input_mapping rim ON rim.dataset_version_uuid = dv.uuid - ), selected_dataset_version_events AS ( - SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event + ), selected_dataset_version_facets AS ( + SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, df.lineage_event_time, df.facet FROM selected_dataset_version_runs dv - LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid + LEFT JOIN dataset_facets_view df ON df.dataset_uuid = dv.dataset_uuid ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state,\s dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, @@ -229,14 +225,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = dv.dataset_uuid LEFT JOIN ( - SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets - FROM selected_dataset_version_events dve, - jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds - WHERE dve.run_uuid = dve.run_uuid - AND ds -> 'facets' IS NOT NULL - AND ds ->> 'name' = dve.dataset_name - AND ds ->> 'namespace' = dve.namespace_name - GROUP BY dve.uuid + SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets + FROM selected_dataset_version_facets dvf + WHERE dvf.run_uuid = dvf.run_uuid + GROUP BY dvf.uuid ) f ON f.dataset_uuid = dv.uuid""") Optional findByUuid(UUID uuid); @@ -283,10 +275,10 @@ WITH selected_dataset_versions AS ( FROM selected_dataset_versions dv LEFT JOIN runs_input_mapping rim ON rim.dataset_version_uuid = dv.uuid - ), selected_dataset_version_events AS ( - SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, le.event_time, le.event + ), selected_dataset_version_facets AS ( + SELECT dv.uuid, dv.dataset_name, dv.namespace_name, dv.run_uuid, df.lineage_event_time, df.facet FROM selected_dataset_version_runs dv - LEFT JOIN lineage_events le ON le.run_uuid = dv.run_uuid + LEFT JOIN dataset_facets_view df ON df.dataset_uuid = dv.dataset_uuid ) SELECT d.type, d.name, d.physical_name, d.namespace_name, d.source_name, d.description, dv.lifecycle_state, dv.created_at, dv.version, dv.fields, dv.run_uuid AS createdByRunUuid, sv.schema_location, @@ -301,14 +293,10 @@ SELECT ARRAY_AGG(t.name) AS tags, m.dataset_uuid GROUP BY m.dataset_uuid ) t ON t.dataset_uuid = dv.dataset_uuid LEFT JOIN ( - SELECT dve.uuid AS dataset_uuid, JSONB_AGG(ds->'facets' ORDER BY event_time ASC) AS facets - FROM selected_dataset_version_events dve, - jsonb_array_elements(coalesce(dve.event -> 'inputs', '[]'::jsonb) || coalesce(dve.event -> 'outputs', '[]'::jsonb)) AS ds - WHERE dve.run_uuid = dve.run_uuid - AND ds -> 'facets' IS NOT NULL - AND ds ->> 'name' = dve.dataset_name - AND ds ->> 'namespace' = dve.namespace_name - GROUP BY dve.uuid + SELECT dvf.uuid AS dataset_uuid, JSONB_AGG(dvf.facet ORDER BY dvf.lineage_event_time ASC) AS facets + FROM selected_dataset_version_facets dvf + WHERE dvf.run_uuid = dvf.run_uuid + GROUP BY dvf.uuid ) f ON f.dataset_uuid = dv.uuid ORDER BY dv.created_at DESC""") List findAll(String namespaceName, String datasetName, int limit, int offset); diff --git a/api/src/main/java/marquez/db/FacetUtils.java b/api/src/main/java/marquez/db/FacetUtils.java index 2fdac63fc3..b756339ede 100644 --- a/api/src/main/java/marquez/db/FacetUtils.java +++ b/api/src/main/java/marquez/db/FacetUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 contributors to the Marquez project + * Copyright 2018-2023 contributors to the Marquez project * SPDX-License-Identifier: Apache-2.0 */ diff --git a/api/src/main/java/marquez/db/JobDao.java b/api/src/main/java/marquez/db/JobDao.java index 7761c1bca9..aa5ca44832 100644 --- a/api/src/main/java/marquez/db/JobDao.java +++ b/api/src/main/java/marquez/db/JobDao.java @@ -63,14 +63,14 @@ SELECT EXISTS ( LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid LEFT OUTER JOIN ( - SELECT run_uuid, JSON_AGG(e.facets) AS facets + SELECT run_uuid, JSON_AGG(e.facet) AS facets FROM ( - SELECT run_uuid, event->'job'->'facets' AS facets - FROM lineage_events AS le - INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid + SELECT jf.run_uuid, jf.facet + FROM job_facets_view AS jf + INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=jf.run_uuid INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid WHERE j2.name=:jobName AND j2.namespace_name=:namespaceName - ORDER BY event_time ASC + ORDER BY lineage_event_time ASC ) e GROUP BY e.run_uuid ) f ON f.run_uuid=jv.latest_run_uuid @@ -135,14 +135,14 @@ default Optional findWithRun(String namespaceName, String jobName) { LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid LEFT OUTER JOIN ( - SELECT run_uuid, JSON_AGG(e.facets) AS facets + SELECT run_uuid, JSON_AGG(e.facet) AS facets FROM ( - SELECT run_uuid, event->'job'->'facets' AS facets - FROM lineage_events AS le - INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=le.run_uuid + SELECT jf.run_uuid, jf.facet + FROM job_facets_view AS jf + INNER JOIN job_versions jv2 ON jv2.latest_run_uuid=jf.run_uuid INNER JOIN jobs_view j2 ON j2.current_version_uuid=jv2.uuid WHERE j2.namespace_name=:namespaceName - ORDER BY event_time ASC + ORDER BY lineage_event_time ASC ) e GROUP BY e.run_uuid ) f ON f.run_uuid=jv.latest_run_uuid diff --git a/api/src/main/java/marquez/db/JobFacetsDao.java b/api/src/main/java/marquez/db/JobFacetsDao.java index db0f9853d8..ddf29dd5ec 100644 --- a/api/src/main/java/marquez/db/JobFacetsDao.java +++ b/api/src/main/java/marquez/db/JobFacetsDao.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 contributors to the Marquez project + * Copyright 2018-2023 contributors to the Marquez project * SPDX-License-Identifier: Apache-2.0 */ diff --git a/api/src/main/java/marquez/db/JobVersionDao.java b/api/src/main/java/marquez/db/JobVersionDao.java index 6eeaffb929..33bbaa725a 100644 --- a/api/src/main/java/marquez/db/JobVersionDao.java +++ b/api/src/main/java/marquez/db/JobVersionDao.java @@ -102,10 +102,10 @@ WITH job_version_io AS ( LEFT JOIN job_version_io dsio ON dsio.job_version_uuid = jv.uuid LEFT OUTER JOIN runs r ON r.uuid = jv.latest_run_uuid LEFT JOIN LATERAL ( - SELECT le.run_uuid, JSON_AGG(event -> 'run' -> 'facets') AS facets - FROM lineage_events le - WHERE le.run_uuid=jv.latest_run_uuid - GROUP BY le.run_uuid + SELECT jf.run_uuid, JSON_AGG(jf.facet ORDER BY jf.lineage_event_time ASC) AS facets + FROM job_facets_view AS jf + WHERE jf.run_uuid=jv.latest_run_uuid AND jf.job_uuid = jv.job_uuid + GROUP BY jf.run_uuid ) AS f ON r.uuid = f.run_uuid LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid LEFT JOIN LATERAL ( diff --git a/api/src/main/java/marquez/db/LineageDao.java b/api/src/main/java/marquez/db/LineageDao.java index bba1c5a786..ce03a685ae 100644 --- a/api/src/main/java/marquez/db/LineageDao.java +++ b/api/src/main/java/marquez/db/LineageDao.java @@ -114,12 +114,6 @@ WHERE ds.uuid IN ()""") + "LEFT JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n" + "LEFT JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n" + "LEFT JOIN LATERAL (\n" - + " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n" - + " FROM lineage_events le\n" - + " WHERE le.run_uuid=r.uuid\n" - + " GROUP BY le.run_uuid\n" - + ") AS f ON r.uuid=f.run_uuid\n" - + "LEFT JOIN LATERAL (\n" + " SELECT im.run_uuid,\n" + " JSON_AGG(json_build_object('namespace', dv.namespace_name,\n" + " 'name', dv.dataset_name,\n" @@ -130,6 +124,12 @@ WHERE ds.uuid IN ()""") + " GROUP BY im.run_uuid\n" + ") ri ON ri.run_uuid=r.uuid\n" + "LEFT JOIN LATERAL (\n" + + " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n" + + " FROM run_facets_view AS rf\n" + + " WHERE rf.run_uuid=r.uuid\n" + + " GROUP BY rf.run_uuid\n" + + ") AS f ON r.uuid=f.run_uuid\n" + + "LEFT JOIN LATERAL (\n" + " SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,\n" + " 'name', dataset_name,\n" + " 'version', version)) AS output_versions\n" diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index 5eb7dae353..13527f93be 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -80,9 +80,9 @@ public interface RunDao extends BaseDao { + "FROM runs_view AS r\n" + "LEFT OUTER JOIN\n" + "(\n" - + " SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets\n" - + " FROM lineage_events le\n" - + " GROUP BY le.run_uuid\n" + + " SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets\n" + + " FROM run_facets_view rf\n" + + " GROUP BY rf.run_uuid\n" + ") AS f ON r.uuid=f.run_uuid\n" + "LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid\n" + "LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid\n" @@ -129,10 +129,10 @@ public interface RunDao extends BaseDao { INNER JOIN jobs_view j ON r.job_uuid=j.uuid LEFT JOIN LATERAL ( - SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets - FROM lineage_events le - WHERE le.run_uuid=r.uuid - GROUP BY le.run_uuid + SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets + FROM run_facets_view rf + WHERE rf.run_uuid=r.uuid + GROUP BY rf.run_uuid ) AS f ON r.uuid=f.run_uuid LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid LEFT OUTER JOIN job_contexts AS ctx ON r.job_context_uuid = ctx.uuid diff --git a/api/src/main/java/marquez/db/RunFacetsDao.java b/api/src/main/java/marquez/db/RunFacetsDao.java index e8600fe5be..f357a8b27f 100644 --- a/api/src/main/java/marquez/db/RunFacetsDao.java +++ b/api/src/main/java/marquez/db/RunFacetsDao.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 contributors to the Marquez project + * Copyright 2018-2023 contributors to the Marquez project * SPDX-License-Identifier: Apache-2.0 */ diff --git a/api/src/main/java/marquez/db/migrations/V56_1__FacetViews.java b/api/src/main/java/marquez/db/migrations/V56_1__FacetViews.java new file mode 100644 index 0000000000..9820a7996c --- /dev/null +++ b/api/src/main/java/marquez/db/migrations/V56_1__FacetViews.java @@ -0,0 +1,183 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ +package marquez.db.migrations; + +import lombok.extern.slf4j.Slf4j; +import org.flywaydb.core.api.MigrationVersion; +import org.flywaydb.core.api.migration.Context; +import org.flywaydb.core.api.migration.JavaMigration; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.statement.UnableToExecuteStatementException; + +/** + * Dataset migration written in Java because SQLs used here will be reused in next migration when + * processing content of `lineage_events` table to facets' tables. + */ +@Slf4j +public class V56_1__FacetViews implements JavaMigration { + + public static String getDatasetFacetsDefinitionSQL(Jdbi jdbi, String sourceTable) { + return String.format( + """ + WITH lineage_datasets AS ( + SELECT + jsonb_array_elements(coalesce(le.event -> 'inputs', '[]'::jsonb) || coalesce(le.event -> 'outputs', '[]'::jsonb)) AS dataset, + le.run_uuid, + le.event_time, + le.event_type, + le.created_at + FROM %s le + ), + dataset_facets AS ( + SELECT + jsonb_object_keys as facet_name, + json_build_object(jsonb_object_keys, dataset -> 'facets' -> jsonb_object_keys)::jsonb as facet, + dataset ->> 'name' as dataset_name, + dataset ->> 'namespace' as dataset_namespace, + ld.* + FROM lineage_datasets ld, jsonb_object_keys(coalesce(dataset -> 'facets', '{}'::jsonb)) + ) + SELECT + %s AS uuid, + COALESCE(df.created_at, df.event_time) AS created_at, + dataset_symlinks.dataset_uuid AS dataset_uuid, + df.run_uuid AS run_uuid, + df.event_time AS lineage_event_time, + df.event_type::VARCHAR(64) AS lineage_event_type, + ( + CASE + WHEN lower(facet_name) IN ('documentation', 'schema', 'datasource', 'description', 'lifecyclestatechange', 'version', 'columnlineage', 'ownership') then 'DATASET' + WHEN lower(facet_name) IN ('dataqualitymetrics', 'dataqualityassertions') then 'INPUT' + WHEN lower(facet_name) = 'outputstatistics' then 'OUTPUT' + ELSE 'UNKNOWN' + END + )::VARCHAR(64) AS type, + df.facet_name::VARCHAR(255) AS name, + df.facet AS facet + FROM dataset_facets df + JOIN dataset_symlinks ON dataset_symlinks.name = dataset_name + INNER JOIN namespaces ON dataset_symlinks.namespace_uuid = namespaces.uuid + WHERE namespaces.name = dataset_namespace + """, + sourceTable, getGenerateUuidFunction(jdbi)); + } + + public static String getRunFacetsDefinitionSQL(Jdbi jdbi, String sourceTable) { + return String.format( + """ + SELECT + %s AS uuid, + COALESCE(le.created_at, le.event_time) AS created_at, + le.run_uuid AS run_uuid, + le.event_time AS lineage_event_time, + le.event_type::VARCHAR(64) AS lineage_event_type, + jsonb_object_keys::VARCHAR(255) as name, + json_build_object(jsonb_object_keys, event -> 'run' -> 'facets' -> jsonb_object_keys)::jsonb as facet + FROM %s le, jsonb_object_keys(coalesce(event -> 'run' -> 'facets', '{}'::jsonb)) + WHERE lower(jsonb_object_keys) != 'spark_unknown' + """, + getGenerateUuidFunction(jdbi), sourceTable); + } + + public static String getJobFacetsDefinitionSQL(Jdbi jdbi, String sourceTable) { + return String.format( + """ + SELECT + %s AS uuid, + COALESCE(le.created_at, le.event_time) AS created_at, + r.job_uuid AS job_uuid, + le.run_uuid AS run_uuid, + le.event_time AS lineage_event_time, + le.event_type::VARCHAR(64) AS lineage_event_type, + jsonb_object_keys::VARCHAR(255) as name, + json_build_object(jsonb_object_keys, event -> 'job' -> 'facets' -> jsonb_object_keys)::jsonb as facet + FROM %s le, runs r, jsonb_object_keys(coalesce(event -> 'job' -> 'facets', '{}'::jsonb)) + WHERE r.uuid = le.run_uuid + """, + getGenerateUuidFunction(jdbi), sourceTable); + } + + @Override + public MigrationVersion getVersion() { + return MigrationVersion.fromVersion("56.1"); + } + + @Override + public String getDescription() { + return "CreateFacetViews"; + } + + @Override + public Integer getChecksum() { + return null; + } + + @Override + public boolean isUndo() { + return false; + } + + @Override + public boolean isBaselineMigration() { + return false; + } + + @Override + public boolean canExecuteInTransaction() { + return false; + } + + @Override + public void migrate(Context context) { + Jdbi jdbi = Jdbi.create(context.getConnection()); + + final String datasetFacetQuery = + "CREATE OR REPLACE VIEW dataset_facets_view AS " + + getDatasetFacetsDefinitionSQL(jdbi, "lineage_events"); + + final String runFacetQuery = + "CREATE OR REPLACE VIEW run_facets_view AS " + + getRunFacetsDefinitionSQL(jdbi, "lineage_events"); + + final String jobFacetQuery = + "CREATE OR REPLACE VIEW job_facets_view AS " + + getJobFacetsDefinitionSQL(jdbi, "lineage_events"); + + jdbi.inTransaction(handle -> handle.execute(datasetFacetQuery)); + jdbi.inTransaction(handle -> handle.execute(runFacetQuery)); + jdbi.inTransaction(handle -> handle.execute(jobFacetQuery)); + } + + /** + * Since version 13 Postgresql contains built-in gen_random_uuid function. Up to version 12, + * uuid-ossp extension needs to be registered. Creating extension requires superuser privileges. + * + *

In case of older version, this method registers extension and returns name of the older + * function `uuid_generate_v4`. If function `gen_random_uuid` exists, its name is returned. + * + * @param jdbi + * @return + */ + private static String getGenerateUuidFunction(Jdbi jdbi) { + try { + jdbi.withHandle(h -> h.createCall("SELECT gen_random_uuid()").invoke()); + return "gen_random_uuid()"; + } catch (UnableToExecuteStatementException e1) { + // need to try to install uuid-ossp + } + + try { + jdbi.withHandle(h -> h.createCall("SELECT uuid_generate_v4()").invoke()); + + // uuid-ossp already installed + return "uuid_generate_v4()"; + } catch (UnableToExecuteStatementException e1) { + // need to try to install uuid-ossp + log.info("creating uuid-ossp extension which requires superuser privileges"); + jdbi.withHandle(h -> h.createCall("CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\"").invoke()); + return "uuid_generate_v4()"; + } + } +} diff --git a/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java b/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java index bbb002f0ac..17a6449499 100644 --- a/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/DatasetFacetsDaoTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 contributors to the Marquez project + * Copyright 2018-2023 contributors to the Marquez project * SPDX-License-Identifier: Apache-2.0 */ diff --git a/api/src/test/java/marquez/db/FacetTestUtils.java b/api/src/test/java/marquez/db/FacetTestUtils.java index 4b4822e50b..5e78e9b689 100644 --- a/api/src/test/java/marquez/db/FacetTestUtils.java +++ b/api/src/test/java/marquez/db/FacetTestUtils.java @@ -13,7 +13,7 @@ import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; /* - * Copyright 2018-2022 contributors to the Marquez project + * Copyright 2018-2023 contributors to the Marquez project * SPDX-License-Identifier: Apache-2.0 */ public class FacetTestUtils { diff --git a/api/src/test/java/marquez/db/JobFacetsDaoTest.java b/api/src/test/java/marquez/db/JobFacetsDaoTest.java index 1dee55700e..0431b931e0 100644 --- a/api/src/test/java/marquez/db/JobFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/JobFacetsDaoTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 contributors to the Marquez project + * Copyright 2018-2023 contributors to the Marquez project * SPDX-License-Identifier: Apache-2.0 */ diff --git a/api/src/test/java/marquez/db/RunFacetsDaoTest.java b/api/src/test/java/marquez/db/RunFacetsDaoTest.java index 7c6ac8378c..c3af8a08d7 100644 --- a/api/src/test/java/marquez/db/RunFacetsDaoTest.java +++ b/api/src/test/java/marquez/db/RunFacetsDaoTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2022 contributors to the Marquez project + * Copyright 2018-2023 contributors to the Marquez project * SPDX-License-Identifier: Apache-2.0 */ diff --git a/docker/init-db.sh b/docker/init-db.sh index c5dceb991d..521042bc89 100755 --- a/docker/init-db.sh +++ b/docker/init-db.sh @@ -8,7 +8,7 @@ set -eu psql -v ON_ERROR_STOP=1 --username "${POSTGRES_USER}" > /dev/null <<-EOSQL - CREATE USER ${MARQUEZ_USER}; + CREATE USER ${MARQUEZ_USER} SUPERUSER; ALTER USER ${MARQUEZ_USER} WITH PASSWORD '${MARQUEZ_PASSWORD}'; CREATE DATABASE ${MARQUEZ_DB}; GRANT ALL PRIVILEGES ON DATABASE ${MARQUEZ_DB} TO ${MARQUEZ_USER};