Skip to content

Commit

Permalink
refactor queries into JOOQ and return empty list if target job cannot…
Browse files Browse the repository at this point in the history
… be found
  • Loading branch information
lmossman committed Sep 8, 2022
1 parent ac0fd1c commit 6826787
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 43 deletions.
2 changes: 1 addition & 1 deletion airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3989,7 +3989,7 @@ components:
type: object
required:
- jobs
- totalCount
- totalJobCount
properties:
jobs:
type: array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.commons.text;

import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -13,6 +15,10 @@ public static <T extends Enum<T>> String toSqlName(final T value) {
return value.name().toLowerCase();
}

public static <T extends Enum<T>> Set<String> toSqlNames(final Collection<T> values) {
return values.stream().map(Sqls::toSqlName).collect(Collectors.toSet());
}

/**
* Generate a string fragment that can be put in the IN clause of a SQL statement. eg. column IN
* (value1, value2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.scheduler.persistence;

import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
Expand Down Expand Up @@ -342,10 +343,10 @@ private Optional<Job> getJobOptional(final DSLContext ctx, final long jobId) {

@Override
public Long getJobCount(final Set<ConfigType> configTypes, final String connectionId) throws IOException {
final Result<Record> result = jobDatabase.query(ctx -> ctx.fetch(
"SELECT COUNT(*) FROM jobs WHERE CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes) + " AND scope = '" + connectionId
+ "'"));
return result.get(0).get("count", Long.class);
return jobDatabase.query(ctx -> ctx.selectCount().from(JOBS)
.where(JOBS.CONFIG_TYPE.cast(String.class).in(Sqls.toSqlNames(configTypes)))
.and(JOBS.SCOPE.eq(connectionId))
.fetchOne().into(Long.class));
}

@Override
Expand All @@ -364,33 +365,34 @@ public List<Job> listJobs(final Set<ConfigType> configTypes, final String config
@Override
public List<Job> listJobsIncludingId(final Set<ConfigType> configTypes, final String connectionId, final long targetJobId, final int pagesize)
throws IOException {
// fetch creation time of the target job for this connection
final Optional<LocalDateTime> targetJobCreatedAt = jobDatabase.query(ctx -> {
final Optional<Record> targetJobRecord = ctx.fetch(
"SELECT created_at FROM jobs WHERE CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes) + " AND scope = '"
+ connectionId + "' AND id = " + targetJobId)
.stream().findFirst();

return targetJobRecord.map(record -> record.get("created_at", LocalDateTime.class));
});
final Optional<OffsetDateTime> targetJobCreatedAt = jobDatabase.query(ctx -> ctx.select(JOBS.CREATED_AT).from(JOBS)
.where(JOBS.CONFIG_TYPE.cast(String.class).in(Sqls.toSqlNames(configTypes)))
.and(JOBS.SCOPE.eq(connectionId))
.and(JOBS.ID.eq(targetJobId))
.stream()
.findFirst()
.map(record -> record.get(JOBS.CREATED_AT, OffsetDateTime.class)));

// we still want a normal result if the target job cannot be found
if (targetJobCreatedAt.isEmpty()) {
return listJobs(configTypes, connectionId, pagesize, 0);
return List.of();
}

// fetch jobs created after and including the target job
final String jobsSubquery = "(SELECT * FROM jobs WHERE CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes)
+ " AND scope = '" + connectionId + "' AND created_at >= ? ORDER BY created_at DESC, id DESC) AS jobs";
final List<Job> jobs = jobDatabase.query(ctx -> getJobsFromResult(ctx.fetch(
jobSelectAndJoin(jobsSubquery) + ORDER_BY_JOB_TIME_ATTEMPT_TIME, targetJobCreatedAt.get())));
final int countIncludingTargetJob = jobDatabase.query(ctx -> ctx.selectCount().from(JOBS)
.where(JOBS.CONFIG_TYPE.cast(String.class).in(Sqls.toSqlNames(configTypes)))
.and(JOBS.SCOPE.eq(connectionId))
.and(JOBS.CREATED_AT.greaterOrEqual(targetJobCreatedAt.get()))
.fetchOne().into(int.class));

// return a full page of jobs if the above result is smaller than a page
if (jobs.size() < pagesize) {
// always want to return at least `pagesize` number of jobs
if (countIncludingTargetJob < pagesize) {
return listJobs(configTypes, connectionId, pagesize, 0);
}

return jobs;
// list of jobs up to target job is larger than pagesize, so return that list
final String jobsSubquery = "(SELECT * FROM jobs WHERE CAST(config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes)
+ " AND scope = '" + connectionId + "' AND created_at >= ? ORDER BY created_at DESC, id DESC) AS jobs";
return jobDatabase.query(ctx -> getJobsFromResult(ctx.fetch(
jobSelectAndJoin(jobsSubquery) + ORDER_BY_JOB_TIME_ATTEMPT_TIME, targetJobCreatedAt.get().toLocalDateTime())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1187,25 +1187,17 @@ void testListJobsIncludingIdAtLeastFullPageSize() throws IOException {
}

@Test
@DisplayName("Should return the normal full page size if there is no job with the target ID for this connection")
@DisplayName("Should return an empty list if there is no job with the target ID for this connection")
void testListJobsIncludingIdFromWrongConnection() throws IOException {
final List<Long> ids = new ArrayList<>();
for (int i = 0; i < 50; i++) {
// This makes each enqueued job have an increasingly higher createdAt time
when(timeSupplier.get()).thenReturn(Instant.ofEpochSecond(i));
final long jobId = jobPersistence.enqueueJob(CONNECTION_ID.toString(), SPEC_JOB_CONFIG).orElseThrow();
ids.add(jobId);
// also create an attempt for each job to verify that joining with attempts does not cause failures
jobPersistence.createAttempt(jobId, LOG_PATH);
for (int i = 0; i < 10; i++) {
jobPersistence.enqueueJob(CONNECTION_ID.toString(), SPEC_JOB_CONFIG);
}

final long otherConnectionJobId = jobPersistence.enqueueJob(UUID.randomUUID().toString(), SPEC_JOB_CONFIG).orElseThrow();

final int pageSize = 25;
final List<Job> actualList =
jobPersistence.listJobsIncludingId(Set.of(SPEC_JOB_CONFIG.getConfigType()), CONNECTION_ID.toString(), otherConnectionJobId, pageSize);
final List<Long> expectedJobIds = Lists.reverse(ids.subList(ids.size() - pageSize, ids.size()));
assertEquals(expectedJobIds, actualList.stream().map(Job::getId).toList());
jobPersistence.listJobsIncludingId(Set.of(SPEC_JOB_CONFIG.getConfigType()), CONNECTION_ID.toString(), otherConnectionJobId, 25);
assertEquals(List.of(), actualList);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Button, ContentCard, LoadingButton } from "components";
import { Tooltip } from "components/base/Tooltip";
import EmptyResource from "components/EmptyResourceBlock";
import { RotateIcon } from "components/icons/RotateIcon";
import { useAttemptLink } from "components/JobItem/attemptLinkUtils";

import { getFrequencyType } from "config/utils";
import { Action, Namespace } from "core/analytics";
Expand Down Expand Up @@ -51,15 +52,21 @@ const StatusView: React.FC<StatusViewProps> = ({ connection }) => {
const [activeJob, setActiveJob] = useState<ActiveJob>();
const [jobPageSize, setJobPageSize] = useState(JOB_PAGE_SIZE_INCREMENT);
const analyticsService = useAnalyticsService();
const { jobs, isPreviousData: isJobPageLoading } = useListJobs({
const { jobId: linkedJobId } = useAttemptLink();
const {
jobs,
totalJobCount,
isPreviousData: isJobPageLoading,
} = useListJobs({
configId: connection.connectionId,
configTypes: ["sync", "reset_connection"],
includingJobId: linkedJobId ? Number(linkedJobId) : undefined,
pagination: {
pageSize: jobPageSize,
},
});

const moreJobPagesAvailable = jobs.length === jobPageSize;
const moreJobPagesAvailable = jobPageSize < totalJobCount;

useEffect(() => {
const jobRunningOrPending = getJobRunningOrPending(jobs);
Expand All @@ -73,6 +80,9 @@ const StatusView: React.FC<StatusViewProps> = ({ connection }) => {
// We need to disable button when job is canceled but the job list still has a running job
} as ActiveJob)
);

// necessary if a specific job ID was linked, causing us to get back more jobs than the current page size
setJobPageSize((prevJobPageSize) => Math.max(prevJobPageSize, jobs.length));
}, [jobs]);

const { openConfirmationModal, closeConfirmationModal } = useConfirmationModalService();
Expand Down
7 changes: 4 additions & 3 deletions airbyte-webapp/src/services/job/JobService.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
JobDebugInfoRead,
JobInfoRead,
JobListRequestBody,
JobWithAttemptsRead,
JobReadList,
Pagination,
} from "../../core/request/AirbyteClient";
import { useSuspenseQuery } from "../connector/useSuspenseQuery";
Expand All @@ -36,8 +36,9 @@ export const useListJobs = (listParams: JobListRequestBody) => {
keepPreviousData: true,
suspense: true,
});
// cast to JobWithAttemptsRead[] because (suspense: true) means we will never get undefined
return { jobs: result.data?.jobs as JobWithAttemptsRead[], isPreviousData: result.isPreviousData };
// cast to JobReadList because (suspense: true) means we will never get undefined
const jobReadList: JobReadList = result.data as JobReadList;
return { jobs: jobReadList.jobs, totalJobCount: jobReadList.totalJobCount, isPreviousData: result.isPreviousData };
};

export const useGetJob = (id: number, enabled = true) => {
Expand Down
2 changes: 1 addition & 1 deletion docs/reference/api/generated-api-html/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -11443,7 +11443,7 @@ <h3><a name="JobReadList"><code>JobReadList</code> - </a> <a class="up" href="#_
<div class='model-description'></div>
<div class="field-items">
<div class="param">jobs </div><div class="param-desc"><span class="param-type"><a href="#JobWithAttemptsRead">array[JobWithAttemptsRead]</a></span> </div>
<div class="param">totalJobCount (optional)</div><div class="param-desc"><span class="param-type"><a href="#long">Long</a></span> the total count of jobs for the specified connection format: int64</div>
<div class="param">totalJobCount </div><div class="param-desc"><span class="param-type"><a href="#long">Long</a></span> the total count of jobs for the specified connection format: int64</div>
</div> <!-- field-items -->
</div>
<div class="model">
Expand Down

0 comments on commit 6826787

Please sign in to comment.