Skip to content

Commit

Permalink
Remove the dependency between the orchestrator container and the work…
Browse files Browse the repository at this point in the history
…er app (#17570)

* test [ci skip]

* Autogenerated files

* Add missing annotation

* Remove unused json2Schema block from worker

* Move tess

* Missing deps and format

* Fix test build

* Add missing dependencies

* PR comments

* Fix acceptance test and add the seed dependency

* Fix build

* Fix build again
  • Loading branch information
benmoriceau authored Oct 12, 2022
1 parent 63e39e6 commit f650005
Show file tree
Hide file tree
Showing 129 changed files with 143 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.temporal.config;

import io.airbyte.config.Configs.WorkerEnvironment;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.env.Environment;
import jakarta.inject.Singleton;

@Factory
public class CommonFactory {

@Singleton
public WorkerEnvironment workerEnvironment(final Environment environment) {
return environment.getActiveNames().contains(Environment.KUBERNETES) ? WorkerEnvironment.KUBERNETES : WorkerEnvironment.DOCKER;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;
package io.airbyte.commons.temporal.sync;

import com.uber.m3.util.ImmutableSet;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
Expand Down
55 changes: 55 additions & 0 deletions airbyte-commons-worker/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import org.jsonschema2pojo.SourceType

plugins {
id "java-library"
id 'com.github.eirnym.js2p' version '1.0'
}

dependencies {
annotationProcessor platform(libs.micronaut.bom)
annotationProcessor libs.bundles.micronaut.annotation.processor

implementation platform(libs.micronaut.bom)
implementation libs.bundles.micronaut

implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'io.temporal:temporal-sdk:1.8.1'
implementation 'org.apache.ant:ant:1.10.10'
implementation 'org.apache.commons:commons-text:1.9'

implementation project(':airbyte-commons-protocol')
implementation project(':airbyte-commons-temporal')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-metrics:metrics-lib')
implementation project(':airbyte-persistence:job-persistence')
implementation project(':airbyte-protocol:protocol-models')

testAnnotationProcessor platform(libs.micronaut.bom)
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor

testImplementation libs.bundles.micronaut.test
testImplementation 'com.jayway.jsonpath:json-path:2.7.0'
testImplementation 'org.mockito:mockito-inline:4.7.0'
testImplementation libs.postgresql
testImplementation libs.platform.testcontainers
testImplementation libs.platform.testcontainers.postgresql

testImplementation project(':airbyte-commons-docker')
}

jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
source = files("${sourceSets.main.output.resourcesDir}/workers_models")
targetDirectory = new File(project.buildDir, 'generated/src/gen/java/')
removeOldOutput = true

targetPackage = 'io.airbyte.persistence.job.models'

useLongIntegers = true
generateBuilders = true
includeConstructors = false
includeSetters = true
}

Task publishArtifactsTask = getPublishArtifactsTask("$rootProject.ext.version", project)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.workers;

import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.workers.general.DocumentStoreClient;
import io.airbyte.workers.storage.DocumentStoreClient;
import io.fabric8.kubernetes.client.KubernetesClient;

public record ContainerOrchestratorConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.workers;

import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.general.DefaultReplicationWorker;
import java.nio.file.Path;

public interface Worker<InputType, OutputType> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ public class WorkerConstants {

public static final String RESET_JOB_SOURCE_DOCKER_IMAGE_STUB = "airbyte_empty";

public static final String WORKER_ENVIRONMENT = "WORKER_ENVIRONMENT";

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.workers.*;
import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.RecordSchemaValidationException;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.workers.general.DocumentStoreClient;
import io.airbyte.workers.storage.DocumentStoreClient;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.ContainerPort;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.workers.storage;

import io.airbyte.commons.io.IOs;
import io.airbyte.workers.general.DocumentStoreClient;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.general;
package io.airbyte.workers.storage;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.google.cloud.storage.Storage;
import io.airbyte.config.storage.CloudStorageConfigs.GcsConfig;
import io.airbyte.config.storage.DefaultGcsClientFactory;
import io.airbyte.workers.general.DocumentStoreClient;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.airbyte.config.storage.CloudStorageConfigs.S3Config;
import io.airbyte.config.storage.DefaultS3ClientFactory;
import io.airbyte.config.storage.MinioS3ClientFactory;
import io.airbyte.workers.general.DocumentStoreClient;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.workers.storage;

import io.airbyte.config.storage.CloudStorageConfigs;
import io.airbyte.workers.general.DocumentStoreClient;
import java.nio.file.Path;

public class StateClients {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;
package io.airbyte.workers.sync;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.TemporalUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;
package io.airbyte.workers.sync;

import com.google.common.base.Stopwatch;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.temporal.TemporalUtils;
import io.airbyte.commons.temporal.sync.OrchestratorConstants;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.ContainerOrchestratorConfig;
import io.airbyte.workers.Worker;
import io.airbyte.workers.config.WorkerConfigurationBeanFactory;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.AsyncKubePodStatus;
import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
Expand Down Expand Up @@ -104,7 +105,7 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

// Manually add the worker environment to the env var map
envMap.put(WorkerConfigurationBeanFactory.WORKER_ENVIRONMENT, containerOrchestratorConfig.workerEnvironment().name());
envMap.put(WorkerConstants.WORKER_ENVIRONMENT, containerOrchestratorConfig.workerEnvironment().name());

final Map<String, String> fileMap = new HashMap<>(additionalFileMap);
fileMap.putAll(Map.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;
package io.airbyte.workers.sync;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.TemporalUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal.sync;
package io.airbyte.workers.sync;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.TemporalUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.internal;
package io.airbyte.workers.test_utils;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers;
package io.airbyte.workers.test_utils;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import io.airbyte.config.StandardSyncInput;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.exception.RecordSchemaValidationException;
import io.airbyte.workers.internal.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.TestConfigHelpers;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.workers.internal.HeartbeatMonitor;
import io.airbyte.workers.test_utils.TestConfigHelpers;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.workers.TestConfigHelpers;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.internal.AirbyteMessageUtils;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.TestConfigHelpers;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Stream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.internal.AirbyteDestination;
import io.airbyte.workers.internal.AirbyteMessageTracker;
import io.airbyte.workers.internal.AirbyteMessageUtils;
import io.airbyte.workers.internal.AirbyteSource;
import io.airbyte.workers.internal.NamespacingMapper;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.TestConfigHelpers;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import io.airbyte.protocol.models.AirbyteErrorTraceMessage;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.workers.helper.FailureHelper.ConnectorCommand;
import io.airbyte.workers.internal.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.internal.StateDeltaTracker.StateDeltaTrackerException;
import io.airbyte.workers.internal.state_aggregator.StateAggregator;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

package io.airbyte.workers.internal;

import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.airbyte.commons.json.Jsons;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.TestConfigHelpers;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import io.airbyte.workers.test_utils.TestConfigHelpers;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.IntegrationLauncher;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.protocol.models.AirbyteLogMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import org.junit.jupiter.api.Test;

class NamespacingMapperTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.workers.internal.StateMetricsTracker.StateMetricsTrackerNoStateMatchException;
import io.airbyte.workers.internal.StateMetricsTracker.StateMetricsTrackerOomException;
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import org.junit.jupiter.api.BeforeEach;
Expand Down
3 changes: 2 additions & 1 deletion airbyte-container-orchestrator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ dependencies {
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
implementation project(':airbyte-commons-temporal')
implementation project(':airbyte-commons-worker')
implementation project(':airbyte-config:init')
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-persistence:job-persistence')
implementation project(':airbyte-workers')
implementation project(':airbyte-metrics:metrics-lib')

testImplementation 'org.mockito:mockito-inline:2.13.0'
Expand Down
Loading

0 comments on commit f650005

Please sign in to comment.