Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve JOB_POD variable naming + improve doc about memory management #9048

Merged
merged 10 commits into from
Dec 23, 2021
14 changes: 7 additions & 7 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ SUBMITTER_NUM_THREADS=10

# Job container images
# Usually you should not need to set these, they have defaults already set
JOB_POD_SOCAT_IMAGE=
JOB_POD_BUSYBOX_IMAGE=
JOB_POD_CURL_IMAGE=
JOB_KUBE_SOCAT_IMAGE=
JOB_KUBE_BUSYBOX_IMAGE=
JOB_KUBE_CURL_IMAGE=

# Miscellaneous
TRACKING_STRATEGY=segment
Expand All @@ -73,10 +73,10 @@ S3_PATH_STYLE_ACCESS=
GCS_LOG_BUCKET=

# Docker Resource Limits
JOB_POD_MAIN_CONTAINER_CPU_REQUEST=
JOB_POD_MAIN_CONTAINER_CPU_LIMIT=
JOB_POD_MAIN_CONTAINER_MEMORY_REQUEST=
JOB_POD_MAIN_CONTAINER_MEMORY_LIMIT=
JOB_MAIN_CONTAINER_CPU_REQUEST=
JOB_MAIN_CONTAINER_CPU_LIMIT=
JOB_MAIN_CONTAINER_MEMORY_REQUEST=
JOB_MAIN_CONTAINER_MEMORY_LIMIT=

