diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index c7cf3c100ec5..2f71f4f57c2f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -15,12 +15,13 @@ import io.airbyte.workers.process.KubeProcessFactory; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.process.WorkerHeartbeatServer; -import io.airbyte.workers.temporal.CheckConnectionWorkflow; import io.airbyte.workers.temporal.DiscoverCatalogWorkflow; import io.airbyte.workers.temporal.SpecWorkflow; import io.airbyte.workers.temporal.SyncWorkflow; import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.TemporalUtils; +import io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl; +import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import io.kubernetes.client.openapi.ApiClient; @@ -88,9 +89,9 @@ public void start() { final Worker checkConnectionWorker = factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers())); - checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflow.WorkflowImpl.class); + checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflowImpl.class); checkConnectionWorker - .registerActivitiesImplementations(new CheckConnectionWorkflow.CheckConnectionActivityImpl(processFactory, secretsHydrator, workspaceRoot)); + .registerActivitiesImplementations(new CheckConnectionActivityImpl(processFactory, secretsHydrator, workspaceRoot)); final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers())); discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflow.WorkflowImpl.class); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CheckConnectionWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CheckConnectionWorkflow.java deleted file mode 100644 index dd6c1ef6f32a..000000000000 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CheckConnectionWorkflow.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.temporal; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.functional.CheckedSupplier; -import io.airbyte.config.StandardCheckConnectionInput; -import io.airbyte.config.StandardCheckConnectionOutput; -import io.airbyte.config.persistence.split_secrets.SecretsHydrator; -import io.airbyte.scheduler.models.IntegrationLauncherConfig; -import io.airbyte.scheduler.models.JobRunConfig; -import io.airbyte.workers.DefaultCheckConnectionWorker; -import io.airbyte.workers.Worker; -import io.airbyte.workers.WorkerUtils; -import io.airbyte.workers.process.AirbyteIntegrationLauncher; -import io.airbyte.workers.process.IntegrationLauncher; -import io.airbyte.workers.process.ProcessFactory; -import io.temporal.activity.ActivityInterface; -import io.temporal.activity.ActivityMethod; -import io.temporal.activity.ActivityOptions; -import io.temporal.workflow.Workflow; -import io.temporal.workflow.WorkflowInterface; -import io.temporal.workflow.WorkflowMethod; -import java.nio.file.Path; -import java.time.Duration; -import java.util.function.Supplier; - -@WorkflowInterface -public interface CheckConnectionWorkflow { - - @WorkflowMethod - StandardCheckConnectionOutput run(JobRunConfig jobRunConfig, - IntegrationLauncherConfig launcherConfig, - StandardCheckConnectionInput connectionConfiguration); - - class WorkflowImpl implements CheckConnectionWorkflow { - - final ActivityOptions options = ActivityOptions.newBuilder() - .setScheduleToCloseTimeout(Duration.ofHours(1)) - .setRetryOptions(TemporalUtils.NO_RETRY) - .build(); - private final CheckConnectionActivity activity = Workflow.newActivityStub(CheckConnectionActivity.class, options); - - @Override - public StandardCheckConnectionOutput run(final JobRunConfig jobRunConfig, - final IntegrationLauncherConfig launcherConfig, - final StandardCheckConnectionInput connectionConfiguration) { - return activity.run(jobRunConfig, launcherConfig, connectionConfiguration); - } - - } - - @ActivityInterface - interface CheckConnectionActivity { - - @ActivityMethod - StandardCheckConnectionOutput run(JobRunConfig jobRunConfig, - IntegrationLauncherConfig launcherConfig, - StandardCheckConnectionInput connectionConfiguration); - - } - - class CheckConnectionActivityImpl implements CheckConnectionActivity { - - private final ProcessFactory processFactory; - private final SecretsHydrator secretsHydrator; - private final Path workspaceRoot; - - public CheckConnectionActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot) { - this.processFactory = processFactory; - this.secretsHydrator = secretsHydrator; - this.workspaceRoot = workspaceRoot; - } - - public StandardCheckConnectionOutput run(final JobRunConfig jobRunConfig, - final IntegrationLauncherConfig launcherConfig, - final StandardCheckConnectionInput connectionConfiguration) { - - final JsonNode fullConfig = secretsHydrator.hydrate(connectionConfiguration.getConnectionConfiguration()); - - final StandardCheckConnectionInput input = new StandardCheckConnectionInput() - .withConnectionConfiguration(fullConfig); - - final Supplier inputSupplier = () -> input; - - final TemporalAttemptExecution temporalAttemptExecution = - new TemporalAttemptExecution<>( - workspaceRoot, - jobRunConfig, - getWorkerFactory(launcherConfig), - inputSupplier, - new CancellationHandler.TemporalCancellationHandler()); - - return temporalAttemptExecution.get(); - } - - private CheckedSupplier, Exception> getWorkerFactory( - final IntegrationLauncherConfig launcherConfig) { - return () -> { - final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher( - launcherConfig.getJobId(), - Math.toIntExact(launcherConfig.getAttemptId()), - launcherConfig.getDockerImage(), - processFactory, - WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS); - - return new DefaultCheckConnectionWorker(integrationLauncher); - }; - } - - } - -} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index 94510d187799..f0ceb91ef391 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -19,6 +19,7 @@ import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.WorkerUtils; +import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; import io.temporal.client.WorkflowClient; import java.nio.file.Path; import java.util.UUID; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java new file mode 100644 index 000000000000..131372124ccc --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.check.connection; + +import io.airbyte.config.StandardCheckConnectionInput; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; + +@ActivityInterface +interface CheckConnectionActivity { + + @ActivityMethod + StandardCheckConnectionOutput run(JobRunConfig jobRunConfig, + IntegrationLauncherConfig launcherConfig, + StandardCheckConnectionInput connectionConfiguration); + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java new file mode 100644 index 000000000000..438b2faad77b --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.check.connection; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.functional.CheckedSupplier; +import io.airbyte.config.StandardCheckConnectionInput; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.config.persistence.split_secrets.SecretsHydrator; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.DefaultCheckConnectionWorker; +import io.airbyte.workers.Worker; +import io.airbyte.workers.WorkerUtils; +import io.airbyte.workers.process.AirbyteIntegrationLauncher; +import io.airbyte.workers.process.IntegrationLauncher; +import io.airbyte.workers.process.ProcessFactory; +import io.airbyte.workers.temporal.CancellationHandler; +import io.airbyte.workers.temporal.TemporalAttemptExecution; +import java.nio.file.Path; +import java.util.function.Supplier; + +public class CheckConnectionActivityImpl implements CheckConnectionActivity { + + private final ProcessFactory processFactory; + private final SecretsHydrator secretsHydrator; + private final Path workspaceRoot; + + public CheckConnectionActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot) { + this.processFactory = processFactory; + this.secretsHydrator = secretsHydrator; + this.workspaceRoot = workspaceRoot; + } + + public StandardCheckConnectionOutput run(final JobRunConfig jobRunConfig, + final IntegrationLauncherConfig launcherConfig, + final StandardCheckConnectionInput connectionConfiguration) { + + final JsonNode fullConfig = secretsHydrator.hydrate(connectionConfiguration.getConnectionConfiguration()); + + final StandardCheckConnectionInput input = new StandardCheckConnectionInput() + .withConnectionConfiguration(fullConfig); + + final Supplier inputSupplier = () -> input; + + final TemporalAttemptExecution temporalAttemptExecution = + new TemporalAttemptExecution<>( + workspaceRoot, + jobRunConfig, + getWorkerFactory(launcherConfig), + inputSupplier, + new CancellationHandler.TemporalCancellationHandler()); + + return temporalAttemptExecution.get(); + } + + private CheckedSupplier, Exception> getWorkerFactory( + final IntegrationLauncherConfig launcherConfig) { + return () -> { + final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher( + launcherConfig.getJobId(), + Math.toIntExact(launcherConfig.getAttemptId()), + launcherConfig.getDockerImage(), + processFactory, + WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS); + + return new DefaultCheckConnectionWorker(integrationLauncher); + }; + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java new file mode 100644 index 000000000000..12968e632920 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.check.connection; + +import io.airbyte.config.StandardCheckConnectionInput; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface CheckConnectionWorkflow { + + @WorkflowMethod + StandardCheckConnectionOutput run(JobRunConfig jobRunConfig, + IntegrationLauncherConfig launcherConfig, + StandardCheckConnectionInput connectionConfiguration); + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java new file mode 100644 index 000000000000..0ffb08059192 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.check.connection; + +import io.airbyte.config.StandardCheckConnectionInput; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.temporal.TemporalUtils; +import io.temporal.activity.ActivityOptions; +import io.temporal.workflow.Workflow; +import java.time.Duration; + +public class CheckConnectionWorkflowImpl implements CheckConnectionWorkflow { + + final ActivityOptions options = ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofHours(1)) + .setRetryOptions(TemporalUtils.NO_RETRY) + .build(); + private final CheckConnectionActivity activity = Workflow.newActivityStub(CheckConnectionActivity.class, options); + + @Override + public StandardCheckConnectionOutput run(final JobRunConfig jobRunConfig, + final IntegrationLauncherConfig launcherConfig, + final StandardCheckConnectionInput connectionConfiguration) { + return activity.run(jobRunConfig, launcherConfig, connectionConfiguration); + } + +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java index 213649b422d4..4bee28e61941 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java @@ -24,6 +24,7 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; import io.temporal.client.WorkflowClient; import java.io.IOException; import java.nio.file.Files;