diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java index 384084d6cf04..d496d4edd340 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/Configs.java @@ -401,6 +401,32 @@ public interface Configs { */ MaxWorkersConfig getMaxWorkers(); + /** + * Define if the worker should run get spec workflows. Defaults to true. Internal-use only. + */ + boolean shouldRunGetSpecWorkflows(); + + /** + * Define if the worker should run check connection workflows. Defaults to true. Internal-use only. + */ + boolean shouldRunCheckConnectionWorkflows(); + + /** + * Define if the worker should run discover workflows. Defaults to true. Internal-use only. + */ + boolean shouldRunDiscoverWorkflows(); + + /** + * Define if the worker should run sync workflows. Defaults to true. Internal-use only. + */ + boolean shouldRunSyncWorkflows(); + + /** + * Define if the worker should run connection manager workflows. Defaults to true. Internal-use + * only. + */ + boolean shouldRunConnectionManagerWorkflows(); + // Worker - Kube only /** * Define the local ports the Airbyte Worker pod uses to connect to the various Job pods. diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java index 3d4f9fb3ebfd..353b7c300177 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -110,6 +110,12 @@ public class EnvConfigs implements Configs { public static final String ACTIVITY_MAX_ATTEMPT = "ACTIVITY_MAX_ATTEMPT"; public static final String ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS = "ACTIVITY_DELAY_IN_SECOND_BETWEEN_ATTEMPTS"; + private static final String SHOULD_RUN_GET_SPEC_WORKFLOWS = "SHOULD_RUN_GET_SPEC_WORKFLOWS"; + private static final String SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS = "SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS"; + private static final String SHOULD_RUN_DISCOVER_WORKFLOWS = "SHOULD_RUN_DISCOVER_WORKFLOWS"; + private static final String SHOULD_RUN_SYNC_WORKFLOWS = "SHOULD_RUN_SYNC_WORKFLOWS"; + private static final String SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS = "SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS"; + // job-type-specific overrides public static final String SPEC_JOB_KUBE_NODE_SELECTORS = "SPEC_JOB_KUBE_NODE_SELECTORS"; public static final String CHECK_JOB_KUBE_NODE_SELECTORS = "CHECK_JOB_KUBE_NODE_SELECTORS"; @@ -692,6 +698,31 @@ public MaxWorkersConfig getMaxWorkers() { Math.toIntExact(getEnvOrDefault(MAX_SYNC_WORKERS, DEFAULT_MAX_SYNC_WORKERS))); } + @Override + public boolean shouldRunGetSpecWorkflows() { + return getEnvOrDefault(SHOULD_RUN_GET_SPEC_WORKFLOWS, true); + } + + @Override + public boolean shouldRunCheckConnectionWorkflows() { + return getEnvOrDefault(SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS, true); + } + + @Override + public boolean shouldRunDiscoverWorkflows() { + return getEnvOrDefault(SHOULD_RUN_DISCOVER_WORKFLOWS, true); + } + + @Override + public boolean shouldRunSyncWorkflows() { + return getEnvOrDefault(SHOULD_RUN_SYNC_WORKFLOWS, true); + } + + @Override + public boolean shouldRunConnectionManagerWorkflows() { + return getEnvOrDefault(SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS, true); + } + @Override public Set getTemporalWorkerPorts() { final var ports = getEnvOrDefault(TEMPORAL_WORKER_PORTS, ""); diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index 0a991de43e3a..d904fec044d1 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -965,35 +965,66 @@ public void testDowntimeDuringSync() throws Exception { final String connectionName = "test-connection"; final UUID sourceId = createPostgresSource().getSourceId(); final UUID destinationId = createDestination().getDestinationId(); - final UUID operationId = createOperation().getOperationId(); final AirbyteCatalog catalog = discoverSourceSchema(sourceId); final SyncMode syncMode = SyncMode.FULL_REFRESH; final DestinationSyncMode destinationSyncMode = DestinationSyncMode.OVERWRITE; catalog.getStreams().forEach(s -> s.getConfig().syncMode(syncMode).destinationSyncMode(destinationSyncMode)); - final UUID connectionId = - createConnection(connectionName, sourceId, destinationId, List.of(operationId), catalog, null).getConnectionId(); - final JobInfoRead connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + for (final var input : List.of("KILL_BOTH_NON_SYNC_SLIGHTLY_FIRST", "KILL_ONLY_SYNC", "KILL_ONLY_NON_SYNC")) { + LOGGER.info("Checking " + input); - Thread.sleep(5000); + final UUID connectionId = + createConnection(connectionName, sourceId, destinationId, List.of(), catalog, null).getConnectionId(); - LOGGER.info("Scaling down workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); - Thread.sleep(1000); + JobInfoRead connectionSyncRead = null; - LOGGER.info("Scaling up workers..."); - kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + while (connectionSyncRead == null) { - waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); - assertSourceAndDestinationDbInSync(false); + try { + connectionSyncRead = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + } catch (Exception e) { + LOGGER.error("retrying after error", e); + } + } + + Thread.sleep(10000); + + switch (input) { + case "KILL_BOTH_NON_SYNC_SLIGHTLY_FIRST" -> { + LOGGER.info("Scaling down both workers at roughly the same time..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + + LOGGER.info("Scaling up both workers..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + } + case "KILL_ONLY_SYNC" -> { + LOGGER.info("Scaling down only sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(0, true); + + LOGGER.info("Scaling up sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-sync-worker").scale(1); + } + case "KILL_ONLY_NON_SYNC" -> { + LOGGER.info("Scaling down only non-sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(0, true); + + LOGGER.info("Scaling up non-sync worker..."); + kubernetesClient.apps().deployments().inNamespace("default").withName("airbyte-worker").scale(1); + } + } + + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); - final long numAttempts = apiClient.getJobsApi() - .getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId())) - .getAttempts() - .size(); + final long numAttempts = apiClient.getJobsApi() + .getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId())) + .getAttempts() + .size(); - // it should be able to accomplish the resume without an additional attempt! - assertEquals(1, numAttempts); + // it should be able to accomplish the resume without an additional attempt! + assertEquals(1, numAttempts); + } } @Test diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index 0ff7b17b6959..22a4444445b2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -134,72 +134,35 @@ public void start() { final WorkerFactory factory = WorkerFactory.newInstance(WorkflowClient.newInstance(temporalService)); - final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name(), getWorkerOptions(maxWorkers.getMaxSpecWorkers())); - specWorker.registerWorkflowImplementationTypes(SpecWorkflowImpl.class); - specWorker.registerActivitiesImplementations( - new SpecActivityImpl(specWorkerConfigs, specProcessFactory, workspaceRoot, workerEnvironment, logConfigs, jobPersistence, - airbyteVersion)); + if (configs.shouldRunGetSpecWorkflows()) { + registerGetSpec(factory); + } - final Worker checkConnectionWorker = - factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers())); - checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflowImpl.class); - checkConnectionWorker - .registerActivitiesImplementations( - new CheckConnectionActivityImpl(checkWorkerConfigs, checkProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs, - jobPersistence, airbyteVersion)); + if (configs.shouldRunCheckConnectionWorkflows()) { + registerCheckConnection(factory); + } - final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers())); - discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflowImpl.class); - discoverWorker - .registerActivitiesImplementations( - new DiscoverCatalogActivityImpl(discoverWorkerConfigs, discoverProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, - logConfigs, - jobPersistence, airbyteVersion)); + if (configs.shouldRunDiscoverWorkflows()) { + registerDiscover(factory); + } - final NormalizationActivityImpl normalizationActivity = - new NormalizationActivityImpl( - containerOrchestratorConfig, - defaultWorkerConfigs, - defaultProcessFactory, - secretsHydrator, - workspaceRoot, - workerEnvironment, - logConfigs, - jobPersistence, - airbyteVersion); - final DbtTransformationActivityImpl dbtTransformationActivity = - new DbtTransformationActivityImpl( - containerOrchestratorConfig, - defaultWorkerConfigs, - defaultProcessFactory, - secretsHydrator, - workspaceRoot, - workerEnvironment, - logConfigs, - jobPersistence, - airbyteVersion); - new PersistStateActivityImpl(workspaceRoot, configRepository); - final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(workspaceRoot, configRepository); - final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers())); - final ReplicationActivityImpl replicationActivity = getReplicationActivityImpl( - containerOrchestratorConfig, - replicationWorkerConfigs, - replicationProcessFactory, - secretsHydrator, - workspaceRoot, - workerEnvironment, - logConfigs, - jobPersistence, - airbyteVersion); - syncWorker.registerWorkflowImplementationTypes(SyncWorkflowImpl.class); + if (configs.shouldRunSyncWorkflows()) { + registerSync(factory); + } - syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity); + if (configs.shouldRunConnectionManagerWorkflows()) { + registerConnectionManager(factory); + } + factory.start(); + } + + private void registerConnectionManager(final WorkerFactory factory) { final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository, defaultWorkerConfigs.getResourceRequirements()); final Worker connectionUpdaterWorker = factory.newWorker(TemporalJobType.CONNECTION_UPDATER.toString(), getWorkerOptions(maxWorkers.getMaxSyncWorkers())); - connectionUpdaterWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SyncWorkflowImpl.class); + connectionUpdaterWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class); connectionUpdaterWorker.registerActivitiesImplementations( new GenerateInputActivityImpl( jobPersistence), @@ -214,29 +177,57 @@ public void start() { configRepository, jobCreator), new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()), - new ConnectionDeletionActivityImpl(connectionHelper), - replicationActivity, - normalizationActivity, - dbtTransformationActivity, - persistStateActivity); + new ConnectionDeletionActivityImpl(connectionHelper)); + } - factory.start(); + private void registerSync(final WorkerFactory factory) { + final ReplicationActivityImpl replicationActivity = getReplicationActivityImpl(replicationWorkerConfigs, replicationProcessFactory); + + final NormalizationActivityImpl normalizationActivity = getNormalizationActivityImpl( + defaultWorkerConfigs, + defaultProcessFactory); + + final DbtTransformationActivityImpl dbtTransformationActivity = getDbtActivityImpl( + defaultWorkerConfigs, + defaultProcessFactory); + + final PersistStateActivityImpl persistStateActivity = new PersistStateActivityImpl(workspaceRoot, configRepository); + + final Worker syncWorker = factory.newWorker(TemporalJobType.SYNC.name(), getWorkerOptions(maxWorkers.getMaxSyncWorkers())); + syncWorker.registerWorkflowImplementationTypes(SyncWorkflowImpl.class); + syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity); } - /** - * Switches behavior based on containerOrchestratorEnabled to decide whether to use new container - * launching or not. - */ - private ReplicationActivityImpl getReplicationActivityImpl( - final Optional containerOrchestratorConfig, - final WorkerConfigs workerConfigs, - final ProcessFactory jobProcessFactory, - final SecretsHydrator secretsHydrator, - final Path workspaceRoot, - final WorkerEnvironment workerEnvironment, - final LogConfigs logConfigs, - final JobPersistence jobPersistence, - final String airbyteVersion) { + private void registerDiscover(final WorkerFactory factory) { + final Worker discoverWorker = factory.newWorker(TemporalJobType.DISCOVER_SCHEMA.name(), getWorkerOptions(maxWorkers.getMaxDiscoverWorkers())); + discoverWorker.registerWorkflowImplementationTypes(DiscoverCatalogWorkflowImpl.class); + discoverWorker + .registerActivitiesImplementations( + new DiscoverCatalogActivityImpl(discoverWorkerConfigs, discoverProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, + logConfigs, + jobPersistence, airbyteVersion)); + } + + private void registerCheckConnection(final WorkerFactory factory) { + final Worker checkConnectionWorker = + factory.newWorker(TemporalJobType.CHECK_CONNECTION.name(), getWorkerOptions(maxWorkers.getMaxCheckWorkers())); + checkConnectionWorker.registerWorkflowImplementationTypes(CheckConnectionWorkflowImpl.class); + checkConnectionWorker + .registerActivitiesImplementations( + new CheckConnectionActivityImpl(checkWorkerConfigs, checkProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs, + jobPersistence, airbyteVersion)); + } + + private void registerGetSpec(final WorkerFactory factory) { + final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name(), getWorkerOptions(maxWorkers.getMaxSpecWorkers())); + specWorker.registerWorkflowImplementationTypes(SpecWorkflowImpl.class); + specWorker.registerActivitiesImplementations( + new SpecActivityImpl(specWorkerConfigs, specProcessFactory, workspaceRoot, workerEnvironment, logConfigs, jobPersistence, + airbyteVersion)); + } + + private ReplicationActivityImpl getReplicationActivityImpl(final WorkerConfigs workerConfigs, + final ProcessFactory jobProcessFactory) { return new ReplicationActivityImpl( containerOrchestratorConfig, @@ -250,6 +241,36 @@ private ReplicationActivityImpl getReplicationActivityImpl( airbyteVersion); } + private NormalizationActivityImpl getNormalizationActivityImpl(final WorkerConfigs workerConfigs, + final ProcessFactory jobProcessFactory) { + + return new NormalizationActivityImpl( + containerOrchestratorConfig, + workerConfigs, + jobProcessFactory, + secretsHydrator, + workspaceRoot, + workerEnvironment, + logConfigs, + jobPersistence, + airbyteVersion); + } + + private DbtTransformationActivityImpl getDbtActivityImpl(final WorkerConfigs workerConfigs, + final ProcessFactory jobProcessFactory) { + + return new DbtTransformationActivityImpl( + containerOrchestratorConfig, + workerConfigs, + jobProcessFactory, + secretsHydrator, + workspaceRoot, + workerEnvironment, + logConfigs, + jobPersistence, + airbyteVersion); + } + private static ProcessFactory getJobProcessFactory(final Configs configs, final WorkerConfigs workerConfigs) throws IOException { if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) { final KubernetesClient fabricClient = new DefaultKubernetesClient(); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 2f914c2bbe76..f631fd762c95 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -54,6 +54,8 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow public static final long NON_RUNNING_JOB_ID = -1; public static final int NON_RUNNING_ATTEMPT_ID = -1; + private static final int TASK_QUEUE_CHANGE_CURRENT_VERSION = 1; + private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener()); private final WorkflowInternalState workflowInternalState = new WorkflowInternalState(); @@ -453,10 +455,19 @@ private void reportJobStarting() { * make sense. >>>>>>> 76e969f2e5e1b869648142c3565b7375b1892999 */ private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) { + int taskQueueChangeVersion = + Workflow.getVersion("task_queue_change_from_connection_updater_to_sync", Workflow.DEFAULT_VERSION, TASK_QUEUE_CHANGE_CURRENT_VERSION); + + String taskQueue = TemporalJobType.SYNC.name(); + + if (taskQueueChangeVersion < TASK_QUEUE_CHANGE_CURRENT_VERSION) { + taskQueue = TemporalJobType.CONNECTION_UPDATER.name(); + } + final SyncWorkflow childSync = Workflow.newChildWorkflowStub(SyncWorkflow.class, ChildWorkflowOptions.newBuilder() .setWorkflowId("sync_" + workflowInternalState.getJobId()) - .setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()) + .setTaskQueue(taskQueue) // This will cancel the child workflow when the parent is terminated .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_REQUEST_CANCEL) .build()); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java index 10a379d665c5..69cde0603974 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/shared/ActivityConfiguration.java @@ -6,15 +6,17 @@ import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; +import io.airbyte.workers.WorkerException; import io.airbyte.workers.temporal.TemporalUtils; import io.temporal.activity.ActivityCancellationType; import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; import java.time.Duration; /** * Shared temporal workflow configuration in order to ensure that * {@link io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflow} and - * {@link io.airbyte.workers.temporal.sync.SyncWorkflow} configurations are on sync, expecialy for + * {@link io.airbyte.workers.temporal.sync.SyncWorkflow} configurations are on sync, especially for * the grace period. */ public class ActivityConfiguration { @@ -24,12 +26,20 @@ public class ActivityConfiguration { private static final int MAX_SYNC_TIMEOUT_DAYS = configs.getSyncJobMaxTimeoutDays(); private static final Duration DB_INTERACTION_TIMEOUT = Duration.ofSeconds(configs.getMaxActivityTimeoutSecond()); + // retry infinitely if the worker is killed without exceptions and dies due to timeouts + // but fail for everything thrown by the call itself which is rethrown as runtime exceptions + private static final RetryOptions ORCHESTRATOR_RETRY = RetryOptions.newBuilder() + .setDoNotRetry(RuntimeException.class.getName(), WorkerException.class.getName()) + .build(); + + private static final RetryOptions RETRY_POLICY = new EnvConfigs().getContainerOrchestratorEnabled() ? ORCHESTRATOR_RETRY : TemporalUtils.NO_RETRY; + public static final ActivityOptions LONG_RUN_OPTIONS = ActivityOptions.newBuilder() .setScheduleToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setStartToCloseTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setScheduleToStartTimeout(Duration.ofDays(MAX_SYNC_TIMEOUT_DAYS)) .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED) - .setRetryOptions(TemporalUtils.NO_RETRY) + .setRetryOptions(RETRY_POLICY) .setHeartbeatTimeout(TemporalUtils.HEARTBEAT_TIMEOUT) .build(); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index dc1daef75858..566b1c318935 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -86,7 +86,6 @@ public class ConnectionManagerWorkflowTest { Mockito.mock(JobCreationAndStatusUpdateActivity.class, Mockito.withSettings().withoutAnnotations()); private TestWorkflowEnvironment testEnv; - private Worker worker; private WorkflowClient client; private ConnectionManagerWorkflow workflow; @@ -131,14 +130,16 @@ class AsynchronousWorkflow { @BeforeEach public void setup() { testEnv = TestWorkflowEnvironment.newInstance(); - worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); - // Register your workflow implementations - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, EmptySyncWorkflow.class); - client = testEnv.getWorkflowClient(); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(EmptySyncWorkflow.class); - worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, + final Worker managerWorker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + managerWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class); + managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); + + client = testEnv.getWorkflowClient(); testEnv.start(); workflow = client @@ -414,14 +415,16 @@ class SynchronousWorkflow { @BeforeEach public void setup() { testEnv = TestWorkflowEnvironment.newInstance(); - worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); - // Register your workflow implementations - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SleepingSyncWorkflow.class); - client = testEnv.getWorkflowClient(); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(SleepingSyncWorkflow.class); - worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, + final Worker managerWorker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + managerWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class); + managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); + + client = testEnv.getWorkflowClient(); testEnv.start(); workflow = client @@ -711,10 +714,13 @@ class SyncWorkflowReplicationFailuresRecorded { @BeforeEach public void setup() { testEnv = TestWorkflowEnvironment.newInstance(); - worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + + final Worker managerWorker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + managerWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class); + managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, + mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); + client = testEnv.getWorkflowClient(); - worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, mGenerateInputActivityImpl, - mJobCreationAndStatusUpdateActivity); workflow = client.newWorkflowStub(ConnectionManagerWorkflow.class, WorkflowOptions.newBuilder().setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name()).build()); @@ -726,7 +732,9 @@ public void setup() { unit = TimeUnit.SECONDS) @DisplayName("Test that source and destination failures are recorded") public void testSourceAndDestinationFailuresRecorded() { - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SourceAndDestinationFailureSyncWorkflow.class); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(SourceAndDestinationFailureSyncWorkflow.class); + testEnv.start(); final UUID testId = UUID.randomUUID(); @@ -753,7 +761,9 @@ public void testSourceAndDestinationFailuresRecorded() { unit = TimeUnit.SECONDS) @DisplayName("Test that normalization failure is recorded") public void testNormalizationFailure() { - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, NormalizationFailureSyncWorkflow.class); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(NormalizationFailureSyncWorkflow.class); + testEnv.start(); final UUID testId = UUID.randomUUID(); @@ -779,7 +789,9 @@ public void testNormalizationFailure() { unit = TimeUnit.SECONDS) @DisplayName("Test that dbt failure is recorded") public void testDbtFailureRecorded() { - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, DbtFailureSyncWorkflow.class); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(DbtFailureSyncWorkflow.class); + testEnv.start(); final UUID testId = UUID.randomUUID(); @@ -805,7 +817,9 @@ public void testDbtFailureRecorded() { unit = TimeUnit.SECONDS) @DisplayName("Test that persistence failure is recorded") public void testPersistenceFailureRecorded() { - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, PersistFailureSyncWorkflow.class); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(PersistFailureSyncWorkflow.class); + testEnv.start(); final UUID testId = UUID.randomUUID(); @@ -831,7 +845,9 @@ public void testPersistenceFailureRecorded() { unit = TimeUnit.SECONDS) @DisplayName("Test that replication worker failure is recorded") public void testReplicationFailureRecorded() { - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, ReplicateFailureSyncWorkflow.class); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(ReplicateFailureSyncWorkflow.class); + testEnv.start(); final UUID testId = UUID.randomUUID(); @@ -861,14 +877,16 @@ class StuckWorkflow { @BeforeEach public void setup() { testEnv = TestWorkflowEnvironment.newInstance(); - worker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); - // Register your workflow implementations - worker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SleepingSyncWorkflow.class); - client = testEnv.getWorkflowClient(); + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(SleepingSyncWorkflow.class); - worker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, + final Worker managerWorker = testEnv.newWorker(TemporalJobType.CONNECTION_UPDATER.name()); + managerWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class); + managerWorker.registerActivitiesImplementations(mConfigFetchActivity, mConnectionDeletionActivity, mGenerateInputActivityImpl, mJobCreationAndStatusUpdateActivity); + + client = testEnv.getWorkflowClient(); testEnv.start(); workflow = client @@ -880,7 +898,6 @@ public void setup() { } public static Stream getSetupFailingFailingActivityBeforeRun() { - Thread.currentThread().run(); return Stream.of( Arguments.of(new Thread(() -> Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any())) .thenThrow(ApplicationFailure.newNonRetryableFailure("", "")))), @@ -969,7 +986,6 @@ void testCanGetUnstuck() { } public static Stream getSetupFailingFailingActivityAfterRun() { - Thread.currentThread().run(); return Stream.of( Arguments.of((Consumer) ((ConnectionManagerWorkflow workflow) -> System.out.println("do Nothing")), new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", "")) diff --git a/kube/overlays/dev-integration-test-schedulerv2/kustomization.yaml b/kube/overlays/dev-integration-test-schedulerv2/kustomization.yaml index c6a280178193..06c4a8222a5c 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/kustomization.yaml +++ b/kube/overlays/dev-integration-test-schedulerv2/kustomization.yaml @@ -6,6 +6,10 @@ namespace: default bases: - ../../resources +resources: + - sync-only-worker.yaml + - temporal-ui.yaml + images: - name: airbyte/db newTag: dev diff --git a/kube/overlays/dev-integration-test-schedulerv2/sync-only-worker.yaml b/kube/overlays/dev-integration-test-schedulerv2/sync-only-worker.yaml new file mode 100644 index 000000000000..c9547abdbc91 --- /dev/null +++ b/kube/overlays/dev-integration-test-schedulerv2/sync-only-worker.yaml @@ -0,0 +1,230 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: airbyte-sync-worker +spec: + replicas: 1 + selector: + matchLabels: + airbyte: sync-worker + template: + metadata: + labels: + airbyte: sync-worker + spec: + serviceAccountName: airbyte-admin + automountServiceAccountToken: true + containers: + - name: airbyte-worker-container + image: airbyte/worker + env: + - name: AIRBYTE_VERSION + valueFrom: + configMapKeyRef: + name: airbyte-env + key: AIRBYTE_VERSION + - name: CONFIG_ROOT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: CONFIG_ROOT + - name: DATABASE_HOST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_HOST + - name: DATABASE_PORT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_PORT + - name: DATABASE_PASSWORD + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: DATABASE_PASSWORD + - name: DATABASE_URL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: DATABASE_URL + - name: DATABASE_USER + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: DATABASE_USER + - name: TRACKING_STRATEGY + valueFrom: + configMapKeyRef: + name: airbyte-env + key: TRACKING_STRATEGY + - name: WORKSPACE_DOCKER_MOUNT + value: workspace + - name: WORKSPACE_ROOT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: WORKSPACE_ROOT + - name: WORKER_ENVIRONMENT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: WORKER_ENVIRONMENT + - name: LOCAL_ROOT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: LOCAL_ROOT + - name: WEBAPP_URL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: WEBAPP_URL + - name: TEMPORAL_HOST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: TEMPORAL_HOST + - name: TEMPORAL_WORKER_PORTS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: TEMPORAL_WORKER_PORTS + - name: LOG_LEVEL + valueFrom: + configMapKeyRef: + name: airbyte-env + key: LOG_LEVEL + - name: JOB_KUBE_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: SUBMITTER_NUM_THREADS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: SUBMITTER_NUM_THREADS + - name: JOB_MAIN_CONTAINER_CPU_REQUEST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_MAIN_CONTAINER_CPU_REQUEST + - name: JOB_MAIN_CONTAINER_CPU_LIMIT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_MAIN_CONTAINER_CPU_LIMIT + - name: JOB_MAIN_CONTAINER_MEMORY_REQUEST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_MAIN_CONTAINER_MEMORY_REQUEST + - name: JOB_MAIN_CONTAINER_MEMORY_LIMIT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_MAIN_CONTAINER_MEMORY_LIMIT + - name: S3_LOG_BUCKET + valueFrom: + configMapKeyRef: + name: airbyte-env + key: S3_LOG_BUCKET + - name: S3_LOG_BUCKET_REGION + valueFrom: + configMapKeyRef: + name: airbyte-env + key: S3_LOG_BUCKET_REGION + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: AWS_ACCESS_KEY_ID + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: AWS_SECRET_ACCESS_KEY + - name: S3_MINIO_ENDPOINT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: S3_MINIO_ENDPOINT + - name: S3_PATH_STYLE_ACCESS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: S3_PATH_STYLE_ACCESS + - name: GOOGLE_APPLICATION_CREDENTIALS + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: GOOGLE_APPLICATION_CREDENTIALS + - name: GCS_LOG_BUCKET + valueFrom: + configMapKeyRef: + name: airbyte-env + key: GCS_LOG_BUCKET + - name: INTERNAL_API_HOST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: INTERNAL_API_HOST + - name: JOB_KUBE_TOLERATIONS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_KUBE_TOLERATIONS + - name: JOB_KUBE_NODE_SELECTORS + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_KUBE_NODE_SELECTORS + - name: JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY + valueFrom: + configMapKeyRef: + name: airbyte-env + key: JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY + # todo: add other state storage keys + - name: STATE_STORAGE_MINIO_BUCKET_NAME + valueFrom: + configMapKeyRef: + name: airbyte-env + key: STATE_STORAGE_MINIO_BUCKET_NAME + - name: STATE_STORAGE_MINIO_ENDPOINT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: STATE_STORAGE_MINIO_ENDPOINT + - name: STATE_STORAGE_MINIO_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: STATE_STORAGE_MINIO_ACCESS_KEY + - name: STATE_STORAGE_MINIO_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-secrets + key: STATE_STORAGE_MINIO_SECRET_ACCESS_KEY + - name: CONTAINER_ORCHESTRATOR_ENABLED + valueFrom: + configMapKeyRef: + name: airbyte-env + key: CONTAINER_ORCHESTRATOR_ENABLED + - name: SHOULD_RUN_GET_SPEC_WORKFLOWS + value: "false" + - name: SHOULD_RUN_CHECK_CONNECTION_WORKFLOWS + value: "false" + - name: SHOULD_RUN_DISCOVER_WORKFLOWS + value: "false" + - name: SHOULD_RUN_SYNC_WORKFLOWS + value: "true" + - name: SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS + value: "false" + volumeMounts: + - name: gcs-log-creds-volume + mountPath: /secrets/gcs-log-creds + readOnly: true + volumes: + - name: gcs-log-creds-volume + secret: + secretName: gcs-log-creds diff --git a/kube/overlays/dev-integration-test-schedulerv2/temporal-ui.yaml b/kube/overlays/dev-integration-test-schedulerv2/temporal-ui.yaml new file mode 100644 index 000000000000..541c3ad2ca12 --- /dev/null +++ b/kube/overlays/dev-integration-test-schedulerv2/temporal-ui.yaml @@ -0,0 +1,37 @@ +apiVersion: v1 +kind: Service +metadata: + name: airbyte-temporal-ui +spec: + type: NodePort + ports: + - port: 8088 + protocol: TCP + selector: + airbyte: temporal-ui +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: airbyte-temporal-ui +spec: + replicas: 1 + selector: + matchLabels: + airbyte: temporal-ui + template: + metadata: + labels: + airbyte: temporal-ui + spec: + containers: + - name: airbyte-temporal-ui-container + image: temporalio/web:1.13.0 + env: + - name: TEMPORAL_GRPC_ENDPOINT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: TEMPORAL_HOST + - name: TEMPORAL_PERMIT_WRITE_API + value: "true" diff --git a/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml b/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml index 8e790435392e..17e02669f730 100644 --- a/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml +++ b/kube/overlays/dev-integration-test-schedulerv2/worker-patch.yaml @@ -18,3 +18,5 @@ spec: configMapKeyRef: name: airbyte-env key: REPLICATION_WORKER_STATUS_CHECK_INTERVAL + - name: SHOULD_RUN_SYNC_WORKFLOWS + value: "false"