Skip to content

Commit

Permalink
[SPARK-24434][K8S] pod template files
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

New feature to pass podspec files for driver and executor pods.

## How was this patch tested?
new unit and integration tests

- [x] more overwrites in integration tests
- [ ] invalid template integration test, documentation

Author: Onur Satici <osatici@palantir.com>
Author: Yifei Huang <yifeih@palantir.com>
Author: onursatici <onursatici@gmail.com>

Closes apache#22146 from onursatici/pod-template.
  • Loading branch information
onursatici authored and tuyide committed Sep 10, 2019
1 parent c3e32bf commit 476c180
Show file tree
Hide file tree
Showing 24 changed files with 1,127 additions and 46 deletions.
221 changes: 221 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,22 @@ To use a secret through an environment variable use the following options to the
--conf spark.kubernetes.executor.secretKeyRef.ENV_NAME=name:key
```

## Pod Template
Kubernetes allows defining pods from [template files](https://kubernetes.io/docs/concepts/workloads/pods/pod-overview/#pod-templates).
Spark users can similarly use template files to define the driver or executor pod configurations that Spark configurations do not support.
To do so, specify the spark properties `spark.kubernetes.driver.podTemplateFile` and `spark.kubernetes.executor.podTemplateFile`
to point to local files accessible to the `spark-submit` process. To allow the driver pod access the executor pod template
file, the file will be automatically mounted onto a volume in the driver pod when it's created.
Spark does not do any validation after unmarshalling these template files and relies on the Kubernetes API server for validation.

It is important to note that Spark is opinionated about certain pod configurations so there are values in the
pod template that will always be overwritten by Spark. Therefore, users of this feature should note that specifying
the pod template file only lets Spark start with a template pod instead of an empty pod during the pod-building process.
For details, see the [full list](#pod-template-properties) of pod template values that will be overwritten by spark.

Pod template files can also define multiple containers. In such cases, Spark will always assume that the first container in
the list will be the driver or executor container.

## Using Kubernetes Volumes

Starting with Spark 2.4.0, users can mount the following types of Kubernetes [volumes](https://kubernetes.io/docs/concepts/storage/volumes/) into the driver and executor pods:
Expand Down Expand Up @@ -819,4 +835,209 @@ specific to Spark on Kubernetes.
This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.krb5.path</code></td>
<td><code>(none)</code></td>
<td>
Specify the local location of the krb5.conf file to be mounted on the driver and executors for Kerberos interaction.
It is important to note that the KDC defined needs to be visible from inside the containers.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.krb5.configMapName</code></td>
<td><code>(none)</code></td>
<td>
Specify the name of the ConfigMap, containing the krb5.conf file, to be mounted on the driver and executors
for Kerberos interaction. The KDC defined needs to be visible from inside the containers. The ConfigMap must also
be in the same namespace of the driver and executor pods.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.hadoop.configMapName</code></td>
<td><code>(none)</code></td>
<td>
Specify the name of the ConfigMap, containing the HADOOP_CONF_DIR files, to be mounted on the driver
and executors for custom Hadoop configuration.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokenSecret.name</code></td>
<td><code>(none)</code></td>
<td>
Specify the name of the secret where your existing delegation tokens are stored. This removes the need for the job user
to provide any kerberos credentials for launching a job.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokenSecret.itemKey</code></td>
<td><code>(none)</code></td>
<td>
Specify the item key of the data where your existing delegation tokens are stored. This removes the need for the job user
to provide any kerberos credentials for launching a job.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.podTemplateFile</code></td>
<td>(none)</td>
<td>
Specify the local file that contains the driver [pod template](#pod-template). For example
<code>spark.kubernetes.driver.podTemplateFile=/path/to/driver-pod-template.yaml`</code>
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.podTemplateFile</code></td>
<td>(none)</td>
<td>
Specify the local file that contains the executor [pod template](#pod-template). For example
<code>spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml`</code>
</td>
</tr>
</table>

#### Pod template properties

See the below table for the full list of pod specifications that will be overwritten by spark.

### Pod Metadata

<table class="table">
<tr><th>Pod metadata key</th><th>Modified value</th><th>Description</th></tr>
<tr>
<td>name</td>
<td>Value of <code>spark.kubernetes.driver.pod.name</code></td>
<td>
The driver pod name will be overwritten with either the configured or default value of
<code>spark.kubernetes.driver.pod.name</code>. The executor pod names will be unaffected.
</td>
</tr>
<tr>
<td>namespace</td>
<td>Value of <code>spark.kubernetes.namespace</code></td>
<td>
Spark makes strong assumptions about the driver and executor namespaces. Both driver and executor namespaces will
be replaced by either the configured or default spark conf value.
</td>
</tr>
<tr>
<td>labels</td>
<td>Adds the labels from <code>spark.kubernetes.{driver,executor}.label.*</code></td>
<td>
Spark will add additional labels specified by the spark configuration.
</td>
</tr>
<tr>
<td>annotations</td>
<td>Adds the annotations from <code>spark.kubernetes.{driver,executor}.annotation.*</code></td>
<td>
Spark will add additional labels specified by the spark configuration.
</td>
</tr>
</table>

### Pod Spec

<table class="table">
<tr><th>Pod spec key</th><th>Modified value</th><th>Description</th></tr>
<tr>
<td>imagePullSecrets</td>
<td>Adds image pull secrets from <code>spark.kubernetes.container.image.pullSecrets</code></td>
<td>
Additional pull secrets will be added from the spark configuration to both executor pods.
</td>
</tr>
<tr>
<td>nodeSelector</td>
<td>Adds node selectors from <code>spark.kubernetes.node.selector.*</code></td>
<td>
Additional node selectors will be added from the spark configuration to both executor pods.
</td>
</tr>
<tr>
<td>restartPolicy</td>
<td><code>"never"</code></td>
<td>
Spark assumes that both drivers and executors never restart.
</td>
</tr>
<tr>
<td>serviceAccount</td>
<td>Value of <code>spark.kubernetes.authenticate.driver.serviceAccountName</code></td>
<td>
Spark will override <code>serviceAccount</code> with the value of the spark configuration for only
driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected.
</td>
</tr>
<tr>
<td>serviceAccountName</td>
<td>Value of <code>spark.kubernetes.authenticate.driver.serviceAccountName</code></td>
<td>
Spark will override <code>serviceAccountName</code> with the value of the spark configuration for only
driver pods, and only if the spark configuration is specified. Executor pods will remain unaffected.
</td>
</tr>
<tr>
<td>volumes</td>
<td>Adds volumes from <code>spark.kubernetes.{driver,executor}.volumes.[VolumeType].[VolumeName].mount.path</code></td>
<td>
Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing
spark conf and pod template files.
</td>
</tr>
</table>

### Container spec

The following affect the driver and executor containers. All other containers in the pod spec will be unaffected.

<table class="table">
<tr><th>Container spec key</th><th>Modified value</th><th>Description</th></tr>
<tr>
<td>env</td>
<td>Adds env variables from <code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code></td>
<td>
Spark will add driver env variables from <code>spark.kubernetes.driverEnv.[EnvironmentVariableName]</code>, and
executor env variables from <code>spark.executorEnv.[EnvironmentVariableName]</code>.
</td>
</tr>
<tr>
<td>image</td>
<td>Value of <code>spark.kubernetes.{driver,executor}.container.image</code></td>
<td>
The image will be defined by the spark configurations.
</td>
</tr>
<tr>
<td>imagePullPolicy</td>
<td>Value of <code>spark.kubernetes.container.image.pullPolicy</code></td>
<td>
Spark will override the pull policy for both driver and executors.
</td>
</tr>
<tr>
<td>name</td>
<td>See description.</code></td>
<td>
The container name will be assigned by spark ("spark-kubernetes-driver" for the driver container, and
"executor" for each executor container) if not defined by the pod template. If the container is defined by the
template, the template's name will be used.
</td>
</tr>
<tr>
<td>resources</td>
<td>See description</td>
<td>
The cpu limits are set by <code>spark.kubernetes.{driver,executor}.limit.cores</code>. The cpu is set by
<code>spark.{driver,executor}.cores</code>. The memory request and limit are set by summing the values of
<code>spark.{driver,executor}.memory</code> and <code>spark.{driver,executor}.memoryOverhead</code>.

</td>
</tr>
<tr>
<td>volumeMounts</td>
<td>Add volumes from <code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.{path,readOnly}</code></td>
<td>
Spark will add volumes as specified by the spark conf, as well as additional volumes necessary for passing
spark conf and pod template files.
</td>
</tr>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,39 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_LOCAL_DIRS_TMPFS =
ConfigBuilder("spark.kubernetes.local.dirs.tmpfs")
.doc("If set to true then emptyDir volumes created to back SPARK_LOCAL_DIRS will have " +
"their medium set to Memory so that they will be created as tmpfs (i.e. RAM) backed " +
"volumes. This may improve performance but scratch space usage will count towards " +
"your pods memory limit so you may wish to request more memory.")
.booleanConf
.createWithDefault(false)

val KUBERNETES_DRIVER_PODTEMPLATE_FILE =
ConfigBuilder("spark.kubernetes.driver.podTemplateFile")
.doc("File containing a template pod spec for the driver")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_PODTEMPLATE_FILE =
ConfigBuilder("spark.kubernetes.executor.podTemplateFile")
.doc("File containing a template pod spec for executors")
.stringConf
.createOptional

val KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME =
ConfigBuilder("spark.kubernetes.driver.podTemplateContainerName")
.doc("container name to be used as a basis for the driver in the given pod template")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_PODTEMPLATE_CONTAINER_NAME =
ConfigBuilder("spark.kubernetes.executor.podTemplateContainerName")
.doc("container name to be used as a basis for executors in the given pod template")
.stringConf
.createOptional

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,16 @@ private[spark] object Constants {
val ENV_R_PRIMARY = "R_PRIMARY"
val ENV_R_ARGS = "R_APP_ARGS"

// Pod spec templates
val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"
val EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH = "/opt/spark/pod-template"
val POD_TEMPLATE_VOLUME = "pod-template-volume"
val POD_TEMPLATE_CONFIGMAP = "podspec-configmap"
val POD_TEMPLATE_KEY = "podspec-configmap-key"

// Miscellaneous
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor"
val MEMORY_OVERHEAD_MIN_MIB = 384L
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,3 @@ private[spark] case class KubernetesDriverSpec(
pod: SparkPod,
driverKubernetesResources: Seq[HasMetadata],
systemProperties: Map[String, String])

private[spark] object KubernetesDriverSpec {
def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec(
SparkPod.initialPod(),
Seq.empty,
initialProps)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@
*/
package org.apache.spark.deploy.k8s

import org.apache.spark.SparkConf
import java.io.File

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

private[spark] object KubernetesUtils {
private[spark] object KubernetesUtils extends Logging {

/**
* Extract and parse Spark configuration properties with a given name prefix and
Expand Down Expand Up @@ -59,5 +67,46 @@ private[spark] object KubernetesUtils {
}
}

def loadPodFromTemplate(
kubernetesClient: KubernetesClient,
templateFile: File,
containerName: Option[String]): SparkPod = {
try {
val pod = kubernetesClient.pods().load(templateFile).get()
selectSparkContainer(pod, containerName)
} catch {
case e: Exception =>
logError(
s"Encountered exception while attempting to load initial pod spec from file", e)
throw new SparkException("Could not load pod from template file.", e)
}
}

def selectSparkContainer(pod: Pod, containerName: Option[String]): SparkPod = {
def selectNamedContainer(
containers: List[Container], name: String): Option[(Container, List[Container])] =
containers.partition(_.getName == name) match {
case (sparkContainer :: Nil, rest) => Some((sparkContainer, rest))
case _ =>
logWarning(
s"specified container ${name} not found on pod template, " +
s"falling back to taking the first container")
Option.empty
}
val containers = pod.getSpec.getContainers.asScala.toList
containerName
.flatMap(selectNamedContainer(containers, _))
.orElse(containers.headOption.map((_, containers.tail)))
.map {
case (sparkContainer: Container, rest: List[Container]) => SparkPod(
new PodBuilder(pod)
.editSpec()
.withContainers(rest.asJava)
.endSpec()
.build(),
sparkContainer)
}.getOrElse(SparkPod(pod, new ContainerBuilder().build()))
}

def parseMasterUrl(url: String): String = url.substring("k8s://".length)
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[spark] class BasicDriverFeatureStep(
)
val driverUIPort = SparkUI.getUIPort(conf.sparkConf)
val driverContainer = new ContainerBuilder(pod.container)
.withName(DRIVER_CONTAINER_NAME)
.withName(Option(pod.container.getName).getOrElse(DEFAULT_DRIVER_CONTAINER_NAME))
.withImage(driverContainerImage)
.withImagePullPolicy(conf.imagePullPolicy())
.addNewPort()
Expand All @@ -105,7 +105,7 @@ private[spark] class BasicDriverFeatureStep(
.withNewFieldRef("v1", "status.podIP")
.build())
.endEnv()
.withNewResources()
.editOrNewResources()
.addToRequests("cpu", driverCpuQuantity)
.addToLimits(maybeCpuLimitQuantity.toMap.asJava)
.addToRequests("memory", driverMemoryQuantity)
Expand All @@ -119,9 +119,9 @@ private[spark] class BasicDriverFeatureStep(
.addToLabels(conf.roleLabels.asJava)
.addToAnnotations(conf.roleAnnotations.asJava)
.endMetadata()
.withNewSpec()
.editOrNewSpec()
.withRestartPolicy("Never")
.withNodeSelector(conf.nodeSelector().asJava)
.addToNodeSelector(conf.nodeSelector().asJava)
.addToImagePullSecrets(conf.imagePullSecrets(): _*)
.endSpec()
.build()
Expand Down
Loading

0 comments on commit 476c180

Please sign in to comment.