Skip to content

Commit

Permalink
fix the other orchestrator tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lmossman committed Jul 8, 2022
1 parent 9407892 commit 1b16b90
Showing 1 changed file with 12 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import io.airbyte.api.client.model.generated.SyncMode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.container_orchestrator.ContainerOrchestratorApp;
import io.airbyte.test.utils.AirbyteAcceptanceTestHarness;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.io.IOException;
Expand Down Expand Up @@ -319,26 +318,20 @@ public void testDowntimeDuringSync() throws Exception {
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));

LOGGER.info("Creating connection...");
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(), catalog, null).getConnectionId();

JobInfoRead connectionSyncRead = null;

while (connectionSyncRead == null) {

try {
connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
} catch (final Exception e) {
LOGGER.error("retrying after error", e);
}
}
LOGGER.info("Run manual sync...");
JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

Thread.sleep(10000);
LOGGER.info("Waiting for job to run...");
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

LOGGER.info("Scaling down only non-sync worker...");
LOGGER.info("Scaling down worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true);

LOGGER.info("Scaling up non-sync worker...");
LOGGER.info("Scaling up worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);

waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
Expand Down Expand Up @@ -369,12 +362,9 @@ public void testCancelSyncWithInterruption() throws Exception {
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING));
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

Thread.sleep(5000);

kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0);
Thread.sleep(1000);
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true);
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);

final var resp = apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()));
Expand All @@ -383,50 +373,6 @@ public void testCancelSyncWithInterruption() throws Exception {

@RetryingTest(3)
@Order(7)
@Timeout(value = 5,
unit = TimeUnit.MINUTES)
@EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR",
matches = "true")
public void testCuttingOffPodBeforeFilesTransfer() throws Exception {
final String connectionName = "test-connection";
final UUID sourceId = testHarness.createPostgresSource().getSourceId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.FULL_REFRESH;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));

LOGGER.info("Creating connection...");
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

LOGGER.info("Waiting for connection to be available in Temporal...");

LOGGER.info("Run manual sync...");
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

LOGGER.info("Waiting for job to run...");
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING));

LOGGER.info("Scale down workers...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0);

LOGGER.info("Wait for worker scale down...");
Thread.sleep(1000);

LOGGER.info("Scale up workers...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);

LOGGER.info("Waiting for worker timeout...");
Thread.sleep(ContainerOrchestratorApp.MAX_SECONDS_TO_WAIT_FOR_FILE_COPY * 1000 + 1000);

LOGGER.info("Waiting for job to retry and succeed...");
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
}

@RetryingTest(3)
@Order(8)
@Timeout(value = 5,
unit = TimeUnit.MINUTES)
@EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR",
Expand All @@ -451,16 +397,13 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

LOGGER.info("Waiting for job to run...");
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING));
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

LOGGER.info("Waiting for job to run a little...");
Thread.sleep(5000);
Thread.sleep(1000);

LOGGER.info("Scale down workers...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0);

LOGGER.info("Waiting for worker shutdown...");
Thread.sleep(2000);
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true);

LOGGER.info("Starting background cancellation request...");
final var pool = Executors.newSingleThreadExecutor();
Expand Down

0 comments on commit 1b16b90

Please sign in to comment.