Skip to content

Commit

Permalink
Allow for deleting connection in an unexpected state (#11302)
Browse files Browse the repository at this point in the history
* Allow for deleting connection in an unexpected state

* Allow for deleting connection in an unexpected state

* Add unit test

* Use correct method when mocking

* Fix implementation

* Add more unit tests

* cleanup

* Add back test to TemporalClientTest

* Remove tests from ConnectionManagerWorkflowTest
  • Loading branch information
terencecho authored Mar 28, 2022
1 parent e3f36ec commit 577e99a
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,32 @@ public void submitConnectionUpdaterAsync(final UUID connectionId) {
}

public void deleteConnection(final UUID connectionId) {
final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionUpdateWorkflow(connectionId);

connectionManagerWorkflow.deleteConnection();
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionUpdateWorkflow(connectionId);
connectionManagerWorkflow.deleteConnection();
} catch (final IllegalStateException e) {
log.info("Connection in an illegal state; Creating new workflow and sending delete signal");

final ConnectionManagerWorkflow connectionManagerWorkflow = getWorkflowOptionsWithWorkflowId(ConnectionManagerWorkflow.class,
TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId));

final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder()
.connectionId(connectionId)
.jobId(null)
.attemptId(null)
.fromFailure(false)
.attemptNumber(1)
.workflowState(null)
.resetConnection(false)
.fromJobResetFailure(false)
.build();

final BatchRequest signalRequest = client.newSignalWithStartRequest();
signalRequest.add(connectionManagerWorkflow::run, input);
signalRequest.add(connectionManagerWorkflow::deleteConnection);
client.signalWithStart(signalRequest);
log.info("New start request and delete signal submitted");
}
}

public void update(final UUID connectionId) {
Expand Down Expand Up @@ -444,7 +467,7 @@ private <T> T getExistingWorkflow(final Class<T> workflowClass, final String nam
return client.newWorkflowStub(workflowClass, name);
}

private ConnectionManagerWorkflow getConnectionUpdateWorkflow(final UUID connectionId) {
ConnectionManagerWorkflow getConnectionUpdateWorkflow(final UUID connectionId) {
final boolean workflowReachable = isWorkflowReachable(getConnectionManagerName(connectionId));

if (!workflowReachable) {
Expand Down Expand Up @@ -505,7 +528,6 @@ boolean isWorkflowStateRunning(final String workflowName) {
}
}

@VisibleForTesting
static String getConnectionManagerName(final UUID connectionId) {
return "connection_manager_" + connectionId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -39,7 +40,9 @@
import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow.JobInformation;
import io.airbyte.workers.temporal.spec.SpecWorkflow;
import io.airbyte.workers.temporal.sync.SyncWorkflow;
import io.temporal.client.BatchRequest;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -51,6 +54,7 @@
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

class TemporalClientTest {

Expand Down Expand Up @@ -257,4 +261,54 @@ public void migrateCalled() {

}

@Nested
@DisplayName("Test delete connection method.")
class DeleteConnection {

@Test
@SuppressWarnings("unchecked")
@DisplayName("Test delete connection method.")
void testDeleteConnection() {
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);

doReturn(true).when(temporalClient).isWorkflowReachable(anyString());
when(workflowClient.newWorkflowStub(any(Class.class), anyString())).thenReturn(mConnectionManagerWorkflow);

final JobSyncConfig syncConfig = new JobSyncConfig()
.withSourceDockerImage(IMAGE_NAME1)
.withSourceDockerImage(IMAGE_NAME2)
.withSourceConfiguration(Jsons.emptyObject())
.withDestinationConfiguration(Jsons.emptyObject())
.withOperationSequence(List.of())
.withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog());

temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID);
temporalClient.deleteConnection(CONNECTION_ID);

verify(workflowClient, Mockito.never()).newSignalWithStartRequest();
verify(mConnectionManagerWorkflow).deleteConnection();
}

@Test
@SuppressWarnings("unchecked")
@DisplayName("Test delete connection method when workflow is in an unexpected state")
void testDeleteConnectionInUnexpectedState() {
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final BatchRequest mBatchRequest = mock(BatchRequest.class);

doThrow(new IllegalStateException("Force illegal state")).when(temporalClient).getConnectionUpdateWorkflow(CONNECTION_ID);
when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mConnectionManagerWorkflow);
when(workflowClient.