From e5676f3b72622bb3124735234be18fd046836596 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 22 Oct 2021 16:05:37 -0700 Subject: [PATCH 1/2] Split the check connection workflow into multiple files. --- .../java/io/airbyte/workers/WorkerApp.java | 7 +- .../temporal/CheckConnectionWorkflow.java | 115 ------------------ .../workers/temporal/TemporalClient.java | 4 +- .../connection/CheckConnectionActivity.java | 18 +++ .../CheckConnectionActivityImpl.java | 69 +++++++++++ .../connection/CheckConnectionWorkflow.java | 17 +++ .../CheckConnectionWorkflowImpl.java | 26 ++++ .../workers/temporal/TemporalClientTest.java | 1 + 8 files changed, 137 insertions(+), 120 deletions(-) delete mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/CheckConnectionWorkflow.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index c7cf3c100ec5..2f71f4f57c2f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -15,12 +15,13 @@ import io.airbyte.workers.process.KubeProcessFactory; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.process.WorkerHeartbeatServer; -import io.airbyte.workers.temporal.CheckConnectionWorkflow; import io.airbyte.workers.temporal.DiscoverCatalogWorkflow; import io.airbyte.workers.temporal.SpecWorkflow; import io.airbyte.workers.temporal.SyncWorkflow; import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.TemporalUtils; +import io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl; +import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import io.kubernetes.client.openapi.ApiClient; @@ -88,9 +89,9 @@ public void start() { final Worker checkConnectionWorker = factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers())); - checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflow.WorkflowImpl.class); + checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflowImpl.class); checkConnectionWorker - .registerActivitiesImplementations(new CheckConnectionWorkflow.CheckConnectionActivityImpl(processFactory, secretsHydrator, workspaceRoot)); + .registerActivitiesImplementations(new CheckConnectionActivityImpl(processFactory, secretsHydrator, workspaceRoot)); final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers())); discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflow.WorkflowImpl.class); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CheckConnectionWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CheckConnectionWorkflow.java deleted file mode 100644 index dd6c1ef6f32a..000000000000 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/CheckConnectionWorkflow.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.temporal; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.functional.CheckedSupplier; -import io.airbyte.config.StandardCheckConnectionInput; -import io.airbyte.config.StandardCheckConnectionOutput; -import io.airbyte.config.persistence.split_secrets.SecretsHydrator; -import io.airbyte.scheduler.models.IntegrationLauncherConfig; -import io.airbyte.scheduler.models.JobRunConfig; -import io.airbyte.workers.DefaultCheckConnectionWorker; -import io.airbyte.workers.Worker; -import io.airbyte.workers.WorkerUtils; -import io.airbyte.workers.process.AirbyteIntegrationLauncher; -import io.airbyte.workers.process.IntegrationLauncher; -import io.airbyte.workers.process.ProcessFactory; -import io.temporal.activity.ActivityInterface; -import io.temporal.activity.ActivityMethod; -import io.temporal.activity.ActivityOptions; -import io.temporal.workflow.Workflow; -import io.temporal.workflow.WorkflowInterface; -import io.temporal.workflow.WorkflowMethod; -import java.nio.file.Path; -import java.time.Duration; -import java.util.function.Supplier; - -@WorkflowInterface -public interface CheckConnectionWorkflow { - - @WorkflowMethod - StandardCheckConnectionOutput run(JobRunConfig jobRunConfig, - IntegrationLauncherConfig launcherConfig, - StandardCheckConnectionInput connectionConfiguration); - - class WorkflowImpl implements CheckConnectionWorkflow { - - final ActivityOptions options = ActivityOptions.newBuilder() - .setScheduleToCloseTimeout(Duration.ofHours(1)) - .setRetryOptions(TemporalUtils.NO_RETRY) - .build(); - private final CheckConnectionActivity activity = Workflow.newActivityStub(CheckConnectionActivity.class, options); - - @Override - public StandardCheckConnectionOutput run(final JobRunConfig jobRunConfig, - final IntegrationLauncherConfig launcherConfig, - final StandardCheckConnectionInput connectionConfiguration) { - return activity.run(jobRunConfig, launcherConfig, connectionConfiguration); - } - - } - - @ActivityInterface - interface CheckConnectionActivity { - - @ActivityMethod - StandardCheckConnectionOutput run(JobRunConfig jobRunConfig, - IntegrationLauncherConfig launcherConfig, - StandardCheckConnectionInput connectionConfiguration); - - } - - class CheckConnectionActivityImpl implements CheckConnectionActivity { - - private final ProcessFactory processFactory; - private final SecretsHydrator secretsHydrator; - private final Path workspaceRoot; - - public CheckConnectionActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot) { - this.processFactory = processFactory; - this.secretsHydrator = secretsHydrator; - this.workspaceRoot = workspaceRoot; - } - - public StandardCheckConnectionOutput run(final JobRunConfig jobRunConfig, - final IntegrationLauncherConfig launcherConfig, - final StandardCheckConnectionInput connectionConfiguration) { - - final JsonNode fullConfig = secretsHydrator.hydrate(connectionConfiguration.getConnectionConfiguration()); - - final StandardCheckConnectionInput input = new StandardCheckConnectionInput() - .withConnectionConfiguration(fullConfig); - - final Supplier inputSupplier = () -> input; - - final TemporalAttemptExecution temporalAttemptExecution = - new TemporalAttemptExecution<>( - workspaceRoot, - jobRunConfig, - getWorkerFactory(launcherConfig), - inputSupplier, - new CancellationHandler.TemporalCancellationHandler()); - - return temporalAttemptExecution.get(); - } - - private CheckedSupplier, Exception> getWorkerFactory( - final IntegrationLauncherConfig launcherConfig) { - return () -> { - final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher( - launcherConfig.getJobId(), - Math.toIntExact(launcherConfig.getAttemptId()), - launcherConfig.getDockerImage(), - processFactory, - WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS); - - return new DefaultCheckConnectionWorker(integrationLauncher); - }; - } - - } - -} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index 94510d187799..03655a8ea563 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -19,6 +19,7 @@ import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.WorkerUtils; +import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; import io.temporal.client.WorkflowClient; import java.nio.file.Path; import java.util.UUID; @@ -114,8 +115,7 @@ private T getWorkflowStub(final Class workflowClass, final TemporalJobTyp return client.newWorkflowStub(workflowClass, TemporalUtils.getWorkflowOptions(jobType)); } - @VisibleForTesting - TemporalResponse execute(final JobRunConfig jobRunConfig, final Supplier executor) { + @VisibleForTesting TemporalResponse execute(final JobRunConfig jobRunConfig, final Supplier executor) { final Path jobRoot = WorkerUtils.getJobRoot(workspaceRoot, jobRunConfig); final Path logPath = WorkerUtils.getLogPath(jobRoot); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java new file mode 100644 index 000000000000..f62f95100d91 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java @@ -0,0 +1,18 @@ +package io.airbyte.workers.temporal.check.connection; + +import io.airbyte.config.StandardCheckConnectionInput; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; + +@ActivityInterface +interface CheckConnectionActivity { + + @ActivityMethod + StandardCheckConnectionOutput run(JobRunConfig jobRunConfig, + IntegrationLauncherConfig launcherConfig, + StandardCheckConnectionInput connectionConfiguration); + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java new file mode 100644 index 000000000000..734b54e191bb --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -0,0 +1,69 @@ +package io.airbyte.workers.temporal.check.connection; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.functional.CheckedSupplier; +import io.airbyte.config.StandardCheckConnectionInput; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.config.persistence.split_secrets.SecretsHydrator; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.DefaultCheckConnectionWorker; +import io.airbyte.workers.Worker; +import io.airbyte.workers.WorkerUtils; +import io.airbyte.workers.process.AirbyteIntegrationLauncher; +import io.airbyte.workers.process.IntegrationLauncher; +import io.airbyte.workers.process.ProcessFactory; +import io.airbyte.workers.temporal.CancellationHandler; +import io.airbyte.workers.temporal.TemporalAttemptExecution; +import java.nio.file.Path; +import java.util.function.Supplier; + +public class CheckConnectionActivityImpl implements CheckConnectionActivity { + + private final ProcessFactory processFactory; + private final SecretsHydrator secretsHydrator; + private final Path workspaceRoot; + + public CheckConnectionActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot) { + this.processFactory = processFactory; + this.secretsHydrator = secretsHydrator; + this.workspaceRoot = workspaceRoot; + } + + public StandardCheckConnectionOutput run(final JobRunConfig jobRunConfig, + final IntegrationLauncherConfig launcherConfig, + final StandardCheckConnectionInput connectionConfiguration) { + + final JsonNode fullConfig = secretsHydrator.hydrate(connectionConfiguration.getConnectionConfiguration()); + + final StandardCheckConnectionInput input = new StandardCheckConnectionInput() + .withConnectionConfiguration(fullConfig); + + final Supplier inputSupplier = () -> input; + + final TemporalAttemptExecution temporalAttemptExecution = + new TemporalAttemptExecution<>( + workspaceRoot, + jobRunConfig, + getWorkerFactory(launcherConfig), + inputSupplier, + new CancellationHandler.TemporalCancellationHandler()); + + return temporalAttemptExecution.get(); + } + + private CheckedSupplier, Exception> getWorkerFactory( + final IntegrationLauncherConfig launcherConfig) { + return () -> { + final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher( + launcherConfig.getJobId(), + Math.toIntExact(launcherConfig.getAttemptId()), + launcherConfig.getDockerImage(), + processFactory, + WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS); + + return new DefaultCheckConnectionWorker(integrationLauncher); + }; + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java new file mode 100644 index 000000000000..4dfd291bdb79 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java @@ -0,0 +1,17 @@ +package io.airbyte.workers.temporal.check.connection; + +import io.airbyte.config.StandardCheckConnectionInput; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface CheckConnectionWorkflow { + + @WorkflowMethod + StandardCheckConnectionOutput run(JobRunConfig jobRunConfig, + IntegrationLauncherConfig launcherConfig, + StandardCheckConnectionInput connectionConfiguration); +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java new file mode 100644 index 000000000000..6dc3c9d0afad --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java @@ -0,0 +1,26 @@ +package io.airbyte.workers.temporal.check.connection; + +import io.airbyte.config.StandardCheckConnectionInput; +import io.airbyte.config.StandardCheckConnectionOutput; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.temporal.TemporalUtils; +import io.temporal.activity.ActivityOptions; +import io.temporal.workflow.Workflow; +import java.time.Duration; + +public class CheckConnectionWorkflowImpl implements CheckConnectionWorkflow { + + final ActivityOptions options = ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofHours(1)) + .setRetryOptions(TemporalUtils.NO_RETRY) + .build(); + private final CheckConnectionActivity activity = Workflow.newActivityStub(CheckConnectionActivity.class, options); + + @Override + public StandardCheckConnectionOutput run(final JobRunConfig jobRunConfig, + final IntegrationLauncherConfig launcherConfig, + final StandardCheckConnectionInput connectionConfiguration) { + return activity.run(jobRunConfig, launcherConfig, connectionConfiguration); + } +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java index 213649b422d4..4bee28e61941 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java @@ -24,6 +24,7 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; import io.temporal.client.WorkflowClient; import java.io.IOException; import java.nio.file.Files; From a68873e1f6e7abd02035351a41ebb8f23f2d030c Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 25 Oct 2021 10:05:12 -0700 Subject: [PATCH 2/2] Format --- .../java/io/airbyte/workers/temporal/TemporalClient.java | 3 ++- .../temporal/check/connection/CheckConnectionActivity.java | 4 ++++ .../check/connection/CheckConnectionActivityImpl.java | 6 +++++- .../temporal/check/connection/CheckConnectionWorkflow.java | 5 +++++ .../check/connection/CheckConnectionWorkflowImpl.java | 5 +++++ 5 files changed, 21 insertions(+), 2 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index 03655a8ea563..f0ceb91ef391 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -115,7 +115,8 @@ private T getWorkflowStub(final Class workflowClass, final TemporalJobTyp return client.newWorkflowStub(workflowClass, TemporalUtils.getWorkflowOptions(jobType)); } - @VisibleForTesting TemporalResponse execute(final JobRunConfig jobRunConfig, final Supplier executor) { + @VisibleForTesting + TemporalResponse execute(final JobRunConfig jobRunConfig, final Supplier executor) { final Path jobRoot = WorkerUtils.getJobRoot(workspaceRoot, jobRunConfig); final Path logPath = WorkerUtils.getLogPath(jobRoot); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java index f62f95100d91..131372124ccc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivity.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.workers.temporal.check.connection; import io.airbyte.config.StandardCheckConnectionInput; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java index 734b54e191bb..438b2faad77b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionActivityImpl.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.workers.temporal.check.connection; import com.fasterxml.jackson.databind.JsonNode; @@ -53,7 +57,7 @@ public StandardCheckConnectionOutput run(final JobRunConfig jobRunConfig, } private CheckedSupplier, Exception> getWorkerFactory( - final IntegrationLauncherConfig launcherConfig) { + final IntegrationLauncherConfig launcherConfig) { return () -> { final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher( launcherConfig.getJobId(), diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java index 4dfd291bdb79..12968e632920 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflow.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.workers.temporal.check.connection; import io.airbyte.config.StandardCheckConnectionInput; @@ -14,4 +18,5 @@ public interface CheckConnectionWorkflow { StandardCheckConnectionOutput run(JobRunConfig jobRunConfig, IntegrationLauncherConfig launcherConfig, StandardCheckConnectionInput connectionConfiguration); + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java index 6dc3c9d0afad..0ffb08059192 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/check/connection/CheckConnectionWorkflowImpl.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.workers.temporal.check.connection; import io.airbyte.config.StandardCheckConnectionInput; @@ -23,4 +27,5 @@ public StandardCheckConnectionOutput run(final JobRunConfig jobRunConfig, final StandardCheckConnectionInput connectionConfiguration) { return activity.run(jobRunConfig, launcherConfig, connectionConfiguration); } + }