Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

socat gets more resources #19953

Merged
merged 4 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.TolerationPOJO;
import io.airbyte.metrics.lib.MetricClientFactory;
Expand Down Expand Up @@ -101,17 +103,19 @@
// TODO(Davin): Better test for this. See https://github.com/airbytehq/airbyte/issues/3700.
public class KubePodProcess extends Process implements KubePod {

private static final Configs configs = new EnvConfigs();

private static final Logger LOGGER = LoggerFactory.getLogger(KubePodProcess.class);

public static final String MAIN_CONTAINER_NAME = "main";
public static final String INIT_CONTAINER_NAME = "init";
private static final String DEFAULT_MEMORY_REQUEST = "25Mi";
private static final String DEFAULT_MEMORY_LIMIT = "50Mi";
private static final String DEFAULT_CPU_REQUEST = "0.1";
private static final String DEFAULT_CPU_LIMIT = "0.2";

private static final ResourceRequirements DEFAULT_SIDECAR_RESOURCES = new ResourceRequirements()
.withMemoryLimit(DEFAULT_MEMORY_LIMIT).withMemoryRequest(DEFAULT_MEMORY_REQUEST)
.withCpuLimit(DEFAULT_CPU_LIMIT).withCpuRequest(DEFAULT_CPU_REQUEST);
.withMemoryLimit(configs.getSidecarKubeMemoryLimit()).withMemoryRequest(configs.getSidecarMemoryRequest())
.withCpuLimit(configs.getSidecarKubeCpuLimit()).withCpuRequest(configs.getSidecarKubeCpuRequest());
private static final ResourceRequirements DEFAULT_SOCAT_RESOURCES = new ResourceRequirements()
.withMemoryLimit(configs.getSidecarKubeMemoryLimit()).withMemoryRequest(configs.getSidecarMemoryRequest())
.withCpuLimit(configs.getSocatSidecarKubeCpuLimit()).withCpuRequest(configs.getSocatSidecarKubeCpuRequest());

private static final String PIPES_DIR = "/pipes";
private static final String STDIN_PIPE_FILE = PIPES_DIR + "/stdin";
Expand Down Expand Up @@ -446,13 +450,17 @@ public KubePodProcess(final boolean isOrchestrator,
// Printing socat notice logs with socat -d -d
// To print info logs as well use socat -d -d -d
// more info: https://linux.die.net/man/1/socat
final io.fabric8.kubernetes.api.model.ResourceRequirements sidecarResources = getResourceRequirementsBuilder(DEFAULT_SIDECAR_RESOURCES).build();
final io.fabric8.kubernetes.api.model.ResourceRequirements heartbeatSidecarResources =
getResourceRequirementsBuilder(DEFAULT_SIDECAR_RESOURCES).build();
final io.fabric8.kubernetes.api.model.ResourceRequirements socatSidecarResources =
getResourceRequirementsBuilder(DEFAULT_SOCAT_RESOURCES).build();

final Container remoteStdin = new ContainerBuilder()
.withName("remote-stdin")
.withImage(socatImage)
.withCommand("sh", "-c", "socat -d -d TCP-L:9001 STDOUT > " + STDIN_PIPE_FILE)
.withVolumeMounts(pipeVolumeMount, terminationVolumeMount)
.withResources(sidecarResources)
.withResources(socatSidecarResources)
.withImagePullPolicy(sidecarImagePullPolicy)
.build();

Expand All @@ -461,7 +469,7 @@ public KubePodProcess(final boolean isOrchestrator,
.withImage(socatImage)
.withCommand("sh", "-c", String.format("cat %s | socat -d -d -t 60 - TCP:%s:%s", STDOUT_PIPE_FILE, processRunnerHost, stdoutLocalPort))
.withVolumeMounts(pipeVolumeMount, terminationVolumeMount)
.withResources(sidecarResources)
.withResources(socatSidecarResources)
.withImagePullPolicy(sidecarImagePullPolicy)
.build();

Expand All @@ -470,7 +478,7 @@ public KubePodProcess(final boolean isOrchestrator,
.withImage(socatImage)
.withCommand("sh", "-c", String.format("cat %s | socat -d -d -t 60 - TCP:%s:%s", STDERR_PIPE_FILE, processRunnerHost, stderrLocalPort))
.withVolumeMounts(pipeVolumeMount, terminationVolumeMount)
.withResources(sidecarResources)
.withResources(socatSidecarResources)
.withImagePullPolicy(sidecarImagePullPolicy)
.build();

Expand All @@ -487,7 +495,7 @@ public KubePodProcess(final boolean isOrchestrator,
.withCommand("sh")
.withArgs("-c", heartbeatCommand)
.withVolumeMounts(terminationVolumeMount)
.withResources(sidecarResources)
.withResources(heartbeatSidecarResources)
.withImagePullPolicy(sidecarImagePullPolicy)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,40 @@ public interface Configs {
String getJobKubeMainContainerImagePullSecret();

/**
* Define the Job pod socat image.
* Define the Memory request for the Sidecar
*/
String getSidecarMemoryRequest();

/**
* Define the Memory limit for the Sidecar
*/
String getSidecarKubeMemoryLimit();

/**
* Define the CPU request for the Sidecar
*/
String getSidecarKubeCpuRequest();

/**
* Define the CPU limit for the Sidecar
*/
String getSidecarKubeCpuLimit();

/**
* Define the CPU request for the SOCAT Sidecar
*/
String getJobKubeSocatImage();

/**
* Define the CPU limit for the SOCAT Sidecar
*/
String getSocatSidecarKubeCpuLimit();

/**
* Define the Job pod socat image.
*/
String getSocatSidecarKubeCpuRequest();

/**
* Define the Job pod busybox image.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,17 @@ public class EnvConfigs implements Configs {
public static final String JOB_KUBE_TOLERATIONS = "JOB_KUBE_TOLERATIONS";
public static final String JOB_KUBE_NODE_SELECTORS = "JOB_KUBE_NODE_SELECTORS";
public static final String JOB_KUBE_ANNOTATIONS = "JOB_KUBE_ANNOTATIONS";
private static final String DEFAULT_SIDECAR_MEMORY_REQUEST = "25Mi";
evantahler marked this conversation as resolved.
Show resolved Hide resolved
private static final String SIDECAR_MEMORY_REQUEST = "SIDECAR_MEMORY_REQUEST";
private static final String DEFAULT_SIDECAR_KUBE_MEMORY_LIMIT = "50Mi";
private static final String SIDECAR_KUBE_MEMORY_LIMIT = "SIDECAR_KUBE_MEMORY_LIMIT";
private static final String DEFAULT_SIDECAR_KUBE_CPU_REQUEST = "0.1";
private static final String SIDECAR_KUBE_CPU_REQUEST = "SIDECAR_KUBE_CPU_REQUEST";
private static final String DEFAULT_SIDECAR_KUBE_CPU_LIMIT = "0.2";
private static final String SIDECAR_KUBE_CPU_LIMIT = "SIDECAR_KUBE_CPU_LIMIT";
public static final String JOB_KUBE_SOCAT_IMAGE = "JOB_KUBE_SOCAT_IMAGE";
private static final String SOCAT_KUBE_CPU_LIMIT = "SOCAT_KUBE_CPU_LIMIT";
private static final String SOCAT_KUBE_CPU_REQUEST = "SOCAT_KUBE_CPU_REQUEST";
public static final String JOB_KUBE_BUSYBOX_IMAGE = "JOB_KUBE_BUSYBOX_IMAGE";
public static final String JOB_KUBE_CURL_IMAGE = "JOB_KUBE_CURL_IMAGE";
public static final String SYNC_JOB_MAX_ATTEMPTS = "SYNC_JOB_MAX_ATTEMPTS";
Expand Down Expand Up @@ -725,11 +735,39 @@ public String getJobKubeMainContainerImagePullSecret() {
return getEnvOrDefault(JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET, "");
}

@Override
public String getSidecarKubeCpuRequest() {
return getEnvOrDefault(SIDECAR_KUBE_CPU_REQUEST, DEFAULT_SIDECAR_KUBE_CPU_REQUEST);
}

@Override
public String getSidecarKubeCpuLimit() {
return getEnvOrDefault(SIDECAR_KUBE_CPU_LIMIT, DEFAULT_SIDECAR_KUBE_CPU_LIMIT);
}

@Override
public String getSidecarKubeMemoryLimit() {
return getEnvOrDefault(SIDECAR_KUBE_MEMORY_LIMIT, DEFAULT_SIDECAR_KUBE_MEMORY_LIMIT);
}

@Override
public String getSidecarMemoryRequest() {
evantahler marked this conversation as resolved.
Show resolved Hide resolved
return getEnvOrDefault(SIDECAR_MEMORY_REQUEST, DEFAULT_SIDECAR_MEMORY_REQUEST);
}

@Override
public String getJobKubeSocatImage() {
return getEnvOrDefault(JOB_KUBE_SOCAT_IMAGE, DEFAULT_JOB_KUBE_SOCAT_IMAGE);
}

@Override
public String getSocatSidecarKubeCpuRequest() {
return getEnvOrDefault(SOCAT_KUBE_CPU_REQUEST, getSidecarKubeCpuRequest());
}

@Override
public String getSocatSidecarKubeCpuLimit() { return getEnvOrDefault(SOCAT_KUBE_CPU_LIMIT, getSidecarKubeCpuLimit()); }

@Override
public String getJobKubeBusyboxImage() {
return getEnvOrDefault(JOB_KUBE_BUSYBOX_IMAGE, DEFAULT_JOB_KUBE_BUSYBOX_IMAGE);
Expand Down