Skip to content

Commit

Permalink
[SPARK-25815][K8S] Support kerberos in client mode, keytab-based toke…
Browse files Browse the repository at this point in the history
…n renewal.

This change hooks up the k8s backed to the updated HadoopDelegationTokenManager,
so that delegation tokens are also available in client mode, and keytab-based token
renewal is enabled.

The change re-works the k8s feature steps related to kerberos so
that the driver does all the credential management and provides all
the needed information to executors - so nothing needs to be added
to executor pods. This also makes cluster mode behave a lot more
similarly to client mode, since no driver-related config steps are run
in the latter case.

The main two things that don't need to happen in executors anymore are:

- adding the Hadoop config to the executor pods: this is not needed
  since the Spark driver will serialize the Hadoop config and send
  it to executors when running tasks.

- mounting the kerberos config file in the executor pods: this is
  not needed once you remove the above. The Hadoop conf sent by
  the driver with the tasks is already resolved (i.e. has all the
  kerberos names properly defined), so executors do not need access
  to the kerberos realm information anymore.

The change also avoids creating delegation tokens unnecessarily.
This means that they'll only be created if a secret with tokens
was not provided, and if a keytab is not provided. In either of
those cases, the driver code will handle delegation tokens: in
cluster mode by creating a secret and stashing them, in client
mode by using existing mechanisms to send DTs to executors.

One last feature: the change also allows defining a keytab with
a "local:" URI. This is supported in client mode (although that's
the same as not saying "local:"), and in k8s cluster mode. This
allows the keytab to be mounted onto the image from a pre-existing
secret, for example.

Finally, the new code always sets SPARK_USER in the driver and
executor pods. This is in line with how other resource managers
behave: the submitting user reflects which user will access
Hadoop services in the app. (With kerberos, that's overridden
by the logged in user.) That user is unrelated to the OS user
the app is running as inside the containers.

