Skip to content

Commit

Permalink
Add parameter for driver pod name (apache#258)
Browse files Browse the repository at this point in the history
* Add parameter for driver pod name

* Mark KUBERNETES_DRIVER_POD_NAME not being internal. Update docment.

* Add test case for driver pod name

* Diff driver pod name with appid

* replace 'spark.kubernetes.driver.pod.name` with KUBERNETES_DRIVER_POD_NAME

* Update readme to complete item
  • Loading branch information
hustcat authored and foxish committed May 16, 2017
1 parent 546f09c commit eb45ae5
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 8 deletions.
7 changes: 7 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,13 @@ from the other deployment modes. See the [configuration page](configuration.html
resource.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.pod.name</code></td>
<td><code>(none)</code></td>
<td>
Name of the driver pod. If not set, the driver pod name is set to "spark.app.name" suffixed by the current timestamp to avoid name conflicts.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.submission.waitAppCompletion</code></td>
<td><code>true</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ package object config extends Logging {
private[spark] val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
.internal()
.stringConf
.createOptional

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ private[spark] class Client(
private val appName = sparkConf.getOption("spark.app.name")
.getOrElse("spark")
private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(kubernetesAppId)
private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId"
private val secretDirectory = s"$DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR/$kubernetesAppId"
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
Expand Down Expand Up @@ -150,7 +152,7 @@ private[spark] class Client(
loggingInterval)
Utils.tryWithResource(kubernetesClient
.pods()
.withName(kubernetesAppId)
.withName(kubernetesDriverPodName)
.watch(loggingWatch)) { _ =>
val resourceCleanShutdownHook = ShutdownHookManager.addShutdownHook(() =>
kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient))
Expand Down Expand Up @@ -247,7 +249,7 @@ private[spark] class Client(
logWarning(s"Warning: Provided app id in spark.app.id as $id will be" +
s" overridden as $kubernetesAppId")
}
sparkConf.set(KUBERNETES_DRIVER_POD_NAME, kubernetesAppId)
sparkConf.setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName)
sparkConf.set(KUBERNETES_DRIVER_SERVICE_NAME, driverService.getMetadata.getName)
sparkConf.set("spark.app.id", kubernetesAppId)
sparkConf.setIfMissing("spark.app.name", appName)
Expand Down Expand Up @@ -314,7 +316,7 @@ private[spark] class Client(
val podWatcher = new DriverPodReadyWatcher(podReadyFuture)
Utils.tryWithResource(kubernetesClient
.pods()
.withName(kubernetesAppId)
.withName(kubernetesDriverPodName)
.watch(podWatcher)) { _ =>
Utils.tryWithResource(kubernetesClient
.services()
Expand Down Expand Up @@ -445,7 +447,7 @@ private[spark] class Client(
.build()
val driverPod = kubernetesClient.pods().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
.withName(kubernetesDriverPodName)
.withLabels(driverKubernetesSelectors.asJava)
.withAnnotations(customAnnotations.asJava)
.endMetadata()
Expand Down Expand Up @@ -571,7 +573,7 @@ private[spark] class Client(
kubernetesClient: KubernetesClient,
e: Throwable): String = {
val driverPod = try {
kubernetesClient.pods().withName(kubernetesAppId).get()
kubernetesClient.pods().withName(kubernetesDriverPodName).get()
} catch {
case throwable: Throwable =>
logError(s"Timed out while waiting $driverSubmitTimeoutSecs seconds for the" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ private[spark] class Client(
private val appName = sparkConf.getOption("spark.app.name")
.getOrElse("spark")
private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(kubernetesAppId)
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
private val maybeStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI)
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
Expand Down Expand Up @@ -123,7 +125,7 @@ private[spark] class Client(
.build()
val basePod = new PodBuilder()
.withNewMetadata()
.withName(kubernetesAppId)
.withName(kubernetesDriverPodName)
.addToLabels(allLabels.asJava)
.addToAnnotations(parsedCustomAnnotations.asJava)
.endMetadata()
Expand Down Expand Up @@ -176,7 +178,7 @@ private[spark] class Client(
if (resolvedFiles.nonEmpty) {
resolvedSparkConf.set("spark.files", resolvedFiles.mkString(","))
}
resolvedSparkConf.set(KUBERNETES_DRIVER_POD_NAME, kubernetesAppId)
resolvedSparkConf.setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName)
resolvedSparkConf.set("spark.app.id", kubernetesAppId)
// We don't need this anymore since we just set the JVM options on the environment
resolvedSparkConf.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,22 @@ private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend)
"Unexpected value for annotation2")
}

test("Run with driver pod name") {
sparkConf.set(KUBERNETES_DRIVER_POD_NAME, "spark-pi")
new Client(
sparkConf = sparkConf,
mainClass = KubernetesSuite.SPARK_PI_MAIN_CLASS,
mainAppResource = KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE,
appArgs = Array.empty[String]).run()
val driverPodMetadata = kubernetesTestComponents.kubernetesClient
.pods()
.withName("spark-pi")
.get()
.getMetadata()
val driverName = driverPodMetadata.getName
assert(driverName === "spark-pi", "Unexpected driver pod name.")
}

test("Enable SSL on the driver submit server") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

Expand Down

0 comments on commit eb45ae5

Please sign in to comment.