diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java index ae57868a6909..48a4d686b53f 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/process/KubePodProcess.java @@ -7,6 +7,8 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.config.Configs; +import io.airbyte.config.EnvConfigs; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.TolerationPOJO; import io.airbyte.metrics.lib.MetricClientFactory; @@ -101,17 +103,19 @@ // TODO(Davin): Better test for this. See https://github.com/airbytehq/airbyte/issues/3700. public class KubePodProcess extends Process implements KubePod { + private static final Configs configs = new EnvConfigs(); + private static final Logger LOGGER = LoggerFactory.getLogger(KubePodProcess.class); public static final String MAIN_CONTAINER_NAME = "main"; public static final String INIT_CONTAINER_NAME = "init"; - private static final String DEFAULT_MEMORY_REQUEST = "25Mi"; - private static final String DEFAULT_MEMORY_LIMIT = "50Mi"; - private static final String DEFAULT_CPU_REQUEST = "0.1"; - private static final String DEFAULT_CPU_LIMIT = "0.2"; + private static final ResourceRequirements DEFAULT_SIDECAR_RESOURCES = new ResourceRequirements() - .withMemoryLimit(DEFAULT_MEMORY_LIMIT).withMemoryRequest(DEFAULT_MEMORY_REQUEST) - .withCpuLimit(DEFAULT_CPU_LIMIT).withCpuRequest(DEFAULT_CPU_REQUEST); + .withMemoryLimit(configs.getSidecarKubeMemoryLimit()).withMemoryRequest(configs.getSidecarMemoryRequest()) + .withCpuLimit(configs.getSidecarKubeCpuLimit()).withCpuRequest(configs.getSidecarKubeCpuRequest()); + private static final ResourceRequirements DEFAULT_SOCAT_RESOURCES = new ResourceRequirements() + .withMemoryLimit(configs.getSidecarKubeMemoryLimit()).withMemoryRequest(configs.getSidecarMemoryRequest()) + .withCpuLimit(configs.getSocatSidecarKubeCpuLimit()).withCpuRequest(configs.getSocatSidecarKubeCpuRequest()); private static final String PIPES_DIR = "/pipes"; private static final String STDIN_PIPE_FILE = PIPES_DIR + "/stdin"; @@ -446,13 +450,17 @@ public KubePodProcess(final boolean isOrchestrator, // Printing socat notice logs with socat -d -d // To print info logs as well use socat -d -d -d // more info: https://linux.die.net/man/1/socat - final io.fabric8.kubernetes.api.model.ResourceRequirements sidecarResources = getResourceRequirementsBuilder(DEFAULT_SIDECAR_RESOURCES).build(); + final io.fabric8.kubernetes.api.model.ResourceRequirements heartbeatSidecarResources = + getResourceRequirementsBuilder(DEFAULT_SIDECAR_RESOURCES).build(); + final io.fabric8.kubernetes.api.model.ResourceRequirements socatSidecarResources = + getResourceRequirementsBuilder(DEFAULT_SOCAT_RESOURCES).build(); + final Container remoteStdin = new ContainerBuilder() .withName("remote-stdin") .withImage(socatImage) .withCommand("sh", "-c", "socat -d -d TCP-L:9001 STDOUT > " + STDIN_PIPE_FILE) .withVolumeMounts(pipeVolumeMount, terminationVolumeMount) - .withResources(sidecarResources) + .withResources(socatSidecarResources) .withImagePullPolicy(sidecarImagePullPolicy) .build(); @@ -461,7 +469,7 @@ public KubePodProcess(final boolean isOrchestrator, .withImage(socatImage) .withCommand("sh", "-c", String.format("cat %s | socat -d -d -t 60 - TCP:%s:%s", STDOUT_PIPE_FILE, processRunnerHost, stdoutLocalPort)) .withVolumeMounts(pipeVolumeMount, terminationVolumeMount) - .withResources(sidecarResources) + .withResources(socatSidecarResources) .withImagePullPolicy(sidecarImagePullPolicy) .build(); @@ -470,7 +478,7 @@ public KubePodProcess(final boolean isOrchestrator, .withImage(socatImage) .withCommand("sh", "-c", String.format("cat %s | socat -d -d -t 60 - TCP:%s:%s", STDERR_PIPE_FILE, processRunnerHost, stderrLocalPort)) .withVolumeMounts(pipeVolumeMount, terminationVolumeMount) - .withResources(sidecarResources) + .withResources(socatSidecarResources) .withImagePullPolicy(sidecarImagePullPolicy) .build(); @@ -487,7 +495,7 @@ public KubePodProcess(final boolean isOrchestrator, .withCommand("sh") .withArgs("-c", heartbeatCommand) .withVolumeMounts(terminationVolumeMount) - .withResources(sidecarResources) + .withResources(heartbeatSidecarResources) .withImagePullPolicy(sidecarImagePullPolicy) .build(); diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index 8769119ddd27..64a51d1e171b 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -457,10 +457,40 @@ public interface Configs { String getJobKubeMainContainerImagePullSecret(); /** - * Define the Job pod socat image. + * Define the Memory request for the Sidecar + */ + String getSidecarMemoryRequest(); + + /** + * Define the Memory limit for the Sidecar + */ + String getSidecarKubeMemoryLimit(); + + /** + * Define the CPU request for the Sidecar + */ + String getSidecarKubeCpuRequest(); + + /** + * Define the CPU limit for the Sidecar + */ + String getSidecarKubeCpuLimit(); + + /** + * Define the CPU request for the SOCAT Sidecar */ String getJobKubeSocatImage(); + /** + * Define the CPU limit for the SOCAT Sidecar + */ + String getSocatSidecarKubeCpuLimit(); + + /** + * Define the Job pod socat image. + */ + String getSocatSidecarKubeCpuRequest(); + /** * Define the Job pod busybox image. */ diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index ef3d7b6750d6..dda293e2cb69 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -73,7 +73,17 @@ public class EnvConfigs implements Configs { public static final String JOB_KUBE_TOLERATIONS = "JOB_KUBE_TOLERATIONS"; public static final String JOB_KUBE_NODE_SELECTORS = "JOB_KUBE_NODE_SELECTORS"; public static final String JOB_KUBE_ANNOTATIONS = "JOB_KUBE_ANNOTATIONS"; + private static final String DEFAULT_SIDECAR_MEMORY_REQUEST = "25Mi"; + private static final String SIDECAR_MEMORY_REQUEST = "SIDECAR_MEMORY_REQUEST"; + private static final String DEFAULT_SIDECAR_KUBE_MEMORY_LIMIT = "50Mi"; + private static final String SIDECAR_KUBE_MEMORY_LIMIT = "SIDECAR_KUBE_MEMORY_LIMIT"; + private static final String DEFAULT_SIDECAR_KUBE_CPU_REQUEST = "0.1"; + private static final String SIDECAR_KUBE_CPU_REQUEST = "SIDECAR_KUBE_CPU_REQUEST"; + private static final String DEFAULT_SIDECAR_KUBE_CPU_LIMIT = "0.2"; + private static final String SIDECAR_KUBE_CPU_LIMIT = "SIDECAR_KUBE_CPU_LIMIT"; public static final String JOB_KUBE_SOCAT_IMAGE = "JOB_KUBE_SOCAT_IMAGE"; + private static final String SOCAT_KUBE_CPU_LIMIT = "SOCAT_KUBE_CPU_LIMIT"; + private static final String SOCAT_KUBE_CPU_REQUEST = "SOCAT_KUBE_CPU_REQUEST"; public static final String JOB_KUBE_BUSYBOX_IMAGE = "JOB_KUBE_BUSYBOX_IMAGE"; public static final String JOB_KUBE_CURL_IMAGE = "JOB_KUBE_CURL_IMAGE"; public static final String SYNC_JOB_MAX_ATTEMPTS = "SYNC_JOB_MAX_ATTEMPTS"; @@ -725,11 +735,41 @@ public String getJobKubeMainContainerImagePullSecret() { return getEnvOrDefault(JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET, ""); } + @Override + public String getSidecarKubeCpuRequest() { + return getEnvOrDefault(SIDECAR_KUBE_CPU_REQUEST, DEFAULT_SIDECAR_KUBE_CPU_REQUEST); + } + + @Override + public String getSidecarKubeCpuLimit() { + return getEnvOrDefault(SIDECAR_KUBE_CPU_LIMIT, DEFAULT_SIDECAR_KUBE_CPU_LIMIT); + } + + @Override + public String getSidecarKubeMemoryLimit() { + return getEnvOrDefault(SIDECAR_KUBE_MEMORY_LIMIT, DEFAULT_SIDECAR_KUBE_MEMORY_LIMIT); + } + + @Override + public String getSidecarMemoryRequest() { + return getEnvOrDefault(SIDECAR_MEMORY_REQUEST, DEFAULT_SIDECAR_MEMORY_REQUEST); + } + @Override public String getJobKubeSocatImage() { return getEnvOrDefault(JOB_KUBE_SOCAT_IMAGE, DEFAULT_JOB_KUBE_SOCAT_IMAGE); } + @Override + public String getSocatSidecarKubeCpuRequest() { + return getEnvOrDefault(SOCAT_KUBE_CPU_REQUEST, getSidecarKubeCpuRequest()); + } + + @Override + public String getSocatSidecarKubeCpuLimit() { + return getEnvOrDefault(SOCAT_KUBE_CPU_LIMIT, getSidecarKubeCpuLimit()); + } + @Override public String getJobKubeBusyboxImage() { return getEnvOrDefault(JOB_KUBE_BUSYBOX_IMAGE, DEFAULT_JOB_KUBE_BUSYBOX_IMAGE);