From 829d60a07f8c0e4c2b2762bf724efee729dfaed3 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 25 Oct 2021 13:45:01 -0700 Subject: [PATCH] refactor spec workflow (#7354) --- .../java/io/airbyte/workers/WorkerApp.java | 7 +- .../workers/temporal/SpecWorkflow.java | 95 ------------------- .../workers/temporal/TemporalClient.java | 1 + .../workers/temporal/spec/SpecActivity.java | 19 ++++ .../temporal/spec/SpecActivityImpl.java | 59 ++++++++++++ .../workers/temporal/spec/SpecWorkflow.java | 19 ++++ .../temporal/spec/SpecWorkflowImpl.java | 28 ++++++ .../workers/temporal/TemporalClientTest.java | 1 + 8 files changed, 131 insertions(+), 98 deletions(-) delete mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/SpecWorkflow.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivity.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflow.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index cdc5d5d031af..62bbeca9b46a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -15,7 +15,6 @@ import io.airbyte.workers.process.KubeProcessFactory; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.process.WorkerHeartbeatServer; -import io.airbyte.workers.temporal.SpecWorkflow; import io.airbyte.workers.temporal.SyncWorkflow; import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.TemporalUtils; @@ -23,6 +22,8 @@ import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogActivityImpl; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl; +import io.airbyte.workers.temporal.spec.SpecActivityImpl; +import io.airbyte.workers.temporal.spec.SpecWorkflowImpl; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import io.kubernetes.client.openapi.ApiClient; @@ -85,8 +86,8 @@ public void start() { final WorkerFactory factory = WorkerFactory.newInstance(WorkflowClient.newInstance(temporalService)); final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name(), getWorkerOptions(maxWorkers.getMaxSpecWorkers())); - specWorker.registerWorkflowImplementationTypes(SpecWorkflow.WorkflowImpl.class); - specWorker.registerActivitiesImplementations(new SpecWorkflow.SpecActivityImpl(processFactory, workspaceRoot)); + specWorker.registerWorkflowImplementationTypes(SpecWorkflowImpl.class); + specWorker.registerActivitiesImplementations(new SpecActivityImpl(processFactory, workspaceRoot)); final Worker checkConnectionWorker = factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers())); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SpecWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SpecWorkflow.java deleted file mode 100644 index 669d7da6fc57..000000000000 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SpecWorkflow.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.temporal; - -import io.airbyte.commons.functional.CheckedSupplier; -import io.airbyte.config.JobGetSpecConfig; -import io.airbyte.protocol.models.ConnectorSpecification; -import io.airbyte.scheduler.models.IntegrationLauncherConfig; -import io.airbyte.scheduler.models.JobRunConfig; -import io.airbyte.workers.DefaultGetSpecWorker; -import io.airbyte.workers.Worker; -import io.airbyte.workers.WorkerUtils; -import io.airbyte.workers.process.AirbyteIntegrationLauncher; -import io.airbyte.workers.process.IntegrationLauncher; -import io.airbyte.workers.process.ProcessFactory; -import io.temporal.activity.ActivityInterface; -import io.temporal.activity.ActivityMethod; -import io.temporal.activity.ActivityOptions; -import io.temporal.workflow.Workflow; -import io.temporal.workflow.WorkflowInterface; -import io.temporal.workflow.WorkflowMethod; -import java.nio.file.Path; -import java.time.Duration; -import java.util.function.Supplier; - -@WorkflowInterface -public interface SpecWorkflow { - - @WorkflowMethod - ConnectorSpecification run(JobRunConfig jobRunConfig, IntegrationLauncherConfig launcherConfig); - - class WorkflowImpl implements SpecWorkflow { - - final ActivityOptions options = ActivityOptions.newBuilder() - .setScheduleToCloseTimeout(Duration.ofHours(1)) - .setRetryOptions(TemporalUtils.NO_RETRY) - .build(); - private final SpecActivity activity = Workflow.newActivityStub(SpecActivity.class, options); - - @Override - public ConnectorSpecification run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) { - return activity.run(jobRunConfig, launcherConfig); - } - - } - - @ActivityInterface - interface SpecActivity { - - @ActivityMethod - ConnectorSpecification run(JobRunConfig jobRunConfig, IntegrationLauncherConfig launcherConfig); - - } - - class SpecActivityImpl implements SpecActivity { - - private final ProcessFactory processFactory; - private final Path workspaceRoot; - - public SpecActivityImpl(final ProcessFactory processFactory, final Path workspaceRoot) { - this.processFactory = processFactory; - this.workspaceRoot = workspaceRoot; - } - - public ConnectorSpecification run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) { - final Supplier inputSupplier = () -> new JobGetSpecConfig().withDockerImage(launcherConfig.getDockerImage()); - - final TemporalAttemptExecution temporalAttemptExecution = new TemporalAttemptExecution<>( - workspaceRoot, - jobRunConfig, - getWorkerFactory(launcherConfig), - inputSupplier, - new CancellationHandler.TemporalCancellationHandler()); - - return temporalAttemptExecution.get(); - } - - private CheckedSupplier, Exception> getWorkerFactory(final IntegrationLauncherConfig launcherConfig) { - return () -> { - final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher( - launcherConfig.getJobId(), - launcherConfig.getAttemptId().intValue(), - launcherConfig.getDockerImage(), - processFactory, - WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS); - - return new DefaultGetSpecWorker(integrationLauncher); - }; - } - - } - -} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index 67ad35b5cb0e..671969f6116d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -21,6 +21,7 @@ import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow; +import io.airbyte.workers.temporal.spec.SpecWorkflow; import io.temporal.client.WorkflowClient; import java.nio.file.Path; import java.util.UUID; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivity.java new file mode 100644 index 000000000000..4b8443f802d0 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivity.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.spec; + +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; + +@ActivityInterface +public interface SpecActivity { + + @ActivityMethod + ConnectorSpecification run(JobRunConfig jobRunConfig, IntegrationLauncherConfig launcherConfig); + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java new file mode 100644 index 000000000000..2cae77228900 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecActivityImpl.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.spec; + +import io.airbyte.commons.functional.CheckedSupplier; +import io.airbyte.config.JobGetSpecConfig; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.DefaultGetSpecWorker; +import io.airbyte.workers.Worker; +import io.airbyte.workers.WorkerUtils; +import io.airbyte.workers.process.AirbyteIntegrationLauncher; +import io.airbyte.workers.process.IntegrationLauncher; +import io.airbyte.workers.process.ProcessFactory; +import io.airbyte.workers.temporal.CancellationHandler; +import io.airbyte.workers.temporal.TemporalAttemptExecution; +import java.nio.file.Path; +import java.util.function.Supplier; + +public class SpecActivityImpl implements SpecActivity { + + private final ProcessFactory processFactory; + private final Path workspaceRoot; + + public SpecActivityImpl(final ProcessFactory processFactory, final Path workspaceRoot) { + this.processFactory = processFactory; + this.workspaceRoot = workspaceRoot; + } + + public ConnectorSpecification run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) { + final Supplier inputSupplier = () -> new JobGetSpecConfig().withDockerImage(launcherConfig.getDockerImage()); + + final TemporalAttemptExecution temporalAttemptExecution = new TemporalAttemptExecution<>( + workspaceRoot, + jobRunConfig, + getWorkerFactory(launcherConfig), + inputSupplier, + new CancellationHandler.TemporalCancellationHandler()); + + return temporalAttemptExecution.get(); + } + + private CheckedSupplier, Exception> getWorkerFactory(final IntegrationLauncherConfig launcherConfig) { + return () -> { + final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher( + launcherConfig.getJobId(), + launcherConfig.getAttemptId().intValue(), + launcherConfig.getDockerImage(), + processFactory, + WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS); + + return new DefaultGetSpecWorker(integrationLauncher); + }; + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflow.java new file mode 100644 index 000000000000..cf86c66f6220 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflow.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.spec; + +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface SpecWorkflow { + + @WorkflowMethod + ConnectorSpecification run(JobRunConfig jobRunConfig, IntegrationLauncherConfig launcherConfig); + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java new file mode 100644 index 000000000000..181a53913f18 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/spec/SpecWorkflowImpl.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.spec; + +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.temporal.TemporalUtils; +import io.temporal.activity.ActivityOptions; +import io.temporal.workflow.Workflow; +import java.time.Duration; + +public class SpecWorkflowImpl implements SpecWorkflow { + + final ActivityOptions options = ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofHours(1)) + .setRetryOptions(TemporalUtils.NO_RETRY) + .build(); + private final SpecActivity activity = Workflow.newActivityStub(SpecActivity.class, options); + + @Override + public ConnectorSpecification run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) { + return activity.run(jobRunConfig, launcherConfig); + } + +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java index a06127b51dca..a560698d951a 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java @@ -26,6 +26,7 @@ import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow; +import io.airbyte.workers.temporal.spec.SpecWorkflow; import io.temporal.client.WorkflowClient; import java.io.IOException; import java.nio.file.Files;