Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25501][SS] Add kafka delegation token support. #22598

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f9b4685
[SPARK-25501][SS] Add kafka delegation token support.
gaborgsomogyi Oct 1, 2018
2a79d59
Review fixes:
gaborgsomogyi Oct 2, 2018
a4ab4f5
Review fixes:
gaborgsomogyi Oct 8, 2018
8c860fe
Small cosmetic change
gaborgsomogyi Oct 8, 2018
a2c4397
Leftover from last review fix commit
gaborgsomogyi Oct 9, 2018
fffd139
Review fixes:
gaborgsomogyi Oct 10, 2018
7c22433
Review fixes:
gaborgsomogyi Oct 15, 2018
d7af8ef
Compile fix
gaborgsomogyi Oct 24, 2018
1badd33
Merge branch 'master' into SPARK-25501
gaborgsomogyi Nov 5, 2018
caf7f1f
Providing renewal date
gaborgsomogyi Nov 5, 2018
deea82f
Compile fix
gaborgsomogyi Nov 6, 2018
d938594
Move TokenUtil to core with provided kafka dependency to make DStream…
gaborgsomogyi Nov 8, 2018
17d25f2
Compile fix
gaborgsomogyi Nov 8, 2018
47d79ef
Compile fix
gaborgsomogyi Nov 8, 2018
b4541cd
Review fixes
gaborgsomogyi Nov 15, 2018
2a0cdb7
Review fixes:
gaborgsomogyi Nov 15, 2018
396c80a
Merge branch 'master' into SPARK-25501
gaborgsomogyi Nov 21, 2018
5741917
Update kafka version to 2.1.0 because SPARK-25954 just resolved
gaborgsomogyi Nov 21, 2018
30df8f1
Add SSL protocol support with two-way authentication
gaborgsomogyi Nov 22, 2018
36d05d2
Review fixes:
gaborgsomogyi Nov 27, 2018
cbff31c
Add ticket cache dymanic configuration
gaborgsomogyi Nov 28, 2018
1eceaa9
Merge branch 'master' into SPARK-25501
gaborgsomogyi Nov 29, 2018
2a0b1c3
Remove token cache usage without delegation token
gaborgsomogyi Nov 29, 2018
c0519f8
Review fixes:
gaborgsomogyi Nov 29, 2018
a122865
Remove unnecessary modifiers
gaborgsomogyi Nov 29, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager(
private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
safeCreateProvider(new HiveDelegationTokenProvider) ++
safeCreateProvider(new HBaseDelegationTokenProvider)
safeCreateProvider(new HBaseDelegationTokenProvider) ++
safeCreateProvider(new KafkaDelegationTokenProvider)
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved

// Filter out providers for which spark.security.credentials.{service}.enabled is false.
providers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import scala.reflect.runtime.universe
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.{Token, TokenIdentifier}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{KAFKA_BOOTSTRAP_SERVERS, KAFKA_SECURITY_PROTOCOL}
import org.apache.spark.util.Utils

private[security] class KafkaDelegationTokenProvider
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
extends HadoopDelegationTokenProvider with Logging {

override def serviceName: String = "kafka"

override def obtainDelegationTokens(
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
try {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
val obtainToken = mirror.classLoader.
loadClass("org.apache.spark.sql.kafka010.TokenUtil").
getMethod("obtainToken", classOf[SparkConf])
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved

logDebug("Attempting to fetch Kafka security token.")
val token = obtainToken.invoke(null, sparkConf)
.asInstanceOf[Token[_ <: TokenIdentifier]]
creds.addToken(token.getService, token)
} catch {
case NonFatal(e) =>
logInfo(s"Failed to get token from service $serviceName", e)
}

None
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
}

override def delegationTokensRequired(
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
sparkConf: SparkConf,
hadoopConf: Configuration): Boolean = {
sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).isDefined &&
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
sparkConf.get(KAFKA_SECURITY_PROTOCOL).startsWith("SASL")
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
}
}
37 changes: 37 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -647,4 +647,41 @@ package object config {
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val KAFKA_BOOTSTRAP_SERVERS =
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
ConfigBuilder("spark.kafka.bootstrap.servers")
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
.doc("A list of coma separated host/port pairs to use for establishing the initial " +
"connection to the Kafka cluster. For further details please see kafka documentation.")
.stringConf
.createOptional

private[spark] val KAFKA_SECURITY_PROTOCOL =
ConfigBuilder("spark.kafka.security.protocol")
.doc("Protocol used to communicate with brokers. For further details please see kafka " +
"documentation.")
.stringConf
.createWithDefault("SASL_SSL")

private[spark] val KAFKA_KERBEROS_SERVICE_NAME =
ConfigBuilder("spark.kafka.sasl.kerberos.service.name")
.doc("The Kerberos principal name that Kafka runs as. This can be defined either in " +
"Kafka's JAAS config or in Kafka's config. For further details please see kafka " +
"documentation.")
.stringConf
.createOptional

private[spark] val KAFKA_TRUSTSTORE_LOCATION =
ConfigBuilder("spark.kafka.ssl.truststore.location")
.doc("The location of the trust store file. For further details please see kafka " +
"documentation.")
.stringConf
.createOptional

private[spark] val KAFKA_TRUSTSTORE_PASSWORD =
ConfigBuilder("spark.kafka.ssl.truststore.password")
.doc("The store password for the key store file. This is optional for client and only " +
"needed if ssl.keystore.location is configured. For further details please see kafka " +
"documentation.")
.stringConf
.createOptional
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hive") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("kafka") should not be (None)
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
delegationTokenManager.getServiceDelegationTokenProvider("bogus") should be (None)
}

