Skip to content

Commit

Permalink
move container orchestrator acceptance tests to their own class
Browse files Browse the repository at this point in the history
  • Loading branch information
lmossman committed Jul 8, 2022
1 parent 1b16b90 commit 2309de2
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -62,12 +58,9 @@
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.junitpioneer.jupiter.RetryingTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/**
* The class test for advanced platform functionality that can be affected by the networking
Expand Down Expand Up @@ -305,128 +298,4 @@ public void testBackpressure() throws Exception {
}
}

@RetryingTest(3)
@Order(5)
@EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR",
matches = "true")
public void testDowntimeDuringSync() throws Exception {
final String connectionName = "test-connection";
final UUID sourceId = testHarness.createPostgresSource().getSourceId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.FULL_REFRESH;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));

LOGGER.info("Creating connection...");
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(), catalog, null).getConnectionId();

LOGGER.info("Run manual sync...");
JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

LOGGER.info("Waiting for job to run...");
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

LOGGER.info("Scaling down worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true);

LOGGER.info("Scaling up worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);

waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());

final long numAttempts = apiClient.getJobsApi()
.getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()))
.getAttempts()
.size();

// it should be able to accomplish the resume without an additional attempt!
assertEquals(1, numAttempts);
}

@RetryingTest(3)
@Order(6)
@EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR",
matches = "true")
public void testCancelSyncWithInterruption() throws Exception {
final String connectionName = "test-connection";
final UUID sourceId = testHarness.createPostgresSource().getSourceId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.FULL_REFRESH;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true);
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);

final var resp = apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()));
assertEquals(JobStatus.CANCELLED, resp.getJob().getStatus());
}

@RetryingTest(3)
@Order(7)
@Timeout(value = 5,
unit = TimeUnit.MINUTES)
@EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR",
matches = "true")
public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception {
final String connectionName = "test-connection";
final UUID sourceId = testHarness.createPostgresSource().getSourceId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.FULL_REFRESH;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));

LOGGER.info("Creating connection...");
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

LOGGER.info("Waiting for connection to be available in Temporal...");

LOGGER.info("Run manual sync...");
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

LOGGER.info("Waiting for job to run...");
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

LOGGER.info("Waiting for job to run a little...");
Thread.sleep(1000);

LOGGER.info("Scale down workers...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true);

LOGGER.info("Starting background cancellation request...");
final var pool = Executors.newSingleThreadExecutor();
final var mdc = MDC.getCopyOfContextMap();
final Future<JobInfoRead> resp =
pool.submit(() -> {
MDC.setContextMap(mdc);
try {
final JobInfoRead jobInfoRead = apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()));
LOGGER.info("jobInfoRead = " + jobInfoRead);
return jobInfoRead;
} catch (final ApiException e) {
LOGGER.error("Failed to read from api", e);
throw e;
}
});
Thread.sleep(2000);

LOGGER.info("Scaling up workers...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);

LOGGER.info("Waiting for cancellation to go into effect...");
assertEquals(JobStatus.CANCELLED, resp.get().getJob().getStatus());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.test.acceptance;

import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitForSuccessfulJob;
import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitWhileJobHasStatus;
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.AirbyteCatalog;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody;
import io.airbyte.api.client.model.generated.DestinationDefinitionRead;
import io.airbyte.api.client.model.generated.DestinationSyncMode;
import io.airbyte.api.client.model.generated.JobIdRequestBody;
import io.airbyte.api.client.model.generated.JobInfoRead;
import io.airbyte.api.client.model.generated.JobStatus;
import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody;
import io.airbyte.api.client.model.generated.SourceDefinitionRead;
import io.airbyte.api.client.model.generated.SyncMode;
import io.airbyte.test.utils.AirbyteAcceptanceTestHarness;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.io.IOException;
import java.net.URISyntaxException;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/**
* This class tests behavior that is specific to container-orchestrator deployments, such as scaling
* down and back up workers while a sync is running to ensure it is not affected by a deployment.
* <p>
* This test class is only enabled if the CONTAINER_ORCHESTRATOR environment variable is true. At
* the time of writing, this is only the case for kubernetes deployments, as container orchestrators
* have not yet been ported over to docker.
*/
@SuppressWarnings({"rawtypes", "ConstantConditions"})
@EnabledIfEnvironmentVariable(named = "CONTAINER_ORCHESTRATOR",
matches = "true")
public class ContainerOrchestratorAcceptanceTests {

private static final Logger LOGGER = LoggerFactory.getLogger(ContainerOrchestratorAcceptanceTests.class);

private static AirbyteAcceptanceTestHarness testHarness;
private static AirbyteApiClient apiClient;
private static UUID workspaceId;
private static KubernetesClient kubernetesClient;

@SuppressWarnings("UnstableApiUsage")
@BeforeAll
public static void init() throws URISyntaxException, IOException, InterruptedException, ApiException {
apiClient = new AirbyteApiClient(
new ApiClient().setScheme("http")
.setHost("localhost")
.setPort(8001)
.setBasePath("/api"));
// work in whatever default workspace is present.
workspaceId = apiClient.getWorkspaceApi().listWorkspaces().getWorkspaces().get(0).getWorkspaceId();
LOGGER.info("workspaceId = " + workspaceId);

// log which connectors are being used.
final SourceDefinitionRead sourceDef = apiClient.getSourceDefinitionApi()
.getSourceDefinition(new SourceDefinitionIdRequestBody()
.sourceDefinitionId(UUID.fromString("decd338e-5647-4c0b-adf4-da0e75f5a750")));
final DestinationDefinitionRead destinationDef = apiClient.getDestinationDefinitionApi()
.getDestinationDefinition(new DestinationDefinitionIdRequestBody()
.destinationDefinitionId(UUID.fromString("25c5221d-dce2-4163-ade9-739ef790f503")));
LOGGER.info("pg source definition: {}", sourceDef.getDockerImageTag());
LOGGER.info("pg destination definition: {}", destinationDef.getDockerImageTag());

testHarness = new AirbyteAcceptanceTestHarness(apiClient, workspaceId);
kubernetesClient = testHarness.getKubernetesClient();
}

@AfterAll
public static void end() {
testHarness.stopDbAndContainers();
}

@BeforeEach
public void setup() throws URISyntaxException, IOException, SQLException {
testHarness.setup();
}

@AfterEach
public void tearDown() {
testHarness.cleanup();
}

@Test
public void testDowntimeDuringSync() throws Exception {
final String connectionName = "test-connection";
final UUID sourceId = testHarness.createPostgresSource().getSourceId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.FULL_REFRESH;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));

