Skip to content

Commit

Permalink
Add logic to recover from quarantined and completed states (#13071)
Browse files Browse the repository at this point in the history
* add logic to recover from quarantined and completed states

* move status retrieval into try-catch

* fix typo in log

* add one more tests

* mvoe isWorkflowStateRunning into ConnectionManagerUtils to be more direct

* format
  • Loading branch information
lmossman authored May 24, 2022
1 parent 8b9fa33 commit e7c26d6
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowImpl;
import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput;
import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
import io.temporal.client.BatchRequest;
import io.temporal.client.WorkflowClient;
import io.temporal.workflow.Functions.Proc;
Expand Down Expand Up @@ -99,6 +103,10 @@ private static <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(
connectionId),
e);

// in case there is an existing workflow in a bad state, attempt to terminate it first before
// starting a new workflow
safeTerminateWorkflow(client, connectionId, "Terminating workflow in unreachable state before starting a new workflow for this connection");

final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
final ConnectionUpdaterInput startWorkflowInput = buildStartWorkflowInput(connectionId);

Expand All @@ -121,6 +129,18 @@ private static <T> ConnectionManagerWorkflow signalWorkflowAndRepairIfNecessary(
}
}

static void safeTerminateWorkflow(final WorkflowClient client, final UUID connectionId, final String reason) {
log.info("Attempting to terminate existing workflow for connection {}.", connectionId);
try {
client.newUntypedWorkflowStub(getConnectionManagerName(connectionId)).terminate(reason);
} catch (final Exception e) {
log.warn(
"Could not terminate temporal workflow due to the following error; "
+ "this may be because there is currently no running workflow for this connection.",
e);
}
}

static ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) {
final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId);
final ConnectionUpdaterInput input = buildStartWorkflowInput(connectionId);
Expand All @@ -136,30 +156,66 @@ static ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowCl
* @param connectionId the ID of the connection whose workflow should be retrieved
* @return the healthy ConnectionManagerWorkflow
* @throws DeletedWorkflowException if the workflow was deleted, according to the workflow state
* @throws UnreachableWorkflowException if the workflow is unreachable
* @throws UnreachableWorkflowException if the workflow is in an unreachable state
*/
static ConnectionManagerWorkflow getConnectionManagerWorkflow(final WorkflowClient client, final UUID connectionId)
throws DeletedWorkflowException, UnreachableWorkflowException {

final ConnectionManagerWorkflow connectionManagerWorkflow;
final WorkflowState workflowState;
final WorkflowExecutionStatus workflowExecutionStatus;
try {
connectionManagerWorkflow = client.newWorkflowStub(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId));
workflowState = connectionManagerWorkflow.getState();
workflowExecutionStatus = getConnectionManagerWorkflowStatus(client, connectionId);
} catch (final Exception e) {
throw new UnreachableWorkflowException(
String.format("Failed to retrieve ConnectionManagerWorkflow for connection %s due to the following error:", connectionId),
e);
}

if (workflowState.isDeleted()) {
throw new DeletedWorkflowException(String.format(
"The connection manager workflow for connection %s is deleted, so no further operations cannot be performed on it.",
connectionId));
if (WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED.equals(workflowExecutionStatus)) {
if (workflowState.isDeleted()) {
throw new DeletedWorkflowException(String.format(
"The connection manager workflow for connection %s is deleted, so no further operations cannot be performed on it.",
connectionId));
}

// A non-deleted workflow being in a COMPLETED state is unexpected, and should be corrected
throw new UnreachableWorkflowException(
String.format("ConnectionManagerWorkflow for connection %s is unreachable due to having COMPLETED status.", connectionId));
}

if (workflowState.isQuarantined()) {
throw new UnreachableWorkflowException(
String.format("ConnectionManagerWorkflow for connection %s is unreachable due to being in a quarantined state.", connectionId));
}

return connectionManagerWorkflow;
}