Expand All @@ -59,6 +60,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hive") should be (None)
delegationTokenManager.getServiceDelegationTokenProvider("kafka") should not be (None)
}

test("using deprecated configurations") {
Expand All @@ -72,6 +74,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hive") should be (None)
delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None)
delegationTokenManager.getServiceDelegationTokenProvider("kafka") should not be (None)
}

test("verify no credentials are obtained") {
Expand All @@ -81,7 +84,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
hadoopFSsToAccess)
val creds = new Credentials()

// Tokens cannot be obtained from HDFS, Hive, HBase in unit tests.
// Tokens cannot be obtained from HDFS, Hive, HBase, Kafka in unit tests.
delegationTokenManager.obtainDelegationTokens(hadoopConf, creds)
val tokens = creds.getAllTokens
tokens.size() should be (0)
Expand Down Expand Up @@ -111,6 +114,16 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
creds.getAllTokens.size should be (0)
}

test("Obtain tokens For Kafka") {
val hadoopConf = new Configuration()

val kafkaTokenProvider = new KafkaDelegationTokenProvider()
val creds = new Credentials()
kafkaTokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, creds)

creds.getAllTokens.size should be (0)
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
}

test("SPARK-23209: obtain tokens when Hive classes are not available") {
// This test needs a custom class loader to hide Hive classes which are in the classpath.
// Because the manager code loads the Hive provider directly instead of using reflection, we
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.kafka.common.security.scram.ScramLoginModule

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._

private[kafka010] object KafkaSecurityHelper extends Logging {
def getKeytabJaasParams(sparkConf: SparkConf): Option[String] = {
val keytab = sparkConf.get(KEYTAB)
if (keytab.isDefined) {
val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME)
require(serviceName.nonEmpty, "Kerberos service name must be defined")
val principal = sparkConf.get(PRINCIPAL)
require(principal.nonEmpty, "Principal must be defined")

val params =
s"""
|${getKrb5LoginModuleName} required
| useKeyTab=true
| serviceName="${serviceName.get}"
| keyTab="${keytab.get}"
| principal="${principal.get}";
""".stripMargin.replace("\n", "")
logDebug(s"Krb JAAS params: $params")
Some(params)
} else {
None
}
}

private def getKrb5LoginModuleName(): String = {
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
if (System.getProperty("java.vendor").contains("IBM")) {
"com.ibm.security.auth.module.Krb5LoginModule"
} else {
"com.sun.security.auth.module.Krb5LoginModule"
}
}

def getTokenJaasParams(sparkConf: SparkConf): Option[String] = {
val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
TokenUtil.TOKEN_SERVICE)
if (token != null) {
Some(getScramJaasParams(sparkConf, token))
} else {
None
}
}

private def getScramJaasParams(
sparkConf: SparkConf,
token: Token[_ <: TokenIdentifier]): String = {
val serviceName = sparkConf.get(KAFKA_KERBEROS_SERVICE_NAME)
require(serviceName.isDefined, "Kerberos service name must be defined")
val username = new String(token.getIdentifier)
val password = new String(token.getPassword)

val loginModuleName = classOf[ScramLoginModule].getName
val params =
s"""
|$loginModuleName required
| tokenauth=true
| serviceName="${serviceName.get}"
| username="$username"
| password="$password";
""".stripMargin.replace("\n", "")
logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}")

params
}
}
Loading