diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java index 3a5b3628e8e3..5dbc9fafd5c1 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java @@ -205,11 +205,6 @@ public ConnectionManagerWorkflow getConnectionManagerWorkflow(final WorkflowClie String.format("ConnectionManagerWorkflow for connection %s is unreachable due to having COMPLETED status.", connectionId)); } - if (workflowState.isQuarantined()) { - throw new UnreachableWorkflowException( - String.format("ConnectionManagerWorkflow for connection %s is unreachable due to being in a quarantined state.", connectionId)); - } - return connectionManagerWorkflow; } diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java index 7dc3a33acda5..435e80c1af28 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java @@ -9,7 +9,6 @@ import io.temporal.workflow.SignalMethod; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; -import java.util.UUID; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -84,22 +83,4 @@ class JobInformation { @QueryMethod JobInformation getJobInformation(); - @Data - @NoArgsConstructor - @AllArgsConstructor - class QuarantinedInformation { - - private UUID connectionId; - private long jobId; - private int attemptId; - private boolean isQuarantined; - - } - - /** - * Return if a job is stuck or not with the job information - */ - @QueryMethod - QuarantinedInformation getQuarantinedInformation(); - } diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java index 5f037ef1ca62..351698d38f44 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java @@ -8,6 +8,7 @@ import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent; import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField; import java.util.UUID; +import lombok.AccessLevel; import lombok.Getter; import lombok.NoArgsConstructor; @@ -32,6 +33,8 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan private final boolean resetConnection = false; @Deprecated private final boolean continueAsReset = false; + @Deprecated + @Getter(AccessLevel.NONE) private boolean quarantined = false; private boolean success = true; private boolean cancelledForReset = false; @@ -88,14 +91,6 @@ public void setFailed(final boolean failed) { this.failed = failed; } - public void setQuarantined(final boolean quarantined) { - final ChangedStateEvent event = new ChangedStateEvent( - StateField.QUARANTINED, - quarantined); - stateChangedListener.addEvent(id, event); - this.quarantined = quarantined; - } - public void setSuccess(final boolean success) { final ChangedStateEvent event = new ChangedStateEvent( StateField.SUCCESS, @@ -138,7 +133,6 @@ public void reset() { this.setCancelled(false); this.setFailed(false); this.setSuccess(false); - this.setQuarantined(false); this.setDoneWaiting(false); this.setSkipSchedulingNextWorkflow(false); } diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java index 881c73bb8203..bd8189d5fcaf 100644 --- a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java @@ -31,7 +31,6 @@ enum StateField { FAILED, RESET, CONTINUE_AS_RESET, - QUARANTINED, SUCCESS, CANCELLED_FOR_RESET, RESET_WITH_SCHEDULING, diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java index 36b3eda38806..228c632cf8d6 100644 --- a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java +++ b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java @@ -734,54 +734,12 @@ void testResetConnectionDeletedWorkflow() throws IOException { } - @Test - @DisplayName("Test manual operation on quarantined workflow causes a restart") - void testManualOperationOnQuarantinedWorkflow() { - final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); - final WorkflowState mWorkflowState = mock(WorkflowState.class); - when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); - when(mWorkflowState.isQuarantined()).thenReturn(true); - - final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); - final WorkflowState mNewWorkflowState = mock(WorkflowState.class); - when(mNewConnectionManagerWorkflow.getState()).thenReturn(mNewWorkflowState); - when(mNewWorkflowState.isRunning()).thenReturn(false).thenReturn(true); - when(mNewConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID)); - when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow); - final BatchRequest mBatchRequest = mock(BatchRequest.class); - when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest); - - when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow, mConnectionManagerWorkflow, - mNewConnectionManagerWorkflow); - - final WorkflowStub mWorkflowStub = mock(WorkflowStub.class); - when(workflowClient.newUntypedWorkflowStub(anyString())).thenReturn(mWorkflowStub); - - final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID); - - assertTrue(result.getJobId().isPresent()); - assertEquals(JOB_ID, result.getJobId().get()); - assertFalse(result.getFailingReason().isPresent()); - verify(workflowClient).signalWithStart(mBatchRequest); - verify(mWorkflowStub).terminate(anyString()); - - // Verify that the submitManualSync signal was passed to the batch request by capturing the - // argument, - // executing the signal, and verifying that the desired signal was executed - final ArgumentCaptor batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class); - verify(mBatchRequest).add(batchRequestAddArgCaptor.capture()); - final Proc signal = batchRequestAddArgCaptor.getValue(); - signal.apply(); - verify(mNewConnectionManagerWorkflow).submitManualSync(); - } - @Test @DisplayName("Test manual operation on completed workflow causes a restart") void testManualOperationOnCompletedWorkflow() { final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); final WorkflowState mWorkflowState = mock(WorkflowState.class); when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState); - when(mWorkflowState.isQuarantined()).thenReturn(false); when(mWorkflowState.isDeleted()).thenReturn(false); when(workflowServiceBlockingStub.describeWorkflowExecution(any())) .thenReturn(DescribeWorkflowExecutionResponse.newBuilder().setWorkflowExecutionInfo( diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 97a999951957..262cd9a73094 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -495,19 +495,6 @@ public JobInformation getJobInformation() { attemptNumber == null ? NON_RUNNING_ATTEMPT_ID : attemptNumber); } - @Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME) - @Override - public QuarantinedInformation getQuarantinedInformation() { - final Long jobId = workflowInternalState.getJobId() != null ? workflowInternalState.getJobId() : NON_RUNNING_JOB_ID; - final Integer attemptNumber = workflowInternalState.getAttemptNumber(); - ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobId)); - return new QuarantinedInformation( - connectionId, - jobId, - attemptNumber == null ? NON_RUNNING_ATTEMPT_ID : attemptNumber, - workflowState.isQuarantined()); - } - /** * return true if the workflow is in a state that require it to continue. If the state is to process * an update or delete the workflow, it won't continue with a run of the {@link SyncWorkflow} but it