From 6826787ab6cfd98b1e8488cf4ce9f54cdf9f4bf8 Mon Sep 17 00:00:00 2001 From: lmossman Date: Wed, 7 Sep 2022 17:30:54 -0700 Subject: [PATCH] refactor queries into JOOQ and return empty list if target job cannot be found --- airbyte-api/src/main/openapi/config.yaml | 2 +- .../java/io/airbyte/commons/text/Sqls.java | 6 +++ .../persistence/DefaultJobPersistence.java | 48 ++++++++++--------- .../DefaultJobPersistenceTest.java | 18 ++----- .../components/StatusView.tsx | 14 +++++- .../src/services/job/JobService.tsx | 7 +-- .../api/generated-api-html/index.html | 2 +- 7 files changed, 54 insertions(+), 43 deletions(-) diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 926c21742e1b..de8c1f04a313 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -3989,7 +3989,7 @@ components: type: object required: - jobs - - totalCount + - totalJobCount properties: jobs: type: array diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/text/Sqls.java b/airbyte-commons/src/main/java/io/airbyte/commons/text/Sqls.java index 3a17a80510a0..4d511f31a28a 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/text/Sqls.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/text/Sqls.java @@ -4,6 +4,8 @@ package io.airbyte.commons.text; +import java.util.Collection; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -13,6 +15,10 @@ public static > String toSqlName(final T value) { return value.name().toLowerCase(); } + public static > Set toSqlNames(final Collection values) { + return values.stream().map(Sqls::toSqlName).collect(Collectors.toSet()); + } + /** * Generate a string fragment that can be put in the IN clause of a SQL statement. eg. column IN * (value1, value2) diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 24e146bcf0d6..9e6c046807c9 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -5,6 +5,7 @@ package io.airbyte.scheduler.persistence; import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS; +import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeType; @@ -342,10 +343,10 @@ private Optional getJobOptional(final DSLContext ctx, final long jobId) { @Override public Long getJobCount(final Set configTypes, final String connectionId) throws IOException { - final Result result = jobDatabase.query(ctx -> ctx.fetch( - "SELECT COUNT(*) FROM jobs WHERE CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes) + " AND scope = '" + connectionId - + "'")); - return result.get(0).get("count", Long.class); + return jobDatabase.query(ctx -> ctx.selectCount().from(JOBS) + .where(JOBS.CONFIG_TYPE.cast(String.class).in(Sqls.toSqlNames(configTypes))) + .and(JOBS.SCOPE.eq(connectionId)) + .fetchOne().into(Long.class)); } @Override @@ -364,33 +365,34 @@ public List listJobs(final Set configTypes, final String config @Override public List listJobsIncludingId(final Set configTypes, final String connectionId, final long targetJobId, final int pagesize) throws IOException { - // fetch creation time of the target job for this connection - final Optional targetJobCreatedAt = jobDatabase.query(ctx -> { - final Optional targetJobRecord = ctx.fetch( - "SELECT created_at FROM jobs WHERE CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes) + " AND scope = '" - + connectionId + "' AND id = " + targetJobId) - .stream().findFirst(); - - return targetJobRecord.map(record -> record.get("created_at", LocalDateTime.class)); - }); + final Optional targetJobCreatedAt = jobDatabase.query(ctx -> ctx.select(JOBS.CREATED_AT).from(JOBS) + .where(JOBS.CONFIG_TYPE.cast(String.class).in(Sqls.toSqlNames(configTypes))) + .and(JOBS.SCOPE.eq(connectionId)) + .and(JOBS.ID.eq(targetJobId)) + .stream() + .findFirst() + .map(record -> record.get(JOBS.CREATED_AT, OffsetDateTime.class))); - // we still want a normal result if the target job cannot be found if (targetJobCreatedAt.isEmpty()) { - return listJobs(configTypes, connectionId, pagesize, 0); + return List.of(); } - // fetch jobs created after and including the target job - final String jobsSubquery = "(SELECT * FROM jobs WHERE CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes) - + " AND scope = '" + connectionId + "' AND created_at >= ? ORDER BY created_at DESC, id DESC) AS jobs"; - final List jobs = jobDatabase.query(ctx -> getJobsFromResult(ctx.fetch( - jobSelectAndJoin(jobsSubquery) + ORDER_BY_JOB_TIME_ATTEMPT_TIME, targetJobCreatedAt.get()))); + final int countIncludingTargetJob = jobDatabase.query(ctx -> ctx.selectCount().from(JOBS) + .where(JOBS.CONFIG_TYPE.cast(String.class).in(Sqls.toSqlNames(configTypes))) + .and(JOBS.SCOPE.eq(connectionId)) + .and(JOBS.CREATED_AT.greaterOrEqual(targetJobCreatedAt.get())) + .fetchOne().into(int.class)); - // return a full page of jobs if the above result is smaller than a page - if (jobs.size() < pagesize) { + // always want to return at least `pagesize` number of jobs + if (countIncludingTargetJob < pagesize) { return listJobs(configTypes, connectionId, pagesize, 0); } - return jobs; + // list of jobs up to target job is larger than pagesize, so return that list + final String jobsSubquery = "(SELECT * FROM jobs WHERE CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes) + + " AND scope = '" + connectionId + "' AND created_at >= ? ORDER BY created_at DESC, id DESC) AS jobs"; + return jobDatabase.query(ctx -> getJobsFromResult(ctx.fetch( + jobSelectAndJoin(jobsSubquery) + ORDER_BY_JOB_TIME_ATTEMPT_TIME, targetJobCreatedAt.get().toLocalDateTime()))); } @Override diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index a8cd9ee1213c..349fcec3ad06 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -1187,25 +1187,17 @@ void testListJobsIncludingIdAtLeastFullPageSize() throws IOException { } @Test - @DisplayName("Should return the normal full page size if there is no job with the target ID for this connection") + @DisplayName("Should return an empty list if there is no job with the target ID for this connection") void testListJobsIncludingIdFromWrongConnection() throws IOException { - final List ids = new ArrayList<>(); - for (int i = 0; i < 50; i++) { - // This makes each enqueued job have an increasingly higher createdAt time - when(timeSupplier.get()).thenReturn(Instant.ofEpochSecond(i)); - final long jobId = jobPersistence.enqueueJob(CONNECTION_ID.toString(), SPEC_JOB_CONFIG).orElseThrow(); - ids.add(jobId); - // also create an attempt for each job to verify that joining with attempts does not cause failures - jobPersistence.createAttempt(jobId, LOG_PATH); + for (int i = 0; i < 10; i++) { + jobPersistence.enqueueJob(CONNECTION_ID.toString(), SPEC_JOB_CONFIG); } final long otherConnectionJobId = jobPersistence.enqueueJob(UUID.randomUUID().toString(), SPEC_JOB_CONFIG).orElseThrow(); - final int pageSize = 25; final List actualList = - jobPersistence.listJobsIncludingId(Set.of(SPEC_JOB_CONFIG.getConfigType()), CONNECTION_ID.toString(), otherConnectionJobId, pageSize); - final List expectedJobIds = Lists.reverse(ids.subList(ids.size() - pageSize, ids.size())); - assertEquals(expectedJobIds, actualList.stream().map(Job::getId).toList()); + jobPersistence.listJobsIncludingId(Set.of(SPEC_JOB_CONFIG.getConfigType()), CONNECTION_ID.toString(), otherConnectionJobId, 25); + assertEquals(List.of(), actualList); } } diff --git a/airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/components/StatusView.tsx b/airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/components/StatusView.tsx index 4b07bfa9fed7..f975407c1300 100644 --- a/airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/components/StatusView.tsx +++ b/airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/components/StatusView.tsx @@ -7,6 +7,7 @@ import { Button, ContentCard, LoadingButton } from "components"; import { Tooltip } from "components/base/Tooltip"; import EmptyResource from "components/EmptyResourceBlock"; import { RotateIcon } from "components/icons/RotateIcon"; +import { useAttemptLink } from "components/JobItem/attemptLinkUtils"; import { getFrequencyType } from "config/utils"; import { Action, Namespace } from "core/analytics"; @@ -51,15 +52,21 @@ const StatusView: React.FC = ({ connection }) => { const [activeJob, setActiveJob] = useState(); const [jobPageSize, setJobPageSize] = useState(JOB_PAGE_SIZE_INCREMENT); const analyticsService = useAnalyticsService(); - const { jobs, isPreviousData: isJobPageLoading } = useListJobs({ + const { jobId: linkedJobId } = useAttemptLink(); + const { + jobs, + totalJobCount, + isPreviousData: isJobPageLoading, + } = useListJobs({ configId: connection.connectionId, configTypes: ["sync", "reset_connection"], + includingJobId: linkedJobId ? Number(linkedJobId) : undefined, pagination: { pageSize: jobPageSize, }, }); - const moreJobPagesAvailable = jobs.length === jobPageSize; + const moreJobPagesAvailable = jobPageSize < totalJobCount; useEffect(() => { const jobRunningOrPending = getJobRunningOrPending(jobs); @@ -73,6 +80,9 @@ const StatusView: React.FC = ({ connection }) => { // We need to disable button when job is canceled but the job list still has a running job } as ActiveJob) ); + + // necessary if a specific job ID was linked, causing us to get back more jobs than the current page size + setJobPageSize((prevJobPageSize) => Math.max(prevJobPageSize, jobs.length)); }, [jobs]); const { openConfirmationModal, closeConfirmationModal } = useConfirmationModalService(); diff --git a/airbyte-webapp/src/services/job/JobService.tsx b/airbyte-webapp/src/services/job/JobService.tsx index ac4d58e6d955..94a054caec40 100644 --- a/airbyte-webapp/src/services/job/JobService.tsx +++ b/airbyte-webapp/src/services/job/JobService.tsx @@ -9,7 +9,7 @@ import { JobDebugInfoRead, JobInfoRead, JobListRequestBody, - JobWithAttemptsRead, + JobReadList, Pagination, } from "../../core/request/AirbyteClient"; import { useSuspenseQuery } from "../connector/useSuspenseQuery"; @@ -36,8 +36,9 @@ export const useListJobs = (listParams: JobListRequestBody) => { keepPreviousData: true, suspense: true, }); - // cast to JobWithAttemptsRead[] because (suspense: true) means we will never get undefined - return { jobs: result.data?.jobs as JobWithAttemptsRead[], isPreviousData: result.isPreviousData }; + // cast to JobReadList because (suspense: true) means we will never get undefined + const jobReadList: JobReadList = result.data as JobReadList; + return { jobs: jobReadList.jobs, totalJobCount: jobReadList.totalJobCount, isPreviousData: result.isPreviousData }; }; export const useGetJob = (id: number, enabled = true) => { diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 87444f78971d..704a784b5d34 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -11443,7 +11443,7 @@

JobReadList -
jobs
-
totalJobCount (optional)
Long the total count of jobs for the specified connection format: int64
+
totalJobCount
Long the total count of jobs for the specified connection format: int64