From 4aa361b4092d98614784722e4ac48eb4f05b6092 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 14 Jun 2022 13:03:33 -0700 Subject: [PATCH 1/9] use SHORT_ACTIVITY_OPTIONS on check connection activity so that it has retries --- .../temporal/scheduling/ConnectionManagerWorkflowImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 09ff356c9c2a..97581746d0d2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -101,7 +101,7 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow private final AutoDisableConnectionActivity autoDisableConnectionActivity = Workflow.newActivityStub(AutoDisableConnectionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); private final CheckConnectionActivity checkActivity = - Workflow.newActivityStub(CheckConnectionActivity.class, ActivityConfiguration.CHECK_ACTIVITY_OPTIONS); + Workflow.newActivityStub(CheckConnectionActivity.class, ActivityConfiguration.SHORT_ACTIVITY_OPTIONS); private CancellationScope cancellableSyncWorkflow; From 7bbc7b3335405622ab895261df7dc10a902acb37 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 14 Jun 2022 16:36:42 -0700 Subject: [PATCH 2/9] retry workflow after delay instead of quarantining --- .../main/java/io/airbyte/config/Configs.java | 7 ++- .../java/io/airbyte/config/EnvConfigs.java | 12 +++- .../temporal/ConnectionManagerUtils.java | 2 +- .../workers/temporal/TemporalUtils.java | 3 +- .../ConnectionManagerWorkflowImpl.java | 55 +++++++++++++------ 5 files changed, 57 insertions(+), 22 deletions(-) diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index f1dfda304d72..74cd4461b4cf 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -510,7 +510,12 @@ public interface Configs { /** * Get the duration in second between 2 activity attempts */ - int getDelayBetweenActivityAttempts(); + int getInitialDelayBetweenActivityAttemptsSeconds(); + + /** + * Get the maximum interval between retries of an activity + */ + int getMaxDelayBetweenActivityAttemptsSeconds(); /** * Get number of attempts of the non long running activities diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index 4152df6bbdee..63fb50d8f0de 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -111,7 +111,8 @@ public class EnvConfigs implements Configs { public static final String ACTIVITY_MAX_TIMEOUT_SECOND = "ACTIVITY_MAX_TIMEOUT_SECOND"; public static final String ACTIVITY_MAX_ATTEMPT = "ACTIVITY_MAX_ATTEMPT"; - public static final String ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS = "ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS"; + public static final String ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS"; + private static final String ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS"; private static final String SHOULD_RUN_GET_SPEC_WORKFLOWS = "SHOULD_RUN_GET_SPEC_WORKFLOWS"; private static final String SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS = "SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS"; @@ -843,8 +844,13 @@ public int getMaxActivityTimeoutSecond() { } @Override - public int getDelayBetweenActivityAttempts() { - return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_TIMEOUT_SECOND, "30")); + public int getInitialDelayBetweenActivityAttemptsSeconds() { + return Integer.parseInt(getEnvOrDefault(ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS, "30")); + } + + @Override + public int getMaxDelayBetweenActivityAttemptsSeconds() { + return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS, String.valueOf(10 * 60))); } @Override diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java index df8d8662583c..ee4d9bf7d38f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java @@ -234,7 +234,7 @@ static String getConnectionManagerName(final UUID connectionId) { return "connection_manager_" + connectionId; } - static ConnectionUpdaterInput buildStartWorkflowInput(final UUID connectionId) { + public static ConnectionUpdaterInput buildStartWorkflowInput(final UUID connectionId) { return ConnectionUpdaterInput.builder() .connectionId(connectionId) .jobId(null) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java index fcae590544d0..bf6849096d23 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java @@ -66,7 +66,8 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo private static final Configs configs = new EnvConfigs(); public static final RetryOptions RETRY = RetryOptions.newBuilder() .setMaximumAttempts(configs.getActivityNumberOfAttempt()) - .setInitialInterval(Duration.ofSeconds(configs.getDelayBetweenActivityAttempts())) + .setInitialInterval(Duration.ofSeconds(configs.getInitialDelayBetweenActivityAttemptsSeconds())) + .setMaximumInterval(Duration.ofMinutes(configs.getMaxDelayBetweenActivityAttemptsSeconds())) .build(); public static final String DEFAULT_NAMESPACE = "default"; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 97581746d0d2..3b9daeff1b3a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -17,6 +17,7 @@ import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.helper.FailureHelper; +import io.airbyte.workers.temporal.ConnectionManagerUtils; import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity.CheckConnectionInput; @@ -86,6 +87,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync"; private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1; + private static final Duration ACTIVITY_FAILURE_RESTART_DELAY = Duration.ofMinutes(10); + private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener()); private final WorkflowInternalState workflowInternalState = new WorkflowInternalState(); @@ -151,11 +154,6 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn return Workflow.newCancellationScope(() -> { connectionId = connectionUpdaterInput.getConnectionId(); - // Clean the job state by failing any jobs for this connection that are currently non-terminal. - // This catches cases where the temporal workflow was terminated and restarted while a job was - // actively running, leaving that job in an orphaned and non-terminal state. - ensureCleanJobState(connectionUpdaterInput); - // workflow state is only ever set in test cases. for production cases, it will always be null. if (connectionUpdaterInput.getWorkflowState() != null) { workflowState = connectionUpdaterInput.getWorkflowState(); @@ -171,6 +169,11 @@ private CancellationScope generateSyncWorkflowRunnable(final ConnectionUpdaterIn workflowState.setResetWithScheduling(true); } + // Clean the job state by failing any jobs for this connection that are currently non-terminal. + // This catches cases where the temporal workflow was terminated and restarted while a job was + // actively running, leaving that job in an orphaned and non-terminal state. + ensureCleanJobState(connectionUpdaterInput); + final Duration timeToWait = getTimeToWait(connectionUpdaterInput.getConnectionId()); Workflow.await(timeToWait, @@ -476,23 +479,43 @@ private void prepareForNextRunAndContinueAsNew(final ConnectionUpdaterInput conn /** * This is running a lambda function that takes {@param input} as an input. If the run of the lambda - * is thowing an exception, the workflow will be in a quarantined state and can then be manual - * un-quarantined or a retry of the failed lambda can be trigger through a signal method. + * throws an exception, the workflow will retried after a short delay. + * + * Note that if the lambda activity is configured to have retries, the exception will only be caught + * after the activity has been retried the maximum number of times. * - * We aimed to use this method for call of the temporal activity. + * This method is meant to be used for calling temporal activities. */ private OUTPUT runMandatoryActivityWithOutput(final Function mapper, final INPUT input) { try { return mapper.apply(input); } catch (final Exception e) { - log.error("Failed to run an activity for the connection " + connectionId, e); - workflowState.setQuarantined(true); - workflowState.setRetryFailedActivity(false); - Workflow.await(() -> workflowState.isRetryFailedActivity()); - log.error("Retrying an activity for the connection " + connectionId, e); - workflowState.setQuarantined(false); - workflowState.setRetryFailedActivity(false); - return runMandatoryActivityWithOutput(mapper, input); + log.error("[ACTIVITY-RETRY-FAILURE] Connection " + connectionId + + " failed to run an activity. Connection manager workflow will be restarted after a short delay.", e); + // TODO (https://github.com/airbytehq/airbyte/issues/13773) add tracking/notification + + final ConnectionUpdaterInput newWorkflowInput = ConnectionManagerUtils.buildStartWorkflowInput(connectionId); + // this ensures that the new workflow will still perform a reset if an activity failed while attempting to reset the connection + if (workflowState.isResetConnection()) { + newWorkflowInput.setResetConnection(true); + newWorkflowInput.setFromJobResetFailure(true); + } + + // Wait a short delay before restarting workflow. This is important if, for example, the failing activity was configured to not have retries. + // Without this delay, that activity could cause the workflow to loop extremely quickly, overwhelming temporal. + log.info("Waiting {} before restarting the workflow, to prevent spamming temporal with restarts.", ACTIVITY_FAILURE_RESTART_DELAY.toString()); + Workflow.await(ACTIVITY_FAILURE_RESTART_DELAY, () -> workflowState.isRetryFailedActivity()); + + // Accept a manual signal to retry the failed activity during this window + if (workflowState.isRetryFailedActivity()) { + workflowState.setRetryFailedActivity(false); + return runMandatoryActivityWithOutput(mapper, input); + } + + Workflow.continueAsNew(newWorkflowInput); + + throw new IllegalStateException("This statement should never be reached, as the ConnectionManagerWorkflow for connection " + + connectionId + " was continued as new."); } } From c5a9d6b7fa590499eb45b7e864fee293d1651da5 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 14 Jun 2022 17:22:32 -0700 Subject: [PATCH 3/9] allow activity env vars to be configured in docker and kube --- charts/airbyte/templates/env-configmap.yaml | 3 +++ charts/airbyte/templates/worker/deployment.yaml | 15 +++++++++++++++ docker-compose.yaml | 3 +++ kube/resources/worker.yaml | 15 +++++++++++++++ 4 files changed, 36 insertions(+) diff --git a/charts/airbyte/templates/env-configmap.yaml b/charts/airbyte/templates/env-configmap.yaml index 384c2f1ba2da..3d3ef3e9dcb9 100644 --- a/charts/airbyte/templates/env-configmap.yaml +++ b/charts/airbyte/templates/env-configmap.yaml @@ -55,3 +55,6 @@ data: WORKSPACE_ROOT: /workspace METRIC_CLIENT: "" OTEL_COLLECTOR_ENDPOINT: "" + ACTIVITY_MAX_ATTEMPT: "" + ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS: "" + ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS: "" diff --git a/charts/airbyte/templates/worker/deployment.yaml b/charts/airbyte/templates/worker/deployment.yaml index 86d6d1726a5f..e0cbe5c063be 100644 --- a/charts/airbyte/templates/worker/deployment.yaml +++ b/charts/airbyte/templates/worker/deployment.yaml @@ -261,6 +261,21 @@ spec: configMapKeyRef: name: {{ include "common.names.fullname" . }}-env key: OTEL_COLLECTOR_ENDPOINT + - name: ACTIVITY_MAX_ATTEMPT + valueFrom: + configMapKeyRef: + name: { { include "common.names.fullname" . } }-env + key: ACTIVITY_MAX_ATTEMPT + - name: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS + valueFrom: + configMapKeyRef: + name: { { include "common.names.fullname" . } }-env + key: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS + - name: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS + valueFrom: + configMapKeyRef: + name: { { include "common.names.fullname" . } }-env + key: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS {{- if .Values.worker.extraEnv }} {{ .Values.worker.extraEnv | toYaml | nindent 8 }} {{- end }} diff --git a/docker-compose.yaml b/docker-compose.yaml index cd5f24849c2e..9fc2a092799f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -88,6 +88,9 @@ services: - WORKSPACE_ROOT=${WORKSPACE_ROOT} - METRIC_CLIENT=${METRIC_CLIENT} - OTEL_COLLECTOR_ENDPOINT=${OTEL_COLLECTOR_ENDPOINT} + - ACTIVITY_MAX_ATTEMPT=${ACTIVITY_MAX_ATTEMPT} + - ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS} + - ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS} volumes: - /var/run/docker.sock:/var/run/docker.sock - workspace:${WORKSPACE_ROOT} diff --git a/kube/resources/worker.yaml b/kube/resources/worker.yaml index 3591812f147f..7240177cc14b 100644 --- a/kube/resources/worker.yaml +++ b/kube/resources/worker.yaml @@ -230,6 +230,21 @@ spec: configMapKeyRef: name: airbyte-env key: OTEL_COLLECTOR_ENDPOINT + - name: ACTIVITY_MAX_ATTEMPT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: ACTIVITY_MAX_ATTEMPT + - name: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS + - name: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS ports: - containerPort: 9000 # for heartbeat server - containerPort: 9001 # start temporal worker port pool From 5f8613b44d0a3d27a3e0e475860574beeeedf45a Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 14 Jun 2022 17:47:20 -0700 Subject: [PATCH 4/9] add env var for workflow restart delay and refactor slightly --- .../main/java/io/airbyte/config/Configs.java | 9 ++++- .../java/io/airbyte/config/EnvConfigs.java | 8 +++- .../workers/temporal/TemporalUtils.java | 2 +- .../ConnectionManagerWorkflowImpl.java | 37 ++++++++++++------- charts/airbyte/templates/env-configmap.yaml | 1 + .../airbyte/templates/worker/deployment.yaml | 29 +++++++++------ docker-compose.yaml | 1 + kube/resources/worker.yaml | 29 +++++++++------ 8 files changed, 75 insertions(+), 41 deletions(-) diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index 74cd4461b4cf..8381f15262da 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -508,15 +508,20 @@ public interface Configs { int getMaxActivityTimeoutSecond(); /** - * Get the duration in second between 2 activity attempts + * Get initial delay in seconds between two activity attempts */ int getInitialDelayBetweenActivityAttemptsSeconds(); /** - * Get the maximum interval between retries of an activity + * Get maximum delay in seconds between two activity attempts */ int getMaxDelayBetweenActivityAttemptsSeconds(); + /** + * Get the delay in seconds between an activity failing and the workflow being restarted + */ + int getWorkflowFailureRestartDelaySeconds(); + /** * Get number of attempts of the non long running activities */ diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index 63fb50d8f0de..48de628c4dc2 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -112,7 +112,8 @@ public class EnvConfigs implements Configs { public static final String ACTIVITY_MAX_TIMEOUT_SECOND = "ACTIVITY_MAX_TIMEOUT_SECOND"; public static final String ACTIVITY_MAX_ATTEMPT = "ACTIVITY_MAX_ATTEMPT"; public static final String ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS"; - private static final String ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS"; + public static final String ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS = "ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS"; + public static final String WORKFLOW_FAILURE_RESTART_DELAY_SECONDS = "WORKFLOW_FAILURE_RESTART_DELAY_SECONDS"; private static final String SHOULD_RUN_GET_SPEC_WORKFLOWS = "SHOULD_RUN_GET_SPEC_WORKFLOWS"; private static final String SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS = "SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS"; @@ -853,6 +854,11 @@ public int getMaxDelayBetweenActivityAttemptsSeconds() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS, String.valueOf(10 * 60))); } + @Override + public int getWorkflowFailureRestartDelaySeconds() { + return Integer.parseInt(getEnvOrDefault(WORKFLOW_FAILURE_RESTART_DELAY_SECONDS, String.valueOf(10 * 60))); + } + @Override public int getActivityNumberOfAttempt() { return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "10")); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java index bf6849096d23..33e1dcba1dc1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java @@ -67,7 +67,7 @@ public static WorkflowServiceStubs createTemporalService(final String temporalHo public static final RetryOptions RETRY = RetryOptions.newBuilder() .setMaximumAttempts(configs.getActivityNumberOfAttempt()) .setInitialInterval(Duration.ofSeconds(configs.getInitialDelayBetweenActivityAttemptsSeconds())) - .setMaximumInterval(Duration.ofMinutes(configs.getMaxDelayBetweenActivityAttemptsSeconds())) + .setMaximumInterval(Duration.ofSeconds(configs.getMaxDelayBetweenActivityAttemptsSeconds())) .build(); public static final String DEFAULT_NAMESPACE = "default"; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 3b9daeff1b3a..bd424d3712a2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.scheduling; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.config.EnvConfigs; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureType; import io.airbyte.config.StandardCheckConnectionInput; @@ -87,7 +88,7 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync"; private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1; - private static final Duration ACTIVITY_FAILURE_RESTART_DELAY = Duration.ofMinutes(10); + private static final Duration WORKFLOW_FAILURE_RESTART_DELAY = Duration.ofSeconds(new EnvConfigs().getWorkflowFailureRestartDelaySeconds()); private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener()); @@ -491,27 +492,37 @@ private OUTPUT runMandatoryActivityWithOutput(final Function workflowState.isRetryFailedActivity()); + // Wait a short delay before restarting workflow. This is important if, for example, the failing + // activity was configured to not have retries. + // Without this delay, that activity could cause the workflow to loop extremely quickly, + // overwhelming temporal. + log.info("Waiting {} seconds before restarting the workflow for connection {}, to prevent spamming temporal with restarts.", + WORKFLOW_FAILURE_RESTART_DELAY.getSeconds(), + connectionId); + Workflow.await(WORKFLOW_FAILURE_RESTART_DELAY, () -> workflowState.isRetryFailedActivity()); // Accept a manual signal to retry the failed activity during this window if (workflowState.isRetryFailedActivity()) { + log.info("Received RetryFailedActivity signal for connection {}. Retrying activity.", connectionId); workflowState.setRetryFailedActivity(false); return runMandatoryActivityWithOutput(mapper, input); } + log.info("Finished wait for connection {}, restarting connection manager workflow", + WORKFLOW_FAILURE_RESTART_DELAY.getSeconds()); + + final ConnectionUpdaterInput newWorkflowInput = ConnectionManagerUtils.buildStartWorkflowInput(connectionId); + // this ensures that the new workflow will still perform a reset if an activity failed while + // attempting to reset the connection + if (workflowState.isResetConnection()) { + newWorkflowInput.setResetConnection(true); + newWorkflowInput.setFromJobResetFailure(true); + } + Workflow.continueAsNew(newWorkflowInput); throw new IllegalStateException("This statement should never be reached, as the ConnectionManagerWorkflow for connection " diff --git a/charts/airbyte/templates/env-configmap.yaml b/charts/airbyte/templates/env-configmap.yaml index 3d3ef3e9dcb9..d626cf2e93c4 100644 --- a/charts/airbyte/templates/env-configmap.yaml +++ b/charts/airbyte/templates/env-configmap.yaml @@ -58,3 +58,4 @@ data: ACTIVITY_MAX_ATTEMPT: "" ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS: "" ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS: "" + WORKFLOW_FAILURE_RESTART_DELAY_SECONDS: "" diff --git a/charts/airbyte/templates/worker/deployment.yaml b/charts/airbyte/templates/worker/deployment.yaml index e0cbe5c063be..f505592f2ca1 100644 --- a/charts/airbyte/templates/worker/deployment.yaml +++ b/charts/airbyte/templates/worker/deployment.yaml @@ -262,20 +262,25 @@ spec: name: {{ include "common.names.fullname" . }}-env key: OTEL_COLLECTOR_ENDPOINT - name: ACTIVITY_MAX_ATTEMPT - valueFrom: - configMapKeyRef: - name: { { include "common.names.fullname" . } }-env - key: ACTIVITY_MAX_ATTEMPT + valueFrom: + configMapKeyRef: + name: { { include "common.names.fullname" . } }-env + key: ACTIVITY_MAX_ATTEMPT - name: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS - valueFrom: - configMapKeyRef: - name: { { include "common.names.fullname" . } }-env - key: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS + valueFrom: + configMapKeyRef: + name: { { include "common.names.fullname" . } }-env + key: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS - name: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS - valueFrom: - configMapKeyRef: - name: { { include "common.names.fullname" . } }-env - key: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS + valueFrom: + configMapKeyRef: + name: { { include "common.names.fullname" . } }-env + key: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS + - name: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS + valueFrom: + configMapKeyRef: + name: { { include "common.names.fullname" . } }-env + key: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS {{- if .Values.worker.extraEnv }} {{ .Values.worker.extraEnv | toYaml | nindent 8 }} {{- end }} diff --git a/docker-compose.yaml b/docker-compose.yaml index 9fc2a092799f..79a53b4d1d26 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -91,6 +91,7 @@ services: - ACTIVITY_MAX_ATTEMPT=${ACTIVITY_MAX_ATTEMPT} - ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS} - ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS} + - WORKFLOW_FAILURE_RESTART_DELAY_SECONDS=${WORKFLOW_FAILURE_RESTART_DELAY_SECONDS} volumes: - /var/run/docker.sock:/var/run/docker.sock - workspace:${WORKSPACE_ROOT} diff --git a/kube/resources/worker.yaml b/kube/resources/worker.yaml index 7240177cc14b..13ff6ce7ff8d 100644 --- a/kube/resources/worker.yaml +++ b/kube/resources/worker.yaml @@ -231,20 +231,25 @@ spec: name: airbyte-env key: OTEL_COLLECTOR_ENDPOINT - name: ACTIVITY_MAX_ATTEMPT - valueFrom: - configMapKeyRef: - name: airbyte-env - key: ACTIVITY_MAX_ATTEMPT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: ACTIVITY_MAX_ATTEMPT - name: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS - valueFrom: - configMapKeyRef: - name: airbyte-env - key: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS - name: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS - valueFrom: - configMapKeyRef: - name: airbyte-env - key: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS + - name: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: WORKFLOW_FAILURE_RESTART_DELAY_SECONDS ports: - containerPort: 9000 # for heartbeat server - containerPort: 9001 # start temporal worker port pool From 802fda9d4f13480de1b66f986b46197c1897e065 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 14 Jun 2022 19:11:34 -0700 Subject: [PATCH 5/9] update tests to handle new restart behavior --- .../ConnectionManagerWorkflowImpl.java | 7 +- .../ConnectionManagerWorkflowTest.java | 151 ++++-------------- 2 files changed, 36 insertions(+), 122 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index bd424d3712a2..e02a3ecbf27a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -88,7 +88,7 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync"; private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1; - private static final Duration WORKFLOW_FAILURE_RESTART_DELAY = Duration.ofSeconds(new EnvConfigs().getWorkflowFailureRestartDelaySeconds()); + static final Duration WORKFLOW_FAILURE_RESTART_DELAY = Duration.ofSeconds(new EnvConfigs().getWorkflowFailureRestartDelaySeconds()); private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener()); @@ -491,7 +491,7 @@ private OUTPUT runMandatoryActivityWithOutput(final Function OUTPUT runMandatoryActivityWithOutput(final Function getSetupFailingFailingActivityBeforeRun() { + public static Stream getSetupFailingActivity() { return Stream.of( Arguments.of(new Thread(() -> Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")))), @@ -1243,8 +1247,8 @@ public static Stream getSetupFailingFailingActivityBeforeRun() { } @ParameterizedTest - @MethodSource("getSetupFailingFailingActivityBeforeRun") - void testGetStuckBeforeRun(final Thread mockSetup) throws InterruptedException { + @MethodSource("getSetupFailingActivity") + void testWorkflowRestarted(final Thread mockSetup) throws InterruptedException { mockSetup.run(); Mockito.when(mConfigFetchActivity.getTimeToWait(Mockito.any())).thenReturn(new ScheduleRetrieverOutput( Duration.ZERO)); @@ -1266,7 +1270,10 @@ void testGetStuckBeforeRun(final Thread mockSetup) throws InterruptedException { .build(); startWorkflowAndWaitUntilReady(workflow, input); - testEnv.sleep(Duration.ofMinutes(2L)); + + // Sleep test env for restart delay, plus a small buffer to ensure that the workflow executed the + // logic after the delay + testEnv.sleep(ConnectionManagerWorkflowImpl.WORKFLOW_FAILURE_RESTART_DELAY.plus(Duration.ofSeconds(10))); final Queue events = testStateListener.events(testId); @@ -1274,13 +1281,11 @@ void testGetStuckBeforeRun(final Thread mockSetup) throws InterruptedException { .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RUNNING && changedStateEvent.isValue()) .isEmpty(); - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.QUARANTINED && changedStateEvent.isValue()) - .hasSize(1); + assertWorkflowWasContinuedAsNew(); } @Test - void testCanGetUnstuck() throws InterruptedException { + void testCanRetryFailedActivity() throws InterruptedException { Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")) .thenReturn(new JobCreationOutput(1l)); @@ -1305,76 +1310,18 @@ void testCanGetUnstuck() throws InterruptedException { startWorkflowAndWaitUntilReady(workflow, input); - testEnv.sleep(Duration.ofSeconds(80L)); + // Sleep test env for half of restart delay, so that we know we are in the middle of the delay + testEnv.sleep(ConnectionManagerWorkflowImpl.WORKFLOW_FAILURE_RESTART_DELAY.dividedBy(2)); workflow.retryFailedActivity(); testEnv.sleep(Duration.ofSeconds(30L)); final Queue events = testStateListener.events(testId); - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.QUARANTINED && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - Assertions.assertThat(events) .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RETRY_FAILED_ACTIVITY && changedStateEvent.isValue()) .hasSize(1); } - public static Stream getSetupFailingFailingActivityAfterRun() { - return Stream.of( - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> System.out.println("do Nothing")), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).jobSuccessWithAttemptNumber(Mockito.any(JobSuccessInputWithAttemptNumber.class)))), - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.cancelJob()), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).jobCancelledWithAttemptNumber(Mockito.any(JobCancelledInputWithAttemptNumber.class)))), - Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> workflow.deleteConnection()), - new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mConnectionDeletionActivity).deleteConnection(Mockito.any())))); - } - - @ParameterizedTest - @MethodSource("getSetupFailingFailingActivityAfterRun") - void testGetStuckAfterRun(final Consumer signalSender, final Thread mockSetup) throws InterruptedException { - mockSetup.run(); - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder() - .connectionId(UUID.randomUUID()) - .jobId(null) - .attemptId(null) - .fromFailure(false) - .attemptNumber(1) - .workflowState(workflowState) - .resetConnection(false) - .fromJobResetFailure(false) - .build(); - - startWorkflowAndWaitUntilReady(workflow, input); - - // wait for workflow to initialize - testEnv.sleep(Duration.ofSeconds(5)); - workflow.submitManualSync(); - - // wait for workflow to initialize - testEnv.sleep(Duration.ofSeconds(5)); - signalSender.accept(workflow); - - // TODO - // For some reason this transiently fails if it is below the runtime. - // However, this should be reported almost immediately. I think this is a bug. - testEnv.sleep(Duration.ofSeconds(SleepingSyncWorkflow.RUN_TIME.toSeconds() + 2)); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.QUARANTINED && changedStateEvent.isValue()) - .hasSizeGreaterThanOrEqualTo(1); - } - } @Nested @@ -1449,53 +1396,6 @@ public void failedResetContinueAsReset() throws InterruptedException { } - @RepeatedTest(10) - @Timeout(value = 2, - unit = TimeUnit.SECONDS) - @DisplayName("Test that we are getting stuck if the report of a failure happen") - void testGetStuckAfterRun() throws InterruptedException { - Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) - .when(mJobCreationAndStatusUpdateActivity).attemptFailureWithAttemptNumber(Mockito.any()); - - Mockito.when(mConfigFetchActivity.getMaxAttempt()) - .thenReturn(new GetMaxAttemptOutput(3)); - - final UUID testId = UUID.randomUUID(); - final TestStateListener testStateListener = new TestStateListener(); - final WorkflowState workflowState = new WorkflowState(testId, testStateListener); - - final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder() - .connectionId(UUID.randomUUID()) - .jobId(null) - .attemptId(null) - .fromFailure(false) - .attemptNumber(1) - .workflowState(workflowState) - .resetConnection(false) - .fromJobResetFailure(false) - .build(); - - startWorkflowAndWaitUntilReady(workflow, input); - - // wait for workflow to initialize - testEnv.sleep(Duration.ofSeconds(5)); - workflow.submitManualSync(); - - // wait for workflow to initialize - testEnv.sleep(Duration.ofSeconds(5)); - - // TODO - // For some reason this transiently fails if it is below the runtime. - // However, this should be reported almost immediately. I think this is a bug. - testEnv.sleep(Duration.ofSeconds(SleepingSyncWorkflow.RUN_TIME.toSeconds() + 2)); - - final Queue events = testStateListener.events(testId); - - Assertions.assertThat(events) - .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.QUARANTINED && changedStateEvent.isValue()) - .hasSize(1); - } - @ParameterizedTest @MethodSource("io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowTest#getMaxAttemptForResetRetry") public void failedResetContinueAttemptAsReset(final int maxAttempt) throws InterruptedException { @@ -1623,6 +1523,7 @@ private void setupSpecificChildWorkflow(final Class ConnectionManagerWorkflow.class, WorkflowOptions.newBuilder() .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) + .setWorkflowId(WORKFLOW_ID) .build()); } @@ -1687,4 +1588,18 @@ private void runRetryResetWaitsAfterJobFailureTest() throws InterruptedException .isFalse(); } + private void assertWorkflowWasContinuedAsNew() { + final ListClosedWorkflowExecutionsRequest request = ListClosedWorkflowExecutionsRequest.newBuilder() + .setNamespace(testEnv.getNamespace()) + .setExecutionFilter(WorkflowExecutionFilter.newBuilder().setWorkflowId(WORKFLOW_ID)) + .build(); + final ListClosedWorkflowExecutionsResponse listResponse = testEnv + .getWorkflowService() + .blockingStub() + .listClosedWorkflowExecutions(request); + Assertions.assertThat(listResponse.getExecutionsCount()).isGreaterThanOrEqualTo(1); + Assertions.assertThat(listResponse.getExecutionsList().get(0).getStatus()) + .isEqualTo(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW); + } + } From 1dbfecea7ef503a3711872a1e3b88e763672a879 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 14 Jun 2022 19:37:24 -0700 Subject: [PATCH 6/9] update test name --- .../temporal/scheduling/ConnectionManagerWorkflowTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 5503ce59d5da..83884db872c9 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -1248,7 +1248,7 @@ public static Stream getSetupFailingActivity() { @ParameterizedTest @MethodSource("getSetupFailingActivity") - void testWorkflowRestarted(final Thread mockSetup) throws InterruptedException { + void testWorkflowRestartedAfterFailedActivity(final Thread mockSetup) throws InterruptedException { mockSetup.run(); Mockito.when(mConfigFetchActivity.getTimeToWait(Mockito.any())).thenReturn(new ScheduleRetrieverOutput( Duration.ZERO)); From 1af4f7635bbc52c77101758102d16c32eb7f49ff Mon Sep 17 00:00:00 2001 From: lmossman Date: Wed, 15 Jun 2022 11:44:12 -0700 Subject: [PATCH 7/9] add empty env var values to .env files --- .env | 5 +++++ kube/overlays/dev-integration-test/.env | 6 ++++++ kube/overlays/dev/.env | 6 ++++++ kube/overlays/stable-with-resource-limits/.env | 6 ++++++ kube/overlays/stable/.env | 6 ++++++ 5 files changed, 29 insertions(+) diff --git a/.env b/.env index d91fd0211204..eb4197539fd0 100644 --- a/.env +++ b/.env @@ -83,6 +83,11 @@ MAX_SYNC_WORKERS=5 MAX_SPEC_WORKERS=5 MAX_CHECK_WORKERS=5 MAX_DISCOVER_WORKERS=5 +# Temporal Activity configuration +ACTIVITY_MAX_ATTEMPT= +ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS= +ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS= +WORKFLOW_FAILURE_RESTART_DELAY_SECONDS= ### FEATURE FLAGS ### diff --git a/kube/overlays/dev-integration-test/.env b/kube/overlays/dev-integration-test/.env index 77257b428be8..03d3f5cca74d 100644 --- a/kube/overlays/dev-integration-test/.env +++ b/kube/overlays/dev-integration-test/.env @@ -68,3 +68,9 @@ CONTAINER_ORCHESTRATOR_ENABLED=true # Open Telemetry Configuration METRIC_CLIENT= OTEL_COLLECTOR_ENDPOINT= + +# Temporal Activity configuration +ACTIVITY_MAX_ATTEMPT= +ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS= +ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS= +WORKFLOW_FAILURE_RESTART_DELAY_SECONDS= diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index ba31e7322d98..70419187554b 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -70,3 +70,9 @@ CONTAINER_ORCHESTRATOR_ENABLED=true # Open Telemetry Configuration METRIC_CLIENT= OTEL_COLLECTOR_ENDPOINT= + +# Temporal Activity configuration +ACTIVITY_MAX_ATTEMPT= +ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS= +ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS= +WORKFLOW_FAILURE_RESTART_DELAY_SECONDS= diff --git a/kube/overlays/stable-with-resource-limits/.env b/kube/overlays/stable-with-resource-limits/.env index fb8f43f2b245..86995915d316 100644 --- a/kube/overlays/stable-with-resource-limits/.env +++ b/kube/overlays/stable-with-resource-limits/.env @@ -72,3 +72,9 @@ CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED=true # Open Telemetry Configuration METRIC_CLIENT= OTEL_COLLECTOR_ENDPOINT= + +# Temporal Activity configuration +ACTIVITY_MAX_ATTEMPT= +ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS= +ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS= +WORKFLOW_FAILURE_RESTART_DELAY_SECONDS= diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index 2e23f846ff9b..a3e5a8814825 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -70,3 +70,9 @@ CONTAINER_ORCHESTRATOR_ENABLED=true # Open Telemetry Configuration METRIC_CLIENT= OTEL_COLLECTOR_ENDPOINT= + +# Temporal Activity configuration +ACTIVITY_MAX_ATTEMPT= +ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS= +ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS= +WORKFLOW_FAILURE_RESTART_DELAY_SECONDS= From a08a4657c2abae31f33906e2a31d482ef139eddd Mon Sep 17 00:00:00 2001 From: lmossman Date: Wed, 15 Jun 2022 15:32:25 -0700 Subject: [PATCH 8/9] fail attempt before job in cleanJobState to prevent state machine failure --- .../activities/JobCreationAndStatusUpdateActivityImpl.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 3b077d88a87f..dc37eb4a731f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -285,8 +285,6 @@ private void failNonTerminalJobs(final UUID connectionId) { io.airbyte.scheduler.models.JobStatus.NON_TERMINAL_STATUSES); for (final Job job : jobs) { final long jobId = job.getId(); - log.info("Failing non-terminal job {}", jobId); - jobPersistence.failJob(jobId); // fail all non-terminal attempts for (final Attempt attempt : job.getAttempts()) { @@ -296,11 +294,15 @@ private void failNonTerminalJobs(final UUID connectionId) { // the Attempt object 'id' is actually the value of the attempt_number column in the db final int attemptNumber = (int) attempt.getId(); + log.info("Failing non-terminal attempt {} for non-terminal job {}", attemptNumber, jobId); jobPersistence.failAttempt(jobId, attemptNumber); jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber, FailureHelper.failureSummaryForTemporalCleaningJobState(jobId, attemptNumber)); } + log.info("Failing non-terminal job {}", jobId); + jobPersistence.failJob(jobId); + final Job failedJob = jobPersistence.getJob(jobId); jobNotifier.failJob("Failing job in order to start from clean job state for new temporal workflow run.", failedJob); trackCompletion(failedJob, JobStatus.FAILED); From be706f9a6c66dcb3a82848c25d8de2323786474f Mon Sep 17 00:00:00 2001 From: lmossman Date: Wed, 15 Jun 2022 15:33:31 -0700 Subject: [PATCH 9/9] change default value of max activity attempt retries from 10 to 5 --- .../src/main/java/io/airbyte/config/EnvConfigs.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index 48de628c4dc2..486575bc242e 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -861,7 +861,7 @@ public int getWorkflowFailureRestartDelaySeconds() { @Override public int getActivityNumberOfAttempt() { - return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "10")); + return Integer.parseInt(getEnvOrDefault(ACTIVITY_MAX_ATTEMPT, "5")); } // Helpers