Skip to content

Commit

Permalink
refactor spec workflow (#7354)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau authored Oct 25, 2021
1 parent c689e17 commit 98d87b1
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
import io.airbyte.workers.process.KubeProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.process.WorkerHeartbeatServer;
import io.airbyte.workers.temporal.SpecWorkflow;
import io.airbyte.workers.temporal.SyncWorkflow;
import io.airbyte.workers.temporal.TemporalJobType;
import io.airbyte.workers.temporal.TemporalUtils;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivityImpl;
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogActivityImpl;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl;
import io.airbyte.workers.temporal.spec.SpecActivityImpl;
import io.airbyte.workers.temporal.spec.SpecWorkflowImpl;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.kubernetes.client.openapi.ApiClient;
Expand Down Expand Up @@ -85,8 +86,8 @@ public void start() {
final WorkerFactory factory = WorkerFactory.newInstance(WorkflowClient.newInstance(temporalService));

final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name(), getWorkerOptions(maxWorkers.getMaxSpecWorkers()));
specWorker.registerWorkflowImplementationTypes(SpecWorkflow.WorkflowImpl.class);
specWorker.registerActivitiesImplementations(new SpecWorkflow.SpecActivityImpl(processFactory, workspaceRoot));
specWorker.registerWorkflowImplementationTypes(SpecWorkflowImpl.class);
specWorker.registerActivitiesImplementations(new SpecActivityImpl(processFactory, workspaceRoot));

final Worker checkConnectionWorker =
factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers()));
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow;
import io.airbyte.workers.temporal.spec.SpecWorkflow;
import io.temporal.client.WorkflowClient;
import java.nio.file.Path;
import java.util.UUID;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.spec;

import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;

@ActivityInterface
public interface SpecActivity {

@ActivityMethod
ConnectorSpecification run(JobRunConfig jobRunConfig, IntegrationLauncherConfig launcherConfig);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.spec;

import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.DefaultGetSpecWorker;
import io.airbyte.workers.Worker;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.CancellationHandler;
import io.airbyte.workers.temporal.TemporalAttemptExecution;
import java.nio.file.Path;
import java.util.function.Supplier;

public class SpecActivityImpl implements SpecActivity {

private final ProcessFactory processFactory;
private final Path workspaceRoot;

public SpecActivityImpl(final ProcessFactory processFactory, final Path workspaceRoot) {
this.processFactory = processFactory;
this.workspaceRoot = workspaceRoot;
}

public ConnectorSpecification run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) {
final Supplier<JobGetSpecConfig> inputSupplier = () -> new JobGetSpecConfig().withDockerImage(launcherConfig.getDockerImage());

final TemporalAttemptExecution<JobGetSpecConfig, ConnectorSpecification> temporalAttemptExecution = new TemporalAttemptExecution<>(
workspaceRoot,
jobRunConfig,
getWorkerFactory(launcherConfig),
inputSupplier,
new CancellationHandler.TemporalCancellationHandler());

return temporalAttemptExecution.get();
}

private CheckedSupplier<Worker<JobGetSpecConfig, ConnectorSpecification>, Exception> getWorkerFactory(final IntegrationLauncherConfig launcherConfig) {
return () -> {
final IntegrationLauncher integrationLauncher = new AirbyteIntegrationLauncher(
launcherConfig.getJobId(),
launcherConfig.getAttemptId().intValue(),
launcherConfig.getDockerImage(),
processFactory,
WorkerUtils.DEFAULT_RESOURCE_REQUIREMENTS);

return new DefaultGetSpecWorker(integrationLauncher);
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.spec;

import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

@WorkflowInterface
public interface SpecWorkflow {

@WorkflowMethod
ConnectorSpecification run(JobRunConfig jobRunConfig, IntegrationLauncherConfig launcherConfig);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.spec;

import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.temporal.TemporalUtils;
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
import java.time.Duration;

public class SpecWorkflowImpl implements SpecWorkflow {

final ActivityOptions options = ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofHours(1))
.setRetryOptions(TemporalUtils.NO_RETRY)
.build();
private final SpecActivity activity = Workflow.newActivityStub(SpecActivity.class, options);

@Override
public ConnectorSpecification run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) {
return activity.run(jobRunConfig, launcherConfig);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.airbyte.scheduler.models.JobRunConfig;
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflow;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflow;
import io.airbyte.workers.temporal.spec.SpecWorkflow;
import io.temporal.client.WorkflowClient;
import java.io.IOException;
import java.nio.file.Files;
Expand Down

0 comments on commit 98d87b1

Please sign in to comment.