Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25501][SS] Add kafka delegation token support. #22598

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f9b4685
[SPARK-25501][SS] Add kafka delegation token support.
gaborgsomogyi Oct 1, 2018
2a79d59
Review fixes:
gaborgsomogyi Oct 2, 2018
a4ab4f5
Review fixes:
gaborgsomogyi Oct 8, 2018
8c860fe
Small cosmetic change
gaborgsomogyi Oct 8, 2018
a2c4397
Leftover from last review fix commit
gaborgsomogyi Oct 9, 2018
fffd139
Review fixes:
gaborgsomogyi Oct 10, 2018
7c22433
Review fixes:
gaborgsomogyi Oct 15, 2018
d7af8ef
Compile fix
gaborgsomogyi Oct 24, 2018
1badd33
Merge branch 'master' into SPARK-25501
gaborgsomogyi Nov 5, 2018
caf7f1f
Providing renewal date
gaborgsomogyi Nov 5, 2018
deea82f
Compile fix
gaborgsomogyi Nov 6, 2018
d938594
Move TokenUtil to core with provided kafka dependency to make DStream…
gaborgsomogyi Nov 8, 2018
17d25f2
Compile fix
gaborgsomogyi Nov 8, 2018
47d79ef
Compile fix
gaborgsomogyi Nov 8, 2018
b4541cd
Review fixes
gaborgsomogyi Nov 15, 2018
2a0cdb7
Review fixes:
gaborgsomogyi Nov 15, 2018
396c80a
Merge branch 'master' into SPARK-25501
gaborgsomogyi Nov 21, 2018
5741917
Update kafka version to 2.1.0 because SPARK-25954 just resolved
gaborgsomogyi Nov 21, 2018
30df8f1
Add SSL protocol support with two-way authentication
gaborgsomogyi Nov 22, 2018
36d05d2
Review fixes:
gaborgsomogyi Nov 27, 2018
cbff31c
Add ticket cache dymanic configuration
gaborgsomogyi Nov 28, 2018
1eceaa9
Merge branch 'master' into SPARK-25501
gaborgsomogyi Nov 29, 2018
2a0b1c3
Remove token cache usage without delegation token
gaborgsomogyi Nov 29, 2018
c0519f8
Review fixes:
gaborgsomogyi Nov 29, 2018
a122865
Remove unnecessary modifiers
gaborgsomogyi Nov 29, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,19 @@
<scope>provided</scope>
</dependency>

<!--
The following kafka dependency used to obtain delegation token.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm starting to think we should pull all this kerberos stuff into a separate module... then we wouldn't need this kind of hack in core.

That's obviously for a separate discussion, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fully agree. Neither copy-paste nor having provided dependency in core is a clean solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for it in one place, easier to maintain what is a security-critical piece of code. -1 if ends up something like UGI which everyone is scared to go near

In order to prevent spark-core from depending on kafka, these deps have been placed in the
"provided" scope, rather than the "compile" scope, and NoClassDefFoundError exceptions are
handled when the user explicitly use neither spark-streaming-kafka nor spark-sql-kafka modules.
-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ private[spark] class HadoopDelegationTokenManager(
new HadoopFSDelegationTokenProvider(
() => HadoopDelegationTokenManager.this.fileSystemsToAccess())) ++
safeCreateProvider(new HiveDelegationTokenProvider) ++
safeCreateProvider(new HBaseDelegationTokenProvider)
safeCreateProvider(new HBaseDelegationTokenProvider) ++
safeCreateProvider(new KafkaDelegationTokenProvider)

// Filter out providers for which spark.security.credentials.{service}.enabled is false.
providers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import scala.language.existentials
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL}

private[security] class KafkaDelegationTokenProvider
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
extends HadoopDelegationTokenProvider with Logging {

override def serviceName: String = "kafka"

override def obtainDelegationTokens(
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
try {
logDebug("Attempting to fetch Kafka security token.")
val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf)
creds.addToken(token.getService, token)
return Some(nextRenewalDate)
} catch {
case NonFatal(e) =>
logInfo(s"Failed to get token from service $serviceName", e)
}
None
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
}

override def delegationTokensRequired(
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
sparkConf: SparkConf,
hadoopConf: Configuration): Boolean = {
val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
sparkConf.contains(KAFKA_BOOTSTRAP_SERVERS) &&
(protocol == SASL_SSL.name ||
protocol == SSL.name ||
protocol == SASL_PLAINTEXT.name)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import java.{ util => ju }
import java.text.SimpleDateFormat

import scala.util.control.NonFatal

import org.apache.hadoop.io.Text
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions}
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
import org.apache.kafka.common.security.token.delegation.DelegationToken

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._

