Skip to content

Commit

Permalink
Improve job list API with more fetching capabilities (airbytehq#16415)
Browse files Browse the repository at this point in the history
* start implementation of new persistence method

* add includingJobId and totalJobCount to job list request

* format

* update local openapi as well

* refactor queries into JOOQ and return empty list if target job cannot be found

* fix descriptions and undo changes from other branch

* switch including job to starting job

* fix job history handler tests

* rewrite jobs subqueries in jooq

* fix multiple config type querying

* remove unnecessary casts

* switch back to 'including' and return multiple of page size necessary to include job

* undo webapp changes

* fix test description

* format
  • Loading branch information
lmossman authored and robbinhan committed Sep 29, 2022
1 parent f2eca5e commit 36d2573
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 13 deletions.
8 changes: 8 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3775,6 +3775,9 @@ components:
$ref: "#/components/schemas/JobConfigType"
configId:
type: string
includingJobId:
description: If the job with this ID exists for the specified connection, returns the number of pages of jobs necessary to include this job. Returns an empty list if this job is specified and cannot be found in this connection.
$ref: "#/components/schemas/JobId"
pagination:
$ref: "#/components/schemas/Pagination"
JobIdRequestBody:
Expand Down Expand Up @@ -3992,11 +3995,16 @@ components:
type: object
required:
- jobs
- totalJobCount
properties:
jobs:
type: array
items:
$ref: "#/components/schemas/JobWithAttemptsRead"
totalJobCount:
description: the total count of jobs for the specified connection
type: integer
format: int64
JobInfoRead:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.commons.text;

import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand All @@ -13,6 +15,10 @@ public static <T extends Enum<T>> String toSqlName(final T value) {
return value.name().toLowerCase();
}

public static <T extends Enum<T>> Set<String> toSqlNames(final Collection<T> values) {
return values.stream().map(Sqls::toSqlName).collect(Collectors.toSet());
}

/**
* Generate a string fragment that can be put in the IN clause of a SQL statement. eg. column IN
* (value1, value2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.scheduler.persistence;

import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.jooq.Result;
import org.jooq.Sequence;
import org.jooq.Table;
import org.jooq.conf.ParamType;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -386,17 +388,58 @@ private Optional<Job> getJobOptional(final DSLContext ctx, final long jobId) {
return getJobFromResult(ctx.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE jobs.id = ?", jobId));
}

@Override
public Long getJobCount(final Set<ConfigType> configTypes, final String connectionId) throws IOException {
return jobDatabase.query(ctx -> ctx.selectCount().from(JOBS)
.where(JOBS.CONFIG_TYPE.in(Sqls.toSqlNames(configTypes)))
.and(JOBS.SCOPE.eq(connectionId))
.fetchOne().into(Long.class));
}

@Override
public List<Job> listJobs(final ConfigType configType, final String configId, final int pagesize, final int offset) throws IOException {
return listJobs(Set.of(configType), configId, pagesize, offset);
}

@Override
public List<Job> listJobs(final Set<ConfigType> configTypes, final String configId, final int pagesize, final int offset) throws IOException {
final String jobsSubquery = "(SELECT * FROM jobs WHERE CAST(jobs.config_type AS VARCHAR) in " + Sqls.toSqlInFragment(configTypes)
+ " AND jobs.scope = '" + configId + "' ORDER BY jobs.created_at DESC, jobs.id DESC LIMIT " + pagesize + " OFFSET " + offset + ") AS jobs";
return jobDatabase.query(ctx -> getJobsFromResult(ctx.fetch(
jobSelectAndJoin(jobsSubquery) + ORDER_BY_JOB_TIME_ATTEMPT_TIME)));
return jobDatabase.query(ctx -> {
final String jobsSubquery = "(" + ctx.select(DSL.asterisk()).from(JOBS)
.where(JOBS.CONFIG_TYPE.in(Sqls.toSqlNames(configTypes)))
.and(JOBS.SCOPE.eq(configId))
.orderBy(JOBS.CREATED_AT.desc(), JOBS.ID.desc())
.limit(pagesize)
.offset(offset)
.getSQL(ParamType.INLINED) + ") AS jobs";

return getJobsFromResult(ctx.fetch(jobSelectAndJoin(jobsSubquery) + ORDER_BY_JOB_TIME_ATTEMPT_TIME));
});
}

@Override
public List<Job> listJobsIncludingId(final Set<ConfigType> configTypes, final String connectionId, final long includingJobId, final int pagesize)
throws IOException {
final Optional<OffsetDateTime> includingJobCreatedAt = jobDatabase.query(ctx -> ctx.select(JOBS.CREATED_AT).from(JOBS)
.where(JOBS.CONFIG_TYPE.in(Sqls.toSqlNames(configTypes)))
.and(JOBS.SCOPE.eq(connectionId))
.and(JOBS.ID.eq(includingJobId))
.stream()
.findFirst()
.map(record -> record.get(JOBS.CREATED_AT, OffsetDateTime.class)));

if (includingJobCreatedAt.isEmpty()) {
return List.of();
}

final int countIncludingJob = jobDatabase.query(ctx -> ctx.selectCount().from(JOBS)
.where(JOBS.CONFIG_TYPE.in(Sqls.toSqlNames(configTypes)))
.and(JOBS.SCOPE.eq(connectionId))
.and(JOBS.CREATED_AT.greaterOrEqual(includingJobCreatedAt.get()))
.fetchOne().into(int.class));

// calculate the multiple of `pagesize` that includes the target job
int pageSizeThatIncludesJob = (countIncludingJob / pagesize + 1) * pagesize;
return listJobs(configTypes, connectionId, pageSizeThatIncludesJob, 0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ public interface JobPersistence {
*/
void writeAttemptFailureSummary(long jobId, int attemptNumber, AttemptFailureSummary failureSummary) throws IOException;

/**
* @param configTypes - the type of config, e.g. sync
* @param connectionId - ID of the connection for which the job count should be retrieved
* @return count of jobs belonging to the specified connection
* @throws IOException
*/
Long getJobCount(final Set<ConfigType> configTypes, final String connectionId) throws IOException;

/**
* @param configTypes - type of config, e.g. sync
* @param configId - id of that config
Expand All @@ -158,6 +166,20 @@ public interface JobPersistence {

List<Job> listJobs(JobConfig.ConfigType configType, String configId, int limit, int offset) throws IOException;

/**
* @param configTypes - type of config, e.g. sync
* @param connectionId - id of the connection for which jobs should be retrieved
* @param includingJobId - id of the job that should be the included in the list, if it exists in
* the connection
* @param pagesize - the pagesize that should be used when building the list (response may include
* multiple pages)
* @return List of jobs in descending created_at order including the specified job. Will include
* multiple pages of jobs if required to include the specified job. If the specified job
* does not exist in the connection, the returned list will be empty.
* @throws IOException
*/
List<Job> listJobsIncludingId(Set<JobConfig.ConfigType> configTypes, String connectionId, long includingJobId, int pagesize) throws IOException;

List<Job> listJobsWithStatus(JobStatus status) throws IOException;

List<Job> listJobsWithStatus(Set<JobConfig.ConfigType> configTypes, JobStatus status) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,39 @@ void testGetOldestPendingJobWithOtherJobWithSameScopeIncomplete() throws IOExcep

}

