Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update insert job function to avoid joining on symlinks for jobs that have no symlinks #2144

Merged
merged 2 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ SELECT f.uuid,
FROM jobs_fqn f,
jobs j
WHERE j.uuid = f.uuid
AND j.is_hidden IS FALSE;
AND j.is_hidden IS FALSE;


CREATE OR REPLACE FUNCTION rewrite_jobs_fqn_table() RETURNS TRIGGER AS
$$
DECLARE
job_uuid uuid;
job_updated_at timestamp with time zone;
new_symlink_target_uuid uuid;
old_symlink_target_uuid uuid;
inserted_job jobs_view%rowtype;
Expand All @@ -53,7 +54,7 @@ BEGIN
COALESCE(NEW.parent_job_uuid::char(36), ''),
false
ON CONFLICT (name, namespace_uuid, parent_job_uuid_string)
DO UPDATE SET updated_at = EXCLUDED.updated_at,
DO UPDATE SET updated_at = now(),
collado-mike marked this conversation as resolved.
Show resolved Hide resolved
type = EXCLUDED.type,
description = EXCLUDED.description,
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
Expand All @@ -64,15 +65,48 @@ BEGIN
EXCLUDED.symlink_target_uuid),
is_hidden = false
-- the SELECT statement below will get the OLD symlink_target_uuid in case of update and the NEW
-- version in case of insert
RETURNING uuid, symlink_target_uuid, (SELECT symlink_target_uuid FROM jobs j2 WHERE j2.uuid=jobs.uuid)
INTO job_uuid, new_symlink_target_uuid, old_symlink_target_uuid;
-- version in case of insert
RETURNING uuid,
updated_at,
symlink_target_uuid,
(SELECT symlink_target_uuid FROM jobs j2 WHERE j2.uuid=jobs.uuid)
INTO job_uuid, job_updated_at, new_symlink_target_uuid, old_symlink_target_uuid;

-- update the jobs_fqn table only when inserting a new record (NEW.uuid will equal the job_uuid
-- when inserting a new record) or when the symlink_target_uuid is being updated.
IF NEW.uuid = job_uuid OR
(new_symlink_target_uuid IS DISTINCT FROM old_symlink_target_uuid) THEN
RAISE LOG 'Updating jobs_fqn due to % to job % (%)', TG_OP, NEW.name, job_uuid;

-- update the jobs_fqn table when inserting a new record
-- (NEW.uuid will equal the job_uuid when inserting a new record)
-- AND if the symlink target is null
-- Avoid constructing the symlinks and aliases, as that is expensive
IF TG_OP='INSERT'
AND NEW.uuid = job_uuid
AND NEW.symlink_target_uuid IS NULL
AND NEW.updated_at=job_updated_at THEN
RAISE DEBUG 'Inserting into jobs_fqn for new job % (%)', NEW.name, job_uuid;
WITH fqn AS (SELECT j.uuid,
CASE
WHEN j.parent_job_uuid IS NULL THEN j.name
ELSE jf.job_fqn || '.' || j.name
END AS name,
j.namespace_uuid,
j.namespace_name,
jf.job_fqn AS parent_job_name,
j.parent_job_uuid
FROM jobs j
LEFT JOIN jobs_fqn jf ON jf.uuid=j.parent_job_uuid
WHERE j.uuid=job_uuid)
INSERT
INTO jobs_fqn
SELECT j.uuid,
jf.namespace_uuid,
jf.namespace_name,
jf.parent_job_name,
ARRAY[jf.name]::text[],
jf.name AS job_fqn
FROM jobs j
INNER JOIN fqn jf ON jf.uuid = j.uuid;
-- or when the symlink_target_uuid is being updated.
ELSIF (new_symlink_target_uuid IS NOT NULL AND new_symlink_target_uuid IS DISTINCT FROM old_symlink_target_uuid) THEN
RAISE DEBUG 'Updating jobs_fqn due to % to job % (%)', TG_OP, NEW.name, job_uuid;
WITH RECURSIVE
jobs_symlink AS (SELECT j.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs j
Expand Down
5 changes: 3 additions & 2 deletions api/src/test/java/marquez/db/LineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.base.Functions;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -220,7 +221,7 @@ public void testGetLineageForSymlinkedJob() throws SQLException {
.upsertJob(
UUID.randomUUID(),
JobType.valueOf(writeJob.getJob().getType()),
writeJob.getJob().getCreatedAt(),
Instant.now(),
namespaceRow.getUuid(),
writeJob.getJob().getNamespaceName(),
symlinkTargetJobName,
Expand All @@ -233,7 +234,7 @@ public void testGetLineageForSymlinkedJob() throws SQLException {
.upsertJob(
writeJob.getJob().getUuid(),
JobType.valueOf(writeJob.getJob().getType()),
writeJob.getJob().getCreatedAt(),
Instant.now(),
namespaceRow.getUuid(),
writeJob.getJob().getNamespaceName(),
writeJob.getJob().getName(),
Expand Down