diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index a0ae45898249..cce982d437c2 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -30,9 +30,12 @@ import io.airbyte.api.client.model.generated.DestinationDefinitionUpdate; import io.airbyte.api.client.model.generated.DestinationIdRequestBody; import io.airbyte.api.client.model.generated.DestinationRead; +import io.airbyte.api.client.model.generated.JobConfigType; import io.airbyte.api.client.model.generated.JobIdRequestBody; +import io.airbyte.api.client.model.generated.JobListRequestBody; import io.airbyte.api.client.model.generated.JobRead; import io.airbyte.api.client.model.generated.JobStatus; +import io.airbyte.api.client.model.generated.JobWithAttemptsRead; import io.airbyte.api.client.model.generated.NamespaceDefinitionType; import io.airbyte.api.client.model.generated.OperationCreate; import io.airbyte.api.client.model.generated.OperationIdRequestBody; @@ -714,6 +717,13 @@ private void deleteOperation(final UUID destinationId) throws ApiException { apiClient.getOperationApi().deleteOperation(new OperationIdRequestBody().operationId(destinationId)); } + public JobRead getMostRecentSyncJobId(UUID connectionId) throws Exception { + return apiClient.getJobsApi() + .listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC))) + .getJobs() + .stream().findFirst().map(JobWithAttemptsRead::getJob).orElseThrow(); + } + public static void waitForSuccessfulJob(final JobsApi jobsApi, final JobRead originalJob) throws InterruptedException, ApiException { final JobRead job = waitWhileJobHasStatus(jobsApi, originalJob, Sets.newHashSet(JobStatus.PENDING, JobStatus.RUNNING)); @@ -769,6 +779,20 @@ public static ConnectionState waitForConnectionState(final AirbyteApiClient apiC return connectionState; } + public JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exception { + final JobRead lastJob = getMostRecentSyncJobId(connectionId); + if (lastJob.getStatus() != JobStatus.SUCCEEDED) { + return lastJob; + } + + JobRead mostRecentSyncJob = getMostRecentSyncJobId(connectionId); + while (mostRecentSyncJob.getId().equals(lastJob.getId())) { + Thread.sleep(Duration.ofSeconds(1).toMillis()); + mostRecentSyncJob = getMostRecentSyncJobId(connectionId); + } + return mostRecentSyncJob; + } + public enum Type { SOURCE, DESTINATION diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 19bacb7a7f02..313a1c15920e 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -977,7 +977,7 @@ public void testResetAllWhenSchemaIsModified() throws Exception { LOGGER.info("ConnectionState after the update request: {}", postResetState.toString()); // Wait until the sync from the UpdateConnection is finished - final JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connection.getConnectionId()); + final JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connection.getConnectionId()); LOGGER.info("Generated SyncJob config: {}", syncFromTheUpdate.toString()); waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); @@ -1008,27 +1008,6 @@ private void prettyPrintTables(final DSLContext ctx, final String... tables) { } } - private JobRead getMostRecentSyncJobId(UUID connectionId) throws Exception { - return apiClient.getJobsApi() - .listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC))) - .getJobs() - .stream().findFirst().map(JobWithAttemptsRead::getJob).orElseThrow(); - } - - private JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exception { - final JobRead lastJob = getMostRecentSyncJobId(connectionId); - if (lastJob.getStatus() != JobStatus.SUCCEEDED) { - return lastJob; - } - - JobRead mostRecentSyncJob = getMostRecentSyncJobId(connectionId); - while (mostRecentSyncJob.getId().equals(lastJob.getId())) { - Thread.sleep(Duration.ofSeconds(1).toMillis()); - mostRecentSyncJob = getMostRecentSyncJobId(connectionId); - } - return mostRecentSyncJob; - } - @Test public void testIncrementalSyncMultipleStreams() throws Exception { LOGGER.info("Starting testIncrementalSyncMultipleStreams()");