Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the quarantine status #21088

Merged
merged 8 commits into from
Jan 6, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,6 @@ public ConnectionManagerWorkflow getConnectionManagerWorkflow(final WorkflowClie
String.format("ConnectionManagerWorkflow for connection %s is unreachable due to having COMPLETED status.", connectionId));
}

if (workflowState.isQuarantined()) {
throw new UnreachableWorkflowException(
String.format("ConnectionManagerWorkflow for connection %s is unreachable due to being in a quarantined state.", connectionId));
}

return connectionManagerWorkflow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.util.UUID;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -84,22 +83,4 @@ class JobInformation {
@QueryMethod
JobInformation getJobInformation();

@Data
@NoArgsConstructor
@AllArgsConstructor
class QuarantinedInformation {

private UUID connectionId;
private long jobId;
private int attemptId;
private boolean isQuarantined;

}

/**
* Return if a job is stuck or not with the job information
*/
@QueryMethod
QuarantinedInformation getQuarantinedInformation();

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent;
import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField;
import java.util.UUID;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;

Expand All @@ -32,6 +33,8 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan
private final boolean resetConnection = false;
@Deprecated
private final boolean continueAsReset = false;
@Deprecated
@Getter(AccessLevel.NONE)
private boolean quarantined = false;
private boolean success = true;
private boolean cancelledForReset = false;
Expand Down Expand Up @@ -88,14 +91,6 @@ public void setFailed(final boolean failed) {
this.failed = failed;
}

public void setQuarantined(final boolean quarantined) {
final ChangedStateEvent event = new ChangedStateEvent(
StateField.QUARANTINED,
quarantined);
stateChangedListener.addEvent(id, event);
this.quarantined = quarantined;
}

public void setSuccess(final boolean success) {
final ChangedStateEvent event = new ChangedStateEvent(
StateField.SUCCESS,
Expand Down Expand Up @@ -138,7 +133,6 @@ public void reset() {
this.setCancelled(false);
this.setFailed(false);
this.setSuccess(false);
this.setQuarantined(false);
this.setDoneWaiting(false);
this.setSkipSchedulingNextWorkflow(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ enum StateField {
FAILED,
RESET,
CONTINUE_AS_RESET,
QUARANTINED,
SUCCESS,
CANCELLED_FOR_RESET,
RESET_WITH_SCHEDULING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,54 +734,12 @@ void testResetConnectionDeletedWorkflow() throws IOException {

}

@Test
@DisplayName("Test manual operation on quarantined workflow causes a restart")
void testManualOperationOnQuarantinedWorkflow() {
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mWorkflowState = mock(WorkflowState.class);
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isQuarantined()).thenReturn(true);

final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mNewWorkflowState = mock(WorkflowState.class);
when(mNewConnectionManagerWorkflow.getState()).thenReturn(mNewWorkflowState);
when(mNewWorkflowState.isRunning()).thenReturn(false).thenReturn(true);
when(mNewConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID));
when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow);
final BatchRequest mBatchRequest = mock(BatchRequest.class);
when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest);

when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow, mConnectionManagerWorkflow,
mNewConnectionManagerWorkflow);

final WorkflowStub mWorkflowStub = mock(WorkflowStub.class);
when(workflowClient.newUntypedWorkflowStub(anyString())).thenReturn(mWorkflowStub);

final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID);

assertTrue(result.getJobId().isPresent());
assertEquals(JOB_ID, result.getJobId().get());
assertFalse(result.getFailingReason().isPresent());
verify(workflowClient).signalWithStart(mBatchRequest);
verify(mWorkflowStub).terminate(anyString());

// Verify that the submitManualSync signal was passed to the batch request by capturing the
// argument,
// executing the signal, and verifying that the desired signal was executed
final ArgumentCaptor<Proc> batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class);
verify(mBatchRequest).add(batchRequestAddArgCaptor.capture());
final Proc signal = batchRequestAddArgCaptor.getValue();
signal.apply();
verify(mNewConnectionManagerWorkflow).submitManualSync();
}

@Test
@DisplayName("Test manual operation on completed workflow causes a restart")
void testManualOperationOnCompletedWorkflow() {
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mWorkflowState = mock(WorkflowState.class);
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isQuarantined()).thenReturn(false);
when(mWorkflowState.isDeleted()).thenReturn(false);
when(workflowServiceBlockingStub.describeWorkflowExecution(any()))
.thenReturn(DescribeWorkflowExecutionResponse.newBuilder().setWorkflowExecutionInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,19 +495,6 @@ public JobInformation getJobInformation() {
attemptNumber == null ? NON_RUNNING_ATTEMPT_ID : attemptNumber);
}

@Trace(operationName = WORKFLOW_TRACE_OPERATION_NAME)
@Override
public QuarantinedInformation getQuarantinedInformation() {
final Long jobId = workflowInternalState.getJobId() != null ? workflowInternalState.getJobId() : NON_RUNNING_JOB_ID;
final Integer attemptNumber = workflowInternalState.getAttemptNumber();
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, JOB_ID_KEY, jobId));
return new QuarantinedInformation(
connectionId,
jobId,
attemptNumber == null ? NON_RUNNING_ATTEMPT_ID : attemptNumber,
workflowState.isQuarantined());
}

/**
* return true if the workflow is in a state that require it to continue. If the state is to process
* an update or delete the workflow, it won't continue with a run of the {@link SyncWorkflow} but it
Expand Down