static boolean isWorkflowStateRunning(final WorkflowClient client, final UUID connectionId) {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = client.newWorkflowStub(ConnectionManagerWorkflow.class,
getConnectionManagerName(connectionId));
return connectionManagerWorkflow.getState().isRunning();
} catch (final Exception e) {
return false;
}
}

static WorkflowExecutionStatus getConnectionManagerWorkflowStatus(final WorkflowClient workflowClient, final UUID connectionId) {
final DescribeWorkflowExecutionRequest describeWorkflowExecutionRequest = DescribeWorkflowExecutionRequest.newBuilder()
.setExecution(WorkflowExecution.newBuilder().setWorkflowId(getConnectionManagerName(connectionId)).build())
.setNamespace(workflowClient.getOptions().getNamespace())
.build();

final DescribeWorkflowExecutionResponse describeWorkflowExecutionResponse = workflowClient.getWorkflowServiceStubs().blockingStub()
.describeWorkflowExecution(describeWorkflowExecutionRequest);

return describeWorkflowExecutionResponse.getWorkflowExecutionInfo().getStatus();
}

static long getCurrentJobId(final WorkflowClient client, final UUID connectionId) {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = ConnectionManagerUtils.getConnectionManagerWorkflow(client, connectionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.airbyte.workers.temporal.exception.DeletedWorkflowException;
import io.airbyte.workers.temporal.exception.UnreachableWorkflowException;
import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow;
import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
import io.airbyte.workers.temporal.spec.SpecWorkflow;
import io.airbyte.workers.temporal.sync.SyncWorkflow;
import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest;
Expand Down Expand Up @@ -276,7 +275,7 @@ public static class ManualOperationResult {
public ManualOperationResult startNewManualSync(final UUID connectionId) {
log.info("Manual sync request");

if (isWorkflowStateRunning(connectionId)) {
if (ConnectionManagerUtils.isWorkflowStateRunning(client, connectionId)) {
// TODO Bmoric: Error is running
return new ManualOperationResult(
Optional.of("A sync is already running for: " + connectionId),
Expand Down Expand Up @@ -335,7 +334,7 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) {
Optional.of("Didn't manage to cancel a sync for: " + connectionId),
Optional.empty());
}
} while (isWorkflowStateRunning(connectionId));
} while (ConnectionManagerUtils.isWorkflowStateRunning(client, connectionId));

log.info("end of manual cancellation");

Expand Down Expand Up @@ -463,18 +462,4 @@ boolean isWorkflowReachable(final UUID connectionId) {
}
}

/**
* Check if a workflow is reachable and has state {@link WorkflowState#isRunning()}
*/
@VisibleForTesting
boolean isWorkflowStateRunning(final UUID connectionId) {
try {
final ConnectionManagerWorkflow connectionManagerWorkflow = ConnectionManagerUtils.getConnectionManagerWorkflow(client, connectionId);

return connectionManagerWorkflow.getState().isRunning();
} catch (final Exception e) {
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

public class UnreachableWorkflowException extends Exception {

public UnreachableWorkflowException(final String message) {
super(message);
}

public UnreachableWorkflowException(final String message, final Throwable t) {
super(message, t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,14 @@ private void reportFailure(final ConnectionUpdaterInput connectionUpdaterInput,
}
}

private SyncCheckConnectionFailure checkConnections(GenerateInputActivity.GeneratedJobInput jobInputs) {
private SyncCheckConnectionFailure checkConnections(final GenerateInputActivity.GeneratedJobInput jobInputs) {
final JobRunConfig jobRunConfig = jobInputs.getJobRunConfig();
final StandardSyncInput syncInput = jobInputs.getSyncInput();
final JsonNode sourceConfig = syncInput.getSourceConfiguration();
final JsonNode destinationConfig = syncInput.getDestinationConfiguration();
final IntegrationLauncherConfig sourceLauncherConfig = jobInputs.getSourceLauncherConfig();
final IntegrationLauncherConfig destinationLauncherConfig = jobInputs.getDestinationLauncherConfig();
SyncCheckConnectionFailure checkFailure = new SyncCheckConnectionFailure(jobRunConfig);
final SyncCheckConnectionFailure checkFailure = new SyncCheckConnectionFailure(jobRunConfig);

final int attemptCreationVersion =
Workflow.getVersion(CHECK_BEFORE_SYNC_TAG, Workflow.DEFAULT_VERSION, CHECK_BEFORE_SYNC_CURRENT_VERSION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@
import io.airbyte.workers.temporal.scheduling.state.WorkflowState;
import io.airbyte.workers.temporal.spec.SpecWorkflow;
import io.airbyte.workers.temporal.sync.SyncWorkflow;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflow.v1.WorkflowExecutionInfo;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc.WorkflowServiceBlockingStub;
import io.temporal.client.BatchRequest;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowClientOptions;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.workflow.Functions.Proc;
import java.io.IOException;
Expand Down Expand Up @@ -79,19 +85,26 @@ class TemporalClientTest {
.withJobId(String.valueOf(JOB_ID))
.withAttemptId((long) ATTEMPT_ID)
.withDockerImage(IMAGE_NAME1);
private static final String NAMESPACE = "namespace";

private WorkflowClient workflowClient;
private TemporalClient temporalClient;
private Path logPath;
private WorkflowServiceStubs workflowServiceStubs;
private WorkflowServiceBlockingStub workflowServiceBlockingStub;
private Configs configs;

@BeforeEach
void setup() throws IOException {
final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "temporal_client_test");
logPath = workspaceRoot.resolve(String.valueOf(JOB_ID)).resolve(String.valueOf(ATTEMPT_ID)).resolve(LogClientSingleton.LOG_FILENAME);
workflowClient = mock(WorkflowClient.class);
when(workflowClient.getOptions()).thenReturn(WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build());
workflowServiceStubs = mock(WorkflowServiceStubs.class);
when(workflowClient.getWorkflowServiceStubs()).thenReturn(workflowServiceStubs);
workflowServiceBlockingStub = mock(WorkflowServiceBlockingStub.class);
when(workflowServiceStubs.blockingStub()).thenReturn(workflowServiceBlockingStub);
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_RUNNING);
temporalClient = spy(new TemporalClient(workflowClient, workspaceRoot, workflowServiceStubs, configs));
}

Expand Down Expand Up @@ -336,6 +349,7 @@ void testDeleteConnectionOnDeletedWorkflow() {
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isDeleted()).thenReturn(true);
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);

temporalClient.deleteConnection(CONNECTION_ID);

Expand Down Expand Up @@ -393,6 +407,7 @@ void testUpdateConnectionDeletedWorkflow() {
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isDeleted()).thenReturn(true);
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);

temporalClient.update(CONNECTION_ID);

Expand Down Expand Up @@ -490,6 +505,7 @@ void testStartNewManualSyncDeletedWorkflow() {
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isDeleted()).thenReturn(true);
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);

final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID);

Expand Down Expand Up @@ -568,6 +584,7 @@ void testStartNewCancellationDeletedWorkflow() {
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isDeleted()).thenReturn(true);
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);

final ManualOperationResult result = temporalClient.startNewCancellation(CONNECTION_ID);

Expand Down Expand Up @@ -656,6 +673,7 @@ void testResetConnectionDeletedWorkflow() {
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isDeleted()).thenReturn(true);
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);

final ManualOperationResult result = temporalClient.resetConnection(CONNECTION_ID);

Expand All @@ -667,4 +685,90 @@ void testResetConnectionDeletedWorkflow() {

}

@Test
@DisplayName("Test manual operation on quarantined workflow causes a restart")
void testManualOperationOnQuarantinedWorkflow() {
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mWorkflowState = mock(WorkflowState.class);
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isQuarantined()).thenReturn(true);
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);

final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mNewWorkflowState = mock(WorkflowState.class);
when(mNewConnectionManagerWorkflow.getState()).thenReturn(mNewWorkflowState);
when(mNewWorkflowState.isRunning()).thenReturn(false).thenReturn(true);
when(mNewConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID));
when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow);
final BatchRequest mBatchRequest = mock(BatchRequest.class);
when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest);