@Nested
@DisplayName("When getting the count of jobs")
class GetJobCount {

@Test
@DisplayName("Should return the total job count for the connection")
void testGetJobCount() throws IOException {
int numJobsToCreate = 10;
for (int i = 0; i < numJobsToCreate; i++) {
jobPersistence.enqueueJob(CONNECTION_ID.toString(), SPEC_JOB_CONFIG);
}

final Long actualJobCount = jobPersistence.getJobCount(Set.of(SPEC_JOB_CONFIG.getConfigType()), CONNECTION_ID.toString());

assertEquals(numJobsToCreate, actualJobCount);
}

@Test
@DisplayName("Should return 0 if there are no jobs for this connection")
void testGetJobCountNoneForConnection() throws IOException {
final UUID otherConnectionId1 = UUID.randomUUID();
final UUID otherConnectionId2 = UUID.randomUUID();

jobPersistence.enqueueJob(otherConnectionId1.toString(), SPEC_JOB_CONFIG);
jobPersistence.enqueueJob(otherConnectionId2.toString(), SPEC_JOB_CONFIG);

final Long actualJobCount = jobPersistence.getJobCount(Set.of(SPEC_JOB_CONFIG.getConfigType()), CONNECTION_ID.toString());

assertEquals(0, actualJobCount);
}

}

