Skip to content

Commit

Permalink
Make it possible to specify normalization pod resources. (#15495)
Browse files Browse the repository at this point in the history
Today we are running into OOM exceptions with normalization. Normalization itself also inherits the destination's resource requirements. After work to bring destination memory usage down, this is no longer ideal, since most destinations use less memory than normalization needs.

This PR makes it possible to specify the general resource the normalization pod is provided via env vars.

Notes:
- Add env vars. Default to the various job main container resources if these are not set.
- Instead of using the destination's memory, use the normalization specify env vars.
  • Loading branch information
davinchia authored and girarda committed Aug 11, 2022
1 parent 563870c commit ebf8352
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 22 deletions.
4 changes: 4 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ JOB_MAIN_CONTAINER_CPU_LIMIT=
JOB_MAIN_CONTAINER_MEMORY_REQUEST=
JOB_MAIN_CONTAINER_MEMORY_LIMIT=

NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT=
NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST=
NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT=
NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST=

### LOGGING/MONITORING/TRACKING ###
TRACKING_STRATEGY=segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,17 +332,41 @@ public interface Configs {
String getCheckJobMainContainerCpuLimit();

/**
* Define the job container's minimum RAM usage. Defaults to
* Define the check job container's minimum RAM usage. Defaults to
* {@link #getJobMainContainerMemoryRequest()} if not set. Internal-use only.
*/
String getCheckJobMainContainerMemoryRequest();

/**
* Define the job container's maximum RAM usage. Defaults to
* Define the check job container's maximum RAM usage. Defaults to
* {@link #getJobMainContainerMemoryLimit()} if not set. Internal-use only.
*/
String getCheckJobMainContainerMemoryLimit();

/**
* Define the normalization job container's minimum CPU request. Defaults to
* {@link #getJobMainContainerCpuRequest()} if not set. Internal-use only.
*/
String getNormalizationJobMainContainerCpuRequest();

/**
* Define the normalization job container's maximum CPU usage. Defaults to
* {@link #getJobMainContainerCpuLimit()} if not set. Internal-use only.
*/
String getNormalizationJobMainContainerCpuLimit();

/**
* Define the normalization job container's minimum RAM usage. Defaults to
* {@link #getJobMainContainerMemoryRequest()} if not set. Internal-use only.
*/
String getNormalizationJobMainContainerMemoryRequest();

/**
* Define the normalization job container's maximum RAM usage. Defaults to
* {@link #getJobMainContainerMemoryLimit()} if not set. Internal-use only.
*/
String getNormalizationJobMainContainerMemoryLimit();

/**
* Define one or more Job pod tolerations. Tolerations are separated by ';'. Each toleration
* contains k=v pairs mentioning some/all of key, effect, operator and value and separated by `,`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ public class EnvConfigs implements Configs {
static final String CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST = "CHECK_JOB_MAIN_CONTAINER_MEMORY_REQUEST";
static final String CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT = "CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT";

static final String NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST = "NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST";
static final String NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT = "NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT";
static final String NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST = "NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST";
static final String NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT = "NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT";

// defaults
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
public static final String DEFAULT_JOB_KUBE_NAMESPACE = "default";
Expand Down Expand Up @@ -766,6 +771,26 @@ public String getCheckJobMainContainerMemoryLimit() {
return getEnvOrDefault(CHECK_JOB_MAIN_CONTAINER_MEMORY_LIMIT, getJobMainContainerMemoryLimit());
}

@Override
public String getNormalizationJobMainContainerCpuRequest() {
return getEnvOrDefault(NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST, getJobMainContainerCpuRequest());
}

@Override
public String getNormalizationJobMainContainerCpuLimit() {
return getEnvOrDefault(NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT, getJobMainContainerCpuLimit());
}

@Override
public String getNormalizationJobMainContainerMemoryRequest() {
return getEnvOrDefault(NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST, getJobMainContainerMemoryRequest());
}

@Override
public String getNormalizationJobMainContainerMemoryLimit() {
return getEnvOrDefault(NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT, getJobMainContainerMemoryLimit());
}

@Override
public LogConfigs getLogConfigs() {
return logConfigs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ private static JobOrchestrator<?> getJobOrchestrator(final Configs configs,
final ProcessFactory processFactory,
final String application,
final FeatureFlags featureFlags) {

return switch (application) {
case ReplicationLauncherWorker.REPLICATION -> new ReplicationJobOrchestrator(configs, workerConfigs, processFactory, featureFlags);
case NormalizationLauncherWorker.NORMALIZATION -> new NormalizationJobOrchestrator(configs, workerConfigs, processFactory);
Expand Down
32 changes: 21 additions & 11 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,10 @@ private void registerConnectionManager(final WorkerFactory factory) {
private void registerSync(final WorkerFactory factory) {
final ReplicationActivityImpl replicationActivity = getReplicationActivityImpl(replicationWorkerConfigs, replicationProcessFactory);

final NormalizationActivityImpl normalizationActivity = getNormalizationActivityImpl(
defaultWorkerConfigs,
defaultProcessFactory);
// Note that the configuration injected here is for the normalization orchestrator, and not the
// normalization pod itself.
// Configuration for the normalization pod is injected via the SyncWorkflowImpl.
final NormalizationActivityImpl normalizationActivity = getNormalizationActivityImpl(defaultWorkerConfigs, defaultProcessFactory);

final DbtTransformationActivityImpl dbtTransformationActivity = getDbtActivityImpl(
defaultWorkerConfigs,
Expand Down Expand Up @@ -314,6 +315,15 @@ private DbtTransformationActivityImpl getDbtActivityImpl(final WorkerConfigs wor
airbyteVersion);
}

/**
* Return either a docker or kubernetes process factory depending on the environment in
* {@link WorkerConfigs}
*
* @param configs used to determine which process factory to create.
* @param workerConfigs used to create the process factory.
* @return either a {@link DockerProcessFactory} or a {@link KubeProcessFactory}.
* @throws IOException
*/
private static ProcessFactory getJobProcessFactory(final Configs configs, final WorkerConfigs workerConfigs) throws IOException {
if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
final KubernetesClient fabricClient = new DefaultKubernetesClient();
Expand Down Expand Up @@ -341,14 +351,14 @@ private static WorkerOptions getWorkerOptions(final int max) {
.build();
}

public static record ContainerOrchestratorConfig(
String namespace,
DocumentStoreClient documentStoreClient,
KubernetesClient kubernetesClient,
String secretName,
String secretMountPath,
String containerOrchestratorImage,
String googleApplicationCredentials) {}
public record ContainerOrchestratorConfig(
String namespace,
DocumentStoreClient documentStoreClient,
KubernetesClient kubernetesClient,
String secretName,
String secretMountPath,
String containerOrchestratorImage,
String googleApplicationCredentials) {}

static Optional<ContainerOrchestratorConfig> getContainerOrchestratorConfig(final Configs configs) {
if (configs.getContainerOrchestratorEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public Process create(
try {
// used to differentiate source and destination processes with the same id and attempt
final String podName = ProcessFactory.createProcessName(imageName, jobType, jobId, attempt, KUBE_NAME_LEN_LIMIT);
LOGGER.info("Attempting to start pod = {} for {}", podName, imageName);
LOGGER.info("Attempting to start pod = {} for {} with resources {}", podName, imageName, resourceRequirements);

final int stdoutLocalPort = KubePortManagerSingleton.getInstance().take();
LOGGER.info("{} stdoutLocalPort = {}", podName, stdoutLocalPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ private CheckedSupplier<Worker<NormalizationInput, NormalizationSummary>, Except
throws IOException {
final var jobScope = jobPersistence.getJob(Long.parseLong(jobRunConfig.getJobId())).getScope();
final var connectionId = UUID.fromString(jobScope);

return () -> new NormalizationLauncherWorker(
connectionId,
destinationLauncherConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

package io.airbyte.workers.temporal.sync;

import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.OperatorDbtInput;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
Expand Down Expand Up @@ -55,11 +58,8 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
if (syncInput.getOperationSequence() != null && !syncInput.getOperationSequence().isEmpty()) {
for (final StandardSyncOperation standardSyncOperation : syncInput.getOperationSequence()) {
if (standardSyncOperation.getOperatorType() == OperatorType.NORMALIZATION) {
final NormalizationInput normalizationInput = new NormalizationInput()
.withDestinationConfiguration(syncInput.getDestinationConfiguration())
.withCatalog(syncOutput.getOutputCatalog())
.withResourceRequirements(syncInput.getDestinationResourceRequirements());

final Configs configs = new EnvConfigs();
final NormalizationInput normalizationInput = generateNormalizationInput(syncInput, syncOutput, configs);
final NormalizationSummary normalizationSummary =
normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput);
syncOutput = syncOutput.withNormalizationSummary(normalizationSummary);
Expand All @@ -80,4 +80,19 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig,
return syncOutput;
}

private NormalizationInput generateNormalizationInput(final StandardSyncInput syncInput,
final StandardSyncOutput syncOutput,
final Configs configs) {
final ResourceRequirements resourceReqs = new ResourceRequirements()
.withCpuRequest(configs.getNormalizationJobMainContainerCpuRequest())
.withCpuLimit(configs.getNormalizationJobMainContainerCpuLimit())
.withMemoryRequest(configs.getNormalizationJobMainContainerMemoryRequest())
.withMemoryLimit(configs.getNormalizationJobMainContainerMemoryLimit());

return new NormalizationInput()
.withDestinationConfiguration(syncInput.getDestinationConfiguration())
.withCatalog(syncOutput.getOutputCatalog())
.withResourceRequirements(resourceReqs);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public void setUp() {

normalizationInput = new NormalizationInput()
.withDestinationConfiguration(syncInput.getDestinationConfiguration())
.withCatalog(syncInput.getCatalog());
.withCatalog(syncInput.getCatalog())
.withResourceRequirements(new ResourceRequirements());

operatorDbtInput = new OperatorDbtInput()
.withDestinationConfiguration(syncInput.getDestinationConfiguration())
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ services:
- MAX_DISCOVER_WORKERS=${MAX_DISCOVER_WORKERS}
- MAX_SPEC_WORKERS=${MAX_SPEC_WORKERS}
- MAX_SYNC_WORKERS=${MAX_SYNC_WORKERS}
- NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT=${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT}
- NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST=${NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST}
- NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT=${NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT}
- NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST=${NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST}
- SECRET_PERSISTENCE=${SECRET_PERSISTENCE}
- SYNC_JOB_MAX_ATTEMPTS=${SYNC_JOB_MAX_ATTEMPTS}
- SYNC_JOB_MAX_TIMEOUT_DAYS=${SYNC_JOB_MAX_TIMEOUT_DAYS}
Expand Down
6 changes: 6 additions & 0 deletions kube/overlays/dev/.env
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ JOB_MAIN_CONTAINER_CPU_LIMIT=
JOB_MAIN_CONTAINER_MEMORY_REQUEST=
JOB_MAIN_CONTAINER_MEMORY_LIMIT=

NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT=
NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST=
NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT=
NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST=

# Worker pod tolerations, annotations and node selectors
JOB_KUBE_TOLERATIONS=
JOB_KUBE_ANNOTATIONS=
Expand All @@ -66,6 +71,7 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY=

# Launch a separate pod to orchestrate sync steps
CONTAINER_ORCHESTRATOR_ENABLED=true
CONTAINER_ORCHESTRATOR_IMAGE=

# Open Telemetry Configuration
METRIC_CLIENT=
Expand Down
5 changes: 5 additions & 0 deletions kube/overlays/stable/.env
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ JOB_MAIN_CONTAINER_CPU_LIMIT=
JOB_MAIN_CONTAINER_MEMORY_REQUEST=
JOB_MAIN_CONTAINER_MEMORY_LIMIT=

NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT=
NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST=
NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT=
NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST=

# Worker pod tolerations, annotations and node selectors
JOB_KUBE_TOLERATIONS=
JOB_KUBE_ANNOTATIONS=
Expand Down
25 changes: 25 additions & 0 deletions kube/resources/worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,26 @@ spec:
configMapKeyRef:
name: airbyte-env
key: JOB_MAIN_CONTAINER_MEMORY_LIMIT
- name: NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST
valueFrom:
configMapKeyRef:
name: airbyte-env
key: NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_REQUEST
- name: NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT
valueFrom:
configMapKeyRef:
name: airbyte-env
key: NORMALIZATION_JOB_MAIN_CONTAINER_MEMORY_LIMIT
- name: NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST
valueFrom:
configMapKeyRef:
name: airbyte-env
key: NORMALIZATION_JOB_MAIN_CONTAINER_CPU_REQUEST
- name: NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT
valueFrom:
configMapKeyRef:
name: airbyte-env
key: NORMALIZATION_JOB_MAIN_CONTAINER_CPU_LIMIT
- name: S3_LOG_BUCKET
valueFrom:
configMapKeyRef:
Expand Down Expand Up @@ -210,6 +230,11 @@ spec:
configMapKeyRef:
name: airbyte-env
key: CONTAINER_ORCHESTRATOR_ENABLED
- name: CONTAINER_ORCHESTRATOR_IMAGE
valueFrom:
configMapKeyRef:
name: airbyte-env
key: CONTAINER_ORCHESTRATOR_IMAGE
- name: CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION
valueFrom:
configMapKeyRef:
Expand Down

0 comments on commit ebf8352

Please sign in to comment.