Tested:
- client and cluster mode with kinit
- cluster mode with keytab
- cluster mode with local: keytab
- YARN cluster with keytab (to make sure it isn't broken)

Closes #22911 from vanzin/SPARK-25815.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
Marcelo Vanzin committed Dec 18, 2018
1 parent 428eb2a commit 4b3fe3a
Show file tree
Hide file tree
Showing 25 changed files with 649 additions and 621 deletions.
29 changes: 15 additions & 14 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy

import java.io._
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.net.{URI, URL}
import java.security.PrivilegedExceptionAction
import java.text.ParseException
import java.util.UUID
Expand Down Expand Up @@ -334,19 +334,20 @@ private[spark] class SparkSubmit extends Logging {
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient || isKubernetesCluster) {
if (args.principal != null) {
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sparkConf.set(KEYTAB, args.keytab)
sparkConf.set(PRINCIPAL, args.principal)
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
// Kerberos is not supported in standalone mode, and keytab support is not yet available
// in Mesos cluster mode.
if (clusterManager != STANDALONE
&& !isMesosCluster
&& args.principal != null
&& args.keytab != null) {
// If client mode, make sure the keytab is just a local path.
if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) {
args.keytab = new URI(args.keytab).getPath()
}

if (!Utils.isLocalUri(args.keytab)) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.security

import java.io.File
import java.net.URI
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
Expand Down Expand Up @@ -71,11 +72,13 @@ private[spark] class HadoopDelegationTokenManager(
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"

private val principal = sparkConf.get(PRINCIPAL).orNull
private val keytab = sparkConf.get(KEYTAB).orNull

// The keytab can be a local: URI for cluster mode, so translate it to a regular path. If it is
// needed later on, the code will check that it exists.
private val keytab = sparkConf.get(KEYTAB).map { uri => new URI(uri).getPath() }.orNull

require((principal == null) == (keytab == null),
"Both principal and keytab must be defined, or neither.")
require(keytab == null || new File(keytab).isFile(), s"Cannot find keytab at $keytab.")

private val delegationTokenProviders = loadProviders()
logDebug("Using the following builtin delegation token providers: " +
Expand Down Expand Up @@ -264,6 +267,7 @@ private[spark] class HadoopDelegationTokenManager(

private def doLogin(): UserGroupInformation = {
logInfo(s"Attempting to login to KDC using principal: $principal")
require(new File(keytab).isFile(), s"Cannot find keytab at $keytab.")
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
logInfo("Successfully logged into KDC.")
ugi
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ private[spark] object Utils extends Logging {
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null

/** Scheme used for files that are locally available on worker nodes in the cluster. */
val LOCAL_SCHEME = "local"

/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
Expand Down Expand Up @@ -2829,6 +2832,11 @@ private[spark] object Utils extends Logging {
def isClientMode(conf: SparkConf): Boolean = {
"client".equals(conf.get(SparkLauncher.DEPLOY_MODE, "client"))
}

/** Returns whether the URI is a "local:" URI. */
def isLocalUri(uri: String): Boolean = {
uri.startsWith(s"$LOCAL_SCHEME:")
}
}

private[util] object CallerContext extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,22 @@ private[spark] object Constants {
val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d

// Hadoop Configuration
val HADOOP_FILE_VOLUME = "hadoop-properties"
val HADOOP_CONF_VOLUME = "hadoop-properties"
val KRB_FILE_VOLUME = "krb5-file"
val HADOOP_CONF_DIR_PATH = "/opt/hadoop/conf"
val KRB_FILE_DIR_PATH = "/etc"
val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
val HADOOP_CONFIG_MAP_NAME =
"spark.kubernetes.executor.hadoopConfigMapName"
val KRB5_CONFIG_MAP_NAME =
"spark.kubernetes.executor.krb5ConfigMapName"

// Kerberos Configuration
val KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME = "delegation-tokens"
val KERBEROS_DT_SECRET_NAME =
"spark.kubernetes.kerberos.dt-secret-name"
val KERBEROS_DT_SECRET_KEY =
"spark.kubernetes.kerberos.dt-secret-key"
val KERBEROS_SPARK_USER_NAME =
"spark.kubernetes.kerberos.spark-user-name"
val KERBEROS_SECRET_KEY = "hadoop-tokens"
val KERBEROS_KEYTAB_VOLUME = "kerberos-keytab"
val KERBEROS_KEYTAB_MOUNT_POINT = "/mnt/secrets/kerberos-keytab"

// Hadoop credentials secrets for the Spark app.
val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {

def appName: String = get("spark.app.name", "spark")

def hadoopConfigMapName: String = s"$resourceNamePrefix-hadoop-config"

def krbConfigMapName: String = s"$resourceNamePrefix-krb5-file"

def namespace: String = get(KUBERNETES_NAMESPACE)

def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,30 @@ package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}

private[spark] case class SparkPod(pod: Pod, container: Container)
private[spark] case class SparkPod(pod: Pod, container: Container) {

/**
* Convenience method to apply a series of chained transformations to a pod.
*
* Use it like:
*
* original.modify { case pod =>
* // update pod and return new one
* }.modify { case pod =>
* // more changes that create a new pod
* }.modify {
* case pod if someCondition => // new pod
* }
*
* This makes it cleaner to apply multiple transformations, avoiding having to create
* a bunch of awkwardly-named local variables. Since the argument is a partial function,
* it can do matching without needing to exhaust all the possibilities. If the function
* is not applied, then the original pod will be kept.
*/
def transform(fn: PartialFunction[SparkPod, SparkPod]): SparkPod = fn.lift(this).getOrElse(this)

}


private[spark] object SparkPod {
def initialPod(): SparkPod = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
.withContainerPort(driverUIPort)
.withProtocol("TCP")
.endPort()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(Utils.getCurrentUserName())
.endEnv()
.addAllToEnv(driverCustomEnvs.asJava)
.addNewEnv()
.withName(ENV_DRIVER_BIND_ADDRESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ private[spark] class BasicExecutorFeatureStep(
.addToLimits("memory", executorMemoryQuantity)
.addToRequests("cpu", executorCpuQuantity)
.endResources()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(Utils.getCurrentUserName())
.endEnv()
.addAllToEnv(executorEnv.asJava)
.withPorts(requiredPorts.asJava)
.addToArgs("executor")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import java.io.File
import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._

import com.google.common.io.Files
import io.fabric8.kubernetes.api.model._

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._

/**
* Mounts the Hadoop configuration - either a pre-defined config map, or a local configuration
* directory - on the driver pod.
*/
private[spark] class HadoopConfDriverFeatureStep(conf: KubernetesConf)
extends KubernetesFeatureConfigStep {

private val confDir = Option(conf.sparkConf.getenv(ENV_HADOOP_CONF_DIR))
private val existingConfMap = conf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)

KubernetesUtils.requireNandDefined(
confDir,
existingConfMap,
"Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " +
"as the creation of an additional ConfigMap, when one is already specified is extraneous")

private lazy val confFiles: Seq[File] = {
val dir = new File(confDir.get)
if (dir.isDirectory) {
dir.listFiles.filter(_.isFile).toSeq
} else {
Nil
}
}

private def newConfigMapName: String = s"${conf.resourceNamePrefix}-hadoop-config"

private def hasHadoopConf: Boolean = confDir.isDefined || existingConfMap.isDefined

override def configurePod(original: SparkPod): SparkPod = {
original.transform { case pod if hasHadoopConf =>
val confVolume = if (confDir.isDefined) {
val keyPaths = confFiles.map { file =>
new KeyToPathBuilder()
.withKey(file.getName())
.withPath(file.getName())
.build()
}
new VolumeBuilder()
.withName(HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(newConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
.build()
} else {
new VolumeBuilder()
.withName(HADOOP_CONF_VOLUME)
.withNewConfigMap()
.withName(existingConfMap.get)
.endConfigMap()
.build()
}

val podWithConf = new PodBuilder(pod.pod)
.editSpec()
.addNewVolumeLike(confVolume)
.endVolume()
.endSpec()
.build()

val containerWithMount = new ContainerBuilder(pod.container)
.addNewVolumeMount()
.withName(HADOOP_CONF_VOLUME)
.withMountPath(HADOOP_CONF_DIR_PATH)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_CONF_DIR)
.withValue(HADOOP_CONF_DIR_PATH)
.endEnv()
.build()

SparkPod(podWithConf, containerWithMount)
}
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (confDir.isDefined) {
val fileMap = confFiles.map { file =>
(file.getName(), Files.toString(file, StandardCharsets.UTF_8))
}.toMap.asJava

Seq(new ConfigMapBuilder()
.withNewMetadata()
.withName(newConfigMapName)
.endMetadata()
.addToData(fileMap)
.build())
} else {
Nil
}
}

}

This file was deleted.

This file was deleted.

Loading

0 comments on commit 4b3fe3a

Please sign in to comment.