Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25877][k8s] Move all feature logic to feature classes. #23220

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ private[spark] class HadoopConfExecutorFeatureStep(conf: KubernetesExecutorConf)

override def configurePod(pod: SparkPod): SparkPod = {
val hadoopConfDirCMapName = conf.getOption(HADOOP_CONFIG_MAP_NAME)
require(hadoopConfDirCMapName.isDefined,
"Ensure that the env `HADOOP_CONF_DIR` is defined either in the client or " +
" using pre-existing ConfigMaps")
logInfo("HADOOP_CONF_DIR defined")
HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod)
if (hadoopConfDirCMapName.isDefined) {
HadoopBootstrapUtil.bootstrapHadoopConfDir(None, None, hadoopConfDirCMapName, pod)
} else {
pod
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ private[spark] class HadoopSparkUserExecutorFeatureStep(conf: KubernetesExecutor
extends KubernetesFeatureConfigStep {

override def configurePod(pod: SparkPod): SparkPod = {
val sparkUserName = conf.get(KERBEROS_SPARK_USER_NAME)
HadoopBootstrapUtil.bootstrapSparkUserPod(sparkUserName, pod)
conf.getOption(KERBEROS_SPARK_USER_NAME).map { user =>
HadoopBootstrapUtil.bootstrapSparkUserPod(user, pod)
}.getOrElse(pod)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ import org.apache.spark.internal.Logging
private[spark] class KerberosConfExecutorFeatureStep(conf: KubernetesExecutorConf)
extends KubernetesFeatureConfigStep with Logging {

private val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME)
require(maybeKrb5CMap.isDefined, "HADOOP_CONF_DIR ConfigMap not found")

override def configurePod(pod: SparkPod): SparkPod = {
logInfo(s"Mounting Resources for Kerberos")
HadoopBootstrapUtil.bootstrapKerberosPod(
conf.get(KERBEROS_DT_SECRET_NAME),
conf.get(KERBEROS_DT_SECRET_KEY),
conf.get(KERBEROS_SPARK_USER_NAME),
None,
None,
maybeKrb5CMap,
pod)
val maybeKrb5CMap = conf.getOption(KRB5_CONFIG_MAP_NAME)
if (maybeKrb5CMap.isDefined) {
logInfo(s"Mounting Resources for Kerberos")
HadoopBootstrapUtil.bootstrapKerberosPod(
conf.get(KERBEROS_DT_SECRET_NAME),
conf.get(KERBEROS_DT_SECRET_KEY),
conf.get(KERBEROS_SPARK_USER_NAME),
None,
None,
maybeKrb5CMap,
pod)
} else {
pod
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,60 @@ import org.apache.spark.deploy.k8s.Constants._

private[spark] class PodTemplateConfigMapStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {

private val hasTemplate = conf.contains(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE)

def configurePod(pod: SparkPod): SparkPod = {
val podWithVolume = new PodBuilder(pod.pod)
.editSpec()
.addNewVolume()
.withName(POD_TEMPLATE_VOLUME)
.withNewConfigMap()
.withName(POD_TEMPLATE_CONFIGMAP)
.addNewItem()
.withKey(POD_TEMPLATE_KEY)
.withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
.endItem()
.endConfigMap()
.endVolume()
.endSpec()
.build()
if (hasTemplate) {
val podWithVolume = new PodBuilder(pod.pod)
.editSpec()
.addNewVolume()
.withName(POD_TEMPLATE_VOLUME)
.withNewConfigMap()
.withName(POD_TEMPLATE_CONFIGMAP)
.addNewItem()
.withKey(POD_TEMPLATE_KEY)
.withPath(EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)
.endItem()
.endConfigMap()
.endVolume()
.endSpec()
.build()

val containerWithVolume = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(POD_TEMPLATE_VOLUME)
.withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH)
.endVolumeMount()
.build()
SparkPod(podWithVolume, containerWithVolume)
val containerWithVolume = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(POD_TEMPLATE_VOLUME)
.withMountPath(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH)
.endVolumeMount()
.build()
SparkPod(podWithVolume, containerWithVolume)
} else {
pod
}
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String](
KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key ->
(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
override def getAdditionalPodSystemProperties(): Map[String, String] = {
if (hasTemplate) {
Map[String, String](
KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key ->
(EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME))
} else {
Map.empty
}
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined)
val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8)
Seq(new ConfigMapBuilder()
.withNewMetadata()
.withName(POD_TEMPLATE_CONFIGMAP)
.endMetadata()
.addToData(POD_TEMPLATE_KEY, podTemplateString)
.build())
if (hasTemplate) {
val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get
val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8)
Seq(new ConfigMapBuilder()
.withNewMetadata()
.withName(POD_TEMPLATE_CONFIGMAP)
.endMetadata()
.addToData(POD_TEMPLATE_KEY, podTemplateString)
.build())
} else {
Nil
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private[spark] class Client(
watcher: LoggingPodStatusWatcher) extends Logging {

def run(): Unit = {
val resolvedDriverSpec = builder.buildFromFeatures(conf)
val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
val configMapName = s"${conf.resourceNamePrefix}-driver-conf-map"
val configMap = buildConfigMap(configMapName, resolvedDriverSpec.systemProperties)
// The include of the ENV_VAR for "SPARK_CONF_DIR" is to allow for the
Expand Down Expand Up @@ -232,7 +232,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
None)) { kubernetesClient =>
val client = new Client(
kubernetesConf,
KubernetesDriverBuilder(kubernetesClient, kubernetesConf.sparkConf),
new KubernetesDriverBuilder(),
kubernetesClient,
waitForAppCompletion,
watcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,90 +20,49 @@ import java.io.File

import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features._

private[spark] class KubernetesDriverBuilder(
provideBasicStep: (KubernetesDriverConf => BasicDriverFeatureStep) =
new BasicDriverFeatureStep(_),
provideCredentialsStep: (KubernetesDriverConf => DriverKubernetesCredentialsFeatureStep) =
new DriverKubernetesCredentialsFeatureStep(_),
provideServiceStep: (KubernetesDriverConf => DriverServiceFeatureStep) =
new DriverServiceFeatureStep(_),
provideSecretsStep: (KubernetesConf => MountSecretsFeatureStep) =
new MountSecretsFeatureStep(_),
provideEnvSecretsStep: (KubernetesConf => EnvSecretsFeatureStep) =
new EnvSecretsFeatureStep(_),
provideLocalDirsStep: (KubernetesConf => LocalDirsFeatureStep) =
new LocalDirsFeatureStep(_),
provideVolumesStep: (KubernetesConf => MountVolumesFeatureStep) =
new MountVolumesFeatureStep(_),
provideDriverCommandStep: (KubernetesDriverConf => DriverCommandFeatureStep) =
new DriverCommandFeatureStep(_),
provideHadoopGlobalStep: (KubernetesDriverConf => KerberosConfDriverFeatureStep) =
new KerberosConfDriverFeatureStep(_),
providePodTemplateConfigMapStep: (KubernetesConf => PodTemplateConfigMapStep) =
new PodTemplateConfigMapStep(_),
provideInitialPod: () => SparkPod = () => SparkPod.initialPod) {
private[spark] class KubernetesDriverBuilder {

def buildFromFeatures(kubernetesConf: KubernetesDriverConf): KubernetesDriverSpec = {
val baseFeatures = Seq(
provideBasicStep(kubernetesConf),
provideCredentialsStep(kubernetesConf),
provideServiceStep(kubernetesConf),
provideLocalDirsStep(kubernetesConf))

val secretFeature = if (kubernetesConf.secretNamesToMountPaths.nonEmpty) {
Seq(provideSecretsStep(kubernetesConf))
} else Nil
val envSecretFeature = if (kubernetesConf.secretEnvNamesToKeyRefs.nonEmpty) {
Seq(provideEnvSecretsStep(kubernetesConf))
} else Nil
val volumesFeature = if (kubernetesConf.volumes.nonEmpty) {
Seq(provideVolumesStep(kubernetesConf))
} else Nil
val podTemplateFeature = if (
kubernetesConf.get(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) {
Seq(providePodTemplateConfigMapStep(kubernetesConf))
} else Nil

val driverCommandStep = provideDriverCommandStep(kubernetesConf)

val hadoopConfigStep = Some(provideHadoopGlobalStep(kubernetesConf))
def buildFromFeatures(
conf: KubernetesDriverConf,
client: KubernetesClient): KubernetesDriverSpec = {
val initialPod = conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
.map { file =>
KubernetesUtils.loadPodFromTemplate(
client,
new File(file),
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
}
.getOrElse(SparkPod.initialPod())

val allFeatures: Seq[KubernetesFeatureConfigStep] =
baseFeatures ++ Seq(driverCommandStep) ++
secretFeature ++ envSecretFeature ++ volumesFeature ++
hadoopConfigStep ++ podTemplateFeature
val features = Seq(
new BasicDriverFeatureStep(conf),
new DriverKubernetesCredentialsFeatureStep(conf),
new DriverServiceFeatureStep(conf),
new MountSecretsFeatureStep(conf),
new EnvSecretsFeatureStep(conf),
new LocalDirsFeatureStep(conf),
new MountVolumesFeatureStep(conf),
new DriverCommandFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf))

var spec = KubernetesDriverSpec(
provideInitialPod(),
val spec = KubernetesDriverSpec(
initialPod,
driverKubernetesResources = Seq.empty,
kubernetesConf.sparkConf.getAll.toMap)
for (feature <- allFeatures) {
conf.sparkConf.getAll.toMap)

features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod)
val addedSystemProperties = feature.getAdditionalPodSystemProperties()
val addedResources = feature.getAdditionalKubernetesResources()
spec = KubernetesDriverSpec(
KubernetesDriverSpec(
configuredPod,
spec.driverKubernetesResources ++ addedResources,
spec.systemProperties ++ addedSystemProperties)
}
spec
}
}

private[spark] object KubernetesDriverBuilder {
def apply(kubernetesClient: KubernetesClient, conf: SparkConf): KubernetesDriverBuilder = {
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_FILE)
.map(new File(_))
.map(file => new KubernetesDriverBuilder(provideInitialPod = () =>
KubernetesUtils.loadPodFromTemplate(
kubernetesClient,
file,
conf.get(Config.KUBERNETES_DRIVER_PODTEMPLATE_CONTAINER_NAME))
))
.getOrElse(new KubernetesDriverBuilder())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ private[spark] class ExecutorPodsAllocator(
newExecutorId.toString,
applicationId,
driverPod)
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr)
val executorPod = executorBuilder.buildFromFeatures(executorConf, secMgr,
kubernetesClient)
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val executorPodsAllocator = new ExecutorPodsAllocator(
sc.conf,
sc.env.securityManager,
KubernetesExecutorBuilder(kubernetesClient, sc.conf),
new KubernetesExecutorBuilder(),
kubernetesClient,
snapshotsStore,
new SystemClock())
Expand Down
Loading