Skip to content

Commit

Permalink
refactor the catalog discovery workflow (airbytehq#7350)
Browse files Browse the repository at this point in the history
* refactor the catalog discovery workflow

* Format
  • Loading branch information
benmoriceau authored and schlattk committed Jan 4, 2022
1 parent 61e07fb commit f241642
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.process.WorkerHeartbeatServer;
import io.airbyte.workers.temporal.DiscoverCatalogWorkflow;
import io.airbyte.workers.temporal.SpecWorkflow;
import io.airbyte.workers.temporal.SyncWorkflow;
import io.airbyte.workers.temporal.TemporalJobType;
import io.airbyte.workers.temporal.TemporalUtils;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl;
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogActivityImpl;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.kubernetes.client.openapi.ApiClient;
Expand Down Expand Up @@ -94,9 +95,9 @@ public void start() {
.registerActivitiesImplementations(new CheckConnectionActivityImpl(processFactory, secretsHydrator, workspaceRoot));

final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers()));
discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflow.WorkflowImpl.class);
discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflowImpl.class);
discoverWorker
.registerActivitiesImplementations(new DiscoverCatalogWorkflow.DiscoverCatalogActivityImpl(processFactory, secretsHydrator, workspaceRoot));
.registerActivitiesImplementations(new DiscoverCatalogActivityImpl(processFactory, secretsHydrator, workspaceRoot));

final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers()));
syncWorker.registerWorkflowImplementationTypes(SyncWorkflow.WorkflowImpl.class);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow;
import io.temporal.client.WorkflowClient;
import java.nio.file.Path;
import java.util.UUID;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.discover.catalog;

import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;

@ActivityInterface
public interface DiscoverCatalogActivity {

@ActivityMethod
AirbyteCatalog run(JobRunConfig jobRunConfig,
IntegrationLauncherConfig launcherConfig,
StandardDiscoverCatalogInput config);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.discover.catalog;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DefaultDiscoverCatalogWorker;
import io.airbyte.workers.Worker;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.protocols.airbyte.AirbyteStreamFactory;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteStreamFactory;
import io.airbyte.workers.temporal.CancellationHandler;
import io.airbyte.workers.temporal.TemporalAttemptExecution;
import java.nio.file.Path;
import java.util.function.Supplier;

public class DiscoverCatalogActivityImpl implements DiscoverCatalogActivity {

private final ProcessFactory processFactory;
private final SecretsHydrator secretsHydrator;
private final Path workspaceRoot;

public DiscoverCatalogActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot) {
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
this.workspaceRoot = workspaceRoot;
}

public AirbyteCatalog run(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig launcherConfig,
final StandardDiscoverCatalogInput config) {

final JsonNode fullConfig = secretsHydrator.hydrate(config.getConnectionConfiguration());

final StandardDiscoverCatalogInput input = new StandardDiscoverCatalogInput()
.withConnectionConfiguration(fullConfig);

final Supplier<StandardDiscoverCatalogInput> inputSupplier = () -> input;

final TemporalAttemptExecution<StandardDiscoverCatalogInput, AirbyteCatalog> temporalAttemptExecution = new TemporalAttemptExecution<>(
workspaceRoot,
jobRunConfig,
getWorkerFactory(launcherConfig),
inputSupplier,
new CancellationHandler.TemporalCancellationHandler());

return temporalAttemptExecution.get();
}

private CheckedSupplier<Worker<StandardDiscoverCatalogInput, AirbyteCatalog>, Exception> getWorkerFactory(final IntegrationLauncherConfig launcherConfig) {
return () -> {
final IntegrationLauncher integrationLauncher =
new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(),
processFactory, WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS);
final AirbyteStreamFactory streamFactory = new DefaultAirbyteStreamFactory();
return new DefaultDiscoverCatalogWorker(integrationLauncher, streamFactory);
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.discover.catalog;

import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

@WorkflowInterface
public interface DiscoverCatalogWorkflow {

@WorkflowMethod
AirbyteCatalog run(JobRunConfig jobRunConfig,
IntegrationLauncherConfig launcherConfig,
StandardDiscoverCatalogInput config);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.discover.catalog;

import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.temporal.TemporalUtils;
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
import java.time.Duration;

public class DiscoverCatalogWorkflowImpl implements DiscoverCatalogWorkflow {

final ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofHours(2))
.setRetryOptions(TemporalUtils.NO_RETRY)
.build();
private final DiscoverCatalogActivity activity = Workflow.newActivityStub(DiscoverCatalogActivity.class, options);

@Override
public AirbyteCatalog run(final JobRunConfig jobRunConfig,
final IntegrationLauncherConfig launcherConfig,
final StandardDiscoverCatalogInput config) {
return activity.run(jobRunConfig, launcherConfig, config);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow;
import io.temporal.client.WorkflowClient;
import java.io.IOException;
import java.nio.file.Files;
Expand Down

0 comments on commit f241642

Please sign in to comment.