final WorkflowStub mWorkflowStub = mock(WorkflowStub.class);
when(workflowClient.newUntypedWorkflowStub(anyString())).thenReturn(mWorkflowStub);

final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID);

assertTrue(result.getJobId().isPresent());
assertEquals(JOB_ID, result.getJobId().get());
assertFalse(result.getFailingReason().isPresent());
verify(workflowClient).signalWithStart(mBatchRequest);
verify(mWorkflowStub).terminate(anyString());

// Verify that the submitManualSync signal was passed to the batch request by capturing the
// argument,
// executing the signal, and verifying that the desired signal was executed
final ArgumentCaptor<Proc> batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class);
verify(mBatchRequest).add(batchRequestAddArgCaptor.capture());
final Proc signal = batchRequestAddArgCaptor.getValue();
signal.apply();
verify(mNewConnectionManagerWorkflow).submitManualSync();
}

@Test
@DisplayName("Test manual operation on completed workflow causes a restart")
void testManualOperationOnCompletedWorkflow() {
final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mWorkflowState = mock(WorkflowState.class);
when(mConnectionManagerWorkflow.getState()).thenReturn(mWorkflowState);
when(mWorkflowState.isQuarantined()).thenReturn(false);
when(mWorkflowState.isDeleted()).thenReturn(false);
when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow);
mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_COMPLETED);

