From ebf83520ec5418769b90dc5465d6096bc44439ba Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 10 Aug 2022 14:16:41 -0700 Subject: [PATCH] Make it possible to specify normalization pod resources. (#15495) Today we are running into OOM exceptions with normalization. Normalization itself also inherits the destination's resource requirements. After work to bring destination memory usage down, this is no longer ideal, since most destinations use less memory than normalization needs. This PR makes it possible to specify the general resource the normalization pod is provided via env vars. Notes: - Add env vars. Default to the various job main container resources if these are not set. - Instead of using the destination's memory, use the normalization specify env vars. --- .env | 4 +++ .../main/java/io/airbyte/config/Configs.java | 28 ++++++++++++++-- .../java/io/airbyte/config/EnvConfigs.java | 25 +++++++++++++++ .../ContainerOrchestratorApp.java | 1 - .../java/io/airbyte/workers/WorkerApp.java | 32 ++++++++++++------- .../workers/process/KubeProcessFactory.java | 2 +- .../sync/NormalizationActivityImpl.java | 1 - .../temporal/sync/SyncWorkflowImpl.java | 25 ++++++++++++--- .../workers/temporal/SyncWorkflowTest.java | 3 +- docker-compose.yaml | 4 +++ kube/overlays/dev/.env | 6 ++++ kube/overlays/stable/.env | 5 +++ kube/resources/worker.yaml | 25 +++++++++++++++ 13 files changed, 139 insertions(+), 22 deletions(-) diff --git a/.env b/.env index a8f8f95d6482..8efe0cda076a 100644 --- a/.env +++ b/.env @@ -67,6 +67,10 @@ JOB_MAIN_CONTAINER_CPU_LIMIT= JOB_MAIN_CONTAINER_MEMORY_REQUEST= JOB_MAIN_CONTAINER_MEMORY_LIMIT= +NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT= +NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST= +NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT= +NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST= ### LOGGING/MONITORING/TRACKING ### TRACKING_STRATEGY=segment diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index f00605241179..692c396afdfe 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -332,17 +332,41 @@ public interface Configs { String getCheckJobMainContainerCpuLimit(); /** - * Define the job container's minimum RAM usage. Defaults to + * Define the check job container's minimum RAM usage. Defaults to * {@link #getJobMainContainerMemoryRequest()} if not set. Internal-use only. */ String getCheckJobMainContainerMemoryRequest(); /** - * Define the job container's maximum RAM usage. Defaults to + * Define the check job container's maximum RAM usage. Defaults to * {@link #getJobMainContainerMemoryLimit()} if not set. Internal-use only. */ String getCheckJobMainContainerMemoryLimit(); + /** + * Define the normalization job container's minimum CPU request. Defaults to + * {@link #getJobMainContainerCpuRequest()} if not set. Internal-use only. + */ + String getNormalizationJobMainContainerCpuRequest(); + + /** + * Define the normalization job container's maximum CPU usage. Defaults to + * {@link #getJobMainContainerCpuLimit()} if not set. Internal-use only. + */ + String getNormalizationJobMainContainerCpuLimit(); + + /** + * Define the normalization job container's minimum RAM usage. Defaults to + * {@link #getJobMainContainerMemoryRequest()} if not set. Internal-use only. + */ + String getNormalizationJobMainContainerMemoryRequest(); + + /** + * Define the normalization job container's maximum RAM usage. Defaults to + * {@link #getJobMainContainerMemoryLimit()} if not set. Internal-use only. + */ + String getNormalizationJobMainContainerMemoryLimit(); + /** * Define one or more Job pod tolerations. Tolerations are separated by ';'. Each toleration * contains k=v pairs mentioning some/all of key, effect, operator and value and separated by `,`. diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index 76e6990230cd..9af20b09d7bd 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -153,6 +153,11 @@ public class EnvConfigs implements Configs { static final String CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST = "CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST"; static final String CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT = "CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT"; + static final String NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST = "NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST"; + static final String NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT = "NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT"; + static final String NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST = "NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST"; + static final String NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT = "NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT"; + // defaults private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache"; public static final String DEFAULT_JOB_KUBE_NAMESPACE = "default"; @@ -766,6 +771,26 @@ public String getCheckJobMainContainerMemoryLimit() { return getEnvOrDefault(CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT, getJobMainContainerMemoryLimit()); } + @Override + public String getNormalizationJobMainContainerCpuRequest() { + return getEnvOrDefault(NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST, getJobMainContainerCpuRequest()); + } + + @Override + public String getNormalizationJobMainContainerCpuLimit() { + return getEnvOrDefault(NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT, getJobMainContainerCpuLimit()); + } + + @Override + public String getNormalizationJobMainContainerMemoryRequest() { + return getEnvOrDefault(NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST, getJobMainContainerMemoryRequest()); + } + + @Override + public String getNormalizationJobMainContainerMemoryLimit() { + return getEnvOrDefault(NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT, getJobMainContainerMemoryLimit()); + } + @Override public LogConfigs getLogConfigs() { return logConfigs; diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java index fb726c0402bd..cd0570bfc535 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ContainerOrchestratorApp.java @@ -196,7 +196,6 @@ private static JobOrchestrator getJobOrchestrator(final Configs configs, final ProcessFactory processFactory, final String application, final FeatureFlags featureFlags) { - return switch (application) { case ReplicationLauncherWorker.REPLICATION -> new ReplicationJobOrchestrator(configs, workerConfigs, processFactory, featureFlags); case NormalizationLauncherWorker.NORMALIZATION -> new NormalizationJobOrchestrator(configs, workerConfigs, processFactory); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index c82d3785a7ef..c4c75b3346e4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -223,9 +223,10 @@ private void registerConnectionManager(final WorkerFactory factory) { private void registerSync(final WorkerFactory factory) { final ReplicationActivityImpl replicationActivity = getReplicationActivityImpl(replicationWorkerConfigs, replicationProcessFactory); - final NormalizationActivityImpl normalizationActivity = getNormalizationActivityImpl( - defaultWorkerConfigs, - defaultProcessFactory); + // Note that the configuration injected here is for the normalization orchestrator, and not the + // normalization pod itself. + // Configuration for the normalization pod is injected via the SyncWorkflowImpl. + final NormalizationActivityImpl normalizationActivity = getNormalizationActivityImpl(defaultWorkerConfigs, defaultProcessFactory); final DbtTransformationActivityImpl dbtTransformationActivity = getDbtActivityImpl( defaultWorkerConfigs, @@ -314,6 +315,15 @@ private DbtTransformationActivityImpl getDbtActivityImpl(final WorkerConfigs wor airbyteVersion); } + /** + * Return either a docker or kubernetes process factory depending on the environment in + * {@link WorkerConfigs} + * + * @param configs used to determine which process factory to create. + * @param workerConfigs used to create the process factory. + * @return either a {@link DockerProcessFactory} or a {@link KubeProcessFactory}. + * @throws IOException + */ private static ProcessFactory getJobProcessFactory(final Configs configs, final WorkerConfigs workerConfigs) throws IOException { if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) { final KubernetesClient fabricClient = new DefaultKubernetesClient(); @@ -341,14 +351,14 @@ private static WorkerOptions getWorkerOptions(final int max) { .build(); } - public static record ContainerOrchestratorConfig( - String namespace, - DocumentStoreClient documentStoreClient, - KubernetesClient kubernetesClient, - String secretName, - String secretMountPath, - String containerOrchestratorImage, - String googleApplicationCredentials) {} + public record ContainerOrchestratorConfig( + String namespace, + DocumentStoreClient documentStoreClient, + KubernetesClient kubernetesClient, + String secretName, + String secretMountPath, + String containerOrchestratorImage, + String googleApplicationCredentials) {} static Optional getContainerOrchestratorConfig(final Configs configs) { if (configs.getContainerOrchestratorEnabled()) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java index 72c52a97c180..99227c82cc8f 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/process/KubeProcessFactory.java @@ -97,7 +97,7 @@ public Process create( try { // used to differentiate source and destination processes with the same id and attempt final String podName = ProcessFactory.createProcessName(imageName, jobType, jobId, attempt, KUBE_NAME_LEN_LIMIT); - LOGGER.info("Attempting to start pod = {} for {}", podName, imageName); + LOGGER.info("Attempting to start pod = {} for {} with resources {}", podName, imageName, resourceRequirements); final int stdoutLocalPort = KubePortManagerSingleton.getInstance().take(); LOGGER.info("{} stdoutLocalPort = {}", podName, stdoutLocalPort); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java index fb07a62db54f..36c08b4cd7ac 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/NormalizationActivityImpl.java @@ -128,7 +128,6 @@ private CheckedSupplier, Except throws IOException { final var jobScope = jobPersistence.getJob(Long.parseLong(jobRunConfig.getJobId())).getScope(); final var connectionId = UUID.fromString(jobScope); - return () -> new NormalizationLauncherWorker( connectionId, destinationLauncherConfig, diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index e60ff71409bf..0ca129ae780a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -4,9 +4,12 @@ package io.airbyte.workers.temporal.sync; +import io.airbyte.config.Configs; +import io.airbyte.config.EnvConfigs; import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; import io.airbyte.config.OperatorDbtInput; +import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOperation.OperatorType; @@ -55,11 +58,8 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, if (syncInput.getOperationSequence() != null && !syncInput.getOperationSequence().isEmpty()) { for (final StandardSyncOperation standardSyncOperation : syncInput.getOperationSequence()) { if (standardSyncOperation.getOperatorType() == OperatorType.NORMALIZATION) { - final NormalizationInput normalizationInput = new NormalizationInput() - .withDestinationConfiguration(syncInput.getDestinationConfiguration()) - .withCatalog(syncOutput.getOutputCatalog()) - .withResourceRequirements(syncInput.getDestinationResourceRequirements()); - + final Configs configs = new EnvConfigs(); + final NormalizationInput normalizationInput = generateNormalizationInput(syncInput, syncOutput, configs); final NormalizationSummary normalizationSummary = normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput); syncOutput = syncOutput.withNormalizationSummary(normalizationSummary); @@ -80,4 +80,19 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, return syncOutput; } + private NormalizationInput generateNormalizationInput(final StandardSyncInput syncInput, + final StandardSyncOutput syncOutput, + final Configs configs) { + final ResourceRequirements resourceReqs = new ResourceRequirements() + .withCpuRequest(configs.getNormalizationJobMainContainerCpuRequest()) + .withCpuLimit(configs.getNormalizationJobMainContainerCpuLimit()) + .withMemoryRequest(configs.getNormalizationJobMainContainerMemoryRequest()) + .withMemoryLimit(configs.getNormalizationJobMainContainerMemoryLimit()); + + return new NormalizationInput() + .withDestinationConfiguration(syncInput.getDestinationConfiguration()) + .withCatalog(syncOutput.getOutputCatalog()) + .withResourceRequirements(resourceReqs); + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java index 2258e03ecc99..dc979922d58a 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/SyncWorkflowTest.java @@ -100,7 +100,8 @@ public void setUp() { normalizationInput = new NormalizationInput() .withDestinationConfiguration(syncInput.getDestinationConfiguration()) - .withCatalog(syncInput.getCatalog()); + .withCatalog(syncInput.getCatalog()) + .withResourceRequirements(new ResourceRequirements()); operatorDbtInput = new OperatorDbtInput() .withDestinationConfiguration(syncInput.getDestinationConfiguration()) diff --git a/docker-compose.yaml b/docker-compose.yaml index 13566de5e479..61502aa9a1c6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -77,6 +77,10 @@ services: - MAX_DISCOVER_WORKERS=${MAX_DISCOVER_WORKERS} - MAX_SPEC_WORKERS=${MAX_SPEC_WORKERS} - MAX_SYNC_WORKERS=${MAX_SYNC_WORKERS} + - NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT=${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT} + - NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST=${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST} + - NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT=${NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT} + - NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST=${NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST} - SECRET_PERSISTENCE=${SECRET_PERSISTENCE} - SYNC_JOB_MAX_ATTEMPTS=${SYNC_JOB_MAX_ATTEMPTS} - SYNC_JOB_MAX_TIMEOUT_DAYS=${SYNC_JOB_MAX_TIMEOUT_DAYS} diff --git a/kube/overlays/dev/.env b/kube/overlays/dev/.env index d62225479835..230efad8e4ec 100644 --- a/kube/overlays/dev/.env +++ b/kube/overlays/dev/.env @@ -56,6 +56,11 @@ JOB_MAIN_CONTAINER_CPU_LIMIT= JOB_MAIN_CONTAINER_MEMORY_REQUEST= JOB_MAIN_CONTAINER_MEMORY_LIMIT= +NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT= +NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST= +NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT= +NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST= + # Worker pod tolerations, annotations and node selectors JOB_KUBE_TOLERATIONS= JOB_KUBE_ANNOTATIONS= @@ -66,6 +71,7 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY= # Launch a separate pod to orchestrate sync steps CONTAINER_ORCHESTRATOR_ENABLED=true +CONTAINER_ORCHESTRATOR_IMAGE= # Open Telemetry Configuration METRIC_CLIENT= diff --git a/kube/overlays/stable/.env b/kube/overlays/stable/.env index b48947558c89..e1b01c39f270 100644 --- a/kube/overlays/stable/.env +++ b/kube/overlays/stable/.env @@ -56,6 +56,11 @@ JOB_MAIN_CONTAINER_CPU_LIMIT= JOB_MAIN_CONTAINER_MEMORY_REQUEST= JOB_MAIN_CONTAINER_MEMORY_LIMIT= +NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT= +NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST= +NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT= +NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST= + # Worker pod tolerations, annotations and node selectors JOB_KUBE_TOLERATIONS= JOB_KUBE_ANNOTATIONS= diff --git a/kube/resources/worker.yaml b/kube/resources/worker.yaml index f5906abd1914..02ceacb266e2 100644 --- a/kube/resources/worker.yaml +++ b/kube/resources/worker.yaml @@ -119,6 +119,26 @@ spec: configMapKeyRef: name: airbyte-env key: JOB_MAIN_CONTAINER_MEMORY_LIMIT + - name: NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST + - name: NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT + - name: NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST + valueFrom: + configMapKeyRef: + name: airbyte-env + key: NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST + - name: NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT + valueFrom: + configMapKeyRef: + name: airbyte-env + key: NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT - name: S3_LOG_BUCKET valueFrom: configMapKeyRef: @@ -210,6 +230,11 @@ spec: configMapKeyRef: name: airbyte-env key: CONTAINER_ORCHESTRATOR_ENABLED + - name: CONTAINER_ORCHESTRATOR_IMAGE + valueFrom: + configMapKeyRef: + name: airbyte-env + key: CONTAINER_ORCHESTRATOR_IMAGE - name: CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION valueFrom: configMapKeyRef: