Skip to content

Commit

Permalink
[SPARK-25501][SS] Add kafka delegation token support.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

It adds kafka delegation token support for structured streaming. Please see the relevant [SPIP](https://docs.google.com/document/d/1ouRayzaJf_N5VQtGhVq9FURXVmRpXzEEWYHob0ne3NY/edit?usp=sharing)

What this PR contains:
* Configuration parameters for the feature
* Delegation token fetching from broker
* Usage of token through dynamic JAAS configuration
* Minor refactoring in the existing code

What this PR doesn't contain:
* Documentation changes because design can change

## How was this patch tested?

Existing tests + added small amount of additional unit tests.

Because it's an external service integration mainly tested on cluster.
* 4 node cluster
* Kafka broker version 1.1.0
* Topic with 4 partitions
* security.protocol = SASL_SSL
* sasl.mechanism = SCRAM-SHA-256

An example of obtaining a token:
```
18/10/01 01:07:49 INFO kafka010.TokenUtil: TOKENID         HMAC                           OWNER           RENEWERS                  ISSUEDATE       EXPIRYDATE      MAXDATE
18/10/01 01:07:49 INFO kafka010.TokenUtil: D1-v__Q5T_uHx55rW16Jwg [hidden] User:user    []                        2018-10-01T01:07 2018-10-02T01:07 2018-10-08T01:07
18/10/01 01:07:49 INFO security.KafkaDelegationTokenProvider: Get token from Kafka: Kind: KAFKA_DELEGATION_TOKEN, Service: kafka.server.delegation.token, Ident: 44 31 2d 76 5f 5f 51 35 54 5f 75 48 78 35 35 72 57 31 36 4a 77 67
```

An example token usage:
```
18/10/01 01:08:07 INFO kafka010.KafkaSecurityHelper: Scram JAAS params: org.apache.kafka.common.security.scram.ScramLoginModule required tokenauth=true serviceName="kafka" username="D1-v__Q5T_uHx55rW16Jwg" password="[hidden]";
18/10/01 01:08:07 INFO kafka010.KafkaSourceProvider: Delegation token detected, using it for login.
```

Closes #22598 from gaborgsomogyi/SPARK-25501.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
gaborgsomogyi authored and Marcelo Vanzin committed Nov 30, 2018
1 parent f97326b commit 0166c73
Show file tree
Hide file tree
Showing 15 changed files with 825 additions and 50 deletions.
13 changes: 13 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,19 @@
<scope>provided</scope>
</dependency>

<!--
The following kafka dependency used to obtain delegation token.
In order to prevent spark-core from depending on kafka, these deps have been placed in the
"provided" scope, rather than the "compile" scope, and NoClassDefFoundError exceptions are
handled when the user explicitly use neither spark-streaming-kafka nor spark-sql-kafka modules.
-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ private[spark] class HadoopDelegationTokenManager(
new HadoopFSDelegationTokenProvider(
() => HadoopDelegationTokenManager.this.fileSystemsToAccess())) ++
safeCreateProvider(new HiveDelegationTokenProvider) ++
safeCreateProvider(new HBaseDelegationTokenProvider)
safeCreateProvider(new HBaseDelegationTokenProvider) ++
safeCreateProvider(new KafkaDelegationTokenProvider)

// Filter out providers for which spark.security.credentials.{service}.enabled is false.
providers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import scala.language.existentials
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._

private[security] class KafkaDelegationTokenProvider
extends HadoopDelegationTokenProvider with Logging {

override def serviceName: String = "kafka"

override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
try {
logDebug("Attempting to fetch Kafka security token.")
val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf)
creds.addToken(token.getService, token)
return Some(nextRenewalDate)
} catch {
case NonFatal(e) =>
logInfo(s"Failed to get token from service $serviceName", e)
}
None
}

override def delegationTokensRequired(
sparkConf: SparkConf,
hadoopConf: Configuration): Boolean = {
val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL)
sparkConf.contains(Kafka.BOOTSTRAP_SERVERS) &&
(protocol == SASL_SSL.name ||
protocol == SSL.name ||
protocol == SASL_PLAINTEXT.name)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import java.{ util => ju }
import java.text.SimpleDateFormat

