diff --git a/airbyte-commons-temporal/build.gradle b/airbyte-commons-temporal/build.gradle new file mode 100644 index 000000000000..3b4e25641aa3 --- /dev/null +++ b/airbyte-commons-temporal/build.gradle @@ -0,0 +1,42 @@ +import org.jsonschema2pojo.SourceType + +plugins { + id "java-library" + id 'com.github.eirnym.js2p' version '1.0' +} + +dependencies { + annotationProcessor platform(libs.micronaut.bom) + annotationProcessor libs.bundles.micronaut.annotation.processor + + implementation platform(libs.micronaut.bom) + implementation libs.bundles.micronaut + + implementation 'io.temporal:temporal-sdk:1.8.1' + implementation 'io.temporal:temporal-serviceclient:1.8.1' + + testAnnotationProcessor platform(libs.micronaut.bom) + testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor + + + implementation project(':airbyte-config:config-models') + implementation project(':airbyte-metrics:metrics-lib') + + testImplementation 'io.temporal:temporal-testing:1.8.1' + // Needed to be able to mock final class + testImplementation 'org.mockito:mockito-inline:4.7.0' +} + +jsonSchema2Pojo { + sourceType = SourceType.YAMLSCHEMA + source = files("${sourceSets.main.output.resourcesDir}/workers_models") + targetDirectory = new File(project.buildDir, 'generated/src/gen/java/') + removeOldOutput = true + + targetPackage = 'io.airbyte.persistence.job.models' + + useLongIntegers = true + generateBuilders = true + includeConstructors = false + includeSetters = true +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CancellationHandler.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/CancellationHandler.java similarity index 98% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/CancellationHandler.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/CancellationHandler.java index aad52655f85b..9745497b6ae3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CancellationHandler.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/CancellationHandler.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal; +package io.airbyte.commons.temporal; import io.temporal.activity.ActivityExecutionContext; import io.temporal.client.ActivityCompletionException; diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java new file mode 100644 index 000000000000..435f7bac4141 --- /dev/null +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/ConnectionManagerUtils.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.temporal; + +import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; +import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; +import io.temporal.client.WorkflowClient; +import java.util.UUID; +import javax.inject.Singleton; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +@NoArgsConstructor +@Singleton +@Slf4j +public class ConnectionManagerUtils { + + void safeTerminateWorkflow(final WorkflowClient client, final String workflowId, final String reason) { + log.info("Attempting to terminate existing workflow for workflowId {}.", workflowId); + try { + client.newUntypedWorkflowStub(workflowId).terminate(reason); + } catch (final Exception e) { + log.warn( + "Could not terminate temporal workflow due to the following error; " + + "this may be because there is currently no running workflow for this connection.", + e); + } + } + + public void safeTerminateWorkflow(final WorkflowClient client, final UUID connectionId, final String reason) { + safeTerminateWorkflow(client, getConnectionManagerName(connectionId), reason); + } + + public String getConnectionManagerName(final UUID connectionId) { + return "connection_manager_" + connectionId; + } + + public ConnectionManagerWorkflow startConnectionManagerNoSignal(final WorkflowClient client, final UUID connectionId) { + final ConnectionManagerWorkflow connectionManagerWorkflow = newConnectionManagerWorkflowStub(client, connectionId); + final ConnectionUpdaterInput input = TemporalWorkflowUtils.buildStartWorkflowInput(connectionId); + WorkflowClient.start(connectionManagerWorkflow::run, input); + + return connectionManagerWorkflow; + } + + public ConnectionManagerWorkflow newConnectionManagerWorkflowStub(final WorkflowClient client, final UUID connectionId) { + return client.newWorkflowStub(ConnectionManagerWorkflow.class, + TemporalWorkflowUtils.buildWorkflowOptions(TemporalJobType.CONNECTION_UPDATER, getConnectionManagerName(connectionId))); + } + +} diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java new file mode 100644 index 000000000000..21242ce9bbe6 --- /dev/null +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalClient.java @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.temporal; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.ByteString; +import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; +import io.micronaut.context.annotation.Requires; +import io.temporal.api.common.v1.WorkflowType; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest; +import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsResponse; +import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest; +import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsResponse; +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import javax.inject.Inject; +import javax.inject.Singleton; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; + +@AllArgsConstructor +@NoArgsConstructor +@Slf4j +@Singleton +@Requires(property = "airbyte.worker.plane", + notEquals = "DATA_PLANE") +public class TemporalClient { + + @Inject + private WorkflowClient client; + @Inject + private WorkflowServiceStubs service; + @Inject + private ConnectionManagerUtils connectionManagerUtils; + + private final Set workflowNames = new HashSet<>(); + + public void restartClosedWorkflowByStatus(final WorkflowExecutionStatus executionStatus) { + final Set workflowExecutionInfos = fetchClosedWorkflowsByStatus(executionStatus); + + final Set nonRunningWorkflow = filterOutRunningWorkspaceId(workflowExecutionInfos); + + nonRunningWorkflow.forEach(connectionId -> { + connectionManagerUtils.safeTerminateWorkflow(client, connectionId, "Terminating workflow in " + + "unreachable state before starting a new workflow for this connection"); + connectionManagerUtils.startConnectionManagerNoSignal(client, connectionId); + }); + } + + Set fetchClosedWorkflowsByStatus(final WorkflowExecutionStatus executionStatus) { + ByteString token; + ListClosedWorkflowExecutionsRequest workflowExecutionsRequest = + ListClosedWorkflowExecutionsRequest.newBuilder() + .setNamespace(client.getOptions().getNamespace()) + .build(); + + final Set workflowExecutionInfos = new HashSet<>(); + do { + final ListClosedWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest = + service.blockingStub().listClosedWorkflowExecutions(workflowExecutionsRequest); + final WorkflowType connectionManagerWorkflowType = WorkflowType.newBuilder().setName(ConnectionManagerWorkflow.class.getSimpleName()).build(); + workflowExecutionInfos.addAll(listOpenWorkflowExecutionsRequest.getExecutionsList().stream() + .filter(workflowExecutionInfo -> workflowExecutionInfo.getType() == connectionManagerWorkflowType || + workflowExecutionInfo.getStatus() == executionStatus) + .flatMap((workflowExecutionInfo -> extractConnectionIdFromWorkflowId(workflowExecutionInfo.getExecution().getWorkflowId()).stream())) + .collect(Collectors.toSet())); + token = listOpenWorkflowExecutionsRequest.getNextPageToken(); + + workflowExecutionsRequest = + ListClosedWorkflowExecutionsRequest.newBuilder() + .setNamespace(client.getOptions().getNamespace()) + .setNextPageToken(token) + .build(); + + } while (token != null && token.size() > 0); + + return workflowExecutionInfos; + } + + @VisibleForTesting + Set filterOutRunningWorkspaceId(final Set workflowIds) { + refreshRunningWorkflow(); + + final Set runningWorkflowByUUID = + workflowNames.stream().flatMap(name -> extractConnectionIdFromWorkflowId(name).stream()).collect(Collectors.toSet()); + + return workflowIds.stream().filter(workflowId -> !runningWorkflowByUUID.contains(workflowId)).collect(Collectors.toSet()); + } + + @VisibleForTesting + void refreshRunningWorkflow() { + workflowNames.clear(); + ByteString token; + ListOpenWorkflowExecutionsRequest openWorkflowExecutionsRequest = + ListOpenWorkflowExecutionsRequest.newBuilder() + .setNamespace(client.getOptions().getNamespace()) + .build(); + do { + final ListOpenWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest = + service.blockingStub().listOpenWorkflowExecutions(openWorkflowExecutionsRequest); + final Set workflowExecutionInfos = listOpenWorkflowExecutionsRequest.getExecutionsList().stream() + .map((workflowExecutionInfo -> workflowExecutionInfo.getExecution().getWorkflowId())) + .collect(Collectors.toSet()); + workflowNames.addAll(workflowExecutionInfos); + token = listOpenWorkflowExecutionsRequest.getNextPageToken(); + + openWorkflowExecutionsRequest = + ListOpenWorkflowExecutionsRequest.newBuilder() + .setNamespace(client.getOptions().getNamespace()) + .setNextPageToken(token) + .build(); + + } while (token != null && token.size() > 0); + } + + Optional extractConnectionIdFromWorkflowId(final String workflowId) { + if (!workflowId.startsWith("connection_manager_")) { + return Optional.empty(); + } + return Optional.ofNullable(StringUtils.removeStart(workflowId, "connection_manager_")) + .map( + stringUUID -> UUID.fromString(stringUUID)); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalJobType.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalJobType.java similarity index 85% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalJobType.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalJobType.java index d098a1049f03..cb8f66f3f630 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalJobType.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalJobType.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal; +package io.airbyte.commons.temporal; public enum TemporalJobType { GET_SPEC, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalUtils.java similarity index 99% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalUtils.java index 2bf6e9004936..68a5e369c7b8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalUtils.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalUtils.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal; +package io.airbyte.commons.temporal; import com.uber.m3.tally.RootScopeBuilder; import com.uber.m3.tally.Scope; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalWorkflowUtils.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalWorkflowUtils.java similarity index 96% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalWorkflowUtils.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalWorkflowUtils.java index f92120c45bae..aeeb018d94a1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalWorkflowUtils.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/TemporalWorkflowUtils.java @@ -2,11 +2,11 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal; +package io.airbyte.commons.temporal; import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; import io.airbyte.persistence.job.models.JobRunConfig; -import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; import io.temporal.client.WorkflowOptions; diff --git a/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/config/TemporalBeanFactory.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/config/TemporalBeanFactory.java new file mode 100644 index 000000000000..a82774bdb120 --- /dev/null +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/config/TemporalBeanFactory.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.temporal.config; + +import io.airbyte.commons.temporal.TemporalUtils; +import io.airbyte.commons.temporal.TemporalWorkflowUtils; +import io.micronaut.context.annotation.Factory; +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; +import javax.inject.Singleton; + +/** + * Micronaut bean factory for Temporal-related singletons. + */ +@Factory +public class TemporalBeanFactory { + + @Singleton + public WorkflowServiceStubs temporalService(final TemporalUtils temporalUtils) { + return temporalUtils.createTemporalService(); + } + + @Singleton + public WorkflowClient workflowClient( + final TemporalUtils temporalUtils, + final WorkflowServiceStubs temporalService) { + return TemporalWorkflowUtils.createWorkflowClient(temporalService, temporalUtils.getNamespace()); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java similarity index 95% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java index 8c732e318328..dcd7fc6637ec 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflow.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionManagerWorkflow.java @@ -2,9 +2,9 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal.scheduling; +package io.airbyte.commons.temporal.scheduling; -import io.airbyte.workers.temporal.scheduling.state.WorkflowState; +import io.airbyte.commons.temporal.scheduling.state.WorkflowState; import io.temporal.workflow.QueryMethod; import io.temporal.workflow.SignalMethod; import io.temporal.workflow.WorkflowInterface; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionUpdaterInput.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionUpdaterInput.java similarity index 86% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionUpdaterInput.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionUpdaterInput.java index 40fc04781687..e6bf75f12c54 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionUpdaterInput.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/ConnectionUpdaterInput.java @@ -2,9 +2,9 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal.scheduling; +package io.airbyte.commons.temporal.scheduling; -import io.airbyte.workers.temporal.scheduling.state.WorkflowState; +import io.airbyte.commons.temporal.scheduling.state.WorkflowState; import java.util.UUID; import javax.annotation.Nullable; import lombok.AllArgsConstructor; @@ -39,7 +39,7 @@ public class ConnectionUpdaterInput { private WorkflowState workflowState; private boolean resetConnection; @Builder.Default - private boolean fromJobResetFailure = false; + private final boolean fromJobResetFailure = false; @Builder.Default private boolean skipScheduling = false; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowInternalState.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowInternalState.java similarity index 80% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowInternalState.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowInternalState.java index 6a672f54ee0d..e3d25f7fee48 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowInternalState.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowInternalState.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal.scheduling.state; +package io.airbyte.commons.temporal.scheduling.state; import io.airbyte.config.FailureReason; import java.util.HashSet; @@ -20,7 +20,7 @@ public class WorkflowInternalState { private Integer attemptNumber = null; // StandardSyncOutput standardSyncOutput = null; - private final Set failures = new HashSet<>(); + private Set failures = new HashSet<>(); private Boolean partialSuccess = null; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java similarity index 94% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java index edb49645fcf6..b345700fcfa0 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/WorkflowState.java @@ -2,11 +2,11 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal.scheduling.state; +package io.airbyte.commons.temporal.scheduling.state; -import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener; -import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent; -import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField; +import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener; +import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent; +import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField; import java.util.UUID; import lombok.Getter; import lombok.NoArgsConstructor; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/NoopStateListener.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/NoopStateListener.java similarity index 86% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/NoopStateListener.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/NoopStateListener.java index 69cf8583c9bf..efb63c09782c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/NoopStateListener.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/NoopStateListener.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal.scheduling.state.listener; +package io.airbyte.commons.temporal.scheduling.state.listener; import java.util.LinkedList; import java.util.Queue; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/TestStateListener.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/TestStateListener.java similarity index 93% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/TestStateListener.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/TestStateListener.java index 214ad92dac94..9abeeb3115d3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/TestStateListener.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/TestStateListener.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal.scheduling.state.listener; +package io.airbyte.commons.temporal.scheduling.state.listener; import java.util.LinkedList; import java.util.Optional; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java similarity index 94% rename from airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java rename to airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java index ade23612909c..86866bf62da1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java +++ b/airbyte-commons-temporal/src/main/java/io/airbyte/commons/temporal/scheduling/state/listener/WorkflowStateChangedListener.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal.scheduling.state.listener; +package io.airbyte.commons.temporal.scheduling.state.listener; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; diff --git a/airbyte-workers/src/main/resources/workers_models/JobRunConfig.yaml b/airbyte-commons-temporal/src/main/resources/workers_models/JobRunConfig.yaml similarity index 100% rename from airbyte-workers/src/main/resources/workers_models/JobRunConfig.yaml rename to airbyte-commons-temporal/src/main/resources/workers_models/JobRunConfig.yaml diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/CancellationHandlerTest.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/CancellationHandlerTest.java similarity index 85% rename from airbyte-workers/src/test/java/io/airbyte/workers/temporal/CancellationHandlerTest.java rename to airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/CancellationHandlerTest.java index 56a37adcb3d8..8f157760c76a 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/CancellationHandlerTest.java +++ b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/CancellationHandlerTest.java @@ -2,17 +2,16 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal; +package io.airbyte.commons.temporal; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; - -import io.airbyte.workers.temporal.stubs.HeartbeatWorkflow; +import io.airbyte.commons.temporal.stubs.HeartbeatWorkflow; import io.temporal.activity.Activity; import io.temporal.activity.ActivityExecutionContext; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowOptions; import io.temporal.testing.TestWorkflowEnvironment; import io.temporal.worker.Worker; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; class CancellationHandlerTest { @@ -40,7 +39,7 @@ void testCancellationHandler() { .setTaskQueue("task-queue") .build()); - assertDoesNotThrow(heartbeatWorkflow::execute); + Assertions.assertDoesNotThrow(heartbeatWorkflow::execute); } diff --git a/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java new file mode 100644 index 000000000000..0ddff18dc49b --- /dev/null +++ b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalClientTest.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.temporal; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflow.v1.WorkflowExecutionInfo; +import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse; +import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.util.Set; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +public class TemporalClientTest { + + private static final String NAMESPACE = "namespace"; + + private WorkflowClient workflowClient; + private TemporalClient temporalClient; + private WorkflowServiceStubs workflowServiceStubs; + private WorkflowServiceGrpc.WorkflowServiceBlockingStub workflowServiceBlockingStub; + + @BeforeEach + void setup() { + workflowClient = mock(WorkflowClient.class); + when(workflowClient.getOptions()).thenReturn(WorkflowClientOptions.newBuilder().setNamespace(NAMESPACE).build()); + + workflowServiceStubs = mock(WorkflowServiceStubs.class); + workflowServiceBlockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); + when(workflowServiceStubs.blockingStub()).thenReturn(workflowServiceBlockingStub); + } + + @Nested + class RestartPerStatus { + + private ConnectionManagerUtils mConnectionManagerUtils; + + @BeforeEach + public void init() { + mConnectionManagerUtils = mock(ConnectionManagerUtils.class); + + temporalClient = spy( + new TemporalClient(workflowClient, workflowServiceStubs, mConnectionManagerUtils)); + } + + @Test + void testRestartFailed() { + final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); + + when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); + final UUID connectionId = UUID.fromString("ebbfdc4c-295b-48a0-844f-88551dfad3db"); + final Set workflowIds = Set.of(connectionId); + + doReturn(workflowIds) + .when(temporalClient).fetchClosedWorkflowsByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + doReturn(workflowIds) + .when(temporalClient).filterOutRunningWorkspaceId(workflowIds); + mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + temporalClient.restartClosedWorkflowByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + verify(mConnectionManagerUtils).safeTerminateWorkflow(eq(workflowClient), eq(connectionId), + anyString()); + verify(mConnectionManagerUtils).startConnectionManagerNoSignal(eq(workflowClient), eq(connectionId)); + } + + } + + private void mockWorkflowStatus(final WorkflowExecutionStatus status) { + when(workflowServiceBlockingStub.describeWorkflowExecution(any())).thenReturn( + DescribeWorkflowExecutionResponse.newBuilder().setWorkflowExecutionInfo( + WorkflowExecutionInfo.newBuilder().setStatus(status).buildPartial()).build()); + } + +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalUtilsTest.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalUtilsTest.java similarity index 98% rename from airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalUtilsTest.java rename to airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalUtilsTest.java index b4fd33ce429f..6d99f5629900 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalUtilsTest.java +++ b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/TemporalUtilsTest.java @@ -2,7 +2,7 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal; +package io.airbyte.commons.temporal; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -13,8 +13,7 @@ import static org.mockito.Mockito.when; import io.airbyte.commons.concurrency.VoidCallable; -import io.airbyte.workers.exception.WorkerException; -import io.airbyte.workers.temporal.stubs.HeartbeatWorkflow; +import io.airbyte.commons.temporal.stubs.HeartbeatWorkflow; import io.temporal.activity.Activity; import io.temporal.activity.ActivityCancellationType; import io.temporal.activity.ActivityExecutionContext; @@ -307,7 +306,7 @@ interface Activity1 { class Activity1Impl implements Activity1 { - private static final Logger LOGGER = LoggerFactory.getLogger(TestWorkflow.Activity1Impl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(Activity1Impl.class); private static final String ACTIVITY1 = "activity1"; private final VoidCallable callable; @@ -399,7 +398,7 @@ public void activity(final String arg) { Thread.sleep(10000); return null; } else { - throw new WorkerException("failed"); + throw new Exception("failed"); } } else { return null; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/stubs/HeartbeatWorkflow.java b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/stubs/HeartbeatWorkflow.java similarity index 93% rename from airbyte-workers/src/test/java/io/airbyte/workers/temporal/stubs/HeartbeatWorkflow.java rename to airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/stubs/HeartbeatWorkflow.java index 306b6b9612aa..3152300ef247 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/stubs/HeartbeatWorkflow.java +++ b/airbyte-commons-temporal/src/test/java/io/airbyte/commons/temporal/stubs/HeartbeatWorkflow.java @@ -2,9 +2,9 @@ * Copyright (c) 2022 Airbyte, Inc., all rights reserved. */ -package io.airbyte.workers.temporal.stubs; +package io.airbyte.commons.temporal.stubs; -import io.airbyte.workers.temporal.TemporalUtils; +import io.airbyte.commons.temporal.TemporalUtils; import io.temporal.activity.ActivityCancellationType; import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; diff --git a/airbyte-container-orchestrator/build.gradle b/airbyte-container-orchestrator/build.gradle index 4695dc4b4d92..d088356a5047 100644 --- a/airbyte-container-orchestrator/build.gradle +++ b/airbyte-container-orchestrator/build.gradle @@ -13,6 +13,7 @@ dependencies { implementation project(':airbyte-api') implementation project(':airbyte-config:config-models') implementation project(':airbyte-config:config-persistence') + implementation project(':airbyte-commons-temporal') implementation project(':airbyte-db:db-lib') implementation project(':airbyte-json-validation') implementation project(':airbyte-protocol:protocol-models') diff --git a/airbyte-cron/build.gradle b/airbyte-cron/build.gradle index 0388fbd6b028..ca20952dc270 100644 --- a/airbyte-cron/build.gradle +++ b/airbyte-cron/build.gradle @@ -3,9 +3,19 @@ plugins { } dependencies { - implementation project(':airbyte-config:config-models') + implementation 'com.auth0:java-jwt:3.19.2' + implementation 'io.fabric8:kubernetes-client:5.12.2' + implementation 'io.sentry:sentry:6.3.1' + implementation 'io.temporal:temporal-sdk:1.8.1' + implementation 'io.temporal:temporal-serviceclient:1.8.1' - runtimeOnly 'io.micronaut:micronaut-http-server-netty:3.6.0' + implementation project(':airbyte-api') + implementation project(':airbyte-analytics') + implementation project(':airbyte-commons-temporal') + implementation project(':airbyte-config:config-models') + implementation project(':airbyte-config:config-persistence') + implementation project(':airbyte-db:db-lib') + implementation project(':airbyte-metrics:metrics-lib') annotationProcessor platform(libs.micronaut.bom) annotationProcessor libs.bundles.micronaut.annotation.processor diff --git a/airbyte-cron/src/main/java/io/airbyte/cron/selfhealing/Temporal.java b/airbyte-cron/src/main/java/io/airbyte/cron/selfhealing/Temporal.java index 38787b128a16..69b0e029b26a 100644 --- a/airbyte-cron/src/main/java/io/airbyte/cron/selfhealing/Temporal.java +++ b/airbyte-cron/src/main/java/io/airbyte/cron/selfhealing/Temporal.java @@ -4,7 +4,10 @@ package io.airbyte.cron.selfhealing; +import io.airbyte.commons.temporal.TemporalClient; import io.micronaut.scheduling.annotation.Scheduled; +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import javax.inject.Named; import javax.inject.Singleton; import lombok.extern.slf4j.Slf4j; @@ -12,11 +15,16 @@ @Slf4j public class Temporal { - public Temporal() { - log.info("Creating temporal self-healing"); + private final TemporalClient temporalClient; + + public Temporal(@Named("temporalClient") final TemporalClient temporalClient) { + log.debug("Creating temporal self-healing"); + this.temporalClient = temporalClient; } @Scheduled(fixedRate = "10s") - void cleanTemporal() {} + void cleanTemporal() { + temporalClient.restartClosedWorkflowByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); + } } diff --git a/airbyte-cron/src/main/resources/application-control.yml b/airbyte-cron/src/main/resources/application-control.yml new file mode 100644 index 000000000000..752bc85a5f11 --- /dev/null +++ b/airbyte-cron/src/main/resources/application-control.yml @@ -0,0 +1,40 @@ +datasources: + config: + connection-test-query: SELECT 1 + connection-timeout: 30000 + idle-timeout: 600000 + maximum-pool-size: 10 + url: ${DATABASE_URL} + driverClassName: org.postgresql.Driver + username: ${DATABASE_USER} + password: ${DATABASE_PASSWORD} + jobs: + connection-test-query: SELECT 1 + connection-timeout: 30000 + idle-timeout: 600000 + maximum-pool-size: 10 + url: ${DATABASE_URL} + driverClassName: org.postgresql.Driver + username: ${DATABASE_USER} + password: ${DATABASE_PASSWORD} + +flyway: + enabled: true + datasources: + config: + enabled: false + locations: + - 'classpath:io/airbyte/db/instance/configs/migrations' + jobs: + enabled: false + locations: + - 'classpath:io/airbyte/db/instance/jobs/migrations' + +jooq: + datasources: + config: + jackson-converter-enabled: true + sql-dialect: POSTGRES + jobs: + jackson-converter-enabled: true + sql-dialect: POSTGRES \ No newline at end of file diff --git a/airbyte-cron/src/main/resources/application.yml b/airbyte-cron/src/main/resources/application.yml new file mode 100644 index 000000000000..6961c8a3801c --- /dev/null +++ b/airbyte-cron/src/main/resources/application.yml @@ -0,0 +1,219 @@ +micronaut: + application: + name: airbyte-workers + security: + intercept-url-map: + - pattern: /** + httpMethod: GET + access: + - isAnonymous() + server: + port: 9000 + +airbyte: + activity: + initial-delay: ${ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS:30} + max-attempts: ${ACTIVITY_MAX_ATTEMPT:5} + max-delay: ${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS:600} + max-timeout: ${ACTIVITY_MAX_TIMEOUT_SECOND:120} + cloud: + storage: + logs: + type: ${WORKER_LOGS_STORAGE_TYPE:} + gcs: + application-credentials: ${GOOGLE_APPLICATION_CREDENTIALS:} + bucket: ${GCS_LOG_BUCKET:} + minio: + access-key: ${AWS_ACCESS_KEY_ID:} + bucket: ${S3_LOG_BUCKET:} + endpoint: ${S3_MINIO_ENDPOINT:} + secret-access-key: ${AWS_SECRET_ACCESS_KEY:} + s3: + access-key: ${AWS_ACCESS_KEY_ID:} + bucket: ${S3_LOG_BUCKET:} + region: ${S3_LOG_BUCKET_REGION:} + secret-access-key: ${AWS_SECRET_ACCESS_KEY:} + state: + type: ${WORKER_STATE_STORAGE_TYPE:} + gcs: + application-credentials: ${STATE_STORAGE_GCS_APPLICATION_CREDENTIALS:} + bucket: ${STATE_STORAGE_GCS_BUCKET_NAME:} + minio: + access-key: ${STATE_STORAGE_MINIO_ACCESS_KEY:} + bucket: ${STATE_STORAGE_MINIO_BUCKET_NAME:} + endpoint: ${STATE_STORAGE_MINIO_ENDPOINT:} + secret-access-key: ${STATE_STORAGE_MINIO_SECRET_ACCESS_KEY:} + s3: + access-key: ${STATE_STORAGE_S3_ACCESS_KEY:} + bucket: ${STATE_STORAGE_S3_BUCKET_NAME:} + region: ${STATE_STORAGE_S3_BUCKET_REGION:} + secret-access-key: ${STATE_STORAGE_S3_SECRET_ACCESS_KEY:} + connector: + specific-resource-defaults-enabled: ${CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED:false} + container: + orchestrator: + enabled: ${CONTAINER_ORCHESTRATOR_ENABLED:false} + image: ${CONTAINER_ORCHESTRATOR_IMAGE:} + secret-mount-path: ${CONTAINER_ORCHESTRATOR_SECRET_MOUNT_PATH:} + secret-name: ${CONTAINER_ORCHESTRATOR_SECRET_NAME:} + control: + plane: + auth-endpoint: ${CONTROL_PLANE_AUTH_ENDPOINT:} + data: + sync: + task-queue: ${DATA_SYNC_TASK_QUEUES:SYNC} + plane: + connection-ids-mvp: ${CONNECTION_IDS_FOR_MVP_DATA_PLANE:} + service-account: + credentials-path: ${DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH:} + email: ${DATA_PLANE_SERVICE_ACCOUNT_EMAIL:} + deployment-mode: ${DEPLOYMENT_MODE:OSS} + flyway: + configs: + initialization-timeout-ms: ${CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS:60000} + minimum-migration-version: ${CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION} + jobs: + initialization-timeout-ms: ${JOBS_DATABASE_INITIALIZATION_TIMEOUT_MS:60000} + minimum-migration-version: ${JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION} + internal: + api: + auth-header: + name: ${AIRBYTE_API_AUTH_HEADER_NAME:} + value: ${AIRBYTE_API_AUTH_HEADER_VALUE:} + host: ${INTERNAL_API_HOST} + local: + docker-mount: ${LOCAL_DOCKER_MOUNT} + root: ${LOCAL_ROOT} + worker: + env: ${WORKER_ENVIRONMENT:DOCKER} + check: + enabled: ${SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS:true} + kube: + annotations: ${CHECK_JOB_KUBE_ANNOTATION:} + node-selectors: ${CHECK_JOB_KUBE_NODE_SELECTORS:} + max-workers: ${MAX_CHECK_WORKERS:5} + main: + container: + cpu: + limit: ${CHECK_JOB_MAIN_CONTAINER_CPU_LIMIT:} + request: ${CHECK_JOB_MAIN_CONTAINER_CPU_REQUEST:} + memory: + limit: ${CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT:} + request: ${CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST:} + connection: + enabled: ${SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS:true} + discover: + enabled: ${SHOULD_RUN_DISCOVER_WORKFLOWS:true} + kube: + annotations: ${DISCOVER_JOB_KUBE_ANNOTATIONS:} + node-selectors: ${DISCOVER_JOB_KUBE_NODE_SELECTORS:} + max-workers: ${MAX_DISCOVER_WORKERS:5} + job: + error-reporting: + sentry: + dsn: ${JOB_ERROR_REPORTING_SENTRY_DSN} + strategy: ${JOB_ERROR_REPORTING_STRATEGY:LOGGING} + failed: + max-days: ${MAX_DAYS_OF_ONLY_FAILED_JOBS_BEFORE_CONNECTION_DISABLE:14} + max-jobs: ${MAX_FAILED_JOBS_IN_A_ROW_BEFORE_CONNECTION_DISABLE:100} + kube: + annotations: ${JOB_KUBE_ANNOTATIONS:} + images: + busybox: ${JOB_KUBE_BUSYBOX_IMAGE:`busybox:1.28`} + curl: ${JOB_KUBE_CURL_IMAGE:`curlimages/curl:7.83.1`} + socat: ${JOB_KUBE_SOCAT_IMAGE:`alpine/socat:1.7.4.3-r0`} + main: + container: + image-pull-policy: ${JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY:IfNotPresent} + image-pull-secret: ${JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET:} + namespace: ${JOB_KUBE_NAMESPACE:default} + node-selectors: ${JOB_KUBE_NODE_SELECTORS:} + sidecar: + container: + image-pull-policy: ${JOB_KUBE_SIDECAR_CONTAINER_IMAGE_PULL_POLICY:IfNotPresent} + tolerations: ${JOB_KUBE_TOLERATIONS:} + main: + container: + cpu: + limit: ${JOB_MAIN_CONTAINER_CPU_LIMIT:} + request: ${JOB_MAIN_CONTAINER_CPU_REQUEST:} + memory: + limit: ${JOB_MAIN_CONTAINER_MEMORY_LIMIT:} + request: ${JOB_MAIN_CONTAINER_MEMORY_REQUEST:} + normalization: + main: + container: + cpu: + limit: ${NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT:} + request: ${NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST:} + memory: + limit: ${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT:} + request: ${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST:} + plane: ${WORKER_PLANE:CONTROL_PLANE} + replication: + orchestrator: + cpu: + limit: ${REPLICATION_ORCHESTRATOR_CPU_LIMIT:} + request: ${REPLICATION_ORCHESTRATOR_CPU_REQUEST:} + memory: + limit: ${REPLICATION_ORCHESTRATOR_MEMORY_LIMIT:} + request: ${REPLICATION_ORCHESTRATOR_MEMORY_REQUEST:} + spec: + enabled: ${SHOULD_RUN_GET_SPEC_WORKFLOWS:true} + kube: + annotations: ${SPEC_JOB_KUBE_ANNOTATIONS:} + node-selectors: ${SPEC_JOB_KUBE_NODE_SELECTORS:} + max-workers: ${MAX_SPEC_WORKERS:5} + sync: + enabled: ${SHOULD_RUN_SYNC_WORKFLOWS:true} + max-workers: ${MAX_SYNC_WORKERS:5} + max-attempts: ${SYNC_JOB_MAX_ATTEMPTS:3} + max-timeout: ${SYNC_JOB_MAX_TIMEOUT_DAYS:3} + role: ${AIRBYTE_ROLE:} + secret: + persistence: ${SECRET_PERSISTENCE:TESTING_CONFIG_DB_TABLE} + store: + gcp: + credentials: ${SECRET_STORE_GCP_CREDENTIALS:} + project-id: ${SECRET_STORE_GCP_PROJECT_ID:} + vault: + address: ${VAULT_ADDRESS:} + prefix: ${VAULT_PREFIX:} + token: ${VAULT_AUTH_TOKEN:} + temporal: + worker: + ports: ${TEMPORAL_WORKER_PORTS:} + tracking-strategy: ${TRACKING_STRATEGY:LOGGING} + version: ${AIRBYTE_VERSION} + web-app: + url: ${WEBAPP_URL:} + workflow: + failure: + restart-delay: ${WORKFLOW_FAILURE_RESTART_DELAY_SECONDS:600} + workspace: + docker-mount: ${WORKSPACE_DOCKER_MOUNT:} + root: ${WORKSPACE_ROOT} + +docker: + network: ${DOCKER_NETWORK:host} + +endpoints: + all: + enabled: true + +temporal: + cloud: + client: + cert: ${TEMPORAL_CLOUD_CLIENT_CERT:} + key: ${TEMPORAL_CLOUD_CLIENT_KEY:} + enabled: ${TEMPORAL_CLOUD_ENABLED:false} + host: ${TEMPORAL_CLOUD_HOST:} + namespace: ${TEMPORAL_CLOUD_NAMESPACE:} + host: ${TEMPORAL_HOST:`airbyte-temporal:7233`} + retention: ${TEMPORAL_HISTORY_RETENTION_IN_DAYS:30} + +logger: + levels: + io.airbyte.bootloader: DEBUG +# Uncomment to help resolve issues with conditional beans +# io.micronaut.context.condition: DEBUG diff --git a/airbyte-cron/src/main/resources/micronaut-banner.txt b/airbyte-cron/src/main/resources/micronaut-banner.txt new file mode 100644 index 000000000000..713ab3df590b --- /dev/null +++ b/airbyte-cron/src/main/resources/micronaut-banner.txt @@ -0,0 +1 @@ + : airbyte-cron : diff --git a/airbyte-server/build.gradle b/airbyte-server/build.gradle index af56de876b02..e3dddbe3ca50 100644 --- a/airbyte-server/build.gradle +++ b/airbyte-server/build.gradle @@ -2,10 +2,15 @@ plugins { id 'application' } +configurations.all { + exclude group: 'io.micronaut.jaxrs' +} + dependencies { implementation project(':airbyte-analytics') implementation project(':airbyte-api') implementation project(':airbyte-commons-docker') + implementation project(':airbyte-commons-temporal') implementation project(':airbyte-config:init') implementation project(':airbyte-config:config-models') implementation project(':airbyte-config:config-persistence') diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index bbb5e4c45bf8..b304e2c008bb 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -12,6 +12,8 @@ import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.lang.CloseableShutdownHook; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.temporal.TemporalUtils; +import io.airbyte.commons.temporal.TemporalWorkflowUtils; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; @@ -59,8 +61,6 @@ import io.airbyte.workers.temporal.ConnectionManagerUtils; import io.airbyte.workers.temporal.StreamResetRecordsHelper; import io.airbyte.workers.temporal.TemporalClient; -import io.airbyte.workers.temporal.TemporalUtils; -import io.airbyte.workers.temporal.TemporalWorkflowUtils; import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; import java.net.http.HttpClient; diff --git a/airbyte-test-utils/build.gradle b/airbyte-test-utils/build.gradle index 8518cb7a5ade..f6c3dfa412d0 100644 --- a/airbyte-test-utils/build.gradle +++ b/airbyte-test-utils/build.gradle @@ -5,6 +5,7 @@ plugins { dependencies { api project(':airbyte-db:db-lib') implementation project(':airbyte-api') + implementation project(':airbyte-commons-temporal') implementation project(':airbyte-workers') implementation 'io.fabric8:kubernetes-client:5.12.2' diff --git a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java index 271a7d611a64..bbb6c9ddecb5 100644 --- a/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java +++ b/airbyte-test-utils/src/main/java/io/airbyte/test/utils/AirbyteAcceptanceTestHarness.java @@ -57,14 +57,14 @@ import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.temporal.TemporalUtils; +import io.airbyte.commons.temporal.TemporalWorkflowUtils; +import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; +import io.airbyte.commons.temporal.scheduling.state.WorkflowState; import io.airbyte.commons.util.MoreProperties; import io.airbyte.db.Database; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.test.airbyte_test_container.AirbyteTestContainer; -import io.airbyte.workers.temporal.TemporalUtils; -import io.airbyte.workers.temporal.TemporalWorkflowUtils; -import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; -import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import io.temporal.client.WorkflowClient; diff --git a/airbyte-tests/build.gradle b/airbyte-tests/build.gradle index 5931cac20e38..8600ff04a42b 100644 --- a/airbyte-tests/build.gradle +++ b/airbyte-tests/build.gradle @@ -43,6 +43,7 @@ dependencies { acceptanceTestsImplementation project(':airbyte-api') acceptanceTestsImplementation project(':airbyte-commons') + acceptanceTestsImplementation project(':airbyte-commons-temporal') acceptanceTestsImplementation project(':airbyte-config:config-models') acceptanceTestsImplementation project(':airbyte-config:config-persistence') acceptanceTestsImplementation project(':airbyte-db:db-lib') diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 14c14233583a..50717ff53ada 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -73,12 +73,12 @@ import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate; import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.temporal.scheduling.state.WorkflowState; import io.airbyte.db.Database; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.test.utils.AirbyteAcceptanceTestHarness; import io.airbyte.test.utils.PostgreSQLContainerHelper; import io.airbyte.test.utils.SchemaTableNamePair; -import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import java.io.IOException; import java.net.URISyntaxException; import java.sql.SQLException; diff --git a/airbyte-workers/build.gradle b/airbyte-workers/build.gradle index 1e93f058e9b1..4f3303c94df7 100644 --- a/airbyte-workers/build.gradle +++ b/airbyte-workers/build.gradle @@ -42,6 +42,7 @@ dependencies { implementation project(':airbyte-analytics') implementation project(':airbyte-api') implementation project(':airbyte-commons-docker') + implementation project(':airbyte-commons-temporal') implementation project(':airbyte-config:config-models') implementation project(':airbyte-config:config-persistence') implementation project(':airbyte-db:jooq') diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java b/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java index ef2cea7d4ddc..e0de9ccc832a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/ApplicationInitializer.java @@ -6,6 +6,8 @@ import io.airbyte.analytics.Deployment; import io.airbyte.analytics.TrackingClientSingleton; +import io.airbyte.commons.temporal.TemporalJobType; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs.DeploymentMode; import io.airbyte.config.Configs.TrackingStrategy; @@ -22,8 +24,6 @@ import io.airbyte.metrics.lib.MetricEmittingApps; import io.airbyte.persistence.job.JobPersistence; import io.airbyte.workers.process.KubePortManagerSingleton; -import io.airbyte.workers.temporal.TemporalJobType; -import io.airbyte.workers.temporal.TemporalUtils; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl; import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowImpl; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java index a4fbbf66ef50..67093ddd82b7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ActivityBeanFactory.java @@ -4,8 +4,8 @@ package io.airbyte.workers.config; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.workers.exception.WorkerException; -import io.airbyte.workers.temporal.TemporalUtils; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogActivity; import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/TemporalBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/TemporalBeanFactory.java index b4c332373f8e..5ed8168cc8b8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/TemporalBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/TemporalBeanFactory.java @@ -7,6 +7,7 @@ import io.airbyte.analytics.TrackingClient; import io.airbyte.analytics.TrackingClientSingleton; import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.persistence.job.DefaultJobCreator; import io.airbyte.persistence.job.factory.DefaultSyncJobFactory; @@ -14,8 +15,6 @@ import io.airbyte.persistence.job.factory.SyncJobFactory; import io.airbyte.workers.run.TemporalWorkerRunFactory; import io.airbyte.workers.temporal.TemporalClient; -import io.airbyte.workers.temporal.TemporalUtils; -import io.airbyte.workers.temporal.TemporalWorkflowUtils; import io.micronaut.context.annotation.Factory; import io.micronaut.context.annotation.Property; import io.micronaut.context.annotation.Requires; @@ -55,18 +54,6 @@ public SyncJobFactory jobFactory( new OAuthConfigSupplier(configRepository, trackingClient)); } - @Singleton - public WorkflowServiceStubs temporalService(final TemporalUtils temporalUtils) { - return temporalUtils.createTemporalService(); - } - - @Singleton - public WorkflowClient workflowClient( - final TemporalUtils temporalUtils, - final WorkflowServiceStubs temporalService) { - return TemporalWorkflowUtils.createWorkflowClient(temporalService, temporalUtils.getNamespace()); - } - @Singleton @Requires(property = "airbyte.worker.plane", pattern = "(?i)^(?!data_plane).*") diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java index 4c33b816c650..44bcb8a1a400 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/run/TemporalWorkerRunFactory.java @@ -7,6 +7,7 @@ import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.temporal.TemporalJobType; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; import io.airbyte.config.JobResetConnectionConfig; @@ -18,7 +19,6 @@ import io.airbyte.workers.OutputAndStatus; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.temporal.TemporalClient; -import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.TemporalResponse; import java.nio.file.Path; import java.util.UUID; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java index f468ea5275fe..e8190f1e2a79 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/ConnectionManagerUtils.java @@ -4,12 +4,14 @@ package io.airbyte.workers.temporal; +import io.airbyte.commons.temporal.TemporalJobType; +import io.airbyte.commons.temporal.TemporalWorkflowUtils; +import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; +import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; +import io.airbyte.commons.temporal.scheduling.state.WorkflowState; import io.airbyte.workers.temporal.exception.DeletedWorkflowException; import io.airbyte.workers.temporal.exception.UnreachableWorkflowException; -import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowImpl; -import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput; -import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.WorkflowExecutionStatus; import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java index 5f920830afb3..a15b109bbb3c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java @@ -9,6 +9,8 @@ import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.SetWorkflowInAttemptRequestBody; import io.airbyte.commons.functional.CheckedSupplier; +import io.airbyte.commons.temporal.CancellationHandler; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.helpers.LogConfigs; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index feb70822d83a..c35f98700d8a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -8,6 +8,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; +import io.airbyte.commons.temporal.TemporalJobType; +import io.airbyte.commons.temporal.TemporalWorkflowUtils; +import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobDiscoverCatalogConfig; @@ -26,16 +29,15 @@ import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow; import io.airbyte.workers.temporal.exception.DeletedWorkflowException; import io.airbyte.workers.temporal.exception.UnreachableWorkflowException; -import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; import io.airbyte.workers.temporal.spec.SpecWorkflow; import io.airbyte.workers.temporal.sync.SyncWorkflow; import io.micronaut.context.annotation.Requires; import io.temporal.api.common.v1.WorkflowType; import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest; +import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsResponse; import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsRequest; import io.temporal.api.workflowservice.v1.ListOpenWorkflowExecutionsResponse; -import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsRequest; -import io.temporal.api.workflowservice.v1.ListWorkflowExecutionsResponse; import io.temporal.client.WorkflowClient; import io.temporal.serviceclient.WorkflowServiceStubs; import java.io.IOException; @@ -464,18 +466,6 @@ public ManualOperationResult synchronousResetConnection(final UUID connectionId, Optional.of(resetJobId), Optional.empty()); } - public void restartWorkflowByStatus(final WorkflowExecutionStatus executionStatus) { - final Set workflowExecutionInfos = fetchWorkflowsByStatus(executionStatus); - - final Set nonRunningWorkflow = filterOutRunningWorkspaceId(workflowExecutionInfos); - - nonRunningWorkflow.forEach(connectionId -> { - connectionManagerUtils.safeTerminateWorkflow(client, connectionId, "Terminating workflow in " - + "unreachable state before starting a new workflow for this connection"); - connectionManagerUtils.startConnectionManagerNoSignal(client, connectionId); - }); - } - /** * This should be in the class {@li} * @@ -491,17 +481,17 @@ Optional extractConnectionIdFromWorkflowId(final String workflowId) { stringUUID -> UUID.fromString(stringUUID)); } - Set fetchWorkflowsByStatus(final WorkflowExecutionStatus executionStatus) { + Set fetchClosedWorkflowsByStatus(final WorkflowExecutionStatus executionStatus) { ByteString token; - ListWorkflowExecutionsRequest workflowExecutionsRequest = - ListWorkflowExecutionsRequest.newBuilder() + ListClosedWorkflowExecutionsRequest workflowExecutionsRequest = + ListClosedWorkflowExecutionsRequest.newBuilder() .setNamespace(client.getOptions().getNamespace()) .build(); final Set workflowExecutionInfos = new HashSet<>(); do { - final ListWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest = - service.blockingStub().listWorkflowExecutions(workflowExecutionsRequest); + final ListClosedWorkflowExecutionsResponse listOpenWorkflowExecutionsRequest = + service.blockingStub().listClosedWorkflowExecutions(workflowExecutionsRequest); final WorkflowType connectionManagerWorkflowType = WorkflowType.newBuilder().setName(ConnectionManagerWorkflow.class.getSimpleName()).build(); workflowExecutionInfos.addAll(listOpenWorkflowExecutionsRequest.getExecutionsList().stream() .filter(workflowExecutionInfo -> workflowExecutionInfo.getType() == connectionManagerWorkflowType || @@ -511,7 +501,7 @@ Set fetchWorkflowsByStatus(final WorkflowExecutionStatus executionStatus) token = listOpenWorkflowExecutionsRequest.getNextPageToken(); workflowExecutionsRequest = - ListWorkflowExecutionsRequest.newBuilder() + ListClosedWorkflowExecutionsRequest.newBuilder() .setNamespace(client.getOptions().getNamespace()) .setNextPageToken(token) .build(); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index 0053aa0d2fe2..fff3ddb5f6eb 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.commons.functional.CheckedSupplier; +import io.airbyte.commons.temporal.CancellationHandler; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardCheckConnectionInput; @@ -21,7 +22,6 @@ import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.IntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; -import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java index 5d201d95d44b..d10c7b94723f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.commons.functional.CheckedSupplier; +import io.airbyte.commons.temporal.CancellationHandler; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.StandardDiscoverCatalogInput; @@ -22,7 +23,6 @@ import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.IntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; -import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index fd0bf6998d68..3e263bc9472e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -5,6 +5,13 @@ package io.airbyte.workers.temporal.scheduling; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.temporal.TemporalJobType; +import io.airbyte.commons.temporal.TemporalWorkflowUtils; +import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; +import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; +import io.airbyte.commons.temporal.scheduling.state.WorkflowInternalState; +import io.airbyte.commons.temporal.scheduling.state.WorkflowState; +import io.airbyte.commons.temporal.scheduling.state.listener.NoopStateListener; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.FailureReason; @@ -21,8 +28,6 @@ import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.helper.FailureHelper; -import io.airbyte.workers.temporal.TemporalJobType; -import io.airbyte.workers.temporal.TemporalWorkflowUtils; import io.airbyte.workers.temporal.annotations.TemporalActivityStub; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity.CheckConnectionInput; @@ -63,9 +68,6 @@ import io.airbyte.workers.temporal.scheduling.activities.StreamResetActivity; import io.airbyte.workers.temporal.scheduling.activities.StreamResetActivity.DeleteStreamResetRecordsForJobInput; import io.airbyte.workers.temporal.scheduling.activities.WorkflowConfigActivity; -import io.airbyte.workers.temporal.scheduling.state.WorkflowInternalState; -import io.airbyte.workers.temporal.scheduling.state.WorkflowState; -import io.airbyte.workers.temporal.scheduling.state.listener.NoopStateListener; import io.airbyte.workers.temporal.sync.SyncWorkflow; import io.temporal.api.enums.v1.ParentClosePolicy; import io.temporal.failure.ActivityFailure; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 5b41c150828a..5ed4748ec057 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -5,6 +5,7 @@ package io.airbyte.workers.temporal.scheduling.activities; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.temporal.TemporalWorkflowUtils; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; @@ -15,7 +16,6 @@ import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.WorkerConstants; -import io.airbyte.workers.temporal.TemporalWorkflowUtils; import io.airbyte.workers.temporal.exception.RetryableException; import io.micronaut.context.annotation.Requires; import java.util.List; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivity.java index e8eac26d0725..7de9bdd9857d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivity.java @@ -4,9 +4,9 @@ package io.airbyte.workers.temporal.scheduling.activities; +import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.OssMetricsRegistry; -import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput; import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; import java.util.Optional; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java index 1ec253ecacd5..b185a4a85aa8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImpl.java @@ -4,10 +4,10 @@ package io.airbyte.workers.temporal.scheduling.activities; +import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.MetricClient; import io.airbyte.metrics.lib.MetricTags; -import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput; import io.micronaut.context.annotation.Requires; import java.util.ArrayList; import java.util.List; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java index c85be7287610..e57dfd82c88f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -6,6 +6,7 @@ import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.commons.functional.CheckedSupplier; +import io.airbyte.commons.temporal.CancellationHandler; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.JobGetSpecConfig; @@ -18,7 +19,6 @@ import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.IntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; -import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java index c0624496cd81..52ebaa9ce3a9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtLauncherWorker.java @@ -5,12 +5,12 @@ package io.airbyte.workers.temporal.sync; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.OperatorDbtInput; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.ContainerOrchestratorConfig; import io.airbyte.workers.WorkerConfigs; -import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.activity.ActivityExecutionContext; import java.util.Map; import java.util.UUID; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java index f56c5fb566af..b10232c5c73a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/DbtTransformationActivityImpl.java @@ -9,6 +9,8 @@ import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.temporal.CancellationHandler; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.AirbyteConfigValidator; import io.airbyte.config.ConfigSchema; import io.airbyte.config.Configs.WorkerEnvironment; @@ -25,9 +27,7 @@ import io.airbyte.workers.general.DbtTransformationWorker; import io.airbyte.workers.normalization.NormalizationRunnerFactory; import io.airbyte.workers.process.ProcessFactory; -import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; -import io.airbyte.workers.temporal.TemporalUtils; import io.micronaut.context.annotation.Value; import io.temporal.activity.Activity; import io.temporal.activity.ActivityExecutionContext; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java index b9be375f8879..eb07bfbc452d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/LauncherWorker.java @@ -8,6 +8,7 @@ import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.ResourceRequirements; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.ContainerOrchestratorConfig; @@ -19,7 +20,6 @@ import io.airbyte.workers.process.KubePodInfo; import io.airbyte.workers.process.KubePodResourceHelper; import io.airbyte.workers.process.KubeProcessFactory; -import io.airbyte.workers.temporal.TemporalUtils; import io.fabric8.kubernetes.api.model.DeletionPropagation; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.client.KubernetesClientException; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index a233089b30e0..c42d394dbb57 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -9,6 +9,8 @@ import io.airbyte.api.client.model.generated.JobIdRequestBody; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.temporal.CancellationHandler; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.AirbyteConfigValidator; import io.airbyte.config.ConfigSchema; import io.airbyte.config.Configs.WorkerEnvironment; @@ -27,9 +29,7 @@ import io.airbyte.workers.general.DefaultNormalizationWorker; import io.airbyte.workers.normalization.NormalizationRunnerFactory; import io.airbyte.workers.process.ProcessFactory; -import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; -import io.airbyte.workers.temporal.TemporalUtils; import io.micronaut.context.annotation.Value; import io.temporal.activity.Activity; import io.temporal.activity.ActivityExecutionContext; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java index 11daed6de277..62bbaadbdb36 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationLauncherWorker.java @@ -5,13 +5,13 @@ package io.airbyte.workers.temporal.sync; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.ContainerOrchestratorConfig; import io.airbyte.workers.WorkerConfigs; -import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.activity.ActivityExecutionContext; import java.util.Map; import java.util.UUID; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 20e05c14f662..7fec50021204 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -10,6 +10,8 @@ import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.functional.CheckedSupplier; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.temporal.CancellationHandler; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.AirbyteConfigValidator; import io.airbyte.config.ConfigSchema; import io.airbyte.config.Configs.WorkerEnvironment; @@ -42,9 +44,7 @@ import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.IntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; -import io.airbyte.workers.temporal.CancellationHandler; import io.airbyte.workers.temporal.TemporalAttemptExecution; -import io.airbyte.workers.temporal.TemporalUtils; import io.micronaut.context.annotation.Value; import io.temporal.activity.Activity; import io.temporal.activity.ActivityExecutionContext; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java index ec2a1f530d23..c8da41af2bf3 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java @@ -5,13 +5,13 @@ package io.airbyte.workers.temporal.sync; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.ReplicationOutput; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardSyncInput; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.ContainerOrchestratorConfig; -import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.activity.ActivityExecutionContext; import java.util.Map; import java.util.UUID; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RouterService.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RouterService.java index 84017b83e778..aa030f20a8ce 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RouterService.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RouterService.java @@ -5,7 +5,7 @@ package io.airbyte.workers.temporal.sync; import com.google.common.annotations.VisibleForTesting; -import io.airbyte.workers.temporal.TemporalJobType; +import io.airbyte.commons.temporal.TemporalJobType; import io.micronaut.context.annotation.Value; import io.micronaut.core.util.StringUtils; import java.util.Arrays; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java index 70d309886bfc..f88252e5003a 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalAttemptExecutionTest.java @@ -17,6 +17,7 @@ import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.api.client.generated.AttemptApi; import io.airbyte.commons.functional.CheckedSupplier; +import io.airbyte.commons.temporal.CancellationHandler; import io.airbyte.config.Configs; import io.airbyte.db.init.DatabaseInitializationException; import io.airbyte.persistence.job.models.JobRunConfig; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java index 5be8bd629a85..37455ce758b9 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java @@ -11,7 +11,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -23,6 +22,11 @@ import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.temporal.TemporalJobType; +import io.airbyte.commons.temporal.TemporalWorkflowUtils; +import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; +import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow.JobInformation; +import io.airbyte.commons.temporal.scheduling.state.WorkflowState; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.FailureReason; import io.airbyte.config.JobCheckConnectionConfig; @@ -41,9 +45,6 @@ import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow; -import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow; -import io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow.JobInformation; -import io.airbyte.workers.temporal.scheduling.state.WorkflowState; import io.airbyte.workers.temporal.spec.SpecWorkflow; import io.airbyte.workers.temporal.sync.SyncWorkflow; import io.temporal.api.enums.v1.WorkflowExecutionStatus; @@ -61,7 +62,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; -import java.util.Set; import java.util.UUID; import java.util.function.Supplier; import org.junit.jupiter.api.BeforeEach; @@ -741,42 +741,6 @@ void testResetConnectionDeletedWorkflow() throws IOException { } - @Nested - class RestartPerStatus { - - private ConnectionManagerUtils mConnectionManagerUtils; - - @BeforeEach - public void init() throws IOException { - mConnectionManagerUtils = mock(ConnectionManagerUtils.class); - - final Path workspaceRoot = Files.createTempDirectory(Path.of("/tmp"), "temporal_client_test"); - temporalClient = spy( - new TemporalClient(workspaceRoot, workflowClient, workflowServiceStubs, streamResetPersistence, mConnectionManagerUtils, - streamResetRecordsHelper)); - } - - @Test - void testRestartFailed() { - final ConnectionManagerWorkflow mConnectionManagerWorkflow = mock(ConnectionManagerWorkflow.class); - - when(workflowClient.newWorkflowStub(any(), anyString())).thenReturn(mConnectionManagerWorkflow); - final UUID connectionId = UUID.fromString("ebbfdc4c-295b-48a0-844f-88551dfad3db"); - final Set workflowIds = Set.of(connectionId); - - doReturn(workflowIds) - .when(temporalClient).fetchWorkflowsByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); - doReturn(workflowIds) - .when(temporalClient).filterOutRunningWorkspaceId(workflowIds); - mockWorkflowStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); - temporalClient.restartWorkflowByStatus(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED); - verify(mConnectionManagerUtils).safeTerminateWorkflow(eq(workflowClient), eq(connectionId), - anyString()); - verify(mConnectionManagerUtils).startConnectionManagerNoSignal(eq(workflowClient), eq(connectionId)); - } - - } - @Test @DisplayName("Test manual operation on quarantined workflow causes a restart") void testManualOperationOnQuarantinedWorkflow() { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowTest.java index 52d21fe19d22..c4768ed6acf0 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowTest.java @@ -7,7 +7,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import io.airbyte.workers.temporal.TemporalUtils; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.workers.temporal.support.TemporalProxyHelper; import io.micronaut.context.BeanRegistration; import io.micronaut.inject.BeanIdentifier; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index c51e6575c682..37aa51078265 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -8,6 +8,13 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.airbyte.commons.temporal.TemporalJobType; +import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow; +import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; +import io.airbyte.commons.temporal.scheduling.state.WorkflowState; +import io.airbyte.commons.temporal.scheduling.state.listener.TestStateListener; +import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent; +import io.airbyte.commons.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.FailureReason; @@ -19,7 +26,6 @@ import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.workers.WorkerConstants; -import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity; import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity; import io.airbyte.workers.temporal.scheduling.activities.AutoDisableConnectionActivity.AutoDisableConnectionActivityInput; @@ -42,10 +48,6 @@ import io.airbyte.workers.temporal.scheduling.activities.RouteToSyncTaskQueueActivity.RouteToSyncTaskQueueOutput; import io.airbyte.workers.temporal.scheduling.activities.StreamResetActivity; import io.airbyte.workers.temporal.scheduling.activities.WorkflowConfigActivity; -import io.airbyte.workers.temporal.scheduling.state.WorkflowState; -import io.airbyte.workers.temporal.scheduling.state.listener.TestStateListener; -import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.ChangedStateEvent; -import io.airbyte.workers.temporal.scheduling.state.listener.WorkflowStateChangedListener.StateField; import io.airbyte.workers.temporal.scheduling.testsyncworkflow.DbtFailureSyncWorkflow; import io.airbyte.workers.temporal.scheduling.testsyncworkflow.EmptySyncWorkflow; import io.airbyte.workers.temporal.scheduling.testsyncworkflow.NormalizationFailureSyncWorkflow; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImplTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImplTest.java index da8786a54de8..3559b434ed05 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImplTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/RecordMetricActivityImplTest.java @@ -9,11 +9,11 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.airbyte.commons.temporal.scheduling.ConnectionUpdaterInput; import io.airbyte.metrics.lib.MetricAttribute; import io.airbyte.metrics.lib.MetricClient; import io.airbyte.metrics.lib.MetricTags; import io.airbyte.metrics.lib.OssMetricsRegistry; -import io.airbyte.workers.temporal.scheduling.ConnectionUpdaterInput; import io.airbyte.workers.temporal.scheduling.activities.RecordMetricActivity.FailureCause; import io.airbyte.workers.temporal.scheduling.activities.RecordMetricActivity.RecordMetricInput; import java.util.Optional; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/RouterServiceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/RouterServiceTest.java index 9eecf2dcab73..0f9165e99b15 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/RouterServiceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/RouterServiceTest.java @@ -7,7 +7,7 @@ import static io.airbyte.workers.temporal.sync.RouterService.MVP_DATA_PLANE_TASK_QUEUE; import static org.junit.jupiter.api.Assertions.assertEquals; -import io.airbyte.workers.temporal.TemporalJobType; +import io.airbyte.commons.temporal.TemporalJobType; import java.util.UUID; import org.junit.jupiter.api.Test; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java index 1979cb9a879e..fe60a4415ae0 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/sync/SyncWorkflowTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import io.airbyte.commons.temporal.TemporalUtils; import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; import io.airbyte.config.OperatorDbtInput; @@ -28,7 +29,6 @@ import io.airbyte.persistence.job.models.JobRunConfig; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.TestConfigHelpers; -import io.airbyte.workers.temporal.TemporalUtils; import io.airbyte.workers.temporal.support.TemporalProxyHelper; import io.micronaut.context.BeanRegistration; import io.micronaut.inject.BeanIdentifier; diff --git a/docker-compose.yaml b/docker-compose.yaml index 2e88f4a637a8..0f2bd38af1d6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -183,14 +183,27 @@ services: container_name: airbyte-cron restart: unless-stopped environment: + - AIRBYTE_VERSION=${VERSION} + - CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=${CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION:-} + - DATABASE_PASSWORD=${DATABASE_PASSWORD} + - DATABASE_URL=${DATABASE_URL} + - DATABASE_USER=${DATABASE_USER} - DB=postgresql - DB_PORT=${DATABASE_PORT} + - JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=${JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION:-} + - JOB_ERROR_REPORTING_STRATEGY=${JOB_ERROR_REPORTING_STRATEGY} + - JOB_ERROR_REPORTING_SENTRY_DSN=${JOB_ERROR_REPORTING_SENTRY_DSN} + - LOCAL_DOCKER_MOUNT=${LOCAL_DOCKER_MOUNT} + - LOCAL_ROOT=${LOCAL_ROOT} - LOG_LEVEL=${LOG_LEVEL} + - INTERNAL_API_HOST=${INTERNAL_API_HOST} - POSTGRES_PWD=${DATABASE_PASSWORD} - POSTGRES_SEEDS=${DATABASE_HOST} - POSTGRES_USER=${DATABASE_USER} - TEMPORAL_HISTORY_RETENTION_IN_DAYS=${TEMPORAL_HISTORY_RETENTION_IN_DAYS} + - SECRET_PERSISTENCE=${SECRET_PERSISTENCE} - WORKSPACE_ROOT=${WORKSPACE_ROOT} + - MICRONAUT_ENVIRONMENTS=${WORKERS_MICRONAUT_ENVIRONMENTS} volumes: - workspace:${WORKSPACE_ROOT} volumes: diff --git a/settings.gradle b/settings.gradle index 09a5cbf56b82..4206edff4c21 100644 --- a/settings.gradle +++ b/settings.gradle @@ -76,6 +76,7 @@ include ':airbyte-test-utils' // airbyte-workers has a lot of dependencies. include ':airbyte-workers' // reused by acceptance tests in connector base. include ':airbyte-analytics' // transitively used by airbyte-workers. +include ':airbyte-commons-temporal' include ':airbyte-config:config-persistence' // transitively used by airbyte-workers. include ':airbyte-persistence:job-persistence' // transitively used by airbyte-workers. include ':airbyte-db:jooq' // transitively used by airbyte-workers. @@ -88,13 +89,13 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD" include ':airbyte-config:init' include ':airbyte-config:specs' include ':airbyte-container-orchestrator' + include ':airbyte-cron' include ':airbyte-metrics:reporter' include ':airbyte-server' include ':airbyte-temporal' include ':airbyte-tests' include ':airbyte-webapp' include ':airbyte-webapp-e2e-tests' - include ':airbyte-cron' } // connectors base @@ -149,3 +150,4 @@ if (!System.getenv().containsKey("SUB_BUILD") || System.getenv().get("SUB_BUILD" } } } +