Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/symlinked jobs in queries #2053

Merged
merged 5 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ public interface LineageDao {
+ " SELECT j.uuid AS job_uuid,\n"
+ " ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,\n"
+ " ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs\n"
+ " FROM jobs j\n"
+ " LEFT JOIN job_versions v on j.current_version_uuid = v.uuid\n"
+ " FROM jobs_view j\n"
+ " LEFT JOIN jobs_view s ON s.symlink_target_uuid=j.uuid\n"
+ " LEFT JOIN job_versions v on COALESCE(j.current_version_uuid, s.current_version_uuid) = v.uuid\n"
+ " LEFT JOIN job_versions_io_mapping io on v.uuid = io.job_version_uuid\n"
+ " GROUP BY j.uuid\n"
+ " ),\n"
Expand All @@ -60,9 +61,10 @@ public interface LineageDao {
+ " array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)\n"
+ " AND depth < :depth"
+ " )\n"
+ "SELECT DISTINCT ON (l2.job_uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n"
+ "SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n"
+ "FROM lineage l2\n"
+ "INNER JOIN jobs_view j ON j.uuid=l2.job_uuid\n"
+ "INNER JOIN jobs_view s ON s.uuid=l2.job_uuid\n"
+ "INNER JOIN jobs_view j ON j.uuid=COALESCE(s.symlink_target_uuid, s.uuid)\n"
+ "LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

Expand All @@ -88,7 +90,8 @@ public interface LineageDao {
+ " SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version\n"
+ " FROM runs_view r\n"
+ " INNER JOIN job_versions jv ON jv.uuid=r.job_version_uuid\n"
+ " WHERE jv.job_uuid in (<jobUuid>)\n"
+ " INNER JOIN jobs_view j ON j.uuid=jv.job_uuid\n"
+ " WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)\n"
+ " ORDER BY r.job_name, r.namespace_name, created_at DESC\n"
+ ")\n"
+ "SELECT r.*, ra.args, ctx.context, f.facets,\n"
Expand Down
8 changes: 6 additions & 2 deletions api/src/main/java/marquez/db/SearchDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ public interface SearchDao {
+ " FROM datasets AS d\n"
+ " WHERE d.name ilike '%' || :query || '%'\n"
+ " UNION\n"
+ " SELECT 'JOB' AS type, j.name, j.updated_at, j.namespace_name\n"
+ " FROM jobs_view AS j\n"
+ " SELECT DISTINCT ON (j.namespace_name, j.name) \n"
+ " 'JOB' AS type, j.name, j.updated_at, j.namespace_name\n"
+ " FROM (SELECT namespace_name, name, unnest(aliases) AS alias, updated_at \n"
+ " FROM jobs_view WHERE symlink_target_uuid IS NULL\n"
+ " ORDER BY updated_at DESC) AS j\n"
+ " WHERE j.name ilike '%' || :query || '%'\n"
+ " OR j.alias ilike '%' || :query || '%'\n"
+ ") AS results\n"
+ "WHERE type = :filter OR CAST(:filter AS TEXT) IS NULL\n"
+ "ORDER BY :sort\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ BEGIN
INNER JOIN fqn jf ON jf.uuid = COALESCE(js.link_target_uuid, j.uuid)
ON CONFLICT (uuid) DO UPDATE
SET job_fqn=EXCLUDED.job_fqn,
aliases = jobs_fqn.aliases || EXCLUDED.aliases;
aliases = (SELECT array_agg(DISTINCT a) FROM (SELECT unnest(jobs_fqn.aliases) AS a UNION SELECT unnest(EXCLUDED.aliases) AS a) al);
END IF;
SELECT * INTO inserted_job FROM jobs_view WHERE uuid=job_uuid;
return inserted_job;
Expand Down
101 changes: 101 additions & 0 deletions api/src/test/java/marquez/db/LineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.base.Functions;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -25,10 +26,13 @@
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import marquez.common.models.JobType;
import marquez.db.LineageTestUtils.DatasetConsumerJob;
import marquez.db.LineageTestUtils.JobLineage;
import marquez.db.models.DatasetData;
import marquez.db.models.JobData;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
import marquez.db.models.UpdateLineageRow;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.LineageEvent;
Expand All @@ -44,6 +48,7 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.postgresql.util.PGobject;

@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
public class LineageDaoTest {
Expand Down Expand Up @@ -177,6 +182,102 @@ public void testGetLineage() {
}
}

@Test
public void testGetLineageForSymlinkedJob() throws SQLException {

UpdateLineageRow writeJob =
LineageTestUtils.createLineageRow(
openLineageDao,
"writeJob",
"COMPLETE",
jobFacet,
Arrays.asList(),
Arrays.asList(dataset));
List<JobLineage> jobRows =
writeDownstreamLineage(
openLineageDao,
new LinkedList<>(
Arrays.asList(
new DatasetConsumerJob("readJob", 20, Optional.of("outputData")),
new DatasetConsumerJob("downstreamJob", 1, Optional.empty()))),
jobFacet,
dataset);

NamespaceRow namespaceRow =
jdbi.onDemand(NamespaceDao.class)
.findNamespaceByName(writeJob.getJob().getNamespaceName())
.get();

PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue("[]");

String symlinkTargetJobName = "A_new_write_job";
JobRow targetJob =
jdbi.onDemand(JobDao.class)
.upsertJob(
UUID.randomUUID(),
JobType.valueOf(writeJob.getJob().getType()),
writeJob.getJob().getCreatedAt(),
namespaceRow.getUuid(),
writeJob.getJob().getNamespaceName(),
symlinkTargetJobName,
writeJob.getJob().getDescription().orElse(null),
writeJob.getJob().getJobContextUuid().orElse(null),
writeJob.getJob().getLocation(),
null,
inputs);
jdbi.onDemand(JobDao.class)
.upsertJob(
writeJob.getJob().getUuid(),
JobType.valueOf(writeJob.getJob().getType()),
writeJob.getJob().getCreatedAt(),
namespaceRow.getUuid(),
writeJob.getJob().getNamespaceName(),
writeJob.getJob().getName(),
writeJob.getJob().getDescription().orElse(null),
writeJob.getJob().getJobContextUuid().orElse(null),
writeJob.getJob().getLocation(),
targetJob.getUuid(),
inputs);

// fetch the first "targetJob" lineage.
Set<JobData> connectedJobs =
lineageDao.getLineage(new HashSet<>(Arrays.asList(targetJob.getUuid())), 2);

// 20 readJobs + 1 downstreamJob for each (20) + 1 write job = 41
assertThat(connectedJobs).size().isEqualTo(41);

Set<UUID> jobIds = connectedJobs.stream().map(JobData::getUuid).collect(Collectors.toSet());
// expect the job that wrote "commonDataset", which is readJob0's input
assertThat(jobIds).contains(targetJob.getUuid());

// expect all downstream jobs
Set<UUID> readJobUUIDs =
jobRows.stream()
.flatMap(row -> Stream.concat(Stream.of(row), row.getDownstreamJobs().stream()))
.map(JobLineage::getId)
.collect(Collectors.toSet());
assertThat(jobIds).containsAll(readJobUUIDs);

Map<UUID, JobData> actualJobRows =
connectedJobs.stream().collect(Collectors.toMap(JobData::getUuid, Functions.identity()));
for (JobLineage expected : jobRows) {
JobData job = actualJobRows.get(expected.getId());
assertThat(job.getInputUuids())
.containsAll(
expected.getInput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator);
assertThat(job.getOutputUuids())
.containsAll(
expected.getOutput().map(ds -> ds.getDatasetRow().getUuid()).stream()::iterator);
}
Set<UUID> lineageForOriginalJob =
lineageDao.getLineage(new HashSet<>(Arrays.asList(writeJob.getJob().getUuid())), 2).stream()
.map(JobData::getUuid)
.collect(Collectors.toSet());
assertThat(lineageForOriginalJob).isEqualTo(jobIds);
}

@Test
public void testGetLineageWithJobThatHasNoDownstreamConsumers() {

Expand Down
67 changes: 65 additions & 2 deletions api/src/test/java/marquez/db/SearchDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.net.URL;
import java.sql.SQLException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
Expand All @@ -15,17 +19,24 @@
import marquez.api.models.SearchFilter;
import marquez.api.models.SearchResult;
import marquez.api.models.SearchSort;
import marquez.common.Utils;
import marquez.common.models.JobType;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
import marquez.jdbi.MarquezJdbiExternalPostgresExtension;
import marquez.service.models.JobMeta;
import org.jdbi.v3.core.Jdbi;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.postgresql.util.PGobject;

/** The test suite for {@link SearchDao}. */
@Tag("DataAccessTests")
@ExtendWith(MarquezJdbiExternalPostgresExtension.class)
public class SearchDaoTest {

static final int LIMIT = 25;
static final int NUM_OF_JOBS = 2;
/**
Expand All @@ -34,10 +45,12 @@ public class SearchDaoTest {
*/
static final int NUM_OF_DATASETS = 12;

public static final String NEW_SYMLINK_TARGET_JOB = "a_new_symlink_target_job";

static SearchDao searchDao;

@BeforeAll
public static void setUpOnce(final Jdbi jdbi) {
public static void setUpOnce(final Jdbi jdbi) throws SQLException {
searchDao = jdbi.onDemand(SearchDao.class);

DbTestUtils.newDataset(jdbi, "name_ordering_0");
Expand All @@ -48,7 +61,51 @@ public static void setUpOnce(final Jdbi jdbi) {
DbTestUtils.newDataset(jdbi, "time_ordering_1");
DbTestUtils.newDataset(jdbi, "time_ordering_2");

DbTestUtils.newJobs(jdbi, NUM_OF_JOBS);
ImmutableSet<JobRow> jobRows = DbTestUtils.newJobs(jdbi, NUM_OF_JOBS);

// add a symlinked job - validate that the number of results is the same in the below unit test
jobRows.stream()
.findAny()
.ifPresent(
j -> {
try {
NamespaceRow namespaceRow =
jdbi.onDemand(NamespaceDao.class)
.findNamespaceByName(j.getNamespaceName())
.get();
JobRow symlinkTargetJob =
DbTestUtils.newJobWith(
jdbi,
namespaceRow.getName(),
NEW_SYMLINK_TARGET_JOB,
new JobMeta(
JobType.valueOf(j.getType()),
ImmutableSet.copyOf(j.getInputs()),
ImmutableSet.of(),
new URL(j.getLocation()),
ImmutableMap.of(),
j.getDescription().orElse(null),
null));
PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue(Utils.getMapper().writeValueAsString(j.getInputs()));
jdbi.onDemand(JobDao.class)
.upsertJob(
j.getUuid(),
JobType.valueOf(j.getType()),
j.getCreatedAt(),
namespaceRow.getUuid(),
namespaceRow.getName(),
j.getName(),
j.getDescription().orElse(null),
j.getJobContextUuid().orElse(null),
j.getLocation(),
symlinkTargetJob.getUuid(),
inputs);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

@Test
Expand All @@ -72,6 +129,12 @@ public void testSearch() {
final List<SearchResult> resultsWithOnlyJobs =
resultsGroupedByType.get(SearchResult.ResultType.JOB);
assertThat(resultsWithOnlyJobs).hasSize(NUM_OF_JOBS);

// Even though we searched for "test" and the symlink target doesn't have "test" in its name,
// it is part of the search results because the original job had "test" in its name.
assertThat(resultsWithOnlyJobs)
.filteredOn(j -> j.getName().equals(NEW_SYMLINK_TARGET_JOB))
.isNotEmpty();
}

@Test
Expand Down