Skip to content

Commit

Permalink
fix downtime test
Browse files Browse the repository at this point in the history
  • Loading branch information
lmossman committed Jul 8, 2022
1 parent 3192a4a commit 9407892
Showing 1 changed file with 23 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,61 +319,37 @@ public void testDowntimeDuringSync() throws Exception {
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE;
catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode));

for (final var input : List.of("KILL_BOTH_NON_SYNC_SLIGHTLY_FIRST", "KILL_ONLY_SYNC", "KILL_ONLY_NON_SYNC")) {
LOGGER.info("Checking " + input);

final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(), catalog, null).getConnectionId();
final UUID connectionId =
testHarness.createConnection(connectionName, sourceId, destinationId, List.of(), catalog, null).getConnectionId();

JobInfoRead connectionSyncRead = null;
JobInfoRead connectionSyncRead = null;

while (connectionSyncRead == null) {
while (connectionSyncRead == null) {

try {
connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
} catch (final Exception e) {
LOGGER.error("retrying after error", e);
}
try {
connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
} catch (final Exception e) {
LOGGER.error("retrying after error", e);
}
}

Thread.sleep(10000);

switch (input) {
case "KILL_BOTH_NON_SYNC_SLIGHTLY_FIRST" -> {
LOGGER.info("Scaling down both workers at roughly the same time...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0);
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true);

LOGGER.info("Scaling up both workers...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1);
}
case "KILL_ONLY_SYNC" -> {
LOGGER.info("Scaling down only sync worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true);

LOGGER.info("Scaling up sync worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1);
}
case "KILL_ONLY_NON_SYNC" -> {
LOGGER.info("Scaling down only non-sync worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true);

LOGGER.info("Scaling up non-sync worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);
}
}
Thread.sleep(10000);

waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());
LOGGER.info("Scaling down only non-sync worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true);

final long numAttempts = apiClient.getJobsApi()
.getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()))
.getAttempts()
.size();
LOGGER.info("Scaling up non-sync worker...");
kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1);

// it should be able to accomplish the resume without an additional attempt!
assertEquals(1, numAttempts);
}
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob());

final long numAttempts = apiClient.getJobsApi()
.getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId()))
.getAttempts()
.size();

// it should be able to accomplish the resume without an additional attempt!
assertEquals(1, numAttempts);
}

@RetryingTest(3)
Expand Down

0 comments on commit 9407892

Please sign in to comment.