import scala.util.control.NonFatal

import org.apache.hadoop.io.Text
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
import org.apache.kafka.common.security.token.delegation.DelegationToken

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._

private[spark] object KafkaTokenUtil extends Logging {
val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
val TOKEN_SERVICE = new Text("kafka.server.delegation.token")

private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
override def getKind: Text = TOKEN_KIND
}

private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = {
val adminClient = AdminClient.create(createAdminClientProperties(sparkConf))
val createDelegationTokenOptions = new CreateDelegationTokenOptions()
val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
val token = createResult.delegationToken().get()
printToken(token)

(new Token[KafkaDelegationTokenIdentifier](
token.tokenInfo.tokenId.getBytes,
token.hmacAsBase64String.getBytes,
TOKEN_KIND,
TOKEN_SERVICE
), token.tokenInfo.expiryTimestamp)
}

private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = {
val adminClientProperties = new ju.Properties

val bootstrapServers = sparkConf.get(Kafka.BOOTSTRAP_SERVERS)
require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " +
"servers not configured.")
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get)

val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL)
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol)
protocol match {
case SASL_SSL.name =>
setTrustStoreProperties(sparkConf, adminClientProperties)

case SSL.name =>
setTrustStoreProperties(sparkConf, adminClientProperties)
setKeyStoreProperties(sparkConf, adminClientProperties)
logWarning("Obtaining kafka delegation token with SSL protocol. Please " +
"configure 2-way authentication on the broker side.")

case SASL_PLAINTEXT.name =>
logWarning("Obtaining kafka delegation token through plain communication channel. Please " +
"consider the security impact.")
}

// There are multiple possibilities to log in and applied in the following order:
// - JVM global security provided -> try to log in with JVM global security configuration
// which can be configured for example with 'java.security.auth.login.config'.
// For this no additional parameter needed.
// - Keytab is provided -> try to log in with kerberos module and keytab using kafka's dynamic
// JAAS configuration.
// - Keytab not provided -> try to log in with kerberos module and ticket cache using kafka's
// dynamic JAAS configuration.
// Kafka client is unable to use subject from JVM which already logged in
// to kdc (see KAFKA-7677)
if (isGlobalJaasConfigurationProvided) {
logDebug("JVM global security configuration detected, using it for login.")
} else {
adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
if (sparkConf.contains(KEYTAB)) {
logDebug("Keytab detected, using it for login.")
val jaasParams = getKeytabJaasParams(sparkConf)
adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
} else {
logDebug("Using ticket cache for login.")
val jaasParams = getTicketCacheJaasParams(sparkConf)
adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
}
}

adminClientProperties
}

def isGlobalJaasConfigurationProvided: Boolean = {
try {
JaasContext.loadClientContext(ju.Collections.emptyMap[String, Object]())
true
} catch {
case NonFatal(_) => false
}
}

private def setTrustStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
sparkConf.get(Kafka.TRUSTSTORE_LOCATION).foreach { truststoreLocation =>
properties.put("ssl.truststore.location", truststoreLocation)
}
sparkConf.get(Kafka.TRUSTSTORE_PASSWORD).foreach { truststorePassword =>
properties.put("ssl.truststore.password", truststorePassword)
}
}

private def setKeyStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
sparkConf.get(Kafka.KEYSTORE_LOCATION).foreach { keystoreLocation =>
properties.put("ssl.keystore.location", keystoreLocation)
}
sparkConf.get(Kafka.KEYSTORE_PASSWORD).foreach { keystorePassword =>
properties.put("ssl.keystore.password", keystorePassword)
}
sparkConf.get(Kafka.KEY_PASSWORD).foreach { keyPassword =>
properties.put("ssl.key.password", keyPassword)
}
}