@Nested
@DisplayName("When listing jobs, use paged results")
class ListJobs {
Expand Down Expand Up @@ -1090,6 +1123,25 @@ void testListJobs() throws IOException {
assertEquals(expected, actual);
}

@Test
@DisplayName("Should list all jobs matching multiple config types")
void testListJobsMultipleConfigTypes() throws IOException {
final long specJobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final long checkJobId = jobPersistence.enqueueJob(SCOPE, CHECK_JOB_CONFIG).orElseThrow();
// add a third config type that is not added in the listJobs request, to verify that it is not
// included in the results
jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();

final List<Job> actualList =
jobPersistence.listJobs(Set.of(SPEC_JOB_CONFIG.getConfigType(), CHECK_JOB_CONFIG.getConfigType()), CONNECTION_ID.toString(), 9999, 0);

final List<Job> expectedList =
List.of(createJob(checkJobId, CHECK_JOB_CONFIG, JobStatus.PENDING, Collections.emptyList(), NOW.getEpochSecond()),
createJob(specJobId, SPEC_JOB_CONFIG, JobStatus.PENDING, Collections.emptyList(), NOW.getEpochSecond()));

assertEquals(expectedList, actualList);
}

@Test
@DisplayName("Should list all jobs with all attempts")
void testListJobsWithMultipleAttempts() throws IOException {
Expand Down Expand Up @@ -1144,6 +1196,69 @@ void testListJobsWithMultipleAttemptsInDescOrder() throws IOException {
assertEquals(jobId2, actualList.get(0).getId());
}

@Test
@DisplayName("Should list jobs including the specified job")
void testListJobsIncludingId() throws IOException {
final List<Long> ids = new ArrayList<>();
for (int i = 0; i < 100; i++) {
// This makes each enqueued job have an increasingly higher createdAt time
when(timeSupplier.get()).thenReturn(Instant.ofEpochSecond(i));
// Alternate between spec and check job config types to verify that both config types are fetched
// properly
final JobConfig jobConfig = i % 2 == 0 ? SPEC_JOB_CONFIG : CHECK_JOB_CONFIG;
final long jobId = jobPersistence.enqueueJob(CONNECTION_ID.toString(), jobConfig).orElseThrow();
ids.add(jobId);
// also create an attempt for each job to verify that joining with attempts does not cause failures
jobPersistence.createAttempt(jobId, LOG_PATH);
}

final int includingIdIndex = 90;
final int pageSize = 25;
final List<Job> actualList = jobPersistence.listJobsIncludingId(Set.of(SPEC_JOB_CONFIG.getConfigType(), CHECK_JOB_CONFIG.getConfigType()),
CONNECTION_ID.toString(), ids.get(includingIdIndex), pageSize);
final List<Long> expectedJobIds = Lists.reverse(ids.subList(ids.size() - pageSize, ids.size()));
assertEquals(expectedJobIds, actualList.stream().map(Job::getId).toList());
}

@Test
@DisplayName("Should list jobs including the specified job, including multiple pages if necessary")
void testListJobsIncludingIdMultiplePages() throws IOException {
final List<Long> ids = new ArrayList<>();
for (int i = 0; i < 100; i++) {
// This makes each enqueued job have an increasingly higher createdAt time
when(timeSupplier.get()).thenReturn(Instant.ofEpochSecond(i));
// Alternate between spec and check job config types to verify that both config types are fetched
// properly
final JobConfig jobConfig = i % 2 == 0 ? SPEC_JOB_CONFIG : CHECK_JOB_CONFIG;
final long jobId = jobPersistence.enqueueJob(CONNECTION_ID.toString(), jobConfig).orElseThrow();
ids.add(jobId);
// also create an attempt for each job to verify that joining with attempts does not cause failures
jobPersistence.createAttempt(jobId, LOG_PATH);
}

// including id is on the second page, so response should contain two pages of jobs
final int includingIdIndex = 60;
final int pageSize = 25;
final List<Job> actualList = jobPersistence.listJobsIncludingId(Set.of(SPEC_JOB_CONFIG.getConfigType(), CHECK_JOB_CONFIG.getConfigType()),
CONNECTION_ID.toString(), ids.get(includingIdIndex), pageSize);
final List<Long> expectedJobIds = Lists.reverse(ids.subList(ids.size() - (pageSize * 2), ids.size()));
assertEquals(expectedJobIds, actualList.stream().map(Job::getId).toList());
}

@Test
@DisplayName("Should return an empty list if there is no job with the includingJob ID for this connection")
void testListJobsIncludingIdFromWrongConnection() throws IOException {
for (int i = 0; i < 10; i++) {
jobPersistence.enqueueJob(CONNECTION_ID.toString(), SPEC_JOB_CONFIG);
}

final long otherConnectionJobId = jobPersistence.enqueueJob(UUID.randomUUID().toString(), SPEC_JOB_CONFIG).orElseThrow();

final List<Job> actualList =
jobPersistence.listJobsIncludingId(Set.of(SPEC_JOB_CONFIG.getConfigType()), CONNECTION_ID.toString(), otherConnectionJobId, 25);
assertEquals(List.of(), actualList);
}

}

@Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,25 @@ public JobReadList listJobsFor(final JobListRequestBody request) throws IOExcept
.collect(Collectors.toSet());
final String configId = request.getConfigId();

final List<JobWithAttemptsRead> jobReads = jobPersistence.listJobs(configTypes,
configId,
(request.getPagination() != null && request.getPagination().getPageSize() != null) ? request.getPagination().getPageSize()
: DEFAULT_PAGE_SIZE,
(request.getPagination() != null && request.getPagination().getRowOffset() != null) ? request.getPagination().getRowOffset() : 0)
final int pageSize = (request.getPagination() != null && request.getPagination().getPageSize() != null) ? request.getPagination().getPageSize()
: DEFAULT_PAGE_SIZE;
final List<Job> jobs;

if (request.getIncludingJobId() != null) {
jobs = jobPersistence.listJobsIncludingId(configTypes, configId, request.getIncludingJobId(), pageSize);
} else {
jobs = jobPersistence.listJobs(configTypes, configId, pageSize,
(request.getPagination() != null && request.getPagination().getRowOffset() != null) ? request.getPagination().getRowOffset() : 0);
}

final Long totalJobCount = jobPersistence.getJobCount(configTypes, configId);

final List<JobWithAttemptsRead> jobReads = jobs
.stream()
.map(attempt -> jobConverter.getJobWithAttemptsRead(attempt))
.map(JobConverter::getJobWithAttemptsRead)
.collect(Collectors.toList());
return new JobReadList().jobs(jobReads);

return new JobReadList().jobs(jobReads).totalJobCount(totalJobCount);
}