final ConnectionManagerWorkflow mNewConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class);
final WorkflowState mNewWorkflowState = mock(WorkflowState.class);
when(mNewConnectionManagerWorkflow.getState()).thenReturn(mNewWorkflowState);
when(mNewWorkflowState.isRunning()).thenReturn(false).thenReturn(true);
when(mNewConnectionManagerWorkflow.getJobInformation()).thenReturn(new JobInformation(JOB_ID, ATTEMPT_ID));
when(workflowClient.newWorkflowStub(any(Class.class), any(WorkflowOptions.class))).thenReturn(mNewConnectionManagerWorkflow);
final BatchRequest mBatchRequest = mock(BatchRequest.class);
when(workflowClient.newSignalWithStartRequest()).thenReturn(mBatchRequest);

final WorkflowStub mWorkflowStub = mock(WorkflowStub.class);
when(workflowClient.newUntypedWorkflowStub(anyString())).thenReturn(mWorkflowStub);

final ManualOperationResult result = temporalClient.startNewManualSync(CONNECTION_ID);

assertTrue(result.getJobId().isPresent());
assertEquals(JOB_ID, result.getJobId().get());
assertFalse(result.getFailingReason().isPresent());
verify(workflowClient).signalWithStart(mBatchRequest);
verify(mWorkflowStub).terminate(anyString());

// Verify that the submitManualSync signal was passed to the batch request by capturing the
// argument,
// executing the signal, and verifying that the desired signal was executed
final ArgumentCaptor<Proc> batchRequestAddArgCaptor = ArgumentCaptor.forClass(Proc.class);
verify(mBatchRequest).add(batchRequestAddArgCaptor.capture());
final Proc signal = batchRequestAddArgCaptor.getValue();
signal.apply();
verify(mNewConnectionManagerWorkflow).submitManualSync();
}

private void mockWorkflowStatus(final WorkflowExecutionStatus status) {
when(workflowServiceBlockingStub.describeWorkflowExecution(any())).thenReturn(
DescribeWorkflowExecutionResponse.newBuilder().setWorkflowExecutionInfo(
WorkflowExecutionInfo.newBuilder().setStatus(status).buildPartial()).build());
}

}

0 comments on commit e7c26d6

Please sign in to comment.