private[spark] object KafkaTokenUtil extends Logging {
val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
val TOKEN_SERVICE = new Text("kafka.server.delegation.token")

private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
override def getKind: Text = TOKEN_KIND
}

private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = {
val adminClient = AdminClient.create(createAdminClientProperties(sparkConf))
val createDelegationTokenOptions = new CreateDelegationTokenOptions()
val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
val token = createResult.delegationToken().get()
printToken(token)

(new Token[KafkaDelegationTokenIdentifier](
token.tokenInfo.tokenId.getBytes,
token.hmacAsBase64String.getBytes,
TOKEN_KIND,
TOKEN_SERVICE
), token.tokenInfo.expiryTimestamp)
}

private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = {
val adminClientProperties = new ju.Properties

val bootstrapServers = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS)
require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " +
"servers not configured.")
adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get)

val protocol = sparkConf.get(KAFKA_SECURITY_PROTOCOL)
adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol)
protocol match {
case SASL_SSL.name =>
setTrustStoreProperties(sparkConf, adminClientProperties)

case SSL.name =>
setTrustStoreProperties(sparkConf, adminClientProperties)
setKeyStoreProperties(sparkConf, adminClientProperties)
logWarning("Obtaining kafka delegation token with SSL protocol. Please " +
"configure 2-way authentication on the broker side.")

case SASL_PLAINTEXT.name =>
logWarning("Obtaining kafka delegation token through plain communication channel. Please " +
"consider the security impact.")
}

// There are multiple possibilities to log in and applied in the following order:
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
// - JVM global security provided -> try to log in with JVM global security configuration
// which can be configured for example with 'java.security.auth.login.config'.
// For this no additional parameter needed.
// - Keytab is provided -> try to log in with kerberos module and keytab using kafka's dynamic
// JAAS configuration.
// - Keytab not provided -> try to log in with kerberos module and ticket cache using kafka's
// dynamic JAAS configuration.
if (isGlobalJaasConfigurationProvided) {
logDebug("JVM global security configuration detected, using it for login.")
} else {
adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
if (sparkConf.contains(KEYTAB)) {
logDebug("Keytab detected, using it for login.")
val jaasParams = getKeytabJaasParams(sparkConf)
adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
} else {
logDebug("Using ticket cache for login.")
val jaasParams = getTicketCacheJaasParams(sparkConf)
adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
}
}

adminClientProperties
}

def isGlobalJaasConfigurationProvided: Boolean = {
try {
JaasContext.loadClientContext(ju.Collections.emptyMap[String, Object]())
true
} catch {
case NonFatal(_) => false
}
}

private def setTrustStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
sparkConf.get(KAFKA_TRUSTSTORE_LOCATION).foreach { truststoreLocation =>
properties.put("ssl.truststore.location", truststoreLocation)
}
sparkConf.get(KAFKA_TRUSTSTORE_PASSWORD).foreach { truststorePassword =>
properties.put("ssl.truststore.password", truststorePassword)
}
}

private def setKeyStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
sparkConf.get(KAFKA_KEYSTORE_LOCATION).foreach { keystoreLocation =>
properties.put("ssl.keystore.location", keystoreLocation)
}
sparkConf.get(KAFKA_KEYSTORE_PASSWORD).foreach { keystorePassword =>
properties.put("ssl.keystore.password", keystorePassword)
}
sparkConf.get(KAFKA_KEY_PASSWORD).foreach { keyPassword =>
properties.put("ssl.key.password", keyPassword)
}
}

private[security] def getKeytabJaasParams(sparkConf: SparkConf): String = {
val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME)
require(serviceName.nonEmpty, "Kerberos service name must be defined")

val params =
s"""
|${getKrb5LoginModuleName} required
| useKeyTab=true
| serviceName="${serviceName.get}"
| keyTab="${sparkConf.get(KEYTAB).get}"
| principal="${sparkConf.get(PRINCIPAL).get}";
""".stripMargin.replace("\n", "")
logDebug(s"Krb keytab JAAS params: $params")
params
}

def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME)
require(serviceName.nonEmpty, "Kerberos service name must be defined")