LOGGER.info("Creating connection...");
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(), catalog, null).getConnectionId();

LOGGER.info("Run manual sync...");
JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

LOGGER.info("Waiting for job to run...");
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

LOGGER.info("Scaling down worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true);

LOGGER.info("Scaling up worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);

waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());

final long numAttempts = apiClient.getJobsApi()
.getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()))
.getAttempts()
.size();

// it should be able to accomplish the resume without an additional attempt!
assertEquals(1, numAttempts);
}

@Test
public void testCancelSyncWithInterruption() throws Exception {
final String connectionName = "test-connection";
final UUID sourceId = testHarness.createPostgresSource().getSourceId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.FULL_REFRESH;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true);
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);

final var resp = apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()));
assertEquals(JobStatus.CANCELLED, resp.getJob().getStatus());
}

@Test
public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception {
final String connectionName = "test-connection";
final UUID sourceId = testHarness.createPostgresSource().getSourceId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
final UUID operationId = testHarness.createOperation().getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.FULL_REFRESH;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));

LOGGER.info("Creating connection...");
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

LOGGER.info("Waiting for connection to be available in Temporal...");

LOGGER.info("Run manual sync...");
final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

LOGGER.info("Waiting for job to run...");
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.PENDING));

LOGGER.info("Waiting for job to run a little...");
Thread.sleep(1000);

LOGGER.info("Scale down workers...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true);

LOGGER.info("Starting background cancellation request...");
final var pool = Executors.newSingleThreadExecutor();
final var mdc = MDC.getCopyOfContextMap();
final Future<JobInfoRead> resp =
pool.submit(() -> {
MDC.setContextMap(mdc);
try {
final JobInfoRead jobInfoRead = apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()));
LOGGER.info("jobInfoRead = " + jobInfoRead);
return jobInfoRead;
} catch (final ApiException e) {
LOGGER.error("Failed to read from api", e);
throw e;
}
});
Thread.sleep(2000);

LOGGER.info("Scaling up workers...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);

LOGGER.info("Waiting for cancellation to go into effect...");
assertEquals(JobStatus.CANCELLED, resp.get().getJob().getStatus());
}

}

0 comments on commit 2309de2

Please sign in to comment.