diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index c843954f4a23..a0d6a0c57e5e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -252,9 +252,32 @@ public void submitConnectionUpdaterAsync(final UUID connectionId) { } public void deleteConnection(final UUID connectionId) { - final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionUpdateWorkflow(connectionId); - - connectionManagerWorkflow.deleteConnection(); + try { + final ConnectionManagerWorkflow connectionManagerWorkflow = getConnectionUpdateWorkflow(connectionId); + connectionManagerWorkflow.deleteConnection(); + } catch (final IllegalStateException e) { + log.info("Connection in an illegal state; Creating new workflow and sending delete signal"); + + final ConnectionManagerWorkflow connectionManagerWorkflow = getWorkflowOptionsWithWorkflowId(ConnectionManagerWorkflow.class, + TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId)); + + final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder() + .connectionId(connectionId) + .jobId(null) + .attemptId(null) + .fromFailure(false) + .attemptNumber(1) + .workflowState(null) + .resetConnection(false) + .fromJobResetFailure(false) + .build(); + + final BatchRequest signalRequest = client.newSignalWithStartRequest(); + signalRequest.add(connectionManagerWorkflow::run, input); + signalRequest.add(connectionManagerWorkflow::deleteConnection); + client.signalWithStart(signalRequest); + log.info("New start request and delete signal submitted"); + } } public void update(final UUID connectionId) { @@ -444,7 +467,7 @@ private T getExistingWorkflow(final Class workflowClass, final String nam return client.newWorkflowStub(workflowClass, name); } - private ConnectionManagerWorkflow getConnectionUpdateWorkflow(final UUID connectionId) { + ConnectionManagerWorkflow getConnectionUpdateWorkflow(final UUID connectionId) { final boolean workflowReachable = isWorkflowReachable(getConnectionManagerName(connectionId)); if (!workflowReachable) { @@ -505,7 +528,6 @@ boolean isWorkflowStateRunning(final String workflowName) { } } - @VisibleForTesting static String getConnectionManagerName(final UUID connectionId) { return "connection_manager_" + connectionId; } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java index fa9c9e7e3763..d64fe77d965b 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java @@ -12,6 +12,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -39,7 +40,9 @@ import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow.JobInformation; import io.airbyte.workers.temporal.spec.SpecWorkflow; import io.airbyte.workers.temporal.sync.SyncWorkflow; +import io.temporal.client.BatchRequest; import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.nio.file.Files; @@ -51,6 +54,7 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; class TemporalClientTest { @@ -257,4 +261,54 @@ public void migrateCalled() { } + @Nested + @DisplayName("Test delete connection method.") + class DeleteConnection { + + @Test + @SuppressWarnings("unchecked") + @DisplayName("Test delete connection method.") + void testDeleteConnection() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + + doReturn(true).when(temporalClient).isWorkflowReachable(anyString()); + when(workflowClient.newWorkflowStub(any(Class.class), anyString())).thenReturn(mConnectionManagerWorkflow); + + final JobSyncConfig syncConfig = new JobSyncConfig() + .withSourceDockerImage(IMAGE_NAME1) + .withSourceDockerImage(IMAGE_NAME2) + .withSourceConfiguration(Jsons.emptyObject()) + .withDestinationConfiguration(Jsons.emptyObject()) + .withOperationSequence(List.of()) + .withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog()); + + temporalClient.submitSync(JOB_ID, ATTEMPT_ID, syncConfig, CONNECTION_ID); + temporalClient.deleteConnection(CONNECTION_ID); + + verify(workflowClient, Mockito.never()).newSignalWithStartRequest(); + verify(mConnectionManagerWorkflow).deleteConnection(); + } + + @Test + @SuppressWarnings("unchecked") + @DisplayName("Test delete connection method when workflow is in an unexpected state") + void testDeleteConnectionInUnexpectedState() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + final BatchRequest mBatchRequest = mock(BatchRequest.class); + + doThrow(new IllegalStateException("Force illegal state")).when(temporalClient).getConnectionUpdateWorkflow(CONNECTION_ID); + when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mConnectionManagerWorkflow); + when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest); + + temporalClient.deleteConnection(CONNECTION_ID); + + // this is only called when getting existing workflow + verify(workflowClient, Mockito.never()).newWorkflowStub(any(), anyString()); + + verify(workflowClient).newSignalWithStartRequest(); + verify(workflowClient).signalWithStart(mBatchRequest); + } + + } + }