diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
index 85d74c27142ad..88c612c2d951a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
@@ -79,4 +79,13 @@ private[spark] object Kafka {
"For further details please see kafka documentation. Only used to obtain delegation token.")
.stringConf
.createOptional
+
+ val TOKEN_SASL_MECHANISM =
+ ConfigBuilder("spark.kafka.sasl.token.mechanism")
+ .doc("SASL mechanism used for client connections with delegation token. Because SCRAM " +
+ "login module used for authentication a compatible mechanism has to be set here. " +
+ "For further details please see kafka documentation (sasl.mechanism). Only used to " +
+ "authenticate against Kafka broker with delegation token.")
+ .stringConf
+ .createWithDefault("SCRAM-SHA-512")
}
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 7040f8da2c614..3d64ec4cb55f7 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -642,9 +642,9 @@ This way the application can be configured via Spark parameters and may not need
configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token).
-The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`,
+The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` is set,
Spark considers the following log in options, in order of preference:
-- **JAAS login configuration**
+- **JAAS login configuration**, please see example below.
- **Keytab file**, such as,
./bin/spark-submit \
@@ -669,144 +669,8 @@ Kafka broker configuration):
After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
Delegation token uses `SCRAM` login module for authentication and because of that the appropriate
-`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration):
-
-
-
-{% highlight scala %}
-
-// Setting on Kafka Source for Streaming Queries
-val df = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("subscribe", "topic1")
- .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
-
-// Setting on Kafka Source for Batch Queries
-val df = spark
- .read
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("subscribe", "topic1")
- .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
-
-// Setting on Kafka Sink for Streaming Queries
-val ds = df
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .writeStream
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("topic", "topic1")
- .start()
-
-// Setting on Kafka Sink for Batch Queries
-val ds = df
- .selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
- .write
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .save()
-
-{% endhighlight %}
-
-
-{% highlight java %}
-
-// Setting on Kafka Source for Streaming Queries
-Dataset df = spark
- .readStream()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("subscribe", "topic1")
- .load();
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
-
-// Setting on Kafka Source for Batch Queries
-Dataset df = spark
- .read()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("subscribe", "topic1")
- .load();
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
-
-// Setting on Kafka Sink for Streaming Queries
-StreamingQuery ds = df
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .writeStream()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("topic", "topic1")
- .start();
-
-// Setting on Kafka Sink for Batch Queries
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .write()
- .format("kafka")
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
- .option("topic", "topic1")
- .save();
-
-{% endhighlight %}
-
-
-{% highlight python %}
-
-// Setting on Kafka Source for Streaming Queries
-df = spark \
- .readStream \
- .format("kafka") \
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
- .option("subscribe", "topic1") \
- .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-
-// Setting on Kafka Source for Batch Queries
-df = spark \
- .read \
- .format("kafka") \
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
- .option("subscribe", "topic1") \
- .load()
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-
-// Setting on Kafka Sink for Streaming Queries
-ds = df \
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
- .writeStream \
- .format("kafka") \
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
- .option("topic", "topic1") \
- .start()
-
-// Setting on Kafka Sink for Batch Queries
-df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
- .write \
- .format("kafka") \
- .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
- .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
- .option("topic", "topic1") \
- .save()
-
-{% endhighlight %}
-
-
+`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter
+must match with Kafka broker configuration.
When delegation token is available on an executor it can be overridden with JAAS login configuration.
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
index 74d5ef9c05f14..a9b76def4f8be 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.kafka010
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.kafka.common.security.scram.ScramLoginModule
import org.apache.spark.SparkConf
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 0ac330435e5c5..6a0c2088ac3d1 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.SparkEnv
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
@@ -501,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// If buffer config is not set, set it to reasonable value to work around
// buffer issues (see KAFKA-3135)
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
- .setTokenJaasConfigIfNeeded()
+ .setAuthenticationConfigIfNeeded()
.build()
def kafkaParamsForExecutors(
@@ -523,7 +524,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// If buffer config is not set, set it to reasonable value to work around
// buffer issues (see KAFKA-3135)
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
- .setTokenJaasConfigIfNeeded()
+ .setAuthenticationConfigIfNeeded()
.build()
/**
@@ -556,7 +557,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
this
}
- def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
+ def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
// There are multiple possibilities to log in and applied in the following order:
// - JVM global security provided -> try to log in with JVM global security configuration
// which can be configured for example with 'java.security.auth.login.config'.
@@ -568,11 +569,11 @@ private[kafka010] object KafkaSourceProvider extends Logging {
} else if (KafkaSecurityHelper.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
- val mechanism = kafkaParams
- .getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM)
+ set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+ val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
"Delegation token works only with SCRAM mechanism.")
- set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+ set(SaslConfigs.SASL_MECHANISM, mechanism)
}
this
}
@@ -600,7 +601,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
ConfigUpdater("executor", specifiedKafkaParams)
.set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
.set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
- .setTokenJaasConfigIfNeeded()
+ .setAuthenticationConfigIfNeeded()
.build()
}