public JobInfoRead getJobInfo(final JobIdRequestBody jobIdRequestBody) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ void testListJobs() throws IOException {

when(jobPersistence.listJobs(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID, pagesize, rowOffset))
.thenReturn(List.of(latestJobNoAttempt, successfulJob));
when(jobPersistence.getJobCount(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID)).thenReturn(2L);

final var requestBody = new JobListRequestBody()
.configTypes(Collections.singletonList(CONFIG_TYPE_FOR_API))
Expand All @@ -158,7 +159,8 @@ void testListJobs() throws IOException {
final var successfulJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(successfulJob)).attempts(ImmutableList.of(toAttemptRead(
testJobAttempt)));
final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJobNoAttempt)).attempts(Collections.emptyList());
final JobReadList expectedJobReadList = new JobReadList().jobs(List.of(latestJobWithAttemptRead, successfulJobWithAttemptRead));
final JobReadList expectedJobReadList =
new JobReadList().jobs(List.of(latestJobWithAttemptRead, successfulJobWithAttemptRead)).totalJobCount(2L);

assertEquals(expectedJobReadList, jobReadList);
}
Expand Down Expand Up @@ -187,6 +189,7 @@ void testListJobsFor() throws IOException {
new Job(latestJobId, ConfigType.SYNC, JOB_CONFIG_ID, JOB_CONFIG, Collections.emptyList(), JobStatus.PENDING, null, createdAt3, createdAt3);

when(jobPersistence.listJobs(configTypes, JOB_CONFIG_ID, pagesize, rowOffset)).thenReturn(List.of(latestJob, secondJob, firstJob));
when(jobPersistence.getJobCount(configTypes, JOB_CONFIG_ID)).thenReturn(3L);

final JobListRequestBody requestBody = new JobListRequestBody()
.configTypes(List.of(CONFIG_TYPE_FOR_API, JobConfigType.SYNC, JobConfigType.DISCOVER_SCHEMA))
Expand All @@ -200,7 +203,40 @@ void testListJobsFor() throws IOException {
new JobWithAttemptsRead().job(toJobInfo(secondJob)).attempts(ImmutableList.of(toAttemptRead(secondJobAttempt)));
final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJob)).attempts(Collections.emptyList());
final JobReadList expectedJobReadList =
new JobReadList().jobs(List.of(latestJobWithAttemptRead, secondJobWithAttemptRead, firstJobWithAttemptRead));
new JobReadList().jobs(List.of(latestJobWithAttemptRead, secondJobWithAttemptRead, firstJobWithAttemptRead)).totalJobCount(3L);

