Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add acceptance test for deleting connetion #11563

Merged
merged 16 commits into from
Apr 4, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@
import io.airbyte.db.Databases;
import io.airbyte.test.airbyte_test_container.AirbyteTestContainer;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import io.airbyte.workers.temporal.TemporalUtils;
import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.File;
import java.io.IOException;
import java.net.Inet4Address;
Expand Down Expand Up @@ -784,7 +788,7 @@ public void testCheckpointing() throws Exception {
// now cancel it so that we freeze state!
try {
apiClient.getJobsApi().cancelJob(new JobIdRequestBody().id(connectionSyncRead1.getJob().getId()));
} catch (Exception e) {}
} catch (final Exception e) {}

final ConnectionState connectionState = waitForConnectionState(apiClient, connectionId);

Expand Down Expand Up @@ -1147,6 +1151,78 @@ public void testCancelSyncWhenCancelledWhenWorkerIsNotRunning() throws Exception
assertEquals(JobStatus.CANCELLED, resp.get().getJob().getStatus());
}

@Test
@Order(22)
public void testDeleteConnection() throws Exception {
final String connectionName = "test-connection";
final UUID sourceId = createPostgresSource().getSourceId();
final UUID destinationId = createDestination().getDestinationId();
final UUID operationId = createOperation().getOperationId();
final AirbyteCatalog catalog = discoverSourceSchema(sourceId);
final SyncMode syncMode = SyncMode.INCREMENTAL;
final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND_DEDUP;
catalog.getStreams().forEach(s -> s.getConfig()
.syncMode(syncMode)
.cursorField(List.of(COLUMN_ID))
.destinationSyncMode(destinationSyncMode)
.primaryKey(List.of(List.of(COLUMN_NAME))));

UUID connectionId =
createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitWhileJobHasStatus(apiClient.getJobsApi(), connectionSyncRead.getJob(), Set.of(JobStatus.RUNNING));

// test normal deletion of connection
apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId));

// remove connection to avoid exception during tear down
connectionIds.remove(connectionId);

LOGGER.info("Waiting for connection to be deleted...");
Thread.sleep(500);

ConnectionStatus connectionStatus =
apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus();
assertEquals(ConnectionStatus.DEPRECATED, connectionStatus);

// test deletion of connection when temporal workflow is in a bad state, only when using new
// scheduler
final FeatureFlags featureFlags = new EnvVariableFeatureFlags();
if (featureFlags.usesNewScheduler()) {
LOGGER.info("Testing connection deletion when temporal is in a terminal state");
connectionId = createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId();

terminateTemporalWorkflow(connectionId);

// we should still be able to delete the connection when the temporal workflow is in this state
apiClient.getConnectionApi().deleteConnection(new ConnectionIdRequestBody().connectionId(connectionId));

LOGGER.info("Waiting for connection to be deleted...");
Thread.sleep(500);

connectionStatus = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)).getStatus();
assertEquals(ConnectionStatus.DEPRECATED, connectionStatus);
}
}

private void terminateTemporalWorkflow(final UUID connectionId) {
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService("localhost:7233");
final WorkflowClient workflowCLient = WorkflowClient.newInstance(temporalService);

// check if temporal workflow is reachable
final ConnectionManagerWorkflow connectionManagerWorkflow =
workflowCLient.newWorkflowStub(ConnectionManagerWorkflow.class, "connection_manager_" + connectionId);
connectionManagerWorkflow.getState();

// Terminate workflow
LOGGER.info("Terminating temporal workflow...");
workflowCLient.newUntypedWorkflowStub("connection_manager_" + connectionId).terminate("");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract the termination in its own method? It will be needed for other tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


// remove connection to avoid exception during tear down
connectionIds.remove(connectionId);
}

private AirbyteCatalog discoverSourceSchema(final UUID sourceId) throws ApiException {
return apiClient.getSourceApi().discoverSchemaForSource(new SourceDiscoverSchemaRequestBody().sourceId(sourceId)).getCatalog();
}
Expand Down