From 320040a90c58c87ce1b21ba6fdc16b703122e8b1 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 7 Dec 2018 16:02:17 +0100 Subject: [PATCH 1/3] [SPARK-26322][SS] Add spark.kafka.token.sasl.mechanism to ease delegation token configuration. --- .../apache/spark/internal/config/Kafka.scala | 9 ++ .../structured-streaming-kafka-integration.md | 143 +----------------- .../sql/kafka010/KafkaSecurityHelper.scala | 1 - .../sql/kafka010/KafkaSourceProvider.scala | 15 +- 4 files changed, 20 insertions(+), 148 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala index 85d74c27142ad..0f37ce2fd9707 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala @@ -79,4 +79,13 @@ private[spark] object Kafka { "For further details please see kafka documentation. Only used to obtain delegation token.") .stringConf .createOptional + + val TOKEN_SASL_MECHANISM = + ConfigBuilder("spark.kafka.token.sasl.mechanism") + .doc("SASL mechanism used for client connections with delegation token. Because SCRAM " + + "login module used for authentication a compatible mechanism has to be set here. " + + "For further details please see kafka documentation. Only used to authenticate against " + + "Kafka broker with delegation token.") + .stringConf + .createWithDefault("SCRAM-SHA-512") } diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 7040f8da2c614..31e536b8c6ce6 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -642,9 +642,9 @@ This way the application can be configured via Spark parameters and may not need configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). -The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`, +The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` set, Spark considers the following log in options, in order of preference: -- **JAAS login configuration** +- **JAAS login configuration**, please see example below. - **Keytab file**, such as, ./bin/spark-submit \ @@ -669,144 +669,7 @@ Kafka broker configuration): After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly. Delegation token uses `SCRAM` login module for authentication and because of that the appropriate -`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration): - -
-
-{% highlight scala %} - -// Setting on Kafka Source for Streaming Queries -val df = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("subscribe", "topic1") - .load() -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - -// Setting on Kafka Source for Batch Queries -val df = spark - .read - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("subscribe", "topic1") - .load() -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - -// Setting on Kafka Sink for Streaming Queries -val ds = df - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .writeStream - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("topic", "topic1") - .start() - -// Setting on Kafka Sink for Batch Queries -val ds = df - .selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)") - .write - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .save() - -{% endhighlight %} -
-
-{% highlight java %} - -// Setting on Kafka Source for Streaming Queries -Dataset df = spark - .readStream() - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("subscribe", "topic1") - .load(); -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); - -// Setting on Kafka Source for Batch Queries -Dataset df = spark - .read() - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("subscribe", "topic1") - .load(); -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); - -// Setting on Kafka Sink for Streaming Queries -StreamingQuery ds = df - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .writeStream() - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("topic", "topic1") - .start(); - -// Setting on Kafka Sink for Batch Queries -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .write() - .format("kafka") - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") - .option("topic", "topic1") - .save(); - -{% endhighlight %} -
-
-{% highlight python %} - -// Setting on Kafka Source for Streaming Queries -df = spark \ - .readStream \ - .format("kafka") \ - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ - .option("subscribe", "topic1") \ - .load() -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - -// Setting on Kafka Source for Batch Queries -df = spark \ - .read \ - .format("kafka") \ - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ - .option("subscribe", "topic1") \ - .load() -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - -// Setting on Kafka Sink for Streaming Queries -ds = df \ - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ - .writeStream \ - .format("kafka") \ - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ - .option("topic", "topic1") \ - .start() - -// Setting on Kafka Sink for Batch Queries -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ - .write \ - .format("kafka") \ - .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ - .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ - .option("topic", "topic1") \ - .save() - -{% endhighlight %} -
-
+`spark.kafka.token.sasl.mechanism` (default: `SCRAM-SHA-512`) has to be configured (it must match with Kafka broker configuration). When delegation token is available on an executor it can be overridden with JAAS login configuration. diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala index 74d5ef9c05f14..a9b76def4f8be 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.kafka010 import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.kafka.common.security.scram.ScramLoginModule import org.apache.spark.SparkConf diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 0ac330435e5c5..6a0c2088ac3d1 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe import org.apache.spark.SparkEnv import org.apache.spark.deploy.security.KafkaTokenUtil import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ @@ -501,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { // If buffer config is not set, set it to reasonable value to work around // buffer issues (see KAFKA-3135) .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) - .setTokenJaasConfigIfNeeded() + .setAuthenticationConfigIfNeeded() .build() def kafkaParamsForExecutors( @@ -523,7 +524,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { // If buffer config is not set, set it to reasonable value to work around // buffer issues (see KAFKA-3135) .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) - .setTokenJaasConfigIfNeeded() + .setAuthenticationConfigIfNeeded() .build() /** @@ -556,7 +557,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { this } - def setTokenJaasConfigIfNeeded(): ConfigUpdater = { + def setAuthenticationConfigIfNeeded(): ConfigUpdater = { // There are multiple possibilities to log in and applied in the following order: // - JVM global security provided -> try to log in with JVM global security configuration // which can be configured for example with 'java.security.auth.login.config'. @@ -568,11 +569,11 @@ private[kafka010] object KafkaSourceProvider extends Logging { } else if (KafkaSecurityHelper.isTokenAvailable()) { logDebug("Delegation token detected, using it for login.") val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) - val mechanism = kafkaParams - .getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM) + set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) + val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM) require(mechanism.startsWith("SCRAM"), "Delegation token works only with SCRAM mechanism.") - set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) + set(SaslConfigs.SASL_MECHANISM, mechanism) } this } @@ -600,7 +601,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { ConfigUpdater("executor", specifiedKafkaParams) .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName) .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName) - .setTokenJaasConfigIfNeeded() + .setAuthenticationConfigIfNeeded() .build() } From 1a6ae00e5d070c4016a450e3be2e862d2b060fa3 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 11 Dec 2018 10:23:22 +0100 Subject: [PATCH 2/3] Review fixes. --- docs/structured-streaming-kafka-integration.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index 31e536b8c6ce6..ee81a4b77c3f0 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -642,7 +642,7 @@ This way the application can be configured via Spark parameters and may not need configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). -The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` set, +The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` is set, Spark considers the following log in options, in order of preference: - **JAAS login configuration**, please see example below. - **Keytab file**, such as, @@ -669,7 +669,8 @@ Kafka broker configuration): After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly. Delegation token uses `SCRAM` login module for authentication and because of that the appropriate -`spark.kafka.token.sasl.mechanism` (default: `SCRAM-SHA-512`) has to be configured (it must match with Kafka broker configuration). +`spark.kafka.token.sasl.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter +must match with Kafka broker configuration. When delegation token is available on an executor it can be overridden with JAAS login configuration. From de35aa2479400c11a48658501a838bebd83553cd Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 11 Dec 2018 19:07:22 +0100 Subject: [PATCH 3/3] Change spark.kafka.token.sasl.mechanism to spark.kafka.sasl.token.mechanism --- .../main/scala/org/apache/spark/internal/config/Kafka.scala | 6 +++--- docs/structured-streaming-kafka-integration.md | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala index 0f37ce2fd9707..88c612c2d951a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala @@ -81,11 +81,11 @@ private[spark] object Kafka { .createOptional val TOKEN_SASL_MECHANISM = - ConfigBuilder("spark.kafka.token.sasl.mechanism") + ConfigBuilder("spark.kafka.sasl.token.mechanism") .doc("SASL mechanism used for client connections with delegation token. Because SCRAM " + "login module used for authentication a compatible mechanism has to be set here. " + - "For further details please see kafka documentation. Only used to authenticate against " + - "Kafka broker with delegation token.") + "For further details please see kafka documentation (sasl.mechanism). Only used to " + + "authenticate against Kafka broker with delegation token.") .stringConf .createWithDefault("SCRAM-SHA-512") } diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index ee81a4b77c3f0..3d64ec4cb55f7 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -669,7 +669,7 @@ Kafka broker configuration): After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly. Delegation token uses `SCRAM` login module for authentication and because of that the appropriate -`spark.kafka.token.sasl.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter +`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter must match with Kafka broker configuration. When delegation token is available on an executor it can be overridden with JAAS login configuration.