Skip to content

Commit

Permalink
[SPARK-26322][SS] Add spark.kafka.sasl.token.mechanism to ease delega…
Browse files Browse the repository at this point in the history
…tion token configuration.

## What changes were proposed in this pull request?

When Kafka delegation token obtained, SCRAM `sasl.mechanism` has to be configured for authentication. This can be configured on the related source/sink which is inconvenient from user perspective. Such granularity is not required and this configuration can be implemented with one central parameter.

In this PR `spark.kafka.sasl.token.mechanism` added to configure this centrally (default: `SCRAM-SHA-512`).

## How was this patch tested?

Existing unit tests + on cluster.

Closes apache#23274 from gaborgsomogyi/SPARK-26322.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
gaborgsomogyi authored and jackylee-ch committed Feb 18, 2019
1 parent 1306fdf commit a50ea0e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,13 @@ private[spark] object Kafka {
"For further details please see kafka documentation. Only used to obtain delegation token.")
.stringConf
.createOptional

val TOKEN_SASL_MECHANISM =
ConfigBuilder("spark.kafka.sasl.token.mechanism")
.doc("SASL mechanism used for client connections with delegation token. Because SCRAM " +
"login module used for authentication a compatible mechanism has to be set here. " +
"For further details please see kafka documentation (sasl.mechanism). Only used to " +
"authenticate against Kafka broker with delegation token.")
.stringConf
.createWithDefault("SCRAM-SHA-512")
}
144 changes: 4 additions & 140 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,9 @@ This way the application can be configured via Spark parameters and may not need
configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token).

The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`,
The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` is set,
Spark considers the following log in options, in order of preference:
- **JAAS login configuration**
- **JAAS login configuration**, please see example below.
- **Keytab file**, such as,

./bin/spark-submit \
Expand All @@ -669,144 +669,8 @@ Kafka broker configuration):

After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
Delegation token uses `SCRAM` login module for authentication and because of that the appropriate
`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration):

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}

// Setting on Kafka Source for Streaming Queries
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

// Setting on Kafka Source for Batch Queries
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

// Setting on Kafka Sink for Streaming Queries
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("topic", "topic1")
.start()

// Setting on Kafka Sink for Batch Queries
val ds = df
.selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.save()

{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}

// Setting on Kafka Source for Streaming Queries
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Setting on Kafka Source for Batch Queries
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Setting on Kafka Sink for Streaming Queries
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("topic", "topic1")
.start();

// Setting on Kafka Sink for Batch Queries
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("topic", "topic1")
.save();

{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}

// Setting on Kafka Source for Streaming Queries
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// Setting on Kafka Source for Batch Queries
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

// Setting on Kafka Sink for Streaming Queries
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("topic", "topic1") \
.start()

// Setting on Kafka Sink for Batch Queries
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
.option("topic", "topic1") \
.save()

{% endhighlight %}
</div>
</div>
`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter
must match with Kafka broker configuration.

When delegation token is available on an executor it can be overridden with JAAS login configuration.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.SparkEnv
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -501,7 +502,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// If buffer config is not set, set it to reasonable value to work around
// buffer issues (see KAFKA-3135)
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
.setTokenJaasConfigIfNeeded()
.setAuthenticationConfigIfNeeded()
.build()

def kafkaParamsForExecutors(
Expand All @@ -523,7 +524,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// If buffer config is not set, set it to reasonable value to work around
// buffer issues (see KAFKA-3135)
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
.setTokenJaasConfigIfNeeded()
.setAuthenticationConfigIfNeeded()
.build()

/**
Expand Down Expand Up @@ -556,7 +557,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
this
}

def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
// There are multiple possibilities to log in and applied in the following order:
// - JVM global security provided -> try to log in with JVM global security configuration
// which can be configured for example with 'java.security.auth.login.config'.
Expand All @@ -568,11 +569,11 @@ private[kafka010] object KafkaSourceProvider extends Logging {
} else if (KafkaSecurityHelper.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
val mechanism = kafkaParams
.getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
"Delegation token works only with SCRAM mechanism.")
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
set(SaslConfigs.SASL_MECHANISM, mechanism)
}
this
}
Expand Down Expand Up @@ -600,7 +601,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
ConfigUpdater("executor", specifiedKafkaParams)
.set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
.set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
.setTokenJaasConfigIfNeeded()
.setAuthenticationConfigIfNeeded()
.build()
}

Expand Down

0 comments on commit a50ea0e

Please sign in to comment.