private[security] def getKeytabJaasParams(sparkConf: SparkConf): String = {
val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
require(serviceName.nonEmpty, "Kerberos service name must be defined")

val params =
s"""
|${getKrb5LoginModuleName} required
| useKeyTab=true
| serviceName="${serviceName.get}"
| keyTab="${sparkConf.get(KEYTAB).get}"
| principal="${sparkConf.get(PRINCIPAL).get}";
""".stripMargin.replace("\n", "")
logDebug(s"Krb keytab JAAS params: $params")
params
}

def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
require(serviceName.nonEmpty, "Kerberos service name must be defined")

val params =
s"""
|${getKrb5LoginModuleName} required
| useTicketCache=true
| serviceName="${serviceName.get}";
""".stripMargin.replace("\n", "")
logDebug(s"Krb ticket cache JAAS params: $params")
params
}

/**
* Krb5LoginModule package vary in different JVMs.
* Please see Hadoop UserGroupInformation for further details.
*/
private def getKrb5LoginModuleName(): String = {
if (System.getProperty("java.vendor").contains("IBM")) {
"com.ibm.security.auth.module.Krb5LoginModule"
} else {
"com.sun.security.auth.module.Krb5LoginModule"
}
}

private def printToken(token: DelegationToken): Unit = {
if (log.isDebugEnabled) {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
val tokenInfo = token.tokenInfo
logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
tokenInfo.tokenId,
tokenInfo.owner,
tokenInfo.renewersAsString,
dateFormat.format(tokenInfo.issueTimestamp),
dateFormat.format(tokenInfo.expiryTimestamp),
dateFormat.format(tokenInfo.maxTimestamp)))
}
}
}
82 changes: 82 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.config

private[spark] object Kafka {

val BOOTSTRAP_SERVERS =
ConfigBuilder("spark.kafka.bootstrap.servers")
.doc("A list of coma separated host/port pairs to use for establishing the initial " +
"connection to the Kafka cluster. For further details please see kafka documentation. " +
"Only used to obtain delegation token.")
.stringConf
.createOptional

val SECURITY_PROTOCOL =
ConfigBuilder("spark.kafka.security.protocol")
.doc("Protocol used to communicate with brokers. For further details please see kafka " +
"documentation. Only used to obtain delegation token.")
.stringConf
.createWithDefault("SASL_SSL")

val KERBEROS_SERVICE_NAME =
ConfigBuilder("spark.kafka.sasl.kerberos.service.name")
.doc("The Kerberos principal name that Kafka runs as. This can be defined either in " +
"Kafka's JAAS config or in Kafka's config. For further details please see kafka " +
"documentation. Only used to obtain delegation token.")
.stringConf
.createOptional

val TRUSTSTORE_LOCATION =
ConfigBuilder("spark.kafka.ssl.truststore.location")
.doc("The location of the trust store file. For further details please see kafka " +
"documentation. Only used to obtain delegation token.")
.stringConf
.createOptional

val TRUSTSTORE_PASSWORD =
ConfigBuilder("spark.kafka.ssl.truststore.password")
.doc("The store password for the trust store file. This is optional for client and only " +
"needed if ssl.truststore.location is configured. For further details please see kafka " +
"documentation. Only used to obtain delegation token.")
.stringConf
.createOptional

val KEYSTORE_LOCATION =
ConfigBuilder("spark.kafka.ssl.keystore.location")
.doc("The location of the key store file. This is optional for client and can be used for " +
"two-way authentication for client. For further details please see kafka documentation. " +
"Only used to obtain delegation token.")
.stringConf
.createOptional

val KEYSTORE_PASSWORD =
ConfigBuilder("spark.kafka.ssl.keystore.password")
.doc("The store password for the key store file. This is optional for client and only " +
"needed if ssl.keystore.location is configured. For further details please see kafka " +
"documentation. Only used to obtain delegation token.")
.stringConf
.createOptional

val KEY_PASSWORD =
ConfigBuilder("spark.kafka.ssl.key.password")
.doc("The password of the private key in the key store file. This is optional for client. " +
"For further details please see kafka documentation. Only used to obtain delegation token.")
.stringConf
.createOptional
}
Loading

0 comments on commit 0166c73

Please sign in to comment.