Skip to content

Commit

Permalink
K8S-1087 (apache#602)
Browse files Browse the repository at this point in the history
* K8S-1087

- mount metrics_ticket implicitly to spark pods from mapr-server-secrets

* K8S-1087
- fix tickets mounting conflict
- move unnecessary config values to constants
  • Loading branch information
AlexNavara authored and ekrivokonmapr committed Nov 6, 2023
1 parent cccf6e5 commit 7f35c27
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -676,11 +676,6 @@ private[spark] object Config extends Logging {
.stringConf
.createWithDefault("mapr-user-secret")

val MAPR_TICKET_SECRET_KEY =
ConfigBuilder("spark.mapr.ticket.secret.key")
.stringConf
.createWithDefault("CONTAINER_TICKET")

val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
"spark.kubernetes.authenticate.submission"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ private[spark] object Constants {
val APP_RESOURCE_TYPE_PYTHON = "python"
val APP_RESOURCE_TYPE_R = "r"

val MAPR_TICKETFILE_LOCATION = "MAPR_TICKETFILE_LOCATION"
val MAPR_SSL_LOCATION = "MAPR_SSL_LOCATION"
// MAPR-SPECIFIC
val ENV_MAPR_TICKETFILE_LOCATION = "MAPR_TICKETFILE_LOCATION"
val MAPR_USER_TICKET_SUBPATH = "CONTAINER_TICKET"
val MAPR_USER_TICKET_MOUNT_PATH = "/tmp/maprticket/CONTAINER_TICKET"

val ENV_MAPR_METRICSFILE_LOCATION = "MAPR_METRICSFILE_LOCATION"
val MAPR_METRICS_TICKET_SUBPATH = "maprmetricsticket"
val MAPR_METRICS_TICKET_MOUNT_PATH = "/tmp/maprticket/METRICS_TICKET"
val MAPR_SERVER_SECRET = "mapr-server-secrets"

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata, PodBuilder, VolumeBuilder}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata, PodBuilder}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
Expand All @@ -14,55 +14,96 @@ private[spark] class MaprConfigFeatureStep(
val sparkConf: SparkConf = conf.sparkConf

override def configurePod(pod: SparkPod): SparkPod = {
val clusterEnvs = sparkConf.getAllWithPrefix(KUBERNETES_CLUSTER_ENV_KEY).toSeq
.map { case (name, value) =>
new EnvVarBuilder()
.withName(name)
.withValue(value)
.build()
}

val clusterConfMap = sparkConf.get(MAPR_CLUSTER_CONFIGMAP).toString
val userSecret = sparkConf.get(MAPR_USER_SECRET).toString
val userSecretVolumeName = s"$userSecret-volume"
val userSecretMountPath = "/tmp/maprticket"
val ticketFileLocation = s"$userSecretMountPath/${sparkConf.get(MAPR_TICKET_SECRET_KEY)}"

val maprPod = new PodBuilder(pod.pod)
.editOrNewSpec()
.addToVolumes(
new VolumeBuilder()
.withName(userSecretVolumeName)
.withNewSecret()
.withSecretName(userSecret)
.endSecret()
.build())
val podBuilder = new PodBuilder(pod.pod)
val containerBuilder = new ContainerBuilder(pod.container)

applyUserSecret(podBuilder, containerBuilder)
applyMetricsTicket(podBuilder, containerBuilder)
applyClusterConfigMap(podBuilder, containerBuilder)
addClusterEnvs(podBuilder, containerBuilder)

SparkPod(podBuilder.build(), containerBuilder.build())
}

private def applyUserSecret(podBuilder: PodBuilder, containerBuilder: ContainerBuilder) = {
val userSecretName = sparkConf.get(MAPR_USER_SECRET).toString
val userSecretVolumeName = s"$userSecretName-volume"

podBuilder.editOrNewSpec()
.addNewVolume()
.withName(userSecretVolumeName)
.withNewSecret()
.withSecretName(userSecretName)
.endSecret()
.endVolume()
.endSpec()
.build()

val maprContainer = new ContainerBuilder(pod.container)
.addAllToEnv(clusterEnvs.asJava)
containerBuilder
.addNewEnv()
.withName(MAPR_TICKETFILE_LOCATION)
.withValue(ticketFileLocation)
.withName(ENV_MAPR_TICKETFILE_LOCATION)
.withValue(MAPR_USER_TICKET_MOUNT_PATH)
.endEnv()
.addNewEnvFrom()
.withNewSecretRef()
.withName(userSecretName)
.endSecretRef()
.endEnvFrom()
.addNewVolumeMount()
.withName(userSecretVolumeName)
.withMountPath(userSecretMountPath)
.withMountPath(MAPR_USER_TICKET_MOUNT_PATH)
.withSubPath(MAPR_USER_TICKET_SUBPATH)
.endVolumeMount()
}

private def applyMetricsTicket(podBuilder: PodBuilder, containerBuilder: ContainerBuilder) = {
val serverSecretName = MAPR_SERVER_SECRET
val serverSecretVolume = s"$serverSecretName-volume"

podBuilder.editOrNewSpec()
.addNewVolume()
.withName(serverSecretVolume)
.withNewSecret()
.withSecretName(serverSecretName)
.endSecret()
.endVolume()
.endSpec()

containerBuilder
.addNewEnv()
.withName(ENV_MAPR_METRICSFILE_LOCATION)
.withValue(MAPR_METRICS_TICKET_MOUNT_PATH)
.endEnv()
.addNewVolumeMount()
.withName(serverSecretVolume)
.withMountPath(MAPR_METRICS_TICKET_MOUNT_PATH)
.withSubPath(MAPR_METRICS_TICKET_SUBPATH)
.endVolumeMount()
}

private def applyClusterConfigMap(podBuilder: PodBuilder, containerBuilder: ContainerBuilder) = {
val clusterConfMap = sparkConf.get(MAPR_CLUSTER_CONFIGMAP).toString

containerBuilder
.addNewEnvFrom()
.withNewConfigMapRef()
.withName(clusterConfMap)
.endConfigMapRef()
.endEnvFrom()
.addNewEnvFrom()
.withNewSecretRef()
.withName(userSecret)
.endSecretRef()
.endEnvFrom()
.build()
.endConfigMapRef()
.endEnvFrom()
}

SparkPod(maprPod, maprContainer)
private def addClusterEnvs(podBuilder: PodBuilder, containerBuilder: ContainerBuilder) = {
val clusterEnvs = sparkConf.getAllWithPrefix(KUBERNETES_CLUSTER_ENV_KEY).toSeq
.map { case (name, value) =>
new EnvVarBuilder()
.withName(name)
.withValue(value)
.build()
}

containerBuilder
.addAllToEnv(clusterEnvs.asJava)
.build()
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
Expand Down

0 comments on commit 7f35c27

Please sign in to comment.