-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restart workflow after activity failure instead of quarantining #13779
Changes from all commits
4aa361b
7bbc7b3
c5a9d6b
5f8613b
802fda9
1dbfece
1af4f76
a08a465
be706f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ | |
package io.airbyte.workers.temporal.scheduling; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.config.EnvConfigs; | ||
import io.airbyte.config.FailureReason; | ||
import io.airbyte.config.FailureReason.FailureType; | ||
import io.airbyte.config.StandardCheckConnectionInput; | ||
|
@@ -17,6 +18,7 @@ | |
import io.airbyte.scheduler.models.IntegrationLauncherConfig; | ||
import io.airbyte.scheduler.models.JobRunConfig; | ||
import io.airbyte.workers.helper.FailureHelper; | ||
import io.airbyte.workers.temporal.ConnectionManagerUtils; | ||
import io.airbyte.workers.temporal.TemporalJobType; | ||
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity; | ||
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity.CheckConnectionInput; | ||
|
@@ -86,6 +88,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow | |
private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync"; | ||
private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1; | ||
|
||
static final Duration WORKFLOW_FAILURE_RESTART_DELAY = Duration.ofSeconds(new EnvConfigs().getWorkflowFailureRestartDelaySeconds()); | ||
|
||
private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener()); | ||
|
||
private final WorkflowInternalState workflowInternalState = new WorkflowInternalState(); | ||
|
@@ -101,7 +105,7 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow | |
private final AutoDisableConnectionActivity autoDisableConnectionActivity = | ||
Workflow.newActivityStub(AutoDisableConnectionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); | ||
private final CheckConnectionActivity checkActivity = | ||
Workflow.newActivityStub(CheckConnectionActivity.class, ActivityConfiguration.CHECK_ACTIVITY_OPTIONS); | ||
Workflow.newActivityStub(CheckConnectionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The CheckConnection activity that is launched before a sync workflow was currently using a different set of activity options than the other activities launched by the connection manager workflow. Specifically, it was configured to not have any retry logic, since it was reusing the options from the "check connection" that is launched by the UI. This change just makes the options used for this activity consistent with the other activities in this workflow, so that they all have the same retry behavior. Without this change, a failed "check connection" results in immediately quarantining/restarting the workflow without any retries, which is not desired. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI @evantahler There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! to clarify my understanding:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is correct |
||
|
||
private CancellationScope cancellableSyncWorkflow; | ||
|
||
|
@@ -151,11 +155,6 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn | |
return Workflow.newCancellationScope(() -> { | ||
connectionId = connectionUpdaterInput.getConnectionId(); | ||
|
||
// Clean the job state by failing any jobs for this connection that are currently non-terminal. | ||
// This catches cases where the temporal workflow was terminated and restarted while a job was | ||
// actively running, leaving that job in an orphaned and non-terminal state. | ||
ensureCleanJobState(connectionUpdaterInput); | ||
|
||
// workflow state is only ever set in test cases. for production cases, it will always be null. | ||
if (connectionUpdaterInput.getWorkflowState() != null) { | ||
workflowState = connectionUpdaterInput.getWorkflowState(); | ||
|
@@ -171,6 +170,11 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn | |
workflowState.setResetWithScheduling(true); | ||
} | ||
|
||
// Clean the job state by failing any jobs for this connection that are currently non-terminal. | ||
// This catches cases where the temporal workflow was terminated and restarted while a job was | ||
// actively running, leaving that job in an orphaned and non-terminal state. | ||
ensureCleanJobState(connectionUpdaterInput); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Had to move this down below the statements that transfer values from the |
||
|
||
final Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId()); | ||
|
||
Workflow.await(timeToWait, | ||
|
@@ -476,23 +480,52 @@ private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput conn | |
|
||
/** | ||
* This is running a lambda function that takes {@param input} as an input. If the run of the lambda | ||
* is thowing an exception, the workflow will be in a quarantined state and can then be manual | ||
* un-quarantined or a retry of the failed lambda can be trigger through a signal method. | ||
* throws an exception, the workflow will retried after a short delay. | ||
* | ||
* Note that if the lambda activity is configured to have retries, the exception will only be caught | ||
* after the activity has been retried the maximum number of times. | ||
* | ||
* We aimed to use this method for call of the temporal activity. | ||
* This method is meant to be used for calling temporal activities. | ||
*/ | ||
private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(final Function<INPUT, OUTPUT> mapper, final INPUT input) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The changes to this method are the main changes of this PR |
||
try { | ||
return mapper.apply(input); | ||
} catch (final Exception e) { | ||
log.error("Failed to run an activity for the connection " + connectionId, e); | ||
workflowState.setQuarantined(true); | ||
workflowState.setRetryFailedActivity(false); | ||
Workflow.await(() -> workflowState.isRetryFailedActivity()); | ||
log.error("Retrying an activity for the connection " + connectionId, e); | ||
workflowState.setQuarantined(false); | ||
workflowState.setRetryFailedActivity(false); | ||
return runMandatoryActivityWithOutput(mapper, input); | ||
log.error("[ACTIVITY-FAILURE] Connection " + connectionId + | ||
" failed to run an activity. Connection manager workflow will be restarted after a delay of " + | ||
WORKFLOW_FAILURE_RESTART_DELAY.getSeconds() + " seconds.", e); | ||
// TODO (https://github.com/airbytehq/airbyte/issues/13773) add tracking/notification | ||
|
||
// Wait a short delay before restarting workflow. This is important if, for example, the failing | ||
// activity was configured to not have retries. | ||
// Without this delay, that activity could cause the workflow to loop extremely quickly, | ||
// overwhelming temporal. | ||
log.info("Waiting {} seconds before restarting the workflow for connection {}, to prevent spamming temporal with restarts.", | ||
WORKFLOW_FAILURE_RESTART_DELAY.getSeconds(), | ||
connectionId); | ||
Workflow.await(WORKFLOW_FAILURE_RESTART_DELAY, () -> workflowState.isRetryFailedActivity()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to keep this? given the delay I think it will be complicated to reach this state There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't feel too strongly about this - I thought it may be useful in debugging situations if someone wants to force the activity to be retried instead of waiting for the workflow to restart. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is it complicated to reach this state? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was about the long retries that were taking potentially more than 5 hours, The extra 10 minutes sound not needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I think that is not as applicable now, because we have adjusted the retries to take ~15 mins max now. This 10 minute wait is more of a failsafe in case someone adds an activity in the future that does not have any retry configuration - in that case, this wait will help protect us from spamming temporal with restarts of the connection very quickly |
||
|
||
// Accept a manual signal to retry the failed activity during this window | ||
if (workflowState.isRetryFailedActivity()) { | ||
log.info("Received RetryFailedActivity signal for connection {}. Retrying activity.", connectionId); | ||
workflowState.setRetryFailedActivity(false); | ||
return runMandatoryActivityWithOutput(mapper, input); | ||
} | ||
|
||
log.info("Finished wait for connection {}, restarting connection manager workflow", connectionId); | ||
|
||
final ConnectionUpdaterInput newWorkflowInput = ConnectionManagerUtils.buildStartWorkflowInput(connectionId); | ||
// this ensures that the new workflow will still perform a reset if an activity failed while | ||
// attempting to reset the connection | ||
if (workflowState.isResetConnection()) { | ||
newWorkflowInput.setResetConnection(true); | ||
newWorkflowInput.setFromJobResetFailure(true); | ||
} | ||
|
||
Workflow.continueAsNew(newWorkflowInput); | ||
|
||
throw new IllegalStateException("This statement should never be reached, as the ConnectionManagerWorkflow for connection " | ||
+ connectionId + " was continued as new."); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means that the maximum interval between attempt retries will default to 10 minutes, which prevents the exponential backoff from causing attempts to have hours of delay between retries which would slow down automatic system recovery.
These new settings mean that activities will be retried with the following logic:
Adding up all those above times gives us a total timespan of 55.5 minutes over which an activity will be retried 10 times, before the workflow is restarted.
Open to feedback on if any numbers here should be adjusted (e.g. initial wait time or maximum wait time)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the logic that increases the wait time between failed activity attempts? Is that part of the Temporal retry options?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking we should lower the retries down to 5, which gives us about 15 mins until a workflow is restarted.
55 mins seems like a long one restart cycle loop, since this should only cover transient errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is part of the RetryOptions in the Temporal SDK. We define these options here, use those in our SHORT_ACTIVITY_OPTIONS here, and then assign those SHORT_ACTIVITY_OPTIONS to each of the activity definitions in the workflow here.
And as described in the temporal documentation, the
backoffCoefficient
is what Temporal uses to determine how much to multiple the wait time each time. Default is 2. https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/common/RetryOptions.Builder.html#setBackoffCoefficient(double)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 5 retries makes sense - I will make that change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have pushed a change to update the default value for this
ACTIVITY_MAX_ATTEMPT
env var from 10 to 5