# Max attempts per sync and max retries per attempt
SYNC_JOB_MAX_ATTEMPTS=3
Expand Down
24 changes: 12 additions & 12 deletions airbyte-config/models/src/main/java/io/airbyte/config/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,29 +85,29 @@ public interface Configs {

int getSyncJobMaxTimeoutDays();

List<TolerationPOJO> getJobPodTolerations();
List<TolerationPOJO> getJobKubeTolerations();

Map<String, String> getJobPodNodeSelectors();
Map<String, String> getJobKubeNodeSelectors();

String getJobPodMainContainerImagePullPolicy();
String getJobKubeMainContainerImagePullPolicy();

String getJobPodMainContainerImagePullSecret();
String getJobKubeMainContainerImagePullSecret();

String getJobPodSocatImage();
String getJobKubeSocatImage();

String getJobPodBusyboxImage();
String getJobKubeBusyboxImage();

String getJobPodCurlImage();
String getJobKubeCurlImage();

String getJobPodKubeNamespace();
String getJobKubeNamespace();

String getJobPodMainContainerCpuRequest();
String getJobMainContainerCpuRequest();

String getJobPodMainContainerCpuLimit();
String getJobMainContainerCpuLimit();

String getJobPodMainContainerMemoryRequest();
String getJobMainContainerMemoryRequest();

String getJobPodMainContainerMemoryLimit();
String getJobMainContainerMemoryLimit();

// Logging/Monitoring/Tracking
LogConfigs getLogConfigs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ public class EnvConfigs implements Configs {
public static final String LOG_LEVEL = "LOG_LEVEL";
public static final String S3_PATH_STYLE_ACCESS = "S3_PATH_STYLE_ACCESS";
public static final String WEBAPP_URL = "WEBAPP_URL";
public static final String JOB_POD_MAIN_CONTAINER_IMAGE_PULL_POLICY = "JOB_POD_MAIN_CONTAINER_IMAGE_PULL_POLICY";
public static final String JOB_POD_TOLERATIONS = "JOB_POD_TOLERATIONS";
public static final String JOB_POD_NODE_SELECTORS = "JOB_POD_NODE_SELECTORS";
public static final String JOB_POD_SOCAT_IMAGE = "JOB_POD_SOCAT_IMAGE";
public static final String JOB_POD_BUSYBOX_IMAGE = "JOB_POD_BUSYBOX_IMAGE";
public static final String JOB_POD_CURL_IMAGE = "JOB_POD_CURL_IMAGE";
public static final String JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY = "JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY";
public static final String JOB_KUBE_TOLERATIONS = "JOB_KUBE_TOLERATIONS";
public static final String JOB_KUBE_NODE_SELECTORS = "JOB_KUBE_NODE_SELECTORS";
public static final String JOB_KUBE_SOCAT_IMAGE = "JOB_KUBE_SOCAT_IMAGE";
public static final String JOB_KUBE_BUSYBOX_IMAGE = "JOB_KUBE_BUSYBOX_IMAGE";
public static final String JOB_KUBE_CURL_IMAGE = "JOB_KUBE_CURL_IMAGE";
public static final String SYNC_JOB_MAX_ATTEMPTS = "SYNC_JOB_MAX_ATTEMPTS";
public static final String SYNC_JOB_MAX_TIMEOUT_DAYS = "SYNC_JOB_MAX_TIMEOUT_DAYS";
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
Expand All @@ -73,14 +73,14 @@ public class EnvConfigs implements Configs {
public static final String MAX_SYNC_WORKERS = "MAX_SYNC_WORKERS";
private static final String TEMPORAL_HOST = "TEMPORAL_HOST";
private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS";
private static final String JOB_POD_KUBE_NAMESPACE = "JOB_POD_KUBE_NAMESPACE";
private static final String JOB_KUBE_NAMESPACE = "JOB_KUBE_NAMESPACE";
private static final String SUBMITTER_NUM_THREADS = "SUBMITTER_NUM_THREADS";
public static final String JOB_POD_MAIN_CONTAINER_CPU_REQUEST = "JOB_POD_MAIN_CONTAINER_CPU_REQUEST";
public static final String JOB_POD_MAIN_CONTAINER_CPU_LIMIT = "JOB_POD_MAIN_CONTAINER_CPU_LIMIT";
public static final String JOB_POD_MAIN_CONTAINER_MEMORY_REQUEST = "JOB_POD_MAIN_CONTAINER_MEMORY_REQUEST";
public static final String JOB_POD_MAIN_CONTAINER_MEMORY_LIMIT = "JOB_POD_MAIN_CONTAINER_MEMORY_LIMIT";
public static final String JOB_MAIN_CONTAINER_CPU_REQUEST = "JOB_MAIN_CONTAINER_CPU_REQUEST";
public static final String JOB_MAIN_CONTAINER_CPU_LIMIT = "JOB_MAIN_CONTAINER_CPU_LIMIT";
public static final String JOB_MAIN_CONTAINER_MEMORY_REQUEST = "JOB_MAIN_CONTAINER_MEMORY_REQUEST";
public static final String JOB_MAIN_CONTAINER_MEMORY_LIMIT = "JOB_MAIN_CONTAINER_MEMORY_LIMIT";
private static final String SECRET_PERSISTENCE = "SECRET_PERSISTENCE";
public static final String JOB_POD_MAIN_CONTAINER_IMAGE_PULL_SECRET = "JOB_POD_MAIN_CONTAINER_IMAGE_PULL_SECRET";
public static final String JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET = "JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET";
private static final String PUBLISH_METRICS = "PUBLISH_METRICS";
private static final String CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION = "CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION";
private static final String CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS = "CONFIGS_DATABASE_INITIALIZATION_TIMEOUT_MS";
Expand All @@ -100,15 +100,15 @@ public class EnvConfigs implements Configs {

// defaults
private static final String DEFAULT_SPEC_CACHE_BUCKET = "io-airbyte-cloud-spec-cache";
public static final String DEFAULT_JOB_POD_KUBE_NAMESPACE = "default";
private static final String DEFAULT_JOB_POD_CPU_REQUIREMENT = null;
private static final String DEFAULT_JOB_POD_MEMORY_REQUIREMENT = null;
private static final String DEFAULT_JOB_POD_MAIN_CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent";
public static final String DEFAULT_JOB_KUBE_NAMESPACE = "default";
private static final String DEFAULT_JOB_CPU_REQUIREMENT = null;
private static final String DEFAULT_JOB_MEMORY_REQUIREMENT = null;
private static final String DEFAULT_JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent";
private static final String SECRET_STORE_GCP_PROJECT_ID = "SECRET_STORE_GCP_PROJECT_ID";
private static final String SECRET_STORE_GCP_CREDENTIALS = "SECRET_STORE_GCP_CREDENTIALS";
private static final String DEFAULT_JOB_POD_SOCAT_IMAGE = "alpine/socat:1.7.4.1-r1";
private static final String DEFAULT_JOB_POD_BUSYBOX_IMAGE = "busybox:1.28";
private static final String DEFAULT_JOB_POD_CURL_IMAGE = "curlimages/curl:7.77.0";
private static final String DEFAULT_JOB_KUBE_SOCAT_IMAGE = "alpine/socat:1.7.4.1-r1";
private static final String DEFAULT_JOB_KUBE_BUSYBOX_IMAGE = "busybox:1.28";
private static final String DEFAULT_JOB_KUBE_CURL_IMAGE = "curlimages/curl:7.77.0";
private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1;
private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60;
private static final long DEFAULT_MAXIMUM_WORKSPACE_SIZE_MB = 5000;
Expand Down Expand Up @@ -371,11 +371,11 @@ public int getSyncJobMaxTimeoutDays() {
* <p>
* key=airbyte-server,operator=Exists,effect=NoSchedule;key=airbyte-server,operator=Equals,value=true,effect=NoSchedule
*
* @return list of WorkerPodToleration parsed from env
* @return list of WorkerKubeToleration parsed from env
*/
@Override
public List<TolerationPOJO> getJobPodTolerations() {
final String tolerationsStr = getEnvOrDefault(JOB_POD_TOLERATIONS, "");
public List<TolerationPOJO> getJobKubeTolerations() {
final String tolerationsStr = getEnvOrDefault(JOB_KUBE_TOLERATIONS, "");

final Stream<String> tolerations = Strings.isNullOrEmpty(tolerationsStr) ? Stream.of()
: Splitter.on(";")
Expand Down Expand Up @@ -419,17 +419,17 @@ private TolerationPOJO parseToleration(final String tolerationStr) {
* @return map containing kv pairs of node selectors
*/
@Override
public Map<String, String> getJobPodNodeSelectors() {
public Map<String, String> getJobKubeNodeSelectors() {
return Splitter.on(",")
.splitToStream(getEnvOrDefault(JOB_POD_NODE_SELECTORS, ""))
.splitToStream(getEnvOrDefault(JOB_KUBE_NODE_SELECTORS, ""))
.filter(s -> !Strings.isNullOrEmpty(s) && s.contains("="))
.map(s -> s.split("="))
.collect(Collectors.toMap(s -> s[0], s -> s[1]));
}

@Override
public String getJobPodMainContainerImagePullPolicy() {
return getEnvOrDefault(JOB_POD_MAIN_CONTAINER_IMAGE_PULL_POLICY, DEFAULT_JOB_POD_MAIN_CONTAINER_IMAGE_PULL_POLICY);
public String getJobKubeMainContainerImagePullPolicy() {
return getEnvOrDefault(JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY, DEFAULT_JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY);
}

/**
Expand All @@ -438,48 +438,48 @@ public String getJobPodMainContainerImagePullPolicy() {
* no-op value.
*/
@Override
public String getJobPodMainContainerImagePullSecret() {
return getEnvOrDefault(JOB_POD_MAIN_CONTAINER_IMAGE_PULL_SECRET, "");
public String getJobKubeMainContainerImagePullSecret() {
return getEnvOrDefault(JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_SECRET, "");
}

@Override
public String getJobPodSocatImage() {
return getEnvOrDefault(JOB_POD_SOCAT_IMAGE, DEFAULT_JOB_POD_SOCAT_IMAGE);
public String getJobKubeSocatImage() {
return getEnvOrDefault(JOB_KUBE_SOCAT_IMAGE, DEFAULT_JOB_KUBE_SOCAT_IMAGE);
}

@Override
public String getJobPodBusyboxImage() {
return getEnvOrDefault(JOB_POD_BUSYBOX_IMAGE, DEFAULT_JOB_POD_BUSYBOX_IMAGE);
public String getJobKubeBusyboxImage() {
return getEnvOrDefault(JOB_KUBE_BUSYBOX_IMAGE, DEFAULT_JOB_KUBE_BUSYBOX_IMAGE);
}

@Override
public String getJobPodCurlImage() {
return getEnvOrDefault(JOB_POD_CURL_IMAGE, DEFAULT_JOB_POD_CURL_IMAGE);
public String getJobKubeCurlImage() {
return getEnvOrDefault(JOB_KUBE_CURL_IMAGE, DEFAULT_JOB_KUBE_CURL_IMAGE);
}

@Override
public String getJobPodKubeNamespace() {
return getEnvOrDefault(JOB_POD_KUBE_NAMESPACE, DEFAULT_JOB_POD_KUBE_NAMESPACE);
public String getJobKubeNamespace() {
return getEnvOrDefault(JOB_KUBE_NAMESPACE, DEFAULT_JOB_KUBE_NAMESPACE);
}

@Override
public String getJobPodMainContainerCpuRequest() {
return getEnvOrDefault(JOB_POD_MAIN_CONTAINER_CPU_REQUEST, DEFAULT_JOB_POD_CPU_REQUIREMENT);
public String getJobMainContainerCpuRequest() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_CPU_REQUEST, DEFAULT_JOB_CPU_REQUIREMENT);
}

@Override
public String getJobPodMainContainerCpuLimit() {
return getEnvOrDefault(JOB_POD_MAIN_CONTAINER_CPU_LIMIT, DEFAULT_JOB_POD_CPU_REQUIREMENT);
public String getJobMainContainerCpuLimit() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_CPU_LIMIT, DEFAULT_JOB_CPU_REQUIREMENT);
}

@Override
public String getJobPodMainContainerMemoryRequest() {
return getEnvOrDefault(JOB_POD_MAIN_CONTAINER_MEMORY_REQUEST, DEFAULT_JOB_POD_MEMORY_REQUIREMENT);
public String getJobMainContainerMemoryRequest() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_MEMORY_REQUEST, DEFAULT_JOB_MEMORY_REQUIREMENT);
}

@Override
public String getJobPodMainContainerMemoryLimit() {
return getEnvOrDefault(JOB_POD_MAIN_CONTAINER_MEMORY_LIMIT, DEFAULT_JOB_POD_MEMORY_REQUIREMENT);
public String getJobMainContainerMemoryLimit() {
return getEnvOrDefault(JOB_MAIN_CONTAINER_MEMORY_LIMIT, DEFAULT_JOB_MEMORY_REQUIREMENT);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,45 +163,45 @@ void testTrackingStrategy() {
}

@Test
void testWorkerPodTolerations() {
when(function.apply(EnvConfigs.JOB_POD_TOLERATIONS)).thenReturn(null);
Assertions.assertEquals(config.getJobPodTolerations(), List.of());
void testworkerKubeTolerations() {
when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn(null);
Assertions.assertEquals(config.getJobKubeTolerations(), List.of());

when(function.apply(EnvConfigs.JOB_POD_TOLERATIONS)).thenReturn(";;;");
Assertions.assertEquals(config.getJobPodTolerations(), List.of());
when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn(";;;");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of());

when(function.apply(EnvConfigs.JOB_POD_TOLERATIONS)).thenReturn("key=k,value=v;");
Assertions.assertEquals(config.getJobPodTolerations(), List.of());
when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn("key=k,value=v;");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of());

when(function.apply(EnvConfigs.JOB_POD_TOLERATIONS)).thenReturn("key=airbyte-server,operator=Exists,effect=NoSchedule");
Assertions.assertEquals(config.getJobPodTolerations(), List.of(new TolerationPOJO("airbyte-server", "NoSchedule", null, "Exists")));
when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn("key=airbyte-server,operator=Exists,effect=NoSchedule");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of(new TolerationPOJO("airbyte-server", "NoSchedule", null, "Exists")));

when(function.apply(EnvConfigs.JOB_POD_TOLERATIONS)).thenReturn("key=airbyte-server,operator=Equals,value=true,effect=NoSchedule");
Assertions.assertEquals(config.getJobPodTolerations(), List.of(new TolerationPOJO("airbyte-server", "NoSchedule", "true", "Equals")));
when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS)).thenReturn("key=airbyte-server,operator=Equals,value=true,effect=NoSchedule");
Assertions.assertEquals(config.getJobKubeTolerations(), List.of(new TolerationPOJO("airbyte-server", "NoSchedule", "true", "Equals")));

when(function.apply(EnvConfigs.JOB_POD_TOLERATIONS))
when(function.apply(EnvConfigs.JOB_KUBE_TOLERATIONS))
.thenReturn("key=airbyte-server,operator=Exists,effect=NoSchedule;key=airbyte-server,operator=Equals,value=true,effect=NoSchedule");
Assertions.assertEquals(config.getJobPodTolerations(), List.of(
Assertions.assertEquals(config.getJobKubeTolerations(), List.of(
new TolerationPOJO("airbyte-server", "NoSchedule", null, "Exists"),
new TolerationPOJO("airbyte-server", "NoSchedule", "true", "Equals")));
}

@Test
void testWorkerPodNodeSelectors() {
when(function.apply(EnvConfigs.JOB_POD_NODE_SELECTORS)).thenReturn(null);
Assertions.assertEquals(config.getJobPodNodeSelectors(), Map.of());
void testworkerKubeNodeSelectors() {
when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn(null);
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of());

when(function.apply(EnvConfigs.JOB_POD_NODE_SELECTORS)).thenReturn(",,,");
Assertions.assertEquals(config.getJobPodNodeSelectors(), Map.of());
when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn(",,,");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of());

when(function.apply(EnvConfigs.JOB_POD_NODE_SELECTORS)).thenReturn("key=k,,;$%&^#");
Assertions.assertEquals(config.getJobPodNodeSelectors(), Map.of("key", "k"));
when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn("key=k,,;$%&^#");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("key", "k"));

when(function.apply(EnvConfigs.JOB_POD_NODE_SELECTORS)).thenReturn("one=two");
Assertions.assertEquals(config.getJobPodNodeSelectors(), Map.of("one", "two"));
when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn("one=two");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("one", "two"));

when(function.apply(EnvConfigs.JOB_POD_NODE_SELECTORS)).thenReturn("airbyte=server,something=nothing");
Assertions.assertEquals(config.getJobPodNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
when(function.apply(EnvConfigs.JOB_KUBE_NODE_SELECTORS)).thenReturn("airbyte=server,something=nothing");
Assertions.assertEquals(config.getJobKubeNodeSelectors(), Map.of("airbyte", "server", "something", "nothing"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ private static ProcessFactory getProcessBuilderFactory(final Configs configs, fi
final KubernetesClient fabricClient = new DefaultKubernetesClient();
final String localIp = InetAddress.getLocalHost().getHostAddress();
final String kubeHeartbeatUrl = localIp + ":" + WorkerApp.KUBE_HEARTBEAT_PORT;
LOGGER.info("Using Kubernetes namespace: {}", configs.getJobPodKubeNamespace());
LOGGER.info("Using Kubernetes namespace: {}", configs.getJobKubeNamespace());

// this needs to have two ports for the source and two ports for the destination (all four must be
// exposed)
KubePortManagerSingleton.init(ReplicationLauncherWorker.PORTS);

return new KubeProcessFactory(workerConfigs, configs.getJobPodKubeNamespace(), fabricClient, kubeHeartbeatUrl, false);
return new KubeProcessFactory(workerConfigs, configs.getJobKubeNamespace(), fabricClient, kubeHeartbeatUrl, false);
} else {
return new DockerProcessFactory(
workerConfigs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ SUBMITTER_NUM_THREADS=10

# Job container images
# Usually you should not need to set these, they have defaults already set
JOB_SOCAT_IMAGE=
JOB_BUSYBOX_IMAGE=
JOB_CURL_IMAGE=
JOB_KUBE_SOCAT_IMAGE=
JOB_KUBE_BUSYBOX_IMAGE=
JOB_KUBE_CURL_IMAGE=

# Miscellaneous
TRACKING_STRATEGY=segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ private static ProcessFactory getJobProcessFactory(final Configs configs) throws
final KubernetesClient fabricClient = new DefaultKubernetesClient();
final String localIp = InetAddress.getLocalHost().getHostAddress();
final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT;
LOGGER.info("Using Kubernetes namespace: {}", configs.getJobPodKubeNamespace());
return new KubeProcessFactory(workerConfigs, configs.getJobPodKubeNamespace(), fabricClient, kubeHeartbeatUrl, false);
LOGGER.info("Using Kubernetes namespace: {}", configs.getJobKubeNamespace());
return new KubeProcessFactory(workerConfigs, configs.getJobKubeNamespace(), fabricClient, kubeHeartbeatUrl, false);
} else {
return new DockerProcessFactory(
workerConfigs,
Expand All @@ -244,8 +244,8 @@ private static ProcessFactory getOrchestratorProcessFactory(final Configs config
final KubernetesClient fabricClient = new DefaultKubernetesClient();
final String localIp = InetAddress.getLocalHost().getHostAddress();
final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT;
LOGGER.info("Using Kubernetes namespace: {}", configs.getJobPodKubeNamespace());
return new KubeProcessFactory(workerConfigs, configs.getJobPodKubeNamespace(), fabricClient, kubeHeartbeatUrl, true);
LOGGER.info("Using Kubernetes namespace: {}", configs.getJobKubeNamespace());
return new KubeProcessFactory(workerConfigs, configs.getJobKubeNamespace(), fabricClient, kubeHeartbeatUrl, true);
} else {
return new DockerProcessFactory(
workerConfigs,
Expand Down
Loading