Skip to content

Commit

Permalink
add env var for workflow restart delay and refactor slightly
Browse files Browse the repository at this point in the history
  • Loading branch information
lmossman committed Jun 15, 2022
1 parent c5a9d6b commit 5f8613b
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -508,15 +508,20 @@ public interface Configs {
int getMaxActivityTimeoutSecond();

/**
* Get the duration in second between 2 activity attempts
* Get initial delay in seconds between two activity attempts
*/
int getInitialDelayBetweenActivityAttemptsSeconds();

/**
* Get the maximum interval between retries of an activity
* Get maximum delay in seconds between two activity attempts
*/
int getMaxDelayBetweenActivityAttemptsSeconds();

/**
* Get the delay in seconds between an activity failing and the workflow being restarted
*/
int getWorkflowFailureRestartDelaySeconds();

/**
* Get number of attempts of the non long running activities
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public class EnvConfigs implements Configs {
public static final String ACTIVITY_MAX_TIMEOUT_SECOND = "ACTIVITY_MAX_TIMEOUT_SECOND";
public static final String ACTIVITY_MAX_ATTEMPT = "ACTIVITY_MAX_ATTEMPT";
public static final String ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS";
private static final String ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS";
public static final String ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS";
public static final String WORKFLOW_FAILURE_RESTART_DELAY_SECONDS = "WORKFLOW_FAILURE_RESTART_DELAY_SECONDS";

private static final String SHOULD_RUN_GET_SPEC_WORKFLOWS = "SHOULD_RUN_GET_SPEC_WORKFLOWS";
private static final String SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS = "SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS";
Expand Down Expand Up @@ -853,6 +854,11 @@ public int getMaxDelayBetweenActivityAttemptsSeconds() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS, String.valueOf(10 * 60)));
}

@Override
public int getWorkflowFailureRestartDelaySeconds() {
return Integer.parseInt(getEnvOrDefault(WORKFLOW_FAILURE_RESTART_DELAY_SECONDS, String.valueOf(10 * 60)));
}

@Override
public int getActivityNumberOfAttempt() {
return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "10"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo
public static final RetryOptions RETRY = RetryOptions.newBuilder()
.setMaximumAttempts(configs.getActivityNumberOfAttempt())
.setInitialInterval(Duration.ofSeconds(configs.getInitialDelayBetweenActivityAttemptsSeconds()))
.setMaximumInterval(Duration.ofMinutes(configs.getMaxDelayBetweenActivityAttemptsSeconds()))
.setMaximumInterval(Duration.ofSeconds(configs.getMaxDelayBetweenActivityAttemptsSeconds()))
.build();

public static final String DEFAULT_NAMESPACE = "default";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.scheduling;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.StandardCheckConnectionInput;
Expand Down Expand Up @@ -87,7 +88,7 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync";
private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1;

private static final Duration ACTIVITY_FAILURE_RESTART_DELAY = Duration.ofMinutes(10);
private static final Duration WORKFLOW_FAILURE_RESTART_DELAY = Duration.ofSeconds(new EnvConfigs().getWorkflowFailureRestartDelaySeconds());

private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener());

Expand Down Expand Up @@ -491,27 +492,37 @@ private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(final Function<INP
return mapper.apply(input);
} catch (final Exception e) {
log.error("[ACTIVITY-RETRY-FAILURE] Connection " + connectionId +
" failed to run an activity. Connection manager workflow will be restarted after a short delay.", e);
" failed to run an activity. Connection manager workflow will be restarted after a delay of " +
WORKFLOW_FAILURE_RESTART_DELAY.getSeconds() + " seconds.", e);
// TODO (https://github.com/airbytehq/airbyte/issues/13773) add tracking/notification

final ConnectionUpdaterInput newWorkflowInput = ConnectionManagerUtils.buildStartWorkflowInput(connectionId);
// this ensures that the new workflow will still perform a reset if an activity failed while attempting to reset the connection
if (workflowState.isResetConnection()) {
newWorkflowInput.setResetConnection(true);
newWorkflowInput.setFromJobResetFailure(true);
}

// Wait a short delay before restarting workflow. This is important if, for example, the failing activity was configured to not have retries.
// Without this delay, that activity could cause the workflow to loop extremely quickly, overwhelming temporal.
log.info("Waiting {} before restarting the workflow, to prevent spamming temporal with restarts.", ACTIVITY_FAILURE_RESTART_DELAY.toString());
Workflow.await(ACTIVITY_FAILURE_RESTART_DELAY, () -> workflowState.isRetryFailedActivity());
// Wait a short delay before restarting workflow. This is important if, for example, the failing
// activity was configured to not have retries.
// Without this delay, that activity could cause the workflow to loop extremely quickly,
// overwhelming temporal.
log.info("Waiting {} seconds before restarting the workflow for connection {}, to prevent spamming temporal with restarts.",
WORKFLOW_FAILURE_RESTART_DELAY.getSeconds(),
connectionId);
Workflow.await(WORKFLOW_FAILURE_RESTART_DELAY, () -> workflowState.isRetryFailedActivity());

// Accept a manual signal to retry the failed activity during this window
if (workflowState.isRetryFailedActivity()) {
log.info("Received RetryFailedActivity signal for connection {}. Retrying activity.", connectionId);
workflowState.setRetryFailedActivity(false);
return runMandatoryActivityWithOutput(mapper, input);
}

log.info("Finished wait for connection {}, restarting connection manager workflow",
WORKFLOW_FAILURE_RESTART_DELAY.getSeconds());

final ConnectionUpdaterInput newWorkflowInput = ConnectionManagerUtils.buildStartWorkflowInput(connectionId);
// this ensures that the new workflow will still perform a reset if an activity failed while
// attempting to reset the connection
if (workflowState.isResetConnection()) {
newWorkflowInput.setResetConnection(true);
newWorkflowInput.setFromJobResetFailure(true);
}

Workflow.continueAsNew(newWorkflowInput);

throw new IllegalStateException("This statement should never be reached, as the ConnectionManagerWorkflow for connection "
Expand Down
1 change: 1 addition & 0 deletions charts/airbyte/templates/env-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ data:
ACTIVITY_MAX_ATTEMPT: ""
ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS: ""
ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS: ""
WORKFLOW_FAILURE_RESTART_DELAY_SECONDS: ""
29 changes: 17 additions & 12 deletions charts/airbyte/templates/worker/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -262,20 +262,25 @@ spec:
name: {{ include "common.names.fullname" . }}-env
key: OTEL_COLLECTOR_ENDPOINT
- name: ACTIVITY_MAX_ATTEMPT
valueFrom:
configMapKeyRef:
name: { { include "common.names.fullname" . } }-env
key: ACTIVITY_MAX_ATTEMPT
valueFrom:
configMapKeyRef:
name: { { include "common.names.fullname" . } }-env
key: ACTIVITY_MAX_ATTEMPT
- name: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS
valueFrom:
configMapKeyRef:
name: { { include "common.names.fullname" . } }-env
key: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS
valueFrom:
configMapKeyRef:
name: { { include "common.names.fullname" . } }-env
key: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS
- name: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS
valueFrom:
configMapKeyRef:
name: { { include "common.names.fullname" . } }-env
key: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS
valueFrom:
configMapKeyRef:
name: { { include "common.names.fullname" . } }-env
key: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS
- name: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS
valueFrom:
configMapKeyRef:
name: { { include "common.names.fullname" . } }-env
key: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS
{{- if .Values.worker.extraEnv }}
{{ .Values.worker.extraEnv | toYaml | nindent 8 }}
{{- end }}
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ services:
- ACTIVITY_MAX_ATTEMPT=${ACTIVITY_MAX_ATTEMPT}
- ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS}
- ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS}
- WORKFLOW_FAILURE_RESTART_DELAY_SECONDS=${WORKFLOW_FAILURE_RESTART_DELAY_SECONDS}
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- workspace:${WORKSPACE_ROOT}
Expand Down
29 changes: 17 additions & 12 deletions kube/resources/worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -231,20 +231,25 @@ spec:
name: airbyte-env
key: OTEL_COLLECTOR_ENDPOINT
- name: ACTIVITY_MAX_ATTEMPT
valueFrom:
configMapKeyRef:
name: airbyte-env
key: ACTIVITY_MAX_ATTEMPT
valueFrom:
configMapKeyRef:
name: airbyte-env
key: ACTIVITY_MAX_ATTEMPT
- name: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS
valueFrom:
configMapKeyRef:
name: airbyte-env
key: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS
valueFrom:
configMapKeyRef:
name: airbyte-env
key: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS
- name: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS
valueFrom:
configMapKeyRef:
name: airbyte-env
key: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS
valueFrom:
configMapKeyRef:
name: airbyte-env
key: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS
- name: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS
valueFrom:
configMapKeyRef:
name: airbyte-env
key: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS
ports:
- containerPort: 9000 # for heartbeat server
- containerPort: 9001 # start temporal worker port pool
Expand Down

0 comments on commit 5f8613b

Please sign in to comment.