Skip to content

Commit

Permalink
Allow setting memory on the driver submission server. (apache#161)
Browse files Browse the repository at this point in the history
* Allow setting memory on the driver submission server.

* Address comments

* Address comments
  • Loading branch information
mccheah authored and foxish committed Jul 24, 2017
1 parent 6c42d4b commit bd3deca
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 13 deletions.
18 changes: 17 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,29 @@ from the other deployment modes. See the [configuration page](configuration.html
</tr>
<tr>
<td><code>spark.kubernetes.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
<td>executorMemory * 0.10, with minimum of 384</td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things
like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size
(typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.submissionServerMemory</code></td>
<td>256m</td>
<td>
The amount of memory to allocate for the driver submission server.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.memoryOverhead</code></td>
<td>(driverMemory + driverSubmissionServerMemory) * 0.10, with minimum of 384</td>
<td>
The amount of off-heap memory (in megabytes) to be allocated for the driver and the driver submission server. This
is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to
grow with the driver size (typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.labels</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ private[spark] class Client(
.map(_.split(","))
.getOrElse(Array.empty[String])

// Memory settings
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
private val driverSubmitServerMemoryMb = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY)
private val driverSubmitServerMemoryString = sparkConf.get(
KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.key,
KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.defaultValueString)
private val driverContainerMemoryMb = driverMemoryMb + driverSubmitServerMemoryMb
private val memoryOverheadMb = sparkConf
.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverContainerMemoryMb).toInt,
MEMORY_OVERHEAD_MIN))
private val driverContainerMemoryWithOverhead = driverContainerMemoryMb + memoryOverheadMb

private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION)

private val secretBase64String = {
Expand Down Expand Up @@ -408,6 +421,12 @@ private[spark] class Client(
.withPath("/v1/submissions/ping")
.withNewPort(SUBMISSION_SERVER_PORT_NAME)
.build()
val driverMemoryQuantity = new QuantityBuilder(false)
.withAmount(s"${driverContainerMemoryMb}M")
.build()
val driverMemoryLimitQuantity = new QuantityBuilder(false)
.withAmount(s"${driverContainerMemoryWithOverhead}M")
.build()
val driverPod = kubernetesClient.pods().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
Expand Down Expand Up @@ -442,7 +461,16 @@ private[spark] class Client(
.withName(ENV_SUBMISSION_SERVER_PORT)
.withValue(SUBMISSION_SERVER_PORT.toString)
.endEnv()
// Note that SPARK_DRIVER_MEMORY only affects the REST server via spark-class.
.addNewEnv()
.withName(ENV_DRIVER_MEMORY)
.withValue(driverSubmitServerMemoryString)
.endEnv()
.addToEnv(sslConfiguration.sslPodEnvVars: _*)
.withNewResources()
.addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryLimitQuantity)
.endResources()
.withPorts(containerPorts.asJava)
.withNewReadinessProbe().withHttpGet(probePingHttpGet).endReadinessProbe()
.endContainer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.deploy.rest.kubernetes.NodePortUrisDriverServiceManager
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

package object config {

Expand Down Expand Up @@ -104,7 +105,19 @@ package object config {
| overheads, etc. This tends to grow with the executor size
| (typically 6-10%).
""".stripMargin)
.stringConf
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
.doc("""
| The amount of off-heap memory (in megabytes) to be
| allocated for the driver and the driver submission server.
| This is memory that accounts for things like VM overheads,
| interned strings, other native overheads, etc. This tends
| to grow with the driver's memory size (typically 6-10%).
""".stripMargin)
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val KUBERNETES_DRIVER_LABELS =
Expand Down Expand Up @@ -177,6 +190,14 @@ package object config {
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY =
ConfigBuilder("spark.kubernetes.driver.submissionServerMemory")
.doc("""
| The amount of memory to allocate for the driver submission server.
""".stripMargin)
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("256m")

private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT =
ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort")
.doc("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ package object constants {
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"

// Annotation keys
private[spark] val ANNOTATION_PROVIDE_EXTERNAL_URI =
Expand All @@ -74,4 +75,6 @@ package object constants {
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit"
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
private[spark] val MEMORY_OVERHEAD_MIN = 384L
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
.getOrElse(
throw new SparkException("Must specify the driver pod name"))

private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g")
private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory)
private val executorMemoryMb = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
private val executorMemoryString = conf.get(
org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)

private val memoryOverheadBytes = conf
private val memoryOverheadMb = conf
.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
.map(overhead => Utils.byteStringAsBytes(overhead))
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryBytes).toInt,
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt,
MEMORY_OVERHEAD_MIN))
private val executorMemoryWithOverhead = executorMemoryBytes + memoryOverheadBytes
private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb

private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1")

Expand Down Expand Up @@ -165,10 +166,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
val selectors = Map(SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> applicationId()).asJava
val executorMemoryQuantity = new QuantityBuilder(false)
.withAmount(executorMemoryBytes.toString)
.withAmount(s"${executorMemoryMb}M")
.build()
val executorMemoryLimitQuantity = new QuantityBuilder(false)
.withAmount(executorMemoryWithOverhead.toString)
.withAmount(s"${executorMemoryWithOverhead}M")
.build()
val executorCpuQuantity = new QuantityBuilder(false)
.withAmount(executorCores)
Expand All @@ -177,7 +178,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
(ENV_EXECUTOR_PORT, executorPort.toString),
(ENV_DRIVER_URL, driverUrl),
(ENV_EXECUTOR_CORES, executorCores),
(ENV_EXECUTOR_MEMORY, executorMemory),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, applicationId()),
(ENV_EXECUTOR_ID, executorId)
).map(env => new EnvVarBuilder()
Expand Down Expand Up @@ -261,7 +262,5 @@ private[spark] class KubernetesClusterSchedulerBackend(

private object KubernetesClusterSchedulerBackend {
private val DEFAULT_STATIC_PORT = 10000
private val MEMORY_OVERHEAD_FACTOR = 0.10
private val MEMORY_OVERHEAD_MIN = 384L
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
}

0 comments on commit bd3deca

Please sign in to comment.