Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restart workflow after activity failure instead of quarantining #13779

Merged
merged 9 commits into from
Jun 15, 2022
5 changes: 5 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ MAX_SYNC_WORKERS=5
MAX_SPEC_WORKERS=5
MAX_CHECK_WORKERS=5
MAX_DISCOVER_WORKERS=5
# Temporal Activity configuration
ACTIVITY_MAX_ATTEMPT=
ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=
ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=
WORKFLOW_FAILURE_RESTART_DELAY_SECONDS=


### FEATURE FLAGS ###
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,19 @@ public interface Configs {
int getMaxActivityTimeoutSecond();

/**
* Get the duration in second between 2 activity attempts
* Get initial delay in seconds between two activity attempts
*/
int getDelayBetweenActivityAttempts();
int getInitialDelayBetweenActivityAttemptsSeconds();

/**
* Get maximum delay in seconds between two activity attempts
*/
int getMaxDelayBetweenActivityAttemptsSeconds();

/**
* Get the delay in seconds between an activity failing and the workflow being restarted
*/
int getWorkflowFailureRestartDelaySeconds();

/**
* Get number of attempts of the non long running activities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ public class EnvConfigs implements Configs {

public static final String ACTIVITY_MAX_TIMEOUT_SECOND = "ACTIVITY_MAX_TIMEOUT_SECOND";
public static final String ACTIVITY_MAX_ATTEMPT = "ACTIVITY_MAX_ATTEMPT";
public static final String ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS = "ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS";
public static final String ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS";
public static final String ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS";
public static final String WORKFLOW_FAILURE_RESTART_DELAY_SECONDS = "WORKFLOW_FAILURE_RESTART_DELAY_SECONDS";

private static final String SHOULD_RUN_GET_SPEC_WORKFLOWS = "SHOULD_RUN_GET_SPEC_WORKFLOWS";
private static final String SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS = "SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS";
Expand Down Expand Up @@ -843,13 +845,23 @@ public int getMaxActivityTimeoutSecond() {
}

@Override
public int getDelayBetweenActivityAttempts() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "30"));
public int getInitialDelayBetweenActivityAttemptsSeconds() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS, "30"));
}

@Override
public int getMaxDelayBetweenActivityAttemptsSeconds() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS, String.valueOf(10 * 60)));
}

@Override
public int getWorkflowFailureRestartDelaySeconds() {
return Integer.parseInt(getEnvOrDefault(WORKFLOW_FAILURE_RESTART_DELAY_SECONDS, String.valueOf(10 * 60)));
}

@Override
public int getActivityNumberOfAttempt() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "10"));
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5"));
}

// Helpers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ static String getConnectionManagerName(final UUID connectionId) {
return "connection_manager_" + connectionId;
}

static ConnectionUpdaterInput buildStartWorkflowInput(final UUID connectionId) {
public static ConnectionUpdaterInput buildStartWorkflowInput(final UUID connectionId) {
return ConnectionUpdaterInput.builder()
.connectionId(connectionId)
.jobId(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo
private static final Configs configs = new EnvConfigs();
public static final RetryOptions RETRY = RetryOptions.newBuilder()
.setMaximumAttempts(configs.getActivityNumberOfAttempt())
.setInitialInterval(Duration.ofSeconds(configs.getDelayBetweenActivityAttempts()))
.setInitialInterval(Duration.ofSeconds(configs.getInitialDelayBetweenActivityAttemptsSeconds()))
.setMaximumInterval(Duration.ofSeconds(configs.getMaxDelayBetweenActivityAttemptsSeconds()))
Copy link
Contributor Author

@lmossman lmossman Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that the maximum interval between attempt retries will default to 10 minutes, which prevents the exponential backoff from causing attempts to have hours of delay between retries which would slow down automatic system recovery.

These new settings mean that activities will be retried with the following logic:

Attempt 1 fails
(wait 30s)
Attempt 2 fails
(wait 1m)
Attempt 3 fails
(wait 2m)
Attempt 4 fails
(wait 4m)
Attempt 5 fails
(wait 8m)
Attempt 6 fails
(wait 10m)
Attempt 7 fails
(wait 10m)
Attempt 8 fails
(wait 10m)
Attempt 9 fails
(wait 10m)
Attempt 10 fails
(execute workflow restart logic)

Adding up all those above times gives us a total timespan of 55.5 minutes over which an activity will be retried 10 times, before the workflow is restarted.

Open to feedback on if any numbers here should be adjusted (e.g. initial wait time or maximum wait time)

Copy link
Contributor

@davinchia davinchia Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the logic that increases the wait time between failed activity attempts? Is that part of the Temporal retry options?

Copy link
Contributor

@davinchia davinchia Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking we should lower the retries down to 5, which gives us about 15 mins until a workflow is restarted.

55 mins seems like a long one restart cycle loop, since this should only cover transient errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the logic that increases the wait time between failed activity attempts? Is that part of the Temporal retry options?

Yes it is part of the RetryOptions in the Temporal SDK. We define these options here, use those in our SHORT_ACTIVITY_OPTIONS here, and then assign those SHORT_ACTIVITY_OPTIONS to each of the activity definitions in the workflow here.

And as described in the temporal documentation, the backoffCoefficient is what Temporal uses to determine how much to multiple the wait time each time. Default is 2. https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/common/RetryOptions.Builder.html#setBackoffCoefficient(double)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think 5 retries makes sense - I will make that change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have pushed a change to update the default value for this ACTIVITY_MAX_ATTEMPT env var from 10 to 5

.build();

public static final String DEFAULT_NAMESPACE = "default";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.scheduling;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.StandardCheckConnectionInput;
Expand All @@ -17,6 +18,7 @@
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.temporal.ConnectionManagerUtils;
import io.airbyte.workers.temporal.TemporalJobType;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity.CheckConnectionInput;
Expand Down Expand Up @@ -86,6 +88,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync";
private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1;

static final Duration WORKFLOW_FAILURE_RESTART_DELAY = Duration.ofSeconds(new EnvConfigs().getWorkflowFailureRestartDelaySeconds());

private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener());

private final WorkflowInternalState workflowInternalState = new WorkflowInternalState();
Expand All @@ -101,7 +105,7 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private final AutoDisableConnectionActivity autoDisableConnectionActivity =
Workflow.newActivityStub(AutoDisableConnectionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS);
private final CheckConnectionActivity checkActivity =
Workflow.newActivityStub(CheckConnectionActivity.class, ActivityConfiguration.CHECK_ACTIVITY_OPTIONS);
Workflow.newActivityStub(CheckConnectionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS);
Copy link
Contributor Author

@lmossman lmossman Jun 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CheckConnection activity that is launched before a sync workflow was currently using a different set of activity options than the other activities launched by the connection manager workflow. Specifically, it was configured to not have any retry logic, since it was reusing the options from the "check connection" that is launched by the UI.

This change just makes the options used for this activity consistent with the other activities in this workflow, so that they all have the same retry behavior. Without this change, a failed "check connection" results in immediately quarantining/restarting the workflow without any retries, which is not desired.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! to clarify my understanding:

  • If the /activity/ fails, it will be retried, up to 10 times.
  • If the /check/ fails (and the activity to run that check succeeded), it will not be retried and the sync will be stopped without retry as it is today

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct


private CancellationScope cancellableSyncWorkflow;

Expand Down Expand Up @@ -151,11 +155,6 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
return Workflow.newCancellationScope(() -> {
connectionId = connectionUpdaterInput.getConnectionId();

// Clean the job state by failing any jobs for this connection that are currently non-terminal.
// This catches cases where the temporal workflow was terminated and restarted while a job was
// actively running, leaving that job in an orphaned and non-terminal state.
ensureCleanJobState(connectionUpdaterInput);

// workflow state is only ever set in test cases. for production cases, it will always be null.
if (connectionUpdaterInput.getWorkflowState() != null) {
workflowState = connectionUpdaterInput.getWorkflowState();
Expand All @@ -171,6 +170,11 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn
workflowState.setResetWithScheduling(true);
}

// Clean the job state by failing any jobs for this connection that are currently non-terminal.
// This catches cases where the temporal workflow was terminated and restarted while a job was
// actively running, leaving that job in an orphaned and non-terminal state.
ensureCleanJobState(connectionUpdaterInput);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to move this down below the statements that transfer values from the connectionUpdaterInput into the workflowState, so that if the ensureCleanJobState activity fails, the resetConnection flag is still set as desired on the workflowState, as that is used in the workflow retrying logic below.


final Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId());

Workflow.await(timeToWait,
Expand Down Expand Up @@ -476,23 +480,52 @@ private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput conn

/**
* This is running a lambda function that takes {@param input} as an input. If the run of the lambda
* is thowing an exception, the workflow will be in a quarantined state and can then be manual
* un-quarantined or a retry of the failed lambda can be trigger through a signal method.
* throws an exception, the workflow will retried after a short delay.
*
* Note that if the lambda activity is configured to have retries, the exception will only be caught
* after the activity has been retried the maximum number of times.
*
* We aimed to use this method for call of the temporal activity.
* This method is meant to be used for calling temporal activities.
*/
private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(final Function<INPUT, OUTPUT> mapper, final INPUT input) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to this method are the main changes of this PR

try {
return mapper.apply(input);
} catch (final Exception e) {
log.error("Failed to run an activity for the connection " + connectionId, e);
workflowState.setQuarantined(true);
workflowState.setRetryFailedActivity(false);
Workflow.await(() -> workflowState.isRetryFailedActivity());
log.error("Retrying an activity for the connection " + connectionId, e);
workflowState.setQuarantined(false);
workflowState.setRetryFailedActivity(false);
return runMandatoryActivityWithOutput(mapper, input);
log.error("[ACTIVITY-FAILURE] Connection " + connectionId +
" failed to run an activity. Connection manager workflow will be restarted after a delay of " +
WORKFLOW_FAILURE_RESTART_DELAY.getSeconds() + " seconds.", e);
// TODO (https://github.com/airbytehq/airbyte/issues/13773) add tracking/notification

// Wait a short delay before restarting workflow. This is important if, for example, the failing
// activity was configured to not have retries.
// Without this delay, that activity could cause the workflow to loop extremely quickly,
// overwhelming temporal.
log.info("Waiting {} seconds before restarting the workflow for connection {}, to prevent spamming temporal with restarts.",
WORKFLOW_FAILURE_RESTART_DELAY.getSeconds(),
connectionId);
Workflow.await(WORKFLOW_FAILURE_RESTART_DELAY, () -> workflowState.isRetryFailedActivity());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to keep this? given the delay I think it will be complicated to reach this state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel too strongly about this - I thought it may be useful in debugging situations if someone wants to force the activity to be retried instead of waiting for the workflow to restart.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it complicated to reach this state?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was about the long retries that were taking potentially more than 5 hours, The extra 10 minutes sound not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that is not as applicable now, because we have adjusted the retries to take ~15 mins max now. This 10 minute wait is more of a failsafe in case someone adds an activity in the future that does not have any retry configuration - in that case, this wait will help protect us from spamming temporal with restarts of the connection very quickly


// Accept a manual signal to retry the failed activity during this window
if (workflowState.isRetryFailedActivity()) {
log.info("Received RetryFailedActivity signal for connection {}. Retrying activity.", connectionId);
workflowState.setRetryFailedActivity(false);
return runMandatoryActivityWithOutput(mapper, input);
}

log.info("Finished wait for connection {}, restarting connection manager workflow", connectionId);

final ConnectionUpdaterInput newWorkflowInput = ConnectionManagerUtils.buildStartWorkflowInput(connectionId);
// this ensures that the new workflow will still perform a reset if an activity failed while
// attempting to reset the connection
if (workflowState.isResetConnection()) {
newWorkflowInput.setResetConnection(true);
newWorkflowInput.setFromJobResetFailure(true);
}

Workflow.continueAsNew(newWorkflowInput);

throw new IllegalStateException("This statement should never be reached, as the ConnectionManagerWorkflow for connection "
+ connectionId + " was continued as new.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,6 @@ private void failNonTerminalJobs(final UUID connectionId) {
io.airbyte.scheduler.models.JobStatus.NON_TERMINAL_STATUSES);
for (final Job job : jobs) {
final long jobId = job.getId();
log.info("Failing non-terminal job {}", jobId);
jobPersistence.failJob(jobId);

// fail all non-terminal attempts
for (final Attempt attempt : job.getAttempts()) {
Expand All @@ -296,11 +294,15 @@ private void failNonTerminalJobs(final UUID connectionId) {

// the Attempt object 'id' is actually the value of the attempt_number column in the db
final int attemptNumber = (int) attempt.getId();
log.info("Failing non-terminal attempt {} for non-terminal job {}", attemptNumber, jobId);
jobPersistence.failAttempt(jobId, attemptNumber);
jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber,
FailureHelper.failureSummaryForTemporalCleaningJobState(jobId, attemptNumber));
}

log.info("Failing non-terminal job {}", jobId);
jobPersistence.failJob(jobId);

final Job failedJob = jobPersistence.getJob(jobId);
jobNotifier.failJob("Failing job in order to start from clean job state for new temporal workflow run.", failedJob);
trackCompletion(failedJob, JobStatus.FAILED);
Expand Down
Loading