Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delegation token configuration. #23274

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,13 @@ private[spark] object Kafka {
"For further details please see kafka documentation. Only used to obtain delegation token.")
.stringConf
.createOptional

val TOKEN_SASL_MECHANISM =
ConfigBuilder("spark.kafka.token.sasl.mechanism")
gaborgsomogyi marked this conversation as resolved.
Show resolved Hide resolved
.doc("SASL mechanism used for client connections with delegation token. Because SCRAM " +
"login module used for authentication a compatible mechanism has to be set here. " +
"For further details please see kafka documentation. Only used to authenticate against " +
"Kafka broker with delegation token.")
.stringConf
.createWithDefault("SCRAM-SHA-512")
}
144 changes: 4 additions & 140 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,9 @@ This way the application can be configured via Spark parameters and may not need
configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token).

The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`,
The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` is set,
Spark considers the following log in options, in order of preference:
- **JAAS login configuration**
- **JAAS login configuration**, please see example below.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this small pointer to make things more clear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @gaborgsomogyi . Which example is this pointing? Previous examples seems to be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun down below with the same JAAS login configuration name.

- **Keytab file**, such as,

./bin/spark-submit \
Expand All @@ -669,144 +669,8 @@ Kafka broker configuration):

After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
Delegation token uses `SCRAM` login module for authentication and because of that the appropriate
`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration):

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}

// Setting on Kafka Source for Streaming Queries
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

// Setting on Kafka Source for Batch Queries
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

// Setting on Kafka Sink for Streaming Queries
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("topic", "topic1")
.start()

// Setting on Kafka Sink for Batch Queries
val ds = df
.selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.save()

{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}

// Setting on Kafka Source for Streaming Queries
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Setting on Kafka Source for Batch Queries
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Setting on Kafka Sink for Streaming Queries
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("topic", "topic1")
.start();

// Setting on Kafka Sink for Batch Queries
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("topic", "topic1")
.save();

{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}

// Setting on Kafka Source for Streaming Queries
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// Setting on Kafka Source for Batch Queries
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// Setting on Kafka Sink for Streaming Queries
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("topic", "topic1") \
.start()

// Setting on Kafka Sink for Batch Queries
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("topic", "topic1") \
.save()

{% endhighlight %}
</div>
</div>
`spark.kafka.token.sasl.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter
must match with Kafka broker configuration.

When delegation token is available on an executor it can be overridden with JAAS login configuration.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.kafka010

import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.kafka.common.security.scram.ScramLoginModule

import org.apache.spark.SparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.SparkEnv
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -501,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// If buffer config is not set, set it to reasonable value to work around
// buffer issues (see KAFKA-3135)
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
.setTokenJaasConfigIfNeeded()
.setAuthenticationConfigIfNeeded()
.build()

def kafkaParamsForExecutors(
Expand All @@ -523,7 +524,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// If buffer config is not set, set it to reasonable value to work around
// buffer issues (see KAFKA-3135)
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
.setTokenJaasConfigIfNeeded()
.setAuthenticationConfigIfNeeded()
.build()

/**
Expand Down Expand Up @@ -556,7 +557,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
this
}

def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
// There are multiple possibilities to log in and applied in the following order:
// - JVM global security provided -> try to log in with JVM global security configuration
// which can be configured for example with 'java.security.auth.login.config'.
Expand All @@ -568,11 +569,11 @@ private[kafka010] object KafkaSourceProvider extends Logging {
} else if (KafkaSecurityHelper.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
val mechanism = kafkaParams
.getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
"Delegation token works only with SCRAM mechanism.")
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
set(SaslConfigs.SASL_MECHANISM, mechanism)
}
this
}
Expand Down Expand Up @@ -600,7 +601,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
ConfigUpdater("executor", specifiedKafkaParams)
.set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
.set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
.setTokenJaasConfigIfNeeded()
.setAuthenticationConfigIfNeeded()
.build()
}

Expand Down