diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 7093ee5a9686d..2917197a2e2ec 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -186,6 +186,22 @@ To use a secret through an environment variable use the following options to the
--conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key
```
+## Pod Template
+Kubernetes allows defining pods from [template files](https://kubernetes.io/docs/concepts/workloads/pods/pod-overview/#pod-templates).
+Spark users can similarly use template files to define the driver or executor pod configurations that Spark configurations do not support.
+To do so, specify the spark properties `spark.kubernetes.driver.podTemplateFile` and `spark.kubernetes.executor.podTemplateFile`
+to point to local files accessible to the `spark-submit` process. To allow the driver pod access the executor pod template
+file, the file will be automatically mounted onto a volume in the driver pod when it's created.
+Spark does not do any validation after unmarshalling these template files and relies on the Kubernetes API server for validation.
+
+It is important to note that Spark is opinionated about certain pod configurations so there are values in the
+pod template that will always be overwritten by Spark. Therefore, users of this feature should note that specifying
+the pod template file only lets Spark start with a template pod instead of an empty pod during the pod-building process.
+For details, see the [full list](#pod-template-properties) of pod template values that will be overwritten by spark.
+
+Pod template files can also define multiple containers. In such cases, Spark will always assume that the first container in
+the list will be the driver or executor container.
+
## Using Kubernetes Volumes
Starting with Spark 2.4.0, users can mount the following types of Kubernetes [volumes](https://kubernetes.io/docs/concepts/storage/volumes/) into the driver and executor pods:
@@ -863,4 +879,168 @@ specific to Spark on Kubernetes.
to provide any kerberos credentials for launching a job.
+
+ spark.kubernetes.driver.podTemplateFile |
+ (none) |
+
+ Specify the local file that contains the driver [pod template](#pod-template). For example
+ spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml`
+ |
+
+
+ spark.kubernetes.executor.podTemplateFile |
+ (none) |
+
+ Specify the local file that contains the executor [pod template](#pod-template). For example
+ spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml`
+ |
+
+
+
+#### Pod template properties
+
+See the below table for the full list of pod specifications that will be overwritten by spark.
+
+### Pod Metadata
+
+
+Pod metadata key | Modified value | Description |
+
+ name |
+ Value of spark.kubernetes.driver.pod.name |
+
+ The driver pod name will be overwritten with either the configured or default value of
+ spark.kubernetes.driver.pod.name . The executor pod names will be unaffected.
+ |
+
+
+ namespace |
+ Value of spark.kubernetes.namespace |
+
+ Spark makes strong assumptions about the driver and executor namespaces. Both driver and executor namespaces will
+ be replaced by either the configured or default spark conf value.
+ |
+
+
+ labels |
+ Adds the labels from spark.kubernetes.{driver,executor}.label.* |
+
+ Spark will add additional labels specified by the spark configuration.
+ |
+
+
+ annotations |
+ Adds the annotations from spark.kubernetes.{driver,executor}.annotation.* |
+
+ Spark will add additional labels specified by the spark configuration.
+ |
+
+
+
+### Pod Spec
+
+
+Pod spec key | Modified value | Description |
+
+ imagePullSecrets |
+ Adds image pull secrets from spark.kubernetes.container.image.pullSecrets |
+
+ Additional pull secrets will be added from the spark configuration to both executor pods.
+ |
+
+
+ nodeSelector |
+ Adds node selectors from spark.kubernetes.node.selector.* |
+
+ Additional node selectors will be added from the spark configuration to both executor pods.
+ |
+
+
+ restartPolicy |
+ "never" |
+
+ Spark assumes that both drivers and executors never restart.
+ |
+
+
+ serviceAccount |
+ Value of spark.kubernetes.authenticate.driver.serviceAccountName |
+
+ Spark will override serviceAccount with the value of the spark configuration for only
+ driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected.
+ |
+
+
+ serviceAccountName |
+ Value of spark.kubernetes.authenticate.driver.serviceAccountName |
+
+ Spark will override serviceAccountName with the value of the spark configuration for only
+ driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected.
+ |
+
+
+ volumes |
+ Adds volumes from spark.kubernetes.{driver,executor}.volumes.[VolumeType].[VolumeName].mount.path |
+
+ Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing
+ spark conf and pod template files.
+ |
+
+
+
+### Container spec
+
+The following affect the driver and executor containers. All other containers in the pod spec will be unaffected.
+
+
+Container spec key | Modified value | Description |
+
+ env |
+ Adds env variables from spark.kubernetes.driverEnv.[EnvironmentVariableName] |
+
+ Spark will add driver env variables from spark.kubernetes.driverEnv.[EnvironmentVariableName] , and
+ executor env variables from spark.executorEnv.[EnvironmentVariableName] .
+ |
+
+
+ image |
+ Value of spark.kubernetes.{driver,executor}.container.image |
+
+ The image will be defined by the spark configurations.
+ |
+
+
+ imagePullPolicy |
+ Value of spark.kubernetes.container.image.pullPolicy |
+
+ Spark will override the pull policy for both driver and executors.
+ |
+
+
+ name |
+ See description. |
+
+ The container name will be assigned by spark ("spark-kubernetes-driver" for the driver container, and
+ "executor" for each executor container) if not defined by the pod template. If the container is defined by the
+ template, the template's name will be used.
+ |
+
+
+ resources |
+ See description |
+
+ The cpu limits are set by spark.kubernetes.{driver,executor}.limit.cores . The cpu is set by
+ spark.{driver,executor}.cores . The memory request and limit are set by summing the values of
+ spark.{driver,executor}.memory and spark.{driver,executor}.memoryOverhead .
+
+ |
+
+
+ volumeMounts |
+ Add volumes from spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.{path,readOnly} |
+
+ Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing
+ spark conf and pod template files.
+ |
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index fff8fa4340c35..862f1d63ed39f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -278,6 +278,30 @@ private[spark] object Config extends Logging {
.booleanConf
.createWithDefault(false)
+ val KUBERNETES_DRIVER_PODTEMPLATE_FILE =
+ ConfigBuilder("spark.kubernetes.driver.podTemplateFile")
+ .doc("File containing a template pod spec for the driver")
+ .stringConf
+ .createOptional
+
+ val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE =
+ ConfigBuilder("spark.kubernetes.executor.podTemplateFile")
+ .doc("File containing a template pod spec for executors")
+ .stringConf
+ .createOptional
+
+ val KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME =
+ ConfigBuilder("spark.kubernetes.driver.podTemplateContainerName")
+ .doc("container name to be used as a basis for the driver in the given pod template")
+ .stringConf
+ .createOptional
+
+ val KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME =
+ ConfigBuilder("spark.kubernetes.executor.podTemplateContainerName")
+ .doc("container name to be used as a basis for executors in the given pod template")
+ .stringConf
+ .createOptional
+
val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 172a9054bb4f2..1c6d53c16871e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -76,9 +76,17 @@ private[spark] object Constants {
val ENV_R_PRIMARY = "R_PRIMARY"
val ENV_R_ARGS = "R_APP_ARGS"
+ // Pod spec templates
+ val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"
+ val EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH = "/opt/spark/pod-template"
+ val POD_TEMPLATE_VOLUME = "pod-template-volume"
+ val POD_TEMPLATE_CONFIGMAP = "podspec-configmap"
+ val POD_TEMPLATE_KEY = "podspec-configmap-key"
+
// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
- val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
+ val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
+ val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
val MEMORY_OVERHEAD_MIN_MIB = 384L
// Hadoop Configuration
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
index 0c5ae022f4070..fce8c6a4bf494 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesDriverSpec.scala
@@ -22,10 +22,3 @@ private[spark] case class KubernetesDriverSpec(
pod: SparkPod,
driverKubernetesResources: Seq[HasMetadata],
systemProperties: Map[String, String])
-
-private[spark] object KubernetesDriverSpec {
- def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec(
- SparkPod.initialPod(),
- Seq.empty,
- initialProps)
-}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index 0f740454fafc4..6fafac3ee13c9 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -16,14 +16,18 @@
*/
package org.apache.spark.deploy.k8s
+import java.io.File
+
import scala.collection.JavaConverters._
-import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
-private[spark] object KubernetesUtils {
+private[spark] object KubernetesUtils extends Logging {
/**
* Extract and parse Spark configuration properties with a given name prefix and
@@ -82,6 +86,47 @@ private[spark] object KubernetesUtils {
}
}
+ def loadPodFromTemplate(
+ kubernetesClient: KubernetesClient,
+ templateFile: File,
+ containerName: Option[String]): SparkPod = {
+ try {
+ val pod = kubernetesClient.pods().load(templateFile).get()
+ selectSparkContainer(pod, containerName)
+ } catch {
+ case e: Exception =>
+ logError(
+ s"Encountered exception while attempting to load initial pod spec from file", e)
+ throw new SparkException("Could not load pod from template file.", e)
+ }
+ }
+
+ def selectSparkContainer(pod: Pod, containerName: Option[String]): SparkPod = {
+ def selectNamedContainer(
+ containers: List[Container], name: String): Option[(Container, List[Container])] =
+ containers.partition(_.getName == name) match {
+ case (sparkContainer :: Nil, rest) => Some((sparkContainer, rest))
+ case _ =>
+ logWarning(
+ s"specified container ${name} not found on pod template, " +
+ s"falling back to taking the first container")
+ Option.empty
+ }
+ val containers = pod.getSpec.getContainers.asScala.toList
+ containerName
+ .flatMap(selectNamedContainer(containers, _))
+ .orElse(containers.headOption.map((_, containers.tail)))
+ .map {
+ case (sparkContainer: Container, rest: List[Container]) => SparkPod(
+ new PodBuilder(pod)
+ .editSpec()
+ .withContainers(rest.asJava)
+ .endSpec()
+ .build(),
+ sparkContainer)
+ }.getOrElse(SparkPod(pod, new ContainerBuilder().build()))
+ }
+
def parseMasterUrl(url: String): String = url.substring("k8s://".length)
def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 575bc54ffe2bb..96b14a0d82b4c 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -80,7 +80,7 @@ private[spark] class BasicDriverFeatureStep(
)
val driverUIPort = SparkUI.getUIPort(conf.sparkConf)
val driverContainer = new ContainerBuilder(pod.container)
- .withName(DRIVER_CONTAINER_NAME)
+ .withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
.withImage(driverContainerImage)
.withImagePullPolicy(conf.imagePullPolicy())
.addNewPort()
@@ -105,7 +105,7 @@ private[spark] class BasicDriverFeatureStep(
.withNewFieldRef("v1", "status.podIP")
.build())
.endEnv()
- .withNewResources()
+ .editOrNewResources()
.addToRequests("cpu", driverCpuQuantity)
.addToLimits(maybeCpuLimitQuantity.toMap.asJava)
.addToRequests("memory", driverMemoryQuantity)
@@ -119,9 +119,9 @@ private[spark] class BasicDriverFeatureStep(
.addToLabels(conf.roleLabels.asJava)
.addToAnnotations(conf.roleAnnotations.asJava)
.endMetadata()
- .withNewSpec()
+ .editOrNewSpec()
.withRestartPolicy("Never")
- .withNodeSelector(conf.nodeSelector().asJava)
+ .addToNodeSelector(conf.nodeSelector().asJava)
.addToImagePullSecrets(conf.imagePullSecrets(): _*)
.endSpec()
.build()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index d89995ba5e4f4..1dab2a834f3e7 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -139,10 +139,10 @@ private[spark] class BasicExecutorFeatureStep(
}
val executorContainer = new ContainerBuilder(pod.container)
- .withName("executor")
+ .withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME))
.withImage(executorContainerImage)
.withImagePullPolicy(kubernetesConf.imagePullPolicy())
- .withNewResources()
+ .editOrNewResources()
.addToRequests("memory", executorMemoryQuantity)
.addToLimits("memory", executorMemoryQuantity)
.addToRequests("cpu", executorCpuQuantity)
@@ -173,14 +173,14 @@ private[spark] class BasicExecutorFeatureStep(
val executorPod = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
- .withLabels(kubernetesConf.roleLabels.asJava)
- .withAnnotations(kubernetesConf.roleAnnotations.asJava)
+ .addToLabels(kubernetesConf.roleLabels.asJava)
+ .addToAnnotations(kubernetesConf.roleAnnotations.asJava)
.addToOwnerReferences(ownerReference.toSeq: _*)
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy("Never")
- .withNodeSelector(kubernetesConf.nodeSelector().asJava)
+ .addToNodeSelector(kubernetesConf.nodeSelector().asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
new file mode 100644
index 0000000000000..96a8013246b74
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import com.google.common.io.Files
+import io.fabric8.kubernetes.api.model.{ConfigMapBuilder, ContainerBuilder, HasMetadata, PodBuilder}
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+
+private[spark] class PodTemplateConfigMapStep(
+ conf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
+ extends KubernetesFeatureConfigStep {
+ def configurePod(pod: SparkPod): SparkPod = {
+ val podWithVolume = new PodBuilder(pod.pod)
+ .editSpec()
+ .addNewVolume()
+ .withName(POD_TEMPLATE_VOLUME)
+ .withNewConfigMap()
+ .withName(POD_TEMPLATE_CONFIGMAP)
+ .addNewItem()
+ .withKey(POD_TEMPLATE_KEY)
+ .withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
+ .endItem()
+ .endConfigMap()
+ .endVolume()
+ .endSpec()
+ .build()
+
+ val containerWithVolume = new ContainerBuilder(pod.container)
+ .addNewVolumeMount()
+ .withName(POD_TEMPLATE_VOLUME)
+ .withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH)
+ .endVolumeMount()
+ .build()
+ SparkPod(podWithVolume, containerWithVolume)
+ }
+
+ def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String](
+ KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key ->
+ (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
+
+ def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
+ require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined)
+ val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
+ val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8)
+ Seq(new ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(POD_TEMPLATE_CONFIGMAP)
+ .endMetadata()
+ .addToData(POD_TEMPLATE_KEY, podTemplateString)
+ .build())
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index c658756cc165b..4b58f8ba3c9bd 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -17,7 +17,8 @@
package org.apache.spark.deploy.k8s.submit
import java.io.StringWriter
-import java.util.{Collections, Locale, UUID}
+import java.util.{Collections, Locale, Properties, UUID}
+import java.util.{Collections, UUID}
import java.util.Properties
import io.fabric8.kubernetes.api.model._
@@ -227,7 +228,6 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
clientArguments.driverArgs,
clientArguments.maybePyFiles,
clientArguments.hadoopConfigDir)
- val builder = new KubernetesDriverBuilder
val namespace = kubernetesConf.namespace()
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s://" prefix here.
@@ -244,7 +244,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
None,
None)) { kubernetesClient =>
val client = new Client(
- builder,
+ KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf),
kubernetesConf,
kubernetesClient,
waitForAppCompletion,
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
index b0b53321abd25..5565cd74280e6 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala
@@ -16,7 +16,12 @@
*/
package org.apache.spark.deploy.k8s.submit
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf}
+import java.io.File
+
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.{Config, KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep}
@@ -55,7 +60,11 @@ private[spark] class KubernetesDriverBuilder(
provideHadoopGlobalStep: (
KubernetesConf[KubernetesDriverSpecificConf]
=> KerberosConfDriverFeatureStep) =
- new KerberosConfDriverFeatureStep(_)) {
+ new KerberosConfDriverFeatureStep(_),
+ providePodTemplateConfigMapStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
+ => PodTemplateConfigMapStep) =
+ new PodTemplateConfigMapStep(_),
+ provideInitialPod: () => SparkPod = SparkPod.initialPod) {
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {
@@ -74,6 +83,10 @@ private[spark] class KubernetesDriverBuilder(
val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) {
Seq(provideVolumesStep(kubernetesConf))
} else Nil
+ val podTemplateFeature = if (
+ kubernetesConf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) {
+ Seq(providePodTemplateConfigMapStep(kubernetesConf))
+ } else Nil
val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map {
case JavaMainAppResource(_) =>
@@ -86,14 +99,17 @@ private[spark] class KubernetesDriverBuilder(
val maybeHadoopConfigStep =
kubernetesConf.hadoopConfSpec.map { _ =>
- provideHadoopGlobalStep(kubernetesConf)}
+ provideHadoopGlobalStep(kubernetesConf)}
val allFeatures: Seq[KubernetesFeatureConfigStep] =
(baseFeatures :+ bindingsStep) ++
secretFeature ++ envSecretFeature ++ volumesFeature ++
- maybeHadoopConfigStep.toSeq
+ maybeHadoopConfigStep.toSeq ++ podTemplateFeature
- var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap)
+ var spec = KubernetesDriverSpec(
+ provideInitialPod(),
+ driverKubernetesResources = Seq.empty,
+ kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
val configuredPod = feature.configurePod(spec.pod)
val addedSystemProperties = feature.getAdditionalPodSystemProperties()
@@ -106,3 +122,17 @@ private[spark] class KubernetesDriverBuilder(
spec
}
}
+
+private[spark] object KubernetesDriverBuilder {
+ def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesDriverBuilder = {
+ conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
+ .map(new File(_))
+ .map(file => new KubernetesDriverBuilder(provideInitialPod = () =>
+ KubernetesUtils.loadPodFromTemplate(
+ kubernetesClient,
+ file,
+ conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
+ ))
+ .getOrElse(new KubernetesDriverBuilder())
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 1a75ae00cbd98..77a1d6cfae3bd 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -31,7 +31,6 @@ import org.apache.spark.util.Utils
private[spark] class ExecutorPodsLifecycleManager(
conf: SparkConf,
- executorBuilder: KubernetesExecutorBuilder,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore,
// Use a best-effort to track which executors have been removed already. It's not generally
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 9999c62c878df..ce10f766334ff 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -69,6 +69,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
defaultServiceAccountToken,
defaultServiceAccountCaCrt)
+ if (sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) {
+ KubernetesUtils.loadPodFromTemplate(
+ kubernetesClient,
+ new File(sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get),
+ sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME))
+ }
+
val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
"kubernetes-executor-requests")
@@ -81,13 +88,16 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
.build[java.lang.Long, java.lang.Long]()
val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager(
sc.conf,
- new KubernetesExecutorBuilder(),
kubernetesClient,
snapshotsStore,
removedExecutorsCache)
val executorPodsAllocator = new ExecutorPodsAllocator(
- sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock())
+ sc.conf,
+ KubernetesExecutorBuilder(kubernetesClient, sc.conf),
+ kubernetesClient,
+ snapshotsStore,
+ new SystemClock())
val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
snapshotsStore,
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 6199a8ae30430..089f84dec277f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -16,7 +16,12 @@
*/
package org.apache.spark.scheduler.cluster.k8s
-import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod}
+import java.io.File
+
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features._
@@ -35,19 +40,20 @@ private[spark] class KubernetesExecutorBuilder(
new LocalDirsFeatureStep(_),
provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> MountVolumesFeatureStep) =
- new MountVolumesFeatureStep(_),
+ new MountVolumesFeatureStep(_),
provideHadoopConfStep: (
KubernetesConf[KubernetesExecutorSpecificConf]
- => HadoopConfExecutorFeatureStep) =
- new HadoopConfExecutorFeatureStep(_),
+ => HadoopConfExecutorFeatureStep) =
+ new HadoopConfExecutorFeatureStep(_),
provideKerberosConfStep: (
KubernetesConf[KubernetesExecutorSpecificConf]
- => KerberosConfExecutorFeatureStep) =
- new KerberosConfExecutorFeatureStep(_),
+ => KerberosConfExecutorFeatureStep) =
+ new KerberosConfExecutorFeatureStep(_),
provideHadoopSparkUserStep: (
KubernetesConf[KubernetesExecutorSpecificConf]
- => HadoopSparkUserExecutorFeatureStep) =
- new HadoopSparkUserExecutorFeatureStep(_)) {
+ => HadoopSparkUserExecutorFeatureStep) =
+ new HadoopSparkUserExecutorFeatureStep(_),
+ provideInitialPod: () => SparkPod = SparkPod.initialPod) {
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {
@@ -85,10 +91,24 @@ private[spark] class KubernetesExecutorBuilder(
volumesFeature ++
maybeHadoopConfFeatureSteps
- var executorPod = SparkPod.initialPod()
+ var executorPod = provideInitialPod()
for (feature <- allFeatures) {
executorPod = feature.configurePod(executorPod)
}
executorPod
}
}
+
+private[spark] object KubernetesExecutorBuilder {
+ def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesExecutorBuilder = {
+ conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
+ .map(new File(_))
+ .map(file => new KubernetesExecutorBuilder(provideInitialPod = () =>
+ KubernetesUtils.loadPodFromTemplate(
+ kubernetesClient,
+ file,
+ conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME))
+ ))
+ .getOrElse(new KubernetesExecutorBuilder())
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
new file mode 100644
index 0000000000000..7c231586af935
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsSuite.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, PodBuilder}
+
+import org.apache.spark.SparkFunSuite
+
+class KubernetesUtilsSuite extends SparkFunSuite {
+ private val HOST = "test-host"
+ private val POD = new PodBuilder()
+ .withNewSpec()
+ .withHostname(HOST)
+ .withContainers(
+ new ContainerBuilder().withName("first").build(),
+ new ContainerBuilder().withName("second").build())
+ .endSpec()
+ .build()
+
+ test("Selects the given container as spark container.") {
+ val sparkPod = KubernetesUtils.selectSparkContainer(POD, Some("second"))
+ assert(sparkPod.pod.getSpec.getHostname == HOST)
+ assert(sparkPod.pod.getSpec.getContainers.asScala.toList.map(_.getName) == List("first"))
+ assert(sparkPod.container.getName == "second")
+ }
+
+ test("Selects the first container if no container name is given.") {
+ val sparkPod = KubernetesUtils.selectSparkContainer(POD, Option.empty)
+ assert(sparkPod.pod.getSpec.getHostname == HOST)
+ assert(sparkPod.pod.getSpec.getContainers.asScala.toList.map(_.getName) == List("second"))
+ assert(sparkPod.container.getName == "first")
+ }
+
+ test("Falls back to the first container if given container name does not exist.") {
+ val sparkPod = KubernetesUtils.selectSparkContainer(POD, Some("does-not-exist"))
+ assert(sparkPod.pod.getSpec.getHostname == HOST)
+ assert(sparkPod.pod.getSpec.getContainers.asScala.toList.map(_.getName) == List("second"))
+ assert(sparkPod.container.getName == "first")
+ }
+
+ test("constructs spark pod correctly with pod template with no containers") {
+ val noContainersPod = new PodBuilder(POD).editSpec().withContainers().endSpec().build()
+ val sparkPod = KubernetesUtils.selectSparkContainer(noContainersPod, Some("does-not-exist"))
+ assert(sparkPod.pod.getSpec.getHostname == HOST)
+ assert(sparkPod.container.getName == null)
+ val sparkPodWithNoContainerName =
+ KubernetesUtils.selectSparkContainer(noContainersPod, Option.empty)
+ assert(sparkPodWithNoContainerName.pod.getSpec.getHostname == HOST)
+ assert(sparkPodWithNoContainerName.container.getName == null)
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index eebdd157da638..5c6bcc72158be 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -84,7 +84,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
val basePod = SparkPod.initialPod()
val configuredPod = featureStep.configurePod(basePod)
- assert(configuredPod.container.getName === DRIVER_CONTAINER_NAME)
+ assert(configuredPod.container.getName === DEFAULT_DRIVER_CONTAINER_NAME)
assert(configuredPod.container.getImage === "spark-driver:latest")
assert(configuredPod.container.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
new file mode 100644
index 0000000000000..d7bbbd121af72
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import java.io.{File, PrintWriter}
+import java.nio.file.Files
+
+import io.fabric8.kubernetes.api.model.ConfigMap
+import org.mockito.Mockito
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s._
+
+class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter {
+ private var sparkConf: SparkConf = _
+ private var kubernetesConf : KubernetesConf[_ <: KubernetesRoleSpecificConf] = _
+ private var templateFile: File = _
+
+ before {
+ sparkConf = Mockito.mock(classOf[SparkConf])
+ kubernetesConf = KubernetesConf(
+ sparkConf,
+ KubernetesDriverSpecificConf(
+ None,
+ "app-name",
+ "main",
+ Seq.empty),
+ "resource",
+ "app-id",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Nil,
+ Seq.empty[String],
+ Option.empty)
+ templateFile = Files.createTempFile("pod-template", "yml").toFile
+ templateFile.deleteOnExit()
+ Mockito.doReturn(Option(templateFile.getAbsolutePath)).when(sparkConf)
+ .get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
+ }
+
+ test("Mounts executor template volume if config specified") {
+ val writer = new PrintWriter(templateFile)
+ writer.write("pod-template-contents")
+ writer.close()
+
+ val step = new PodTemplateConfigMapStep(kubernetesConf)
+ val configuredPod = step.configurePod(SparkPod.initialPod())
+
+ assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
+ val volume = configuredPod.pod.getSpec.getVolumes.get(0)
+ assert(volume.getName === Constants.POD_TEMPLATE_VOLUME)
+ assert(volume.getConfigMap.getName === Constants.POD_TEMPLATE_CONFIGMAP)
+ assert(volume.getConfigMap.getItems.size() === 1)
+ assert(volume.getConfigMap.getItems.get(0).getKey === Constants.POD_TEMPLATE_KEY)
+ assert(volume.getConfigMap.getItems.get(0).getPath ===
+ Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
+
+ assert(configuredPod.container.getVolumeMounts.size() === 1)
+ val volumeMount = configuredPod.container.getVolumeMounts.get(0)
+ assert(volumeMount.getMountPath === Constants.EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH)
+ assert(volumeMount.getName === Constants.POD_TEMPLATE_VOLUME)
+
+ val resources = step.getAdditionalKubernetesResources()
+ assert(resources.size === 1)
+ assert(resources.head.getMetadata.getName === Constants.POD_TEMPLATE_CONFIGMAP)
+ assert(resources.head.isInstanceOf[ConfigMap])
+ val configMap = resources.head.asInstanceOf[ConfigMap]
+ assert(configMap.getData.size() === 1)
+ assert(configMap.getData.containsKey(Constants.POD_TEMPLATE_KEY))
+ assert(configMap.getData.containsValue("pod-template-contents"))
+
+ val systemProperties = step.getAdditionalPodSystemProperties()
+ assert(systemProperties.size === 1)
+ assert(systemProperties.contains(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key))
+ assert(systemProperties.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key).get ===
+ (Constants.EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" +
+ Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
index 051d7b6994f5d..84968c3523fc0 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala
@@ -16,8 +16,13 @@
*/
package org.apache.spark.deploy.k8s.submit
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import io.fabric8.kubernetes.api.model.PodBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.mockito.Mockito._
+
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.k8s._
+import org.apache.spark.deploy.k8s.Config.{CONTAINER_IMAGE, KUBERNETES_DRIVER_PODTEMPLATE_FILE, KUBERNETES_EXECUTOR_PODTEMPLATE_FILE}
import org.apache.spark.deploy.k8s.features._
import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep}
@@ -34,6 +39,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
private val ENV_SECRETS_STEP_TYPE = "env-secrets"
private val HADOOP_GLOBAL_STEP_TYPE = "hadoop-global"
private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes"
+ private val TEMPLATE_VOLUME_STEP_TYPE = "template-volume"
private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep])
@@ -68,6 +74,10 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep])
+ private val templateVolumeStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType(
+ TEMPLATE_VOLUME_STEP_TYPE, classOf[PodTemplateConfigMapStep]
+ )
+
private val builderUnderTest: KubernetesDriverBuilder =
new KubernetesDriverBuilder(
_ => basicFeatureStep,
@@ -80,7 +90,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
_ => pythonStep,
_ => rStep,
_ => javaStep,
- _ => hadoopGlobalStep)
+ _ => hadoopGlobalStep,
+ _ => templateVolumeStep)
test("Apply fundamental steps all the time.") {
val conf = KubernetesConf(
@@ -252,6 +263,37 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
R_STEP_TYPE)
}
+ test("Apply template volume step if executor template is present.") {
+ val sparkConf = spy(new SparkConf(false))
+ doReturn(Option("filename")).when(sparkConf)
+ .get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)
+ val conf = KubernetesConf(
+ sparkConf,
+ KubernetesDriverSpecificConf(
+ Some(JavaMainAppResource("example.jar")),
+ "test-app",
+ "main",
+ Seq.empty),
+ "prefix",
+ "appId",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Nil,
+ Seq.empty[String],
+ Option.empty)
+ validateStepTypesApplied(
+ builderUnderTest.buildFromFeatures(conf),
+ BASIC_STEP_TYPE,
+ CREDENTIALS_STEP_TYPE,
+ SERVICE_STEP_TYPE,
+ LOCAL_DIRS_STEP_TYPE,
+ JAVA_STEP_TYPE,
+ TEMPLATE_VOLUME_STEP_TYPE)
+ }
+
test("Apply HadoopSteps if HADOOP_CONF_DIR is defined.") {
val conf = KubernetesConf(
new SparkConf(false),
@@ -314,7 +356,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
HADOOP_GLOBAL_STEP_TYPE)
}
-
private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*)
: Unit = {
assert(resolvedSpec.systemProperties.size === stepTypes.size)
@@ -325,4 +366,73 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite {
assert(resolvedSpec.systemProperties(stepType) === stepType)
}
}
+
+ test("Start with empty pod if template is not specified") {
+ val kubernetesClient = mock(classOf[KubernetesClient])
+ val driverBuilder = KubernetesDriverBuilder.apply(kubernetesClient, new SparkConf())
+ verify(kubernetesClient, never()).pods()
+ }
+
+ test("Starts with template if specified") {
+ val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient()
+ val sparkConf = new SparkConf(false)
+ .set(CONTAINER_IMAGE, "spark-driver:latest")
+ .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml")
+ val kubernetesConf = new KubernetesConf(
+ sparkConf,
+ KubernetesDriverSpecificConf(
+ Some(JavaMainAppResource("example.jar")),
+ "test-app",
+ "main",
+ Seq.empty),
+ "prefix",
+ "appId",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Nil,
+ Seq.empty[String],
+ Option.empty)
+ val driverSpec = KubernetesDriverBuilder
+ .apply(kubernetesClient, sparkConf)
+ .buildFromFeatures(kubernetesConf)
+ PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(driverSpec.pod)
+ }
+
+ test("Throws on misconfigured pod template") {
+ val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient(
+ new PodBuilder()
+ .withNewMetadata()
+ .addToLabels("test-label-key", "test-label-value")
+ .endMetadata()
+ .build())
+ val sparkConf = new SparkConf(false)
+ .set(CONTAINER_IMAGE, "spark-driver:latest")
+ .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml")
+ val kubernetesConf = new KubernetesConf(
+ sparkConf,
+ KubernetesDriverSpecificConf(
+ Some(JavaMainAppResource("example.jar")),
+ "test-app",
+ "main",
+ Seq.empty),
+ "prefix",
+ "appId",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Nil,
+ Seq.empty[String],
+ Option.empty)
+ val exception = intercept[SparkException] {
+ KubernetesDriverBuilder
+ .apply(kubernetesClient, sparkConf)
+ .buildFromFeatures(kubernetesConf)
+ }
+ assert(exception.getMessage.contains("Could not load pod from template file."))
+ }
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala
new file mode 100644
index 0000000000000..c92e9e6e3b6b3
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/PodBuilderSuiteUtils.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.io.File
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource}
+import org.mockito.Matchers.any
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.FlatSpec
+import scala.collection.JavaConverters._
+
+import org.apache.spark.deploy.k8s.SparkPod
+
+object PodBuilderSuiteUtils extends FlatSpec {
+
+ def loadingMockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = {
+ val kubernetesClient = mock(classOf[KubernetesClient])
+ val pods =
+ mock(classOf[MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]]])
+ val podResource = mock(classOf[PodResource[Pod, DoneablePod]])
+ when(kubernetesClient.pods()).thenReturn(pods)
+ when(pods.load(any(classOf[File]))).thenReturn(podResource)
+ when(podResource.get()).thenReturn(pod)
+ kubernetesClient
+ }
+
+ def verifyPodWithSupportedFeatures(pod: SparkPod): Unit = {
+ val metadata = pod.pod.getMetadata
+ assert(metadata.getLabels.containsKey("test-label-key"))
+ assert(metadata.getAnnotations.containsKey("test-annotation-key"))
+ assert(metadata.getNamespace === "namespace")
+ assert(metadata.getOwnerReferences.asScala.exists(_.getName == "owner-reference"))
+ val spec = pod.pod.getSpec
+ assert(!spec.getContainers.asScala.exists(_.getName == "executor-container"))
+ assert(spec.getDnsPolicy === "dns-policy")
+ assert(spec.getHostAliases.asScala.exists(_.getHostnames.asScala.exists(_ == "hostname")))
+ assert(spec.getImagePullSecrets.asScala.exists(_.getName == "local-reference"))
+ assert(spec.getInitContainers.asScala.exists(_.getName == "init-container"))
+ assert(spec.getNodeName == "node-name")
+ assert(spec.getNodeSelector.get("node-selector-key") === "node-selector-value")
+ assert(spec.getSchedulerName === "scheduler")
+ assert(spec.getSecurityContext.getRunAsUser === 1000L)
+ assert(spec.getServiceAccount === "service-account")
+ assert(spec.getSubdomain === "subdomain")
+ assert(spec.getTolerations.asScala.exists(_.getKey == "toleration-key"))
+ assert(spec.getVolumes.asScala.exists(_.getName == "test-volume"))
+ val container = pod.container
+ assert(container.getName === "executor-container")
+ assert(container.getArgs.contains("arg"))
+ assert(container.getCommand.equals(List("command").asJava))
+ assert(container.getEnv.asScala.exists(_.getName == "env-key"))
+ assert(container.getResources.getLimits.get("gpu") ===
+ new QuantityBuilder().withAmount("1").build())
+ assert(container.getSecurityContext.getRunAsNonRoot)
+ assert(container.getStdin)
+ assert(container.getTerminationMessagePath === "termination-message-path")
+ assert(container.getTerminationMessagePolicy === "termination-message-policy")
+ assert(pod.container.getVolumeMounts.asScala.exists(_.getName == "test-volume"))
+
+ }
+
+
+ def podWithSupportedFeatures(): Pod = new PodBuilder()
+ .withNewMetadata()
+ .addToLabels("test-label-key", "test-label-value")
+ .addToAnnotations("test-annotation-key", "test-annotation-value")
+ .withNamespace("namespace")
+ .addNewOwnerReference()
+ .withController(true)
+ .withName("owner-reference")
+ .endOwnerReference()
+ .endMetadata()
+ .withNewSpec()
+ .withDnsPolicy("dns-policy")
+ .withHostAliases(new HostAliasBuilder().withHostnames("hostname").build())
+ .withImagePullSecrets(
+ new LocalObjectReferenceBuilder().withName("local-reference").build())
+ .withInitContainers(new ContainerBuilder().withName("init-container").build())
+ .withNodeName("node-name")
+ .withNodeSelector(Map("node-selector-key" -> "node-selector-value").asJava)
+ .withSchedulerName("scheduler")
+ .withNewSecurityContext()
+ .withRunAsUser(1000L)
+ .endSecurityContext()
+ .withServiceAccount("service-account")
+ .withSubdomain("subdomain")
+ .withTolerations(new TolerationBuilder()
+ .withKey("toleration-key")
+ .withOperator("Equal")
+ .withEffect("NoSchedule")
+ .build())
+ .addNewVolume()
+ .withNewHostPath()
+ .withPath("/test")
+ .endHostPath()
+ .withName("test-volume")
+ .endVolume()
+ .addNewContainer()
+ .withArgs("arg")
+ .withCommand("command")
+ .addNewEnv()
+ .withName("env-key")
+ .withValue("env-value")
+ .endEnv()
+ .withImagePullPolicy("Always")
+ .withName("executor-container")
+ .withNewResources()
+ .withLimits(Map("gpu" -> new QuantityBuilder().withAmount("1").build()).asJava)
+ .endResources()
+ .withNewSecurityContext()
+ .withRunAsNonRoot(true)
+ .endSecurityContext()
+ .withStdin(true)
+ .withTerminationMessagePath("termination-message-path")
+ .withTerminationMessagePolicy("termination-message-policy")
+ .addToVolumeMounts(
+ new VolumeMountBuilder()
+ .withName("test-volume")
+ .withMountPath("/test")
+ .build())
+ .endContainer()
+ .endSpec()
+ .build()
+
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index d8409383b4a1c..3995b2afe7c45 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -45,9 +45,6 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
@Mock
private var podOperations: PODS = _
- @Mock
- private var executorBuilder: KubernetesExecutorBuilder = _
-
@Mock
private var schedulerBackend: KubernetesClusterSchedulerBackend = _
@@ -64,7 +61,6 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
eventHandlerUnderTest = new ExecutorPodsLifecycleManager(
new SparkConf(),
- executorBuilder,
kubernetesClient,
snapshotsStore,
removedExecutorsCache)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
index b572dac2bf624..fb2509fc1bda5 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala
@@ -16,12 +16,15 @@
*/
package org.apache.spark.scheduler.cluster.k8s
-import io.fabric8.kubernetes.api.model.PodBuilder
+import io.fabric8.kubernetes.api.model.{Config => _, _}
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.mockito.Mockito.{mock, never, verify}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features._
+import org.apache.spark.deploy.k8s.submit.PodBuilderSuiteUtils
class KubernetesExecutorBuilderSuite extends SparkFunSuite {
private val BASIC_STEP_TYPE = "basic"
@@ -193,4 +196,40 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite {
assert(resolvedPod.pod.getMetadata.getLabels.get(stepType) === stepType)
}
}
+
+ test("Starts with empty executor pod if template is not specified") {
+ val kubernetesClient = mock(classOf[KubernetesClient])
+ val executorBuilder = KubernetesExecutorBuilder.apply(kubernetesClient, new SparkConf())
+ verify(kubernetesClient, never()).pods()
+ }
+
+ test("Starts with executor template if specified") {
+ val kubernetesClient = PodBuilderSuiteUtils.loadingMockKubernetesClient()
+ val sparkConf = new SparkConf(false)
+ .set("spark.driver.host", "https://driver.host.com")
+ .set(Config.CONTAINER_IMAGE, "spark-executor:latest")
+ .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "template-file.yaml")
+ val kubernetesConf = KubernetesConf(
+ sparkConf,
+ KubernetesExecutorSpecificConf(
+ "executor-id", Some(new PodBuilder()
+ .withNewMetadata()
+ .withName("driver")
+ .endMetadata()
+ .build())),
+ "prefix",
+ "appId",
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Map.empty,
+ Nil,
+ Seq.empty[String],
+ Option.empty)
+ val sparkPod = KubernetesExecutorBuilder
+ .apply(kubernetesClient, sparkConf)
+ .buildFromFeatures(kubernetesConf)
+ PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(sparkPod)
+ }
}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml
new file mode 100644
index 0000000000000..0c185be81d59e
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/resources/driver-template.yml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: v1
+Kind: Pod
+metadata:
+ labels:
+ template-label-key: driver-template-label-value
+spec:
+ containers:
+ - name: test-driver-container
+ image: will-be-overwritten
+
diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml
new file mode 100644
index 0000000000000..0282e23a39bd2
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/resources/executor-template.yml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+apiVersion: v1
+Kind: Pod
+metadata:
+ labels:
+ template-label-key: executor-template-label-value
+spec:
+ containers:
+ - name: test-executor-container
+ image: will-be-overwritten
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index c99a907f98d0a..e2e5880255e2c 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging
private[spark] class KubernetesSuite extends SparkFunSuite
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
- with PythonTestsSuite with ClientModeTestsSuite
+ with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite
with Logging with Eventually with Matchers {
import KubernetesSuite._
@@ -288,21 +288,21 @@ private[spark] class KubernetesSuite extends SparkFunSuite
protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === image)
- assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+ assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor")
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== baseMemory)
}
protected def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
- assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+ assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor")
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== standardNonJVMMemory)
}
protected def doBasicExecutorRPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === rImage)
- assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+ assert(executorPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-executor")
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== standardNonJVMMemory)
}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala
new file mode 100644
index 0000000000000..e5a847e7210cb
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PodTemplateSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.io.File
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
+
+private[spark] trait PodTemplateSuite { k8sSuite: KubernetesSuite =>
+
+ import PodTemplateSuite._
+
+ test("Start pod creation from template", k8sTestTag) {
+ sparkAppConf
+ .set("spark.kubernetes.driver.podTemplateFile", DRIVER_TEMPLATE_FILE.getAbsolutePath)
+ .set("spark.kubernetes.executor.podTemplateFile", EXECUTOR_TEMPLATE_FILE.getAbsolutePath)
+ runSparkPiAndVerifyCompletion(
+ driverPodChecker = (driverPod: Pod) => {
+ assert(driverPod.getMetadata.getName === driverPodName)
+ assert(driverPod.getSpec.getContainers.get(0).getImage === image)
+ assert(driverPod.getSpec.getContainers.get(0).getName === "test-driver-container")
+ assert(driverPod.getMetadata.getLabels.containsKey(LABEL_KEY))
+ assert(driverPod.getMetadata.getLabels.get(LABEL_KEY) === "driver-template-label-value")
+ },
+ executorPodChecker = (executorPod: Pod) => {
+ assert(executorPod.getSpec.getContainers.get(0).getImage === image)
+ assert(executorPod.getSpec.getContainers.get(0).getName === "test-executor-container")
+ assert(executorPod.getMetadata.getLabels.containsKey(LABEL_KEY))
+ assert(executorPod.getMetadata.getLabels.get(LABEL_KEY) === "executor-template-label-value")
+ }
+ )
+ }
+}
+
+private[spark] object PodTemplateSuite {
+ val LABEL_KEY = "template-label-key"
+ val DRIVER_TEMPLATE_FILE = new File(getClass.getResource("/driver-template.yml").getFile)
+ val EXECUTOR_TEMPLATE_FILE = new File(getClass.getResource("/executor-template.yml").getFile)
+}