From 476c1808274cb1176006bb5ca0d57e9661d5d6d2 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 30 Oct 2018 13:52:44 -0700 Subject: [PATCH] [SPARK-24434][K8S] pod template files ## What changes were proposed in this pull request? New feature to pass podspec files for driver and executor pods. ## How was this patch tested? new unit and integration tests - [x] more overwrites in integration tests - [ ] invalid template integration test, documentation Author: Onur Satici Author: Yifei Huang Author: onursatici Closes #22146 from onursatici/pod-template. --- docs/running-on-kubernetes.md | 221 ++++++++++++++++++ .../org/apache/spark/deploy/k8s/Config.scala | 33 +++ .../apache/spark/deploy/k8s/Constants.scala | 10 +- .../deploy/k8s/KubernetesDriverSpec.scala | 7 - .../spark/deploy/k8s/KubernetesUtils.scala | 53 ++++- .../k8s/features/BasicDriverFeatureStep.scala | 8 +- .../features/BasicExecutorFeatureStep.scala | 10 +- .../features/PodTemplateConfigMapStep.scala | 72 ++++++ .../submit/KubernetesClientApplication.scala | 7 +- .../k8s/submit/KubernetesDriverBuilder.scala | 52 ++++- .../k8s/ExecutorPodsLifecycleManager.scala | 1 - .../k8s/KubernetesClusterManager.scala | 14 +- .../k8s/KubernetesExecutorBuilder.scala | 39 +++- .../deploy/k8s/KubernetesUtilsSuite.scala | 68 ++++++ .../BasicDriverFeatureStepSuite.scala | 2 +- .../PodTemplateConfigMapStepSuite.scala | 97 ++++++++ .../submit/KubernetesDriverBuilderSuite.scala | 178 +++++++++++++- .../k8s/submit/PodBuilderSuiteUtils.scala | 142 +++++++++++ .../ExecutorPodsLifecycleManagerSuite.scala | 4 - .../k8s/KubernetesExecutorBuilderSuite.scala | 41 +++- .../src/test/resources/driver-template.yml | 26 +++ .../src/test/resources/executor-template.yml | 25 ++ .../k8s/integrationtest/KubernetesSuite.scala | 8 +- .../integrationtest/PodTemplateSuite.scala | 55 +++++ 24 files changed, 1127 insertions(+), 46 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala create mode 100644 resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml create mode 100644 resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 02770439f5388..8efb11389ed34 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -202,6 +202,22 @@ To use a secret through an environment variable use the following options to the --conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key ``` +## Pod Template +Kubernetes allows defining pods from [template files](https://kubernetes.io/docs/concepts/workloads/pods/pod-overview/#pod-templates). +Spark users can similarly use template files to define the driver or executor pod configurations that Spark configurations do not support. +To do so, specify the spark properties `spark.kubernetes.driver.podTemplateFile` and `spark.kubernetes.executor.podTemplateFile` +to point to local files accessible to the `spark-submit` process. To allow the driver pod access the executor pod template +file, the file will be automatically mounted onto a volume in the driver pod when it's created. +Spark does not do any validation after unmarshalling these template files and relies on the Kubernetes API server for validation. + +It is important to note that Spark is opinionated about certain pod configurations so there are values in the +pod template that will always be overwritten by Spark. Therefore, users of this feature should note that specifying +the pod template file only lets Spark start with a template pod instead of an empty pod during the pod-building process. +For details, see the [full list](#pod-template-properties) of pod template values that will be overwritten by spark. + +Pod template files can also define multiple containers. In such cases, Spark will always assume that the first container in +the list will be the driver or executor container. + ## Using Kubernetes Volumes Starting with Spark 2.4.0, users can mount the following types of Kubernetes [volumes](https://kubernetes.io/docs/concepts/storage/volumes/) into the driver and executor pods: @@ -819,4 +835,209 @@ specific to Spark on Kubernetes. This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3. + + spark.kubernetes.kerberos.krb5.path + (none) + + Specify the local location of the krb5.conf file to be mounted on the driver and executors for Kerberos interaction. + It is important to note that the KDC defined needs to be visible from inside the containers. + + + + spark.kubernetes.kerberos.krb5.configMapName + (none) + + Specify the name of the ConfigMap, containing the krb5.conf file, to be mounted on the driver and executors + for Kerberos interaction. The KDC defined needs to be visible from inside the containers. The ConfigMap must also + be in the same namespace of the driver and executor pods. + + + + spark.kubernetes.hadoop.configMapName + (none) + + Specify the name of the ConfigMap, containing the HADOOP_CONF_DIR files, to be mounted on the driver + and executors for custom Hadoop configuration. + + + + spark.kubernetes.kerberos.tokenSecret.name + (none) + + Specify the name of the secret where your existing delegation tokens are stored. This removes the need for the job user + to provide any kerberos credentials for launching a job. + + + + spark.kubernetes.kerberos.tokenSecret.itemKey + (none) + + Specify the item key of the data where your existing delegation tokens are stored. This removes the need for the job user + to provide any kerberos credentials for launching a job. + + + + spark.kubernetes.driver.podTemplateFile + (none) + + Specify the local file that contains the driver [pod template](#pod-template). For example + spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml` + + + + spark.kubernetes.executor.podTemplateFile + (none) + + Specify the local file that contains the executor [pod template](#pod-template). For example + spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml` + + + + +#### Pod template properties + +See the below table for the full list of pod specifications that will be overwritten by spark. + +### Pod Metadata + + + + + + + + + + + + + + + + + + + + + + + +
Pod metadata keyModified valueDescription
nameValue of spark.kubernetes.driver.pod.name + The driver pod name will be overwritten with either the configured or default value of + spark.kubernetes.driver.pod.name. The executor pod names will be unaffected. +
namespaceValue of spark.kubernetes.namespace + Spark makes strong assumptions about the driver and executor namespaces. Both driver and executor namespaces will + be replaced by either the configured or default spark conf value. +
labelsAdds the labels from spark.kubernetes.{driver,executor}.label.* + Spark will add additional labels specified by the spark configuration. +
annotationsAdds the annotations from spark.kubernetes.{driver,executor}.annotation.* + Spark will add additional labels specified by the spark configuration. +
+ +### Pod Spec + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Pod spec keyModified valueDescription
imagePullSecretsAdds image pull secrets from spark.kubernetes.container.image.pullSecrets + Additional pull secrets will be added from the spark configuration to both executor pods. +
nodeSelectorAdds node selectors from spark.kubernetes.node.selector.* + Additional node selectors will be added from the spark configuration to both executor pods. +
restartPolicy"never" + Spark assumes that both drivers and executors never restart. +
serviceAccountValue of spark.kubernetes.authenticate.driver.serviceAccountName + Spark will override serviceAccount with the value of the spark configuration for only + driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected. +
serviceAccountNameValue of spark.kubernetes.authenticate.driver.serviceAccountName + Spark will override serviceAccountName with the value of the spark configuration for only + driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected. +
volumesAdds volumes from spark.kubernetes.{driver,executor}.volumes.[VolumeType].[VolumeName].mount.path + Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing + spark conf and pod template files. +
+ +### Container spec + +The following affect the driver and executor containers. All other containers in the pod spec will be unaffected. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Container spec keyModified valueDescription
envAdds env variables from spark.kubernetes.driverEnv.[EnvironmentVariableName] + Spark will add driver env variables from spark.kubernetes.driverEnv.[EnvironmentVariableName], and + executor env variables from spark.executorEnv.[EnvironmentVariableName]. +
imageValue of spark.kubernetes.{driver,executor}.container.image + The image will be defined by the spark configurations. +
imagePullPolicyValue of spark.kubernetes.container.image.pullPolicy + Spark will override the pull policy for both driver and executors. +
nameSee description. + The container name will be assigned by spark ("spark-kubernetes-driver" for the driver container, and + "executor" for each executor container) if not defined by the pod template. If the container is defined by the + template, the template's name will be used. +
resourcesSee description + The cpu limits are set by spark.kubernetes.{driver,executor}.limit.cores. The cpu is set by + spark.{driver,executor}.cores. The memory request and limit are set by summing the values of + spark.{driver,executor}.memory and spark.{driver,executor}.memoryOverhead. + +
volumeMountsAdd volumes from spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.{path,readOnly} + Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing + spark conf and pod template files. +
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index c7338a721595f..a8e546d6b9f86 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -232,6 +232,39 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_LOCAL_DIRS_TMPFS = + ConfigBuilder("spark.kubernetes.local.dirs.tmpfs") + .doc("If set to true then emptyDir volumes created to back SPARK_LOCAL_DIRS will have " + + "their medium set to Memory so that they will be created as tmpfs (i.e. RAM) backed " + + "volumes. This may improve performance but scratch space usage will count towards " + + "your pods memory limit so you may wish to request more memory.") + .booleanConf + .createWithDefault(false) + + val KUBERNETES_DRIVER_PODTEMPLATE_FILE = + ConfigBuilder("spark.kubernetes.driver.podTemplateFile") + .doc("File containing a template pod spec for the driver") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE = + ConfigBuilder("spark.kubernetes.executor.podTemplateFile") + .doc("File containing a template pod spec for executors") + .stringConf + .createOptional + + val KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME = + ConfigBuilder("spark.kubernetes.driver.podTemplateContainerName") + .doc("container name to be used as a basis for the driver in the given pod template") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME = + ConfigBuilder("spark.kubernetes.executor.podTemplateContainerName") + .doc("container name to be used as a basis for executors in the given pod template") + .stringConf + .createOptional + val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = "spark.kubernetes.authenticate.submission" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 8202d874a4626..a9bcf49627124 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -74,8 +74,16 @@ private[spark] object Constants { val ENV_R_PRIMARY = "R_PRIMARY" val ENV_R_ARGS = "R_APP_ARGS" + // Pod spec templates + val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml" + val EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH = "/opt/spark/pod-template" + val POD_TEMPLATE_VOLUME = "pod-template-volume" + val POD_TEMPLATE_CONFIGMAP = "podspec-configmap" + val POD_TEMPLATE_KEY = "podspec-configmap-key" + // Miscellaneous val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" - val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" + val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" + val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor" val MEMORY_OVERHEAD_MIN_MIB = 384L } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala index 0c5ae022f4070..fce8c6a4bf494 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala @@ -22,10 +22,3 @@ private[spark] case class KubernetesDriverSpec( pod: SparkPod, driverKubernetesResources: Seq[HasMetadata], systemProperties: Map[String, String]) - -private[spark] object KubernetesDriverSpec { - def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec( - SparkPod.initialPod(), - Seq.empty, - initialProps) -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 588cd9d40f9a0..9a09442eb1d35 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,10 +16,18 @@ */ package org.apache.spark.deploy.k8s -import org.apache.spark.SparkConf +import java.io.File + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[spark] object KubernetesUtils { +private[spark] object KubernetesUtils extends Logging { /** * Extract and parse Spark configuration properties with a given name prefix and @@ -59,5 +67,46 @@ private[spark] object KubernetesUtils { } } + def loadPodFromTemplate( + kubernetesClient: KubernetesClient, + templateFile: File, + containerName: Option[String]): SparkPod = { + try { + val pod = kubernetesClient.pods().load(templateFile).get() + selectSparkContainer(pod, containerName) + } catch { + case e: Exception => + logError( + s"Encountered exception while attempting to load initial pod spec from file", e) + throw new SparkException("Could not load pod from template file.", e) + } + } + + def selectSparkContainer(pod: Pod, containerName: Option[String]): SparkPod = { + def selectNamedContainer( + containers: List[Container], name: String): Option[(Container, List[Container])] = + containers.partition(_.getName == name) match { + case (sparkContainer :: Nil, rest) => Some((sparkContainer, rest)) + case _ => + logWarning( + s"specified container ${name} not found on pod template, " + + s"falling back to taking the first container") + Option.empty + } + val containers = pod.getSpec.getContainers.asScala.toList + containerName + .flatMap(selectNamedContainer(containers, _)) + .orElse(containers.headOption.map((_, containers.tail))) + .map { + case (sparkContainer: Container, rest: List[Container]) => SparkPod( + new PodBuilder(pod) + .editSpec() + .withContainers(rest.asJava) + .endSpec() + .build(), + sparkContainer) + }.getOrElse(SparkPod(pod, new ContainerBuilder().build())) + } + def parseMasterUrl(url: String): String = url.substring("k8s://".length) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 575bc54ffe2bb..96b14a0d82b4c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -80,7 +80,7 @@ private[spark] class BasicDriverFeatureStep( ) val driverUIPort = SparkUI.getUIPort(conf.sparkConf) val driverContainer = new ContainerBuilder(pod.container) - .withName(DRIVER_CONTAINER_NAME) + .withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME)) .withImage(driverContainerImage) .withImagePullPolicy(conf.imagePullPolicy()) .addNewPort() @@ -105,7 +105,7 @@ private[spark] class BasicDriverFeatureStep( .withNewFieldRef("v1", "status.podIP") .build()) .endEnv() - .withNewResources() + .editOrNewResources() .addToRequests("cpu", driverCpuQuantity) .addToLimits(maybeCpuLimitQuantity.toMap.asJava) .addToRequests("memory", driverMemoryQuantity) @@ -119,9 +119,9 @@ private[spark] class BasicDriverFeatureStep( .addToLabels(conf.roleLabels.asJava) .addToAnnotations(conf.roleAnnotations.asJava) .endMetadata() - .withNewSpec() + .editOrNewSpec() .withRestartPolicy("Never") - .withNodeSelector(conf.nodeSelector().asJava) + .addToNodeSelector(conf.nodeSelector().asJava) .addToImagePullSecrets(conf.imagePullSecrets(): _*) .endSpec() .build() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index d89995ba5e4f4..1dab2a834f3e7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -139,10 +139,10 @@ private[spark] class BasicExecutorFeatureStep( } val executorContainer = new ContainerBuilder(pod.container) - .withName("executor") + .withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME)) .withImage(executorContainerImage) .withImagePullPolicy(kubernetesConf.imagePullPolicy()) - .withNewResources() + .editOrNewResources() .addToRequests("memory", executorMemoryQuantity) .addToLimits("memory", executorMemoryQuantity) .addToRequests("cpu", executorCpuQuantity) @@ -173,14 +173,14 @@ private[spark] class BasicExecutorFeatureStep( val executorPod = new PodBuilder(pod.pod) .editOrNewMetadata() .withName(name) - .withLabels(kubernetesConf.roleLabels.asJava) - .withAnnotations(kubernetesConf.roleAnnotations.asJava) + .addToLabels(kubernetesConf.roleLabels.asJava) + .addToAnnotations(kubernetesConf.roleAnnotations.asJava) .addToOwnerReferences(ownerReference.toSeq: _*) .endMetadata() .editOrNewSpec() .withHostname(hostname) .withRestartPolicy("Never") - .withNodeSelector(kubernetesConf.nodeSelector().asJava) + .addToNodeSelector(kubernetesConf.nodeSelector().asJava) .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) .endSpec() .build() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala new file mode 100644 index 0000000000000..96a8013246b74 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import java.io.File +import java.nio.charset.StandardCharsets + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, ContainerBuilder, HasMetadata, PodBuilder} + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ + +private[spark] class PodTemplateConfigMapStep( + conf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + extends KubernetesFeatureConfigStep { + def configurePod(pod: SparkPod): SparkPod = { + val podWithVolume = new PodBuilder(pod.pod) + .editSpec() + .addNewVolume() + .withName(POD_TEMPLATE_VOLUME) + .withNewConfigMap() + .withName(POD_TEMPLATE_CONFIGMAP) + .addNewItem() + .withKey(POD_TEMPLATE_KEY) + .withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME) + .endItem() + .endConfigMap() + .endVolume() + .endSpec() + .build() + + val containerWithVolume = new ContainerBuilder(pod.container) + .addNewVolumeMount() + .withName(POD_TEMPLATE_VOLUME) + .withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH) + .endVolumeMount() + .build() + SparkPod(podWithVolume, containerWithVolume) + } + + def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( + KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key -> + (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)) + + def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) + val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get + val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8) + Seq(new ConfigMapBuilder() + .withNewMetadata() + .withName(POD_TEMPLATE_CONFIGMAP) + .endMetadata() + .addToData(POD_TEMPLATE_KEY, podTemplateString) + .build()) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index edeaa380194ac..b18de6b148e43 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.k8s.submit import java.io.StringWriter +import java.util.{Collections, Locale, Properties, UUID} import java.util.{Collections, UUID} import java.util.Properties @@ -222,8 +223,8 @@ private[spark] class KubernetesClientApplication extends SparkApplication { clientArguments.mainAppResource, clientArguments.mainClass, clientArguments.driverArgs, - clientArguments.maybePyFiles) - val builder = new KubernetesDriverBuilder + clientArguments.maybePyFiles, + clientArguments.hadoopConfigDir) val namespace = kubernetesConf.namespace() // The master URL has been checked for validity already in SparkSubmit. // We just need to get rid of the "k8s://" prefix here. @@ -240,7 +241,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { None, None)) { kubernetesClient => val client = new Client( - builder, + KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf), kubernetesConf, kubernetesClient, waitForAppCompletion, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 8f3f18ffadc3b..5565cd74280e6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -16,8 +16,13 @@ */ package org.apache.spark.deploy.k8s.submit -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf} -import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep, MountVolumesFeatureStep} +import java.io.File + +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.{Config, KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} private[spark] class KubernetesDriverBuilder( @@ -51,7 +56,15 @@ private[spark] class KubernetesDriverBuilder( provideJavaStep: ( KubernetesConf[KubernetesDriverSpecificConf] => JavaDriverFeatureStep) = - new JavaDriverFeatureStep(_)) { + new JavaDriverFeatureStep(_), + provideHadoopGlobalStep: ( + KubernetesConf[KubernetesDriverSpecificConf] + => KerberosConfDriverFeatureStep) = + new KerberosConfDriverFeatureStep(_), + providePodTemplateConfigMapStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] + => PodTemplateConfigMapStep) = + new PodTemplateConfigMapStep(_), + provideInitialPod: () => SparkPod = SparkPod.initialPod) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { @@ -70,6 +83,10 @@ private[spark] class KubernetesDriverBuilder( val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) { Seq(provideVolumesStep(kubernetesConf)) } else Nil + val podTemplateFeature = if ( + kubernetesConf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) { + Seq(providePodTemplateConfigMapStep(kubernetesConf)) + } else Nil val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map { case JavaMainAppResource(_) => @@ -80,10 +97,19 @@ private[spark] class KubernetesDriverBuilder( provideRStep(kubernetesConf)} .getOrElse(provideJavaStep(kubernetesConf)) - val allFeatures = (baseFeatures :+ bindingsStep) ++ - secretFeature ++ envSecretFeature ++ volumesFeature + val maybeHadoopConfigStep = + kubernetesConf.hadoopConfSpec.map { _ => + provideHadoopGlobalStep(kubernetesConf)} - var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) + val allFeatures: Seq[KubernetesFeatureConfigStep] = + (baseFeatures :+ bindingsStep) ++ + secretFeature ++ envSecretFeature ++ volumesFeature ++ + maybeHadoopConfigStep.toSeq ++ podTemplateFeature + + var spec = KubernetesDriverSpec( + provideInitialPod(), + driverKubernetesResources = Seq.empty, + kubernetesConf.sparkConf.getAll.toMap) for (feature <- allFeatures) { val configuredPod = feature.configurePod(spec.pod) val addedSystemProperties = feature.getAdditionalPodSystemProperties() @@ -96,3 +122,17 @@ private[spark] class KubernetesDriverBuilder( spec } } + +private[spark] object KubernetesDriverBuilder { + def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesDriverBuilder = { + conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE) + .map(new File(_)) + .map(file => new KubernetesDriverBuilder(provideInitialPod = () => + KubernetesUtils.loadPodFromTemplate( + kubernetesClient, + file, + conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME)) + )) + .getOrElse(new KubernetesDriverBuilder()) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index 14814635732d4..8f1daac638273 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -30,7 +30,6 @@ import org.apache.spark.util.Utils private[spark] class ExecutorPodsLifecycleManager( conf: SparkConf, - executorBuilder: KubernetesExecutorBuilder, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, // Use a best-effort to track which executors have been removed already. It's not generally diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 9999c62c878df..ce10f766334ff 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -69,6 +69,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit defaultServiceAccountToken, defaultServiceAccountCaCrt) + if (sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) { + KubernetesUtils.loadPodFromTemplate( + kubernetesClient, + new File(sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get), + sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)) + } + val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( "kubernetes-executor-requests") @@ -81,13 +88,16 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit .build[java.lang.Long, java.lang.Long]() val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager( sc.conf, - new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, removedExecutorsCache) val executorPodsAllocator = new ExecutorPodsAllocator( - sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock()) + sc.conf, + KubernetesExecutorBuilder(kubernetesClient, sc.conf), + kubernetesClient, + snapshotsStore, + new SystemClock()) val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource( snapshotsStore, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 364b6fb367722..c616e1735d6f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -16,7 +16,13 @@ */ package org.apache.spark.scheduler.cluster.k8s -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import java.io.File + +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep} @@ -35,7 +41,20 @@ private[spark] class KubernetesExecutorBuilder( new LocalDirsFeatureStep(_), provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountVolumesFeatureStep) = - new MountVolumesFeatureStep(_)) { + new MountVolumesFeatureStep(_), + provideHadoopConfStep: ( + KubernetesConf[KubernetesExecutorSpecificConf] + => HadoopConfExecutorFeatureStep) = + new HadoopConfExecutorFeatureStep(_), + provideKerberosConfStep: ( + KubernetesConf[KubernetesExecutorSpecificConf] + => KerberosConfExecutorFeatureStep) = + new KerberosConfExecutorFeatureStep(_), + provideHadoopSparkUserStep: ( + KubernetesConf[KubernetesExecutorSpecificConf] + => HadoopSparkUserExecutorFeatureStep) = + new HadoopSparkUserExecutorFeatureStep(_), + provideInitialPod: () => SparkPod = SparkPod.initialPod) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = { @@ -53,10 +72,24 @@ private[spark] class KubernetesExecutorBuilder( val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature - var executorPod = SparkPod.initialPod() + var executorPod = provideInitialPod() for (feature <- allFeatures) { executorPod = feature.configurePod(executorPod) } executorPod } } + +private[spark] object KubernetesExecutorBuilder { + def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesExecutorBuilder = { + conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) + .map(new File(_)) + .map(file => new KubernetesExecutorBuilder(provideInitialPod = () => + KubernetesUtils.loadPodFromTemplate( + kubernetesClient, + file, + conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME)) + )) + .getOrElse(new KubernetesExecutorBuilder()) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala new file mode 100644 index 0000000000000..7c231586af935 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, PodBuilder} + +import org.apache.spark.SparkFunSuite + +class KubernetesUtilsSuite extends SparkFunSuite { + private val HOST = "test-host" + private val POD = new PodBuilder() + .withNewSpec() + .withHostname(HOST) + .withContainers( + new ContainerBuilder().withName("first").build(), + new ContainerBuilder().withName("second").build()) + .endSpec() + .build() + + test("Selects the given container as spark container.") { + val sparkPod = KubernetesUtils.selectSparkContainer(POD, Some("second")) + assert(sparkPod.pod.getSpec.getHostname == HOST) + assert(sparkPod.pod.getSpec.getContainers.asScala.toList.map(_.getName) == List("first")) + assert(sparkPod.container.getName == "second") + } + + test("Selects the first container if no container name is given.") { + val sparkPod = KubernetesUtils.selectSparkContainer(POD, Option.empty) + assert(sparkPod.pod.getSpec.getHostname == HOST) + assert(sparkPod.pod.getSpec.getContainers.asScala.toList.map(_.getName) == List("second")) + assert(sparkPod.container.getName == "first") + } + + test("Falls back to the first container if given container name does not exist.") { + val sparkPod = KubernetesUtils.selectSparkContainer(POD, Some("does-not-exist")) + assert(sparkPod.pod.getSpec.getHostname == HOST) + assert(sparkPod.pod.getSpec.getContainers.asScala.toList.map(_.getName) == List("second")) + assert(sparkPod.container.getName == "first") + } + + test("constructs spark pod correctly with pod template with no containers") { + val noContainersPod = new PodBuilder(POD).editSpec().withContainers().endSpec().build() + val sparkPod = KubernetesUtils.selectSparkContainer(noContainersPod, Some("does-not-exist")) + assert(sparkPod.pod.getSpec.getHostname == HOST) + assert(sparkPod.container.getName == null) + val sparkPodWithNoContainerName = + KubernetesUtils.selectSparkContainer(noContainersPod, Option.empty) + assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST) + assert(sparkPodWithNoContainerName.container.getName == null) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 0968cce971c31..d8f426e4eadb1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -83,7 +83,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val basePod = SparkPod.initialPod() val configuredPod = featureStep.configurePod(basePod) - assert(configuredPod.container.getName === DRIVER_CONTAINER_NAME) + assert(configuredPod.container.getName === DEFAULT_DRIVER_CONTAINER_NAME) assert(configuredPod.container.getImage === "spark-driver:latest") assert(configuredPod.container.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala new file mode 100644 index 0000000000000..d7bbbd121af72 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import java.io.{File, PrintWriter} +import java.nio.file.Files + +import io.fabric8.kubernetes.api.model.ConfigMap +import org.mockito.Mockito +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s._ + +class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { + private var sparkConf: SparkConf = _ + private var kubernetesConf : KubernetesConf[_ <: KubernetesRoleSpecificConf] = _ + private var templateFile: File = _ + + before { + sparkConf = Mockito.mock(classOf[SparkConf]) + kubernetesConf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + None, + "app-name", + "main", + Seq.empty), + "resource", + "app-id", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Option.empty) + templateFile = Files.createTempFile("pod-template", "yml").toFile + templateFile.deleteOnExit() + Mockito.doReturn(Option(templateFile.getAbsolutePath)).when(sparkConf) + .get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) + } + + test("Mounts executor template volume if config specified") { + val writer = new PrintWriter(templateFile) + writer.write("pod-template-contents") + writer.close() + + val step = new PodTemplateConfigMapStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + assert(configuredPod.pod.getSpec.getVolumes.size() === 1) + val volume = configuredPod.pod.getSpec.getVolumes.get(0) + assert(volume.getName === Constants.POD_TEMPLATE_VOLUME) + assert(volume.getConfigMap.getName === Constants.POD_TEMPLATE_CONFIGMAP) + assert(volume.getConfigMap.getItems.size() === 1) + assert(volume.getConfigMap.getItems.get(0).getKey === Constants.POD_TEMPLATE_KEY) + assert(volume.getConfigMap.getItems.get(0).getPath === + Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME) + + assert(configuredPod.container.getVolumeMounts.size() === 1) + val volumeMount = configuredPod.container.getVolumeMounts.get(0) + assert(volumeMount.getMountPath === Constants.EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH) + assert(volumeMount.getName === Constants.POD_TEMPLATE_VOLUME) + + val resources = step.getAdditionalKubernetesResources() + assert(resources.size === 1) + assert(resources.head.getMetadata.getName === Constants.POD_TEMPLATE_CONFIGMAP) + assert(resources.head.isInstanceOf[ConfigMap]) + val configMap = resources.head.asInstanceOf[ConfigMap] + assert(configMap.getData.size() === 1) + assert(configMap.getData.containsKey(Constants.POD_TEMPLATE_KEY)) + assert(configMap.getData.containsValue("pod-template-contents")) + + val systemProperties = step.getAdditionalPodSystemProperties() + assert(systemProperties.size === 1) + assert(systemProperties.contains(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key)) + assert(systemProperties.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key).get === + (Constants.EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + + Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 4117c5487a41e..0654fbae192df 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -16,8 +16,13 @@ */ package org.apache.spark.deploy.k8s.submit -import org.apache.spark.{SparkConf, SparkFunSuite} +import io.fabric8.kubernetes.api.model.PodBuilder +import io.fabric8.kubernetes.client.KubernetesClient +import org.mockito.Mockito._ + +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config.{CONTAINER_IMAGE, KUBERNETES_DRIVER_PODTEMPLATE_FILE, KUBERNETES_EXECUTOR_PODTEMPLATE_FILE} import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep} import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} @@ -34,6 +39,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val R_STEP_TYPE = "r-bindings" private val ENV_SECRETS_STEP_TYPE = "env-secrets" private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" + private val TEMPLATE_VOLUME_STEP_TYPE = "template-volume" private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep]) @@ -65,6 +71,10 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep]) + private val templateVolumeStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + TEMPLATE_VOLUME_STEP_TYPE, classOf[PodTemplateConfigMapStep] + ) + private val builderUnderTest: KubernetesDriverBuilder = new KubernetesDriverBuilder( _ => basicFeatureStep, @@ -76,7 +86,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => mountVolumesStep, _ => pythonStep, _ => rStep, - _ => javaStep) + _ => javaStep, + _ => hadoopGlobalStep, + _ => templateVolumeStep) test("Apply fundamental steps all the time.") { val conf = KubernetesConf( @@ -242,6 +254,99 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { R_STEP_TYPE) } + test("Apply template volume step if executor template is present.") { + val sparkConf = spy(new SparkConf(false)) + doReturn(Option("filename")).when(sparkConf) + .get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) + val conf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + Some(JavaMainAppResource("example.jar")), + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Option.empty) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + CREDENTIALS_STEP_TYPE, + SERVICE_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + JAVA_STEP_TYPE, + TEMPLATE_VOLUME_STEP_TYPE) + } + + test("Apply HadoopSteps if HADOOP_CONF_DIR is defined.") { + val conf = KubernetesConf( + new SparkConf(false), + KubernetesDriverSpecificConf( + None, + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + hadoopConfSpec = Some( + HadoopConfSpec( + Some("/var/hadoop-conf"), + None))) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + CREDENTIALS_STEP_TYPE, + SERVICE_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + JAVA_STEP_TYPE, + HADOOP_GLOBAL_STEP_TYPE) + } + + test("Apply HadoopSteps if HADOOP_CONF ConfigMap is defined.") { + val conf = KubernetesConf( + new SparkConf(false), + KubernetesDriverSpecificConf( + None, + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + hadoopConfSpec = Some( + HadoopConfSpec( + None, + Some("pre-defined-configMapName")))) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + CREDENTIALS_STEP_TYPE, + SERVICE_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + JAVA_STEP_TYPE, + HADOOP_GLOBAL_STEP_TYPE) + } + private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*) : Unit = { assert(resolvedSpec.systemProperties.size === stepTypes.size) @@ -252,4 +357,73 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { assert(resolvedSpec.systemProperties(stepType) === stepType) } } + + test("Start with empty pod if template is not specified") { + val kubernetesClient = mock(classOf[KubernetesClient]) + val driverBuilder = KubernetesDriverBuilder.apply(kubernetesClient, new SparkConf()) + verify(kubernetesClient, never()).pods() + } + + test("Starts with template if specified") { + val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient() + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml") + val kubernetesConf = new KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + Some(JavaMainAppResource("example.jar")), + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Option.empty) + val driverSpec = KubernetesDriverBuilder + .apply(kubernetesClient, sparkConf) + .buildFromFeatures(kubernetesConf) + PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(driverSpec.pod) + } + + test("Throws on misconfigured pod template") { + val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient( + new PodBuilder() + .withNewMetadata() + .addToLabels("test-label-key", "test-label-value") + .endMetadata() + .build()) + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml") + val kubernetesConf = new KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + Some(JavaMainAppResource("example.jar")), + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Option.empty) + val exception = intercept[SparkException] { + KubernetesDriverBuilder + .apply(kubernetesClient, sparkConf) + .buildFromFeatures(kubernetesConf) + } + assert(exception.getMessage.contains("Could not load pod from template file.")) + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala new file mode 100644 index 0000000000000..c92e9e6e3b6b3 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.io.File + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource} +import org.mockito.Matchers.any +import org.mockito.Mockito.{mock, when} +import org.scalatest.FlatSpec +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.SparkPod + +object PodBuilderSuiteUtils extends FlatSpec { + + def loadingMockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = { + val kubernetesClient = mock(classOf[KubernetesClient]) + val pods = + mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]]) + val podResource = mock(classOf[PodResource[Pod, DoneablePod]]) + when(kubernetesClient.pods()).thenReturn(pods) + when(pods.load(any(classOf[File]))).thenReturn(podResource) + when(podResource.get()).thenReturn(pod) + kubernetesClient + } + + def verifyPodWithSupportedFeatures(pod: SparkPod): Unit = { + val metadata = pod.pod.getMetadata + assert(metadata.getLabels.containsKey("test-label-key")) + assert(metadata.getAnnotations.containsKey("test-annotation-key")) + assert(metadata.getNamespace === "namespace") + assert(metadata.getOwnerReferences.asScala.exists(_.getName == "owner-reference")) + val spec = pod.pod.getSpec + assert(!spec.getContainers.asScala.exists(_.getName == "executor-container")) + assert(spec.getDnsPolicy === "dns-policy") + assert(spec.getHostAliases.asScala.exists(_.getHostnames.asScala.exists(_ == "hostname"))) + assert(spec.getImagePullSecrets.asScala.exists(_.getName == "local-reference")) + assert(spec.getInitContainers.asScala.exists(_.getName == "init-container")) + assert(spec.getNodeName == "node-name") + assert(spec.getNodeSelector.get("node-selector-key") === "node-selector-value") + assert(spec.getSchedulerName === "scheduler") + assert(spec.getSecurityContext.getRunAsUser === 1000L) + assert(spec.getServiceAccount === "service-account") + assert(spec.getSubdomain === "subdomain") + assert(spec.getTolerations.asScala.exists(_.getKey == "toleration-key")) + assert(spec.getVolumes.asScala.exists(_.getName == "test-volume")) + val container = pod.container + assert(container.getName === "executor-container") + assert(container.getArgs.contains("arg")) + assert(container.getCommand.equals(List("command").asJava)) + assert(container.getEnv.asScala.exists(_.getName == "env-key")) + assert(container.getResources.getLimits.get("gpu") === + new QuantityBuilder().withAmount("1").build()) + assert(container.getSecurityContext.getRunAsNonRoot) + assert(container.getStdin) + assert(container.getTerminationMessagePath === "termination-message-path") + assert(container.getTerminationMessagePolicy === "termination-message-policy") + assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "test-volume")) + + } + + + def podWithSupportedFeatures(): Pod = new PodBuilder() + .withNewMetadata() + .addToLabels("test-label-key", "test-label-value") + .addToAnnotations("test-annotation-key", "test-annotation-value") + .withNamespace("namespace") + .addNewOwnerReference() + .withController(true) + .withName("owner-reference") + .endOwnerReference() + .endMetadata() + .withNewSpec() + .withDnsPolicy("dns-policy") + .withHostAliases(new HostAliasBuilder().withHostnames("hostname").build()) + .withImagePullSecrets( + new LocalObjectReferenceBuilder().withName("local-reference").build()) + .withInitContainers(new ContainerBuilder().withName("init-container").build()) + .withNodeName("node-name") + .withNodeSelector(Map("node-selector-key" -> "node-selector-value").asJava) + .withSchedulerName("scheduler") + .withNewSecurityContext() + .withRunAsUser(1000L) + .endSecurityContext() + .withServiceAccount("service-account") + .withSubdomain("subdomain") + .withTolerations(new TolerationBuilder() + .withKey("toleration-key") + .withOperator("Equal") + .withEffect("NoSchedule") + .build()) + .addNewVolume() + .withNewHostPath() + .withPath("/test") + .endHostPath() + .withName("test-volume") + .endVolume() + .addNewContainer() + .withArgs("arg") + .withCommand("command") + .addNewEnv() + .withName("env-key") + .withValue("env-value") + .endEnv() + .withImagePullPolicy("Always") + .withName("executor-container") + .withNewResources() + .withLimits(Map("gpu" -> new QuantityBuilder().withAmount("1").build()).asJava) + .endResources() + .withNewSecurityContext() + .withRunAsNonRoot(true) + .endSecurityContext() + .withStdin(true) + .withTerminationMessagePath("termination-message-path") + .withTerminationMessagePolicy("termination-message-policy") + .addToVolumeMounts( + new VolumeMountBuilder() + .withName("test-volume") + .withMountPath("/test") + .build()) + .endContainer() + .endSpec() + .build() + +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala index 562ace9f49d4d..8206e25fa7c63 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala @@ -44,9 +44,6 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte @Mock private var podOperations: PODS = _ - @Mock - private var executorBuilder: KubernetesExecutorBuilder = _ - @Mock private var schedulerBackend: KubernetesClusterSchedulerBackend = _ @@ -63,7 +60,6 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer()) eventHandlerUnderTest = new ExecutorPodsLifecycleManager( new SparkConf(), - executorBuilder, kubernetesClient, snapshotsStore, removedExecutorsCache) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index 44fe4a24e1102..c69f9e5026970 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -16,11 +16,14 @@ */ package org.apache.spark.scheduler.cluster.k8s -import io.fabric8.kubernetes.api.model.PodBuilder +import io.fabric8.kubernetes.api.model.{Config => _, _} +import io.fabric8.kubernetes.client.KubernetesClient +import org.mockito.Mockito.{mock, never, verify} import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ +import org.apache.spark.deploy.k8s.submit.PodBuilderSuiteUtils class KubernetesExecutorBuilderSuite extends SparkFunSuite { private val BASIC_STEP_TYPE = "basic" @@ -119,4 +122,40 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { assert(resolvedPod.pod.getMetadata.getLabels.get(stepType) === stepType) } } + + test("Starts with empty executor pod if template is not specified") { + val kubernetesClient = mock(classOf[KubernetesClient]) + val executorBuilder = KubernetesExecutorBuilder.apply(kubernetesClient, new SparkConf()) + verify(kubernetesClient, never()).pods() + } + + test("Starts with executor template if specified") { + val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient() + val sparkConf = new SparkConf(false) + .set("spark.driver.host", "https://driver.host.com") + .set(Config.CONTAINER_IMAGE, "spark-executor:latest") + .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "template-file.yaml") + val kubernetesConf = KubernetesConf( + sparkConf, + KubernetesExecutorSpecificConf( + "executor-id", Some(new PodBuilder() + .withNewMetadata() + .withName("driver") + .endMetadata() + .build())), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + Seq.empty[String], + Option.empty) + val sparkPod = KubernetesExecutorBuilder + .apply(kubernetesClient, sparkConf) + .buildFromFeatures(kubernetesConf) + PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(sparkPod) + } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml new file mode 100644 index 0000000000000..0c185be81d59e --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +Kind: Pod +metadata: + labels: + template-label-key: driver-template-label-value +spec: + containers: + - name: test-driver-container + image: will-be-overwritten + diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml new file mode 100644 index 0000000000000..0282e23a39bd2 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +apiVersion: v1 +Kind: Pod +metadata: + labels: + template-label-key: executor-template-label-value +spec: + containers: + - name: test-executor-container + image: will-be-overwritten diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index e6840ce818c1f..808fec168b4cb 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite - with PythonTestsSuite with ClientModeTestsSuite + with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with Logging with Eventually with Matchers { import KubernetesSuite._ @@ -283,21 +283,21 @@ private[spark] class KubernetesSuite extends SparkFunSuite protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === image) - assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor") assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount === baseMemory) } protected def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage) - assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor") assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount === standardNonJVMMemory) } protected def doBasicExecutorRPodCheck(executorPod: Pod): Unit = { assert(executorPod.getSpec.getContainers.get(0).getImage === rImage) - assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor") assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount === standardNonJVMMemory) } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala new file mode 100644 index 0000000000000..e5a847e7210cb --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.File + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag + +private[spark] trait PodTemplateSuite { k8sSuite: KubernetesSuite => + + import PodTemplateSuite._ + + test("Start pod creation from template", k8sTestTag) { + sparkAppConf + .set("spark.kubernetes.driver.podTemplateFile", DRIVER_TEMPLATE_FILE.getAbsolutePath) + .set("spark.kubernetes.executor.podTemplateFile", EXECUTOR_TEMPLATE_FILE.getAbsolutePath) + runSparkPiAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { + assert(driverPod.getMetadata.getName === driverPodName) + assert(driverPod.getSpec.getContainers.get(0).getImage === image) + assert(driverPod.getSpec.getContainers.get(0).getName === "test-driver-container") + assert(driverPod.getMetadata.getLabels.containsKey(LABEL_KEY)) + assert(driverPod.getMetadata.getLabels.get(LABEL_KEY) === "driver-template-label-value") + }, + executorPodChecker = (executorPod: Pod) => { + assert(executorPod.getSpec.getContainers.get(0).getImage === image) + assert(executorPod.getSpec.getContainers.get(0).getName === "test-executor-container") + assert(executorPod.getMetadata.getLabels.containsKey(LABEL_KEY)) + assert(executorPod.getMetadata.getLabels.get(LABEL_KEY) === "executor-template-label-value") + } + ) + } +} + +private[spark] object PodTemplateSuite { + val LABEL_KEY = "template-label-key" + val DRIVER_TEMPLATE_FILE = new File(getClass.getResource("/driver-template.yml").getFile) + val EXECUTOR_TEMPLATE_FILE = new File(getClass.getResource("/executor-template.yml").getFile) +}