From af92539b7a7428f0c36a5a230f52bc55a4ae9e1e Mon Sep 17 00:00:00 2001 From: mccheah Date: Fri, 3 Mar 2017 12:45:04 -0800 Subject: [PATCH] Allow setting memory on the driver submission server. (#161) * Allow setting memory on the driver submission server. * Address comments * Address comments (cherry picked from commit f6823f381633cd49c572c33e4835868dba0b549e) --- docs/running-on-kubernetes.md | 18 +++++++++++- .../spark/deploy/kubernetes/Client.scala | 28 +++++++++++++++++++ .../spark/deploy/kubernetes/config.scala | 23 ++++++++++++++- .../spark/deploy/kubernetes/constants.scala | 3 ++ .../KubernetesClusterSchedulerBackend.scala | 21 +++++++------- 5 files changed, 80 insertions(+), 13 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index d024d427fea97..1824fd6d2de98 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -201,13 +201,29 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.executor.memoryOverhead - executorMemory * 0.10, with minimum of 384 + executorMemory * 0.10, with minimum of 384 The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). + + spark.kubernetes.driver.submissionServerMemory + 256m + + The amount of memory to allocate for the driver submission server. + + + + spark.kubernetes.driver.memoryOverhead + (driverMemory + driverSubmissionServerMemory) * 0.10, with minimum of 384 + + The amount of off-heap memory (in megabytes) to be allocated for the driver and the driver submission server. This + is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to + grow with the driver size (typically 6-10%). + + spark.kubernetes.driver.labels (none) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index af5623093382e..d38d84f7b3ed3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -64,6 +64,19 @@ private[spark] class Client( .map(_.split(",")) .getOrElse(Array.empty[String]) + // Memory settings + private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverSubmitServerMemoryMb = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY) + private val driverSubmitServerMemoryString = sparkConf.get( + KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.key, + KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.defaultValueString) + private val driverContainerMemoryMb = driverMemoryMb + driverSubmitServerMemoryMb + private val memoryOverheadMb = sparkConf + .get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverContainerMemoryMb).toInt, + MEMORY_OVERHEAD_MIN)) + private val driverContainerMemoryWithOverhead = driverContainerMemoryMb + memoryOverheadMb + private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION) private val secretBase64String = { @@ -408,6 +421,12 @@ private[spark] class Client( .withPath("/v1/submissions/ping") .withNewPort(SUBMISSION_SERVER_PORT_NAME) .build() + val driverMemoryQuantity = new QuantityBuilder(false) + .withAmount(s"${driverContainerMemoryMb}M") + .build() + val driverMemoryLimitQuantity = new QuantityBuilder(false) + .withAmount(s"${driverContainerMemoryWithOverhead}M") + .build() val driverPod = kubernetesClient.pods().createNew() .withNewMetadata() .withName(kubernetesAppId) @@ -442,7 +461,16 @@ private[spark] class Client( .withName(ENV_SUBMISSION_SERVER_PORT) .withValue(SUBMISSION_SERVER_PORT.toString) .endEnv() + // Note that SPARK_DRIVER_MEMORY only affects the REST server via spark-class. + .addNewEnv() + .withName(ENV_DRIVER_MEMORY) + .withValue(driverSubmitServerMemoryString) + .endEnv() .addToEnv(sslConfiguration.sslPodEnvVars: _*) + .withNewResources() + .addToRequests("memory", driverMemoryQuantity) + .addToLimits("memory", driverMemoryLimitQuantity) + .endResources() .withPorts(containerPorts.asJava) .withNewReadinessProbe().withHttpGet(probePingHttpGet).endReadinessProbe() .endContainer() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 213b5367263f8..dc61ad4025f0f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.{SPARK_VERSION => sparkVersion} import org.apache.spark.deploy.rest.kubernetes.NodePortUrisDriverServiceManager import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit package object config { @@ -104,7 +105,19 @@ package object config { | overheads, etc. This tends to grow with the executor size | (typically 6-10%). """.stripMargin) - .stringConf + .bytesConf(ByteUnit.MiB) + .createOptional + + private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD = + ConfigBuilder("spark.kubernetes.driver.memoryOverhead") + .doc(""" + | The amount of off-heap memory (in megabytes) to be + | allocated for the driver and the driver submission server. + | This is memory that accounts for things like VM overheads, + | interned strings, other native overheads, etc. This tends + | to grow with the driver's memory size (typically 6-10%). + """.stripMargin) + .bytesConf(ByteUnit.MiB) .createOptional private[spark] val KUBERNETES_DRIVER_LABELS = @@ -177,6 +190,14 @@ package object config { .stringConf .createOptional + private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY = + ConfigBuilder("spark.kubernetes.driver.submissionServerMemory") + .doc(""" + | The amount of memory to allocate for the driver submission server. + """.stripMargin) + .bytesConf(ByteUnit.MiB) + .createWithDefaultString("256m") + private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT = ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort") .doc(""" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 10ddb12463894..4af065758e674 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -63,6 +63,7 @@ package object constants { private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY" private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID" private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID" + private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY" // Annotation keys private[spark] val ANNOTATION_PROVIDE_EXTERNAL_URI = @@ -74,4 +75,6 @@ package object constants { private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit" private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" + private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 + private[spark] val MEMORY_OVERHEAD_MIN = 384L } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 898b215b92d04..90907ff83ed84 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -60,15 +60,16 @@ private[spark] class KubernetesClusterSchedulerBackend( .getOrElse( throw new SparkException("Must specify the driver pod name")) - private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g") - private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory) + private val executorMemoryMb = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = conf.get( + org.apache.spark.internal.config.EXECUTOR_MEMORY.key, + org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) - private val memoryOverheadBytes = conf + private val memoryOverheadMb = conf .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) - .map(overhead => Utils.byteStringAsBytes(overhead)) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryBytes).toInt, + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt, MEMORY_OVERHEAD_MIN)) - private val executorMemoryWithOverhead = executorMemoryBytes + memoryOverheadBytes + private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1") @@ -165,10 +166,10 @@ private[spark] class KubernetesClusterSchedulerBackend( val selectors = Map(SPARK_EXECUTOR_ID_LABEL -> executorId, SPARK_APP_ID_LABEL -> applicationId()).asJava val executorMemoryQuantity = new QuantityBuilder(false) - .withAmount(executorMemoryBytes.toString) + .withAmount(s"${executorMemoryMb}M") .build() val executorMemoryLimitQuantity = new QuantityBuilder(false) - .withAmount(executorMemoryWithOverhead.toString) + .withAmount(s"${executorMemoryWithOverhead}M") .build() val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCores) @@ -177,7 +178,7 @@ private[spark] class KubernetesClusterSchedulerBackend( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), (ENV_EXECUTOR_CORES, executorCores), - (ENV_EXECUTOR_MEMORY, executorMemory), + (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId()), (ENV_EXECUTOR_ID, executorId) ).map(env => new EnvVarBuilder() @@ -261,7 +262,5 @@ private[spark] class KubernetesClusterSchedulerBackend( private object KubernetesClusterSchedulerBackend { private val DEFAULT_STATIC_PORT = 10000 - private val MEMORY_OVERHEAD_FACTOR = 0.10 - private val MEMORY_OVERHEAD_MIN = 384L private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) }