From c689e17e80101b81735483a5c5985a027296d409 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 25 Oct 2021 13:29:10 -0700 Subject: [PATCH] refactor the catalog discovery workflow (#7350) * refactor the catalog discovery workflow * Format --- .../java/io/airbyte/workers/WorkerApp.java | 7 +- .../temporal/DiscoverCatalogWorkflow.java | 112 ------------------ .../workers/temporal/TemporalClient.java | 1 + .../catalog/DiscoverCatalogActivity.java | 22 ++++ .../catalog/DiscoverCatalogActivityImpl.java | 70 +++++++++++ .../catalog/DiscoverCatalogWorkflow.java | 22 ++++ .../catalog/DiscoverCatalogWorkflowImpl.java | 31 +++++ .../workers/temporal/TemporalClientTest.java | 1 + 8 files changed, 151 insertions(+), 115 deletions(-) delete mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/DiscoverCatalogWorkflow.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivity.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflow.java create mode 100644 airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index 2f71f4f57c2f..cdc5d5d031af 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -15,13 +15,14 @@ import io.airbyte.workers.process.KubeProcessFactory; import io.airbyte.workers.process.ProcessFactory; import io.airbyte.workers.process.WorkerHeartbeatServer; -import io.airbyte.workers.temporal.DiscoverCatalogWorkflow; import io.airbyte.workers.temporal.SpecWorkflow; import io.airbyte.workers.temporal.SyncWorkflow; import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.TemporalUtils; import io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl; +import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogActivityImpl; +import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl; import io.fabric8.kubernetes.client.DefaultKubernetesClient; import io.fabric8.kubernetes.client.KubernetesClient; import io.kubernetes.client.openapi.ApiClient; @@ -94,9 +95,9 @@ public void start() { .registerActivitiesImplementations(new CheckConnectionActivityImpl(processFactory, secretsHydrator, workspaceRoot)); final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers())); - discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflow.WorkflowImpl.class); + discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflowImpl.class); discoverWorker - .registerActivitiesImplementations(new DiscoverCatalogWorkflow.DiscoverCatalogActivityImpl(processFactory, secretsHydrator, workspaceRoot)); + .registerActivitiesImplementations(new DiscoverCatalogActivityImpl(processFactory, secretsHydrator, workspaceRoot)); final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers())); syncWorker.registerWorkflowImplementationTypes(SyncWorkflow.WorkflowImpl.class); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/DiscoverCatalogWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/DiscoverCatalogWorkflow.java deleted file mode 100644 index 87465d3bb42f..000000000000 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/DiscoverCatalogWorkflow.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.workers.temporal; - -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.commons.functional.CheckedSupplier; -import io.airbyte.config.StandardDiscoverCatalogInput; -import io.airbyte.config.persistence.split_secrets.SecretsHydrator; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.scheduler.models.IntegrationLauncherConfig; -import io.airbyte.scheduler.models.JobRunConfig; -import io.airbyte.workers.DefaultDiscoverCatalogWorker; -import io.airbyte.workers.Worker; -import io.airbyte.workers.WorkerUtils; -import io.airbyte.workers.process.AirbyteIntegrationLauncher; -import io.airbyte.workers.process.IntegrationLauncher; -import io.airbyte.workers.process.ProcessFactory; -import io.airbyte.workers.protocols.airbyte.AirbyteStreamFactory; -import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory; -import io.temporal.activity.ActivityInterface; -import io.temporal.activity.ActivityMethod; -import io.temporal.activity.ActivityOptions; -import io.temporal.workflow.Workflow; -import io.temporal.workflow.WorkflowInterface; -import io.temporal.workflow.WorkflowMethod; -import java.nio.file.Path; -import java.time.Duration; -import java.util.function.Supplier; - -@WorkflowInterface -public interface DiscoverCatalogWorkflow { - - @WorkflowMethod - AirbyteCatalog run(JobRunConfig jobRunConfig, - IntegrationLauncherConfig launcherConfig, - StandardDiscoverCatalogInput config); - - class WorkflowImpl implements DiscoverCatalogWorkflow { - - final ActivityOptions options = ActivityOptions.newBuilder() - .setScheduleToCloseTimeout(Duration.ofHours(2)) - .setRetryOptions(TemporalUtils.NO_RETRY) - .build(); - private final DiscoverCatalogActivity activity = Workflow.newActivityStub(DiscoverCatalogActivity.class, options); - - @Override - public AirbyteCatalog run(final JobRunConfig jobRunConfig, - final IntegrationLauncherConfig launcherConfig, - final StandardDiscoverCatalogInput config) { - return activity.run(jobRunConfig, launcherConfig, config); - } - - } - - @ActivityInterface - interface DiscoverCatalogActivity { - - @ActivityMethod - AirbyteCatalog run(JobRunConfig jobRunConfig, - IntegrationLauncherConfig launcherConfig, - StandardDiscoverCatalogInput config); - - } - - class DiscoverCatalogActivityImpl implements DiscoverCatalogActivity { - - private final ProcessFactory processFactory; - private final SecretsHydrator secretsHydrator; - private final Path workspaceRoot; - - public DiscoverCatalogActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot) { - this.processFactory = processFactory; - this.secretsHydrator = secretsHydrator; - this.workspaceRoot = workspaceRoot; - } - - public AirbyteCatalog run(final JobRunConfig jobRunConfig, - final IntegrationLauncherConfig launcherConfig, - final StandardDiscoverCatalogInput config) { - - final JsonNode fullConfig = secretsHydrator.hydrate(config.getConnectionConfiguration()); - - final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput() - .withConnectionConfiguration(fullConfig); - - final Supplier inputSupplier = () -> input; - - final TemporalAttemptExecution temporalAttemptExecution = new TemporalAttemptExecution<>( - workspaceRoot, - jobRunConfig, - getWorkerFactory(launcherConfig), - inputSupplier, - new CancellationHandler.TemporalCancellationHandler()); - - return temporalAttemptExecution.get(); - } - - private CheckedSupplier, Exception> getWorkerFactory(final IntegrationLauncherConfig launcherConfig) { - return () -> { - final IntegrationLauncher integrationLauncher = - new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(), - processFactory, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS); - final AirbyteStreamFactory streamFactory = new DefaultAirbyteStreamFactory(); - return new DefaultDiscoverCatalogWorker(integrationLauncher, streamFactory); - }; - } - - } - -} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index f0ceb91ef391..67ad35b5cb0e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -20,6 +20,7 @@ import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; +import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow; import io.temporal.client.WorkflowClient; import java.nio.file.Path; import java.util.UUID; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivity.java new file mode 100644 index 000000000000..1fb6a3069e07 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivity.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.discover.catalog; + +import io.airbyte.config.StandardDiscoverCatalogInput; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; + +@ActivityInterface +public interface DiscoverCatalogActivity { + + @ActivityMethod + AirbyteCatalog run(JobRunConfig jobRunConfig, + IntegrationLauncherConfig launcherConfig, + StandardDiscoverCatalogInput config); + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java new file mode 100644 index 000000000000..9b972a2f3426 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogActivityImpl.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.discover.catalog; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.functional.CheckedSupplier; +import io.airbyte.config.StandardDiscoverCatalogInput; +import io.airbyte.config.persistence.split_secrets.SecretsHydrator; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.DefaultDiscoverCatalogWorker; +import io.airbyte.workers.Worker; +import io.airbyte.workers.WorkerUtils; +import io.airbyte.workers.process.AirbyteIntegrationLauncher; +import io.airbyte.workers.process.IntegrationLauncher; +import io.airbyte.workers.process.ProcessFactory; +import io.airbyte.workers.protocols.airbyte.AirbyteStreamFactory; +import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory; +import io.airbyte.workers.temporal.CancellationHandler; +import io.airbyte.workers.temporal.TemporalAttemptExecution; +import java.nio.file.Path; +import java.util.function.Supplier; + +public class DiscoverCatalogActivityImpl implements DiscoverCatalogActivity { + + private final ProcessFactory processFactory; + private final SecretsHydrator secretsHydrator; + private final Path workspaceRoot; + + public DiscoverCatalogActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot) { + this.processFactory = processFactory; + this.secretsHydrator = secretsHydrator; + this.workspaceRoot = workspaceRoot; + } + + public AirbyteCatalog run(final JobRunConfig jobRunConfig, + final IntegrationLauncherConfig launcherConfig, + final StandardDiscoverCatalogInput config) { + + final JsonNode fullConfig = secretsHydrator.hydrate(config.getConnectionConfiguration()); + + final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput() + .withConnectionConfiguration(fullConfig); + + final Supplier inputSupplier = () -> input; + + final TemporalAttemptExecution temporalAttemptExecution = new TemporalAttemptExecution<>( + workspaceRoot, + jobRunConfig, + getWorkerFactory(launcherConfig), + inputSupplier, + new CancellationHandler.TemporalCancellationHandler()); + + return temporalAttemptExecution.get(); + } + + private CheckedSupplier, Exception> getWorkerFactory(final IntegrationLauncherConfig launcherConfig) { + return () -> { + final IntegrationLauncher integrationLauncher = + new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(), + processFactory, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS); + final AirbyteStreamFactory streamFactory = new DefaultAirbyteStreamFactory(); + return new DefaultDiscoverCatalogWorker(integrationLauncher, streamFactory); + }; + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflow.java new file mode 100644 index 000000000000..3639caa4946f --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflow.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.discover.catalog; + +import io.airbyte.config.StandardDiscoverCatalogInput; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; + +@WorkflowInterface +public interface DiscoverCatalogWorkflow { + + @WorkflowMethod + AirbyteCatalog run(JobRunConfig jobRunConfig, + IntegrationLauncherConfig launcherConfig, + StandardDiscoverCatalogInput config); + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java new file mode 100644 index 000000000000..da16dcc26225 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/discover/catalog/DiscoverCatalogWorkflowImpl.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.discover.catalog; + +import io.airbyte.config.StandardDiscoverCatalogInput; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.temporal.TemporalUtils; +import io.temporal.activity.ActivityOptions; +import io.temporal.workflow.Workflow; +import java.time.Duration; + +public class DiscoverCatalogWorkflowImpl implements DiscoverCatalogWorkflow { + + final ActivityOptions options = ActivityOptions.newBuilder() + .setScheduleToCloseTimeout(Duration.ofHours(2)) + .setRetryOptions(TemporalUtils.NO_RETRY) + .build(); + private final DiscoverCatalogActivity activity = Workflow.newActivityStub(DiscoverCatalogActivity.class, options); + + @Override + public AirbyteCatalog run(final JobRunConfig jobRunConfig, + final IntegrationLauncherConfig launcherConfig, + final StandardDiscoverCatalogInput config) { + return activity.run(jobRunConfig, launcherConfig, config); + } + +} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java index 4bee28e61941..a06127b51dca 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/TemporalClientTest.java @@ -25,6 +25,7 @@ import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow; +import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow; import io.temporal.client.WorkflowClient; import java.io.IOException; import java.nio.file.Files;