val params =
s"""
|${getKrb5LoginModuleName} required
| useTicketCache=true
| serviceName="${serviceName.get}";
""".stripMargin.replace("\n", "")
logDebug(s"Krb ticket cache JAAS params: $params")
params
}

/**
* Krb5LoginModule package vary in different JVMs.
* Please see Hadoop UserGroupInformation for further details.
*/
private def getKrb5LoginModuleName(): String = {
if (System.getProperty("java.vendor").contains("IBM")) {
"com.ibm.security.auth.module.Krb5LoginModule"
} else {
"com.sun.security.auth.module.Krb5LoginModule"
}
}

private def printToken(token: DelegationToken): Unit = {
if (log.isDebugEnabled) {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
val tokenInfo = token.tokenInfo
logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
tokenInfo.tokenId,
tokenInfo.owner,
tokenInfo.renewersAsString,
dateFormat.format(tokenInfo.issueTimestamp),
dateFormat.format(tokenInfo.expiryTimestamp),
dateFormat.format(tokenInfo.maxTimestamp)))
}
}
}
61 changes: 61 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -688,4 +688,65 @@ package object config {
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val KAFKA_BOOTSTRAP_SERVERS =
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
ConfigBuilder("spark.kafka.bootstrap.servers")
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
.doc("A list of coma separated host/port pairs to use for establishing the initial " +
"connection to the Kafka cluster. For further details please see kafka documentation. " +
"Only used to obtain delegation token.")
.stringConf
.createOptional

private[spark] val KAFKA_SECURITY_PROTOCOL =
ConfigBuilder("spark.kafka.security.protocol")
.doc("Protocol used to communicate with brokers. For further details please see kafka " +
"documentation. Only used to obtain delegation token.")
.stringConf
.createWithDefault("SASL_SSL")

private[spark] val KAFKA_KERBEROS_SERVICE_NAME =
ConfigBuilder("spark.kafka.sasl.kerberos.service.name")
.doc("The Kerberos principal name that Kafka runs as. This can be defined either in " +
"Kafka's JAAS config or in Kafka's config. For further details please see kafka " +
"documentation. Only used to obtain delegation token.")
.stringConf
.createOptional

private[spark] val KAFKA_TRUSTSTORE_LOCATION =
ConfigBuilder("spark.kafka.ssl.truststore.location")
.doc("The location of the trust store file. For further details please see kafka " +
"documentation. Only used to obtain delegation token.")
.stringConf
.createOptional

private[spark] val KAFKA_TRUSTSTORE_PASSWORD =
ConfigBuilder("spark.kafka.ssl.truststore.password")
.doc("The store password for the trust store file. This is optional for client and only " +
"needed if ssl.truststore.location is configured. For further details please see kafka " +
"documentation. Only used to obtain delegation token.")
.stringConf
.createOptional

private[spark] val KAFKA_KEYSTORE_LOCATION =
ConfigBuilder("spark.kafka.ssl.keystore.location")
.doc("The location of the key store file. This is optional for client and can be used for " +
"two-way authentication for client. For further details please see kafka documentation. " +
"Only used to obtain delegation token.")
.stringConf
.createOptional

private[spark] val KAFKA_KEYSTORE_PASSWORD =
ConfigBuilder("spark.kafka.ssl.keystore.password")
.doc("The store password for the key store file. This is optional for client and only " +
"needed if ssl.keystore.location is configured. For further details please see kafka " +
"documentation. Only used to obtain delegation token.")
.stringConf
.createOptional

private[spark] val KAFKA_KEY_PASSWORD =
ConfigBuilder("spark.kafka.ssl.key.password")
.doc("The password of the private key in the key store file. This is optional for client. " +
"For further details please see kafka documentation. Only used to obtain delegation token.")
.stringConf
.createOptional
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.deploy.security

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.util.Utils
Expand All @@ -33,6 +31,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
assert(manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(manager.isProviderLoaded("hive"))
assert(manager.isProviderLoaded("kafka"))
}

test("disable hive credential provider") {
Expand All @@ -41,6 +40,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
assert(manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(!manager.isProviderLoaded("hive"))
assert(manager.isProviderLoaded("kafka"))
}

test("using deprecated configurations") {
Expand All @@ -51,6 +51,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
assert(!manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(!manager.isProviderLoaded("hive"))
assert(manager.isProviderLoaded("kafka"))
}

test("SPARK-23209: obtain tokens when Hive classes are not available") {
Expand Down
Loading