-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24434][K8S] pod template files #22146
Changes from 25 commits
e2e7223
ea4dde6
4f088db
f2f9a44
368d0a4
0005ea5
dda5cc9
d0f41aa
c4c1231
74de0e5
205ddd3
4ae6fc6
c0bcfea
b9e4263
c5e1ea0
56a6b32
7d0d928
1da79a8
8ef756e
7f3cb04
cc8d3f8
4119899
1d0a8fa
81e5a66
7f4ff5a
3097aef
9b1418a
ebacc96
98acd29
95f8b8b
da5dff5
7fb76c7
d86bc75
f2720a5
4b3950d
3813fcb
ec04323
f3b6082
fd503db
eeb2492
4801e8e
36a70ad
ece7a7c
8b8aa48
1ed95ab
a4fde0c
140e89c
838c2bd
5faea62
9e6a4b2
c8077dc
3d6ff3b
83087eb
a46b885
80b56c1
8f7f571
3707e6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,8 +74,14 @@ private[spark] object Constants { | |
val ENV_R_PRIMARY = "R_PRIMARY" | ||
val ENV_R_ARGS = "R_APP_ARGS" | ||
|
||
// Pod spec templates | ||
val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml" | ||
val EXECUTOR_POD_SPEC_TEMPLATE_MOUNTHPATH = "/opt/spark/pod-template" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spelling, think we want There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Quick ping here |
||
val POD_TEMPLATE_VOLUME = "podspec-volume" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: s/podspec-volume/pod-template-volume There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ping here |
||
val POD_TEMPLATE_CONFIGMAP = "podspec-configmap" | ||
val POD_TEMPLATE_KEY = "podspec-configmap-key" | ||
|
||
// Miscellaneous | ||
val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" | ||
val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" | ||
val MEMORY_OVERHEAD_MIN_MIB = 384L | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,8 +24,9 @@ private[spark] case class KubernetesDriverSpec( | |
systemProperties: Map[String, String]) | ||
|
||
private[spark] object KubernetesDriverSpec { | ||
def initialSpec(initialProps: Map[String, String]): KubernetesDriverSpec = KubernetesDriverSpec( | ||
SparkPod.initialPod(), | ||
Seq.empty, | ||
initialProps) | ||
def initialSpec(initialConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = | ||
KubernetesDriverSpec( | ||
SparkPod.initialPod(), | ||
Seq.empty, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: For clarity can you write as:
|
||
initialConf.sparkConf.getAll.toMap) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,10 +16,16 @@ | |
*/ | ||
package org.apache.spark.deploy.k8s | ||
|
||
import org.apache.spark.SparkConf | ||
import java.io.File | ||
|
||
import io.fabric8.kubernetes.client.KubernetesClient | ||
import scala.collection.JavaConverters._ | ||
|
||
import org.apache.spark.{SparkConf, SparkException} | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.util.Utils | ||
|
||
private[spark] object KubernetesUtils { | ||
private[spark] object KubernetesUtils extends Logging { | ||
|
||
/** | ||
* Extract and parse Spark configuration properties with a given name prefix and | ||
|
@@ -59,5 +65,21 @@ private[spark] object KubernetesUtils { | |
} | ||
} | ||
|
||
def loadPodFromTemplate(kubernetesClient: KubernetesClient, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IDEs don't handle this indentation properly I think - you want this:
|
||
templateFile: File, | ||
containerName: String): SparkPod = { | ||
try { | ||
val pod = kubernetesClient.pods().load(templateFile).get() | ||
val containers = pod.getSpec.getContainers.asScala | ||
require(containers.map(_.getName).contains(containerName)) | ||
SparkPod(pod, containers.filter(_.getName == containerName).head) | ||
} catch { | ||
case e: Exception => | ||
logError( | ||
s"Encountered exception while attempting to load initial pod spec from file", e) | ||
throw new SparkException("Could not load driver pod from template file.", e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This error message is misleading, it throws when both executor and driver pod failed to load from its own template. Either remove "driver" or be more specific that its executor or driver. |
||
} | ||
} | ||
|
||
def parseMasterUrl(url: String): String = url.substring("k8s://".length) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.deploy.k8s.features | ||
|
||
import java.io.File | ||
import java.nio.charset.StandardCharsets | ||
|
||
import com.google.common.io.Files | ||
import io.fabric8.kubernetes.api.model.{Config => _, _} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of doing this, just import |
||
|
||
import org.apache.spark.deploy.k8s._ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do not wildcard import the package. |
||
|
||
private[spark] class PodTemplateConfigMapStep( | ||
conf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) | ||
extends KubernetesFeatureConfigStep { | ||
def configurePod(pod: SparkPod): SparkPod = { | ||
val podWithVolume = new PodBuilder(pod.pod) | ||
.editSpec() | ||
.addNewVolume() | ||
.withName(Constants.POD_TEMPLATE_VOLUME) | ||
.withNewConfigMap() | ||
.withName(Constants.POD_TEMPLATE_CONFIGMAP) | ||
.addNewItem() | ||
.withKey(Constants.POD_TEMPLATE_KEY) | ||
.withPath(Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME) | ||
.endItem() | ||
.endConfigMap() | ||
.endVolume() | ||
.endSpec() | ||
.build() | ||
|
||
val containerWithVolume = new ContainerBuilder(pod.container) | ||
.addNewVolumeMount() | ||
.withName(Constants.POD_TEMPLATE_VOLUME) | ||
.withMountPath(Constants.EXECUTOR_POD_SPEC_TEMPLATE_MOUNTHPATH) | ||
.endVolumeMount() | ||
.build() | ||
SparkPod(podWithVolume, containerWithVolume) | ||
} | ||
|
||
def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( | ||
Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key -> | ||
(Constants.EXECUTOR_POD_SPEC_TEMPLATE_MOUNTHPATH + "/" + | ||
Constants.EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)) | ||
|
||
def getAdditionalKubernetesResources(): Seq[HasMetadata] = { | ||
require(conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) | ||
val podTemplateFile = conf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get | ||
val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8) | ||
Seq(new ConfigMapBuilder() | ||
.withNewMetadata() | ||
.withName(Constants.POD_TEMPLATE_CONFIGMAP) | ||
.endMetadata() | ||
.addToData(Constants.POD_TEMPLATE_KEY, podTemplateString) | ||
.build()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,9 +16,15 @@ | |
*/ | ||
package org.apache.spark.deploy.k8s.submit | ||
|
||
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf} | ||
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep, MountVolumesFeatureStep} | ||
import java.io.File | ||
|
||
import io.fabric8.kubernetes.client.KubernetesClient | ||
|
||
import org.apache.spark.{SparkConf, SparkException} | ||
import org.apache.spark.deploy.k8s.{Config, Constants, KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils, SparkPod} | ||
import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep, MountVolumesFeatureStep, PodTemplateConfigMapStep} | ||
import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} | ||
import org.apache.spark.internal.Logging | ||
|
||
private[spark] class KubernetesDriverBuilder( | ||
provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep = | ||
|
@@ -51,7 +57,11 @@ private[spark] class KubernetesDriverBuilder( | |
provideJavaStep: ( | ||
KubernetesConf[KubernetesDriverSpecificConf] | ||
=> JavaDriverFeatureStep) = | ||
new JavaDriverFeatureStep(_)) { | ||
new JavaDriverFeatureStep(_), | ||
podTemplateConfigMapStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] | ||
=> PodTemplateConfigMapStep) = | ||
new PodTemplateConfigMapStep(_), | ||
provideInitialPod: () => SparkPod = SparkPod.initialPod) { | ||
|
||
def buildFromFeatures( | ||
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { | ||
|
@@ -70,6 +80,10 @@ private[spark] class KubernetesDriverBuilder( | |
val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) { | ||
Seq(provideVolumesStep(kubernetesConf)) | ||
} else Nil | ||
val templateVolumeFeature = if ( | ||
kubernetesConf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) { | ||
Seq(podTemplateConfigMapStep(kubernetesConf)) | ||
} else Nil | ||
|
||
val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map { | ||
case JavaMainAppResource(_) => | ||
|
@@ -81,9 +95,12 @@ private[spark] class KubernetesDriverBuilder( | |
.getOrElse(provideJavaStep(kubernetesConf)) | ||
|
||
val allFeatures = (baseFeatures :+ bindingsStep) ++ | ||
secretFeature ++ envSecretFeature ++ volumesFeature | ||
secretFeature ++ envSecretFeature ++ volumesFeature ++ templateVolumeFeature | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never mind, I misread this. |
||
|
||
var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) | ||
var spec = KubernetesDriverSpec( | ||
provideInitialPod(), | ||
Seq.empty, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto to above in setting as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ping on this |
||
kubernetesConf.sparkConf.getAll.toMap) | ||
for (feature <- allFeatures) { | ||
val configuredPod = feature.configurePod(spec.pod) | ||
val addedSystemProperties = feature.getAdditionalPodSystemProperties() | ||
|
@@ -96,3 +113,24 @@ private[spark] class KubernetesDriverBuilder( | |
spec | ||
} | ||
} | ||
|
||
private[spark] object KubernetesDriverBuilder extends Logging { | ||
def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesDriverBuilder = { | ||
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE) | ||
.map(new File(_)) | ||
.map(file => new KubernetesDriverBuilder(provideInitialPod = () => { | ||
try { | ||
KubernetesUtils.loadPodFromTemplate( | ||
kubernetesClient, | ||
file, | ||
conf.get(Config.KUBERNETES_DRIVER_CONTAINER_NAME)) | ||
} catch { | ||
case e: Exception => | ||
logError( | ||
s"Encountered exception while attempting to load initial pod spec from file", e) | ||
throw new SparkException("Could not load driver pod from template file.", e) | ||
} | ||
})) | ||
.getOrElse(new KubernetesDriverBuilder()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,7 +23,7 @@ import com.google.common.cache.CacheBuilder | |
import io.fabric8.kubernetes.client.Config | ||
|
||
import org.apache.spark.SparkContext | ||
import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory} | ||
import org.apache.spark.deploy.k8s.{Constants, KubernetesUtils, SparkKubernetesClientFactory, SparkPod} | ||
import org.apache.spark.deploy.k8s.Config._ | ||
import org.apache.spark.deploy.k8s.Constants._ | ||
import org.apache.spark.internal.Logging | ||
|
@@ -69,6 +69,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit | |
defaultServiceAccountToken, | ||
defaultServiceAccountCaCrt) | ||
|
||
if (sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) { | ||
KubernetesUtils.loadPodFromTemplate( | ||
kubernetesClient, | ||
new File(sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get), | ||
sc.conf.get(KUBERNETES_EXECUTOR_CONTAINER_NAME)) | ||
} | ||
|
||
val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( | ||
"kubernetes-executor-requests") | ||
|
||
|
@@ -81,13 +88,17 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit | |
.build[java.lang.Long, java.lang.Long]() | ||
val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager( | ||
sc.conf, | ||
new KubernetesExecutorBuilder(), | ||
KubernetesExecutorBuilder(kubernetesClient, sc.conf), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I double checked and I don't think we use this variable inside |
||
kubernetesClient, | ||
snapshotsStore, | ||
removedExecutorsCache) | ||
|
||
val executorPodsAllocator = new ExecutorPodsAllocator( | ||
sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock()) | ||
sc.conf, | ||
KubernetesExecutorBuilder(kubernetesClient, sc.conf), | ||
kubernetesClient, | ||
snapshotsStore, | ||
new SystemClock()) | ||
|
||
val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource( | ||
snapshotsStore, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way to do this via the
containers
container array from the pod template?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to make it explicit, rather than e.g. "pick the first container in the list".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're saying that manipulating
containers
doesn't give specific control over which name is assigned to which container, then I agreeThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccheah it raises interesting question: a user might use this feature to add containers, e.g. side-car proxies, service mesh, etc. In fact, I'd want users to be able to do this. If there are multiple containers defined, we may need a convention for identifying which one is driver/executor. Alternatives might include additive-only, like labels, or disallowing, but supporting this seems very desirable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feature should support using multiple containers, in which case the user needs to specify which container is running the Spark process. Using a configuration option for that seems like the most straightforward solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, what if the user gives the driver container name in the driver template but forgets to specify
spark.kubernetes.driver.containerName
? Requiring users to set the container name explicitly and additionally through another config key sounds a bit awkward to me, particularly when there's only the Spark container itself.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there confusion because there's an existing configuration option also? I think the existing configuration option sets the driver container name when no yml is specified. But perhaps the interpretation of this configuration value should change when the pod template is provided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the semantic of this key will change with the template option. So with the template, there're two sources of driver container name, and we need to resolve conflicts in certain cases, e.g., if the name is specified in only one source or if the two sources mismatch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, I don't think there's an existing configuration option. I believe the container name is currently just hard coded in
Constants
.However, I see your concern about using
spark.kubernetes.driver.containerName
for multiple purposes. In that case, it's definitely easier to reason about with fewer moving pieces, and it sounds like simplest is best. I'll remove the container name configuration and just stick with first container = driver container. This just means that the only way to configure the container name will be through a pod template. I'll also clarify in the docsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. We were confusing the one for driver pod name with this one.