assertEquals(expectedJobReadList, jobReadList);
}

@Test
@DisplayName("Should return jobs including specified job id")
void testListJobsIncludingJobId() throws IOException {
final var successfulJob = testJob;
final int pagesize = 25;
final int rowOffset = 0;

final var jobId2 = JOB_ID + 100;
final var createdAt2 = CREATED_AT + 1000;
final var latestJobNoAttempt =
new Job(jobId2, JOB_CONFIG.getConfigType(), JOB_CONFIG_ID, JOB_CONFIG, Collections.emptyList(), JobStatus.PENDING,
null, createdAt2, createdAt2);

when(jobPersistence.listJobsIncludingId(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID, jobId2, pagesize))
.thenReturn(List.of(latestJobNoAttempt, successfulJob));
when(jobPersistence.getJobCount(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID)).thenReturn(2L);

final var requestBody = new JobListRequestBody()
.configTypes(Collections.singletonList(CONFIG_TYPE_FOR_API))
.configId(JOB_CONFIG_ID)
.includingJobId(jobId2)
.pagination(new Pagination().pageSize(pagesize).rowOffset(rowOffset));
final var jobReadList = jobHistoryHandler.listJobsFor(requestBody);

final var successfulJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(successfulJob)).attempts(ImmutableList.of(toAttemptRead(
testJobAttempt)));
final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJobNoAttempt)).attempts(Collections.emptyList());
final JobReadList expectedJobReadList =
new JobReadList().jobs(List.of(latestJobWithAttemptRead, successfulJobWithAttemptRead)).totalJobCount(2L);

assertEquals(expectedJobReadList, jobReadList);
}
Expand Down
Loading

0 comments on commit 36d2573

Please sign in to comment.