Skip to content

Commit

Permalink
Move helper test functions to AirbyteAcceptanceTestHarness (#14656)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp authored Jul 14, 2022
1 parent 21fbf22 commit 5a7b3a5
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
import io.airbyte.api.client.model.generated.DestinationDefinitionUpdate;
import io.airbyte.api.client.model.generated.DestinationIdRequestBody;
import io.airbyte.api.client.model.generated.DestinationRead;
import io.airbyte.api.client.model.generated.JobConfigType;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
import io.airbyte.api.client.model.generated.JobListRequestBody;
import io.airbyte.api.client.model.generated.JobRead;
import io.airbyte.api.client.model.generated.JobStatus;
import io.airbyte.api.client.model.generated.JobWithAttemptsRead;
import io.airbyte.api.client.model.generated.NamespaceDefinitionType;
import io.airbyte.api.client.model.generated.OperationCreate;
import io.airbyte.api.client.model.generated.OperationIdRequestBody;
Expand Down Expand Up @@ -714,6 +717,13 @@ private void deleteOperation(final UUID destinationId) throws ApiException {
apiClient.getOperationApi().deleteOperation(new OperationIdRequestBody().operationId(destinationId));
}

public JobRead getMostRecentSyncJobId(UUID connectionId) throws Exception {
return apiClient.getJobsApi()
.listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC)))
.getJobs()
.stream().findFirst().map(JobWithAttemptsRead::getJob).orElseThrow();
}

public static void waitForSuccessfulJob(final JobsApi jobsApi, final JobRead originalJob) throws InterruptedException, ApiException {
final JobRead job = waitWhileJobHasStatus(jobsApi, originalJob, Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING));

Expand Down Expand Up @@ -769,6 +779,20 @@ public static ConnectionState waitForConnectionState(final AirbyteApiClient apiC
return connectionState;
}

public JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exception {
final JobRead lastJob = getMostRecentSyncJobId(connectionId);
if (lastJob.getStatus() != JobStatus.SUCCEEDED) {
return lastJob;
}

JobRead mostRecentSyncJob = getMostRecentSyncJobId(connectionId);
while (mostRecentSyncJob.getId().equals(lastJob.getId())) {
Thread.sleep(Duration.ofSeconds(1).toMillis());
mostRecentSyncJob = getMostRecentSyncJobId(connectionId);
}
return mostRecentSyncJob;
}

public enum Type {
SOURCE,
DESTINATION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ public void testResetAllWhenSchemaIsModified() throws Exception {
LOGGER.info("ConnectionState after the update request: {}", postResetState.toString());

// Wait until the sync from the UpdateConnection is finished
final JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId());
final JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connection.getConnectionId());
LOGGER.info("Generated SyncJob config: {}", syncFromTheUpdate.toString());
waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate);

Expand Down Expand Up @@ -1008,27 +1008,6 @@ private void prettyPrintTables(final DSLContext ctx, final String... tables) {
}
}

private JobRead getMostRecentSyncJobId(UUID connectionId) throws Exception {
return apiClient.getJobsApi()
.listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC)))
.getJobs()
.stream().findFirst().map(JobWithAttemptsRead::getJob).orElseThrow();
}

private JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exception {
final JobRead lastJob = getMostRecentSyncJobId(connectionId);
if (lastJob.getStatus() != JobStatus.SUCCEEDED) {
return lastJob;
}

JobRead mostRecentSyncJob = getMostRecentSyncJobId(connectionId);
while (mostRecentSyncJob.getId().equals(lastJob.getId())) {
Thread.sleep(Duration.ofSeconds(1).toMillis());
mostRecentSyncJob = getMostRecentSyncJobId(connectionId);
}
return mostRecentSyncJob;
}

@Test
public void testIncrementalSyncMultipleStreams() throws Exception {
LOGGER.info("Starting testIncrementalSyncMultipleStreams()");
Expand Down

0 comments on commit 5a7b3a5

Please sign in to comment.