Skip to content

Commit

Permalink
[SPARK-25262][K8S] Allow SPARK_LOCAL_DIRS to be tmpfs backed on K8S
Browse files Browse the repository at this point in the history
Adds a configuration option that enables SPARK_LOCAL_DIRS to be backed
by Memory backed emptyDir volumes rather than the default which is
whatever the kubelet's node storage happens to be
  • Loading branch information
rvesse committed Sep 3, 2018
1 parent 39d3d6c commit 0e1238a
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 0 deletions.
13 changes: 13 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,19 @@ spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.clai

The configuration properties for mounting volumes into the executor pods use prefix `spark.kubernetes.executor.` instead of `spark.kubernetes.driver.`. For a complete list of available options for each supported type of volumes, please refer to the [Spark Properties](#spark-properties) section below.

## Local Storage

Spark uses temporary scratch space to spill data to disk during shuffles and other operations. When using Kubernetes as the resource manager the pods will be created with an [emptyDir](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) volume mounted for each directory listed in `SPARK_LOCAL_DIRS`. If no directories are explicitly specified then a default directory is created and configured appropriately.

`emptyDir` volumes use the ephemeral storage feature of Kubernetes and do not persist beyond the life of the pod.

### Using RAM for local storage

As `emptyDir` volumes use the nodes backing storage for ephemeral storage this default behaviour may not be appropriate for some compute environments. For example if you have diskless nodes with remote storage mounted over a network having lots of executors doing IO to this remote storage may actually degrade performance.

In this case it may be desirable to set `spark.kubernetes.local.dirs.tmpfs=true` in your configuration which will cause the `emptyDir` volumes to be configured as `tmpfs` i.e. RAM backed volumes. When configured like this Sparks local storage usage will count towards your pods memory usage therefore you may wish to increase your memory requests via the normal `spark.driver.memory` and `spark.executor.memory` configuration properties.


## Introspection and Debugging

These are the different ways in which you can investigate a running/completed Spark application, monitor progress, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,15 @@ private[spark] object Config extends Logging {
"Ensure that major Python version is either Python2 or Python3")
.createWithDefault("2")

val KUBERNETES_LOCAL_DIRS_TMPFS =
ConfigBuilder("spark.kubernetes.local.dirs.tmpfs")
.doc("If set to true then emptyDir volumes created to back spark.local.dirs will have " +
"their medium set to Memory so that they will be created as tmpfs (i.e. RAM) backed " +
"volumes. This may improve performance but scratch space usage will count towards " +
"your pods memory limit so you may wish to request more memory.")
.booleanConf
.createWithDefault(false)

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.UUID
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder}

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._

private[spark] class LocalDirsFeatureStep(
conf: KubernetesConf[_ <: KubernetesRoleSpecificConf],
Expand All @@ -37,6 +38,7 @@ private[spark] class LocalDirsFeatureStep(
.orElse(conf.getOption("spark.local.dir"))
.getOrElse(defaultLocalDir)
.split(",")
private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)

override def configurePod(pod: SparkPod): SparkPod = {
val localDirVolumes = resolvedLocalDirs
Expand All @@ -45,6 +47,10 @@ private[spark] class LocalDirsFeatureStep(
new VolumeBuilder()
.withName(s"spark-local-dir-${index + 1}")
.withNewEmptyDir()
.withMedium(useLocalDirTmpFs match {
case true => "Memory" // Use tmpfs
case false => null // Default - use nodes backing storage
})
.endEmptyDir()
.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package org.apache.spark.deploy.k8s.features

import io.fabric8.kubernetes.api.model.{EnvVarBuilder, VolumeBuilder, VolumeMountBuilder}
import org.mockito.Mockito
import org.scalatest._
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._

class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
private val defaultLocalDir = "/var/data/default-local-dir"
Expand Down Expand Up @@ -111,4 +113,32 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
.withValue("/var/data/my-local-dir-1,/var/data/my-local-dir-2")
.build())
}

test("Use tmpfs to back default local dir") {
Mockito.doReturn(null).when(sparkConf).get("spark.local.dir")
Mockito.doReturn(null).when(sparkConf).getenv("SPARK_LOCAL_DIRS")
Mockito.doReturn(true).when(sparkConf).get(KUBERNETES_LOCAL_DIRS_TMPFS)
val stepUnderTest = new LocalDirsFeatureStep(kubernetesConf, defaultLocalDir)
val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
assert(configuredPod.pod.getSpec.getVolumes.size === 1)
assert(configuredPod.pod.getSpec.getVolumes.get(0) ===
new VolumeBuilder()
.withName(s"spark-local-dir-1")
.withNewEmptyDir()
.withMedium("Memory")
.endEmptyDir()
.build())
assert(configuredPod.container.getVolumeMounts.size === 1)
assert(configuredPod.container.getVolumeMounts.get(0) ===
new VolumeMountBuilder()
.withName(s"spark-local-dir-1")
.withMountPath(defaultLocalDir)
.build())
assert(configuredPod.container.getEnv.size === 1)
assert(configuredPod.container.getEnv.get(0) ===
new EnvVarBuilder()
.withName("SPARK_LOCAL_DIRS")
.withValue(defaultLocalDir)
.build())
}
}

0 comments on commit 0e1238a

Please sign in to comment.