From 9fe7874db9d5a90b86561249187e70804b7058a5 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 29 Nov 2018 18:00:47 -0800 Subject: [PATCH] [SPARK-25501][SS] Add kafka delegation token support. ## What changes were proposed in this pull request? It adds kafka delegation token support for structured streaming. Please see the relevant [SPIP](https://docs.google.com/document/d/1ouRayzaJf_N5VQtGhVq9FURXVmRpXzEEWYHob0ne3NY/edit?usp=sharing) What this PR contains: * Configuration parameters for the feature * Delegation token fetching from broker * Usage of token through dynamic JAAS configuration * Minor refactoring in the existing code What this PR doesn't contain: * Documentation changes because design can change ## How was this patch tested? Existing tests + added small amount of additional unit tests. Because it's an external service integration mainly tested on cluster. * 4 node cluster * Kafka broker version 1.1.0 * Topic with 4 partitions * security.protocol = SASL_SSL * sasl.mechanism = SCRAM-SHA-256 An example of obtaining a token: ``` 18/10/01 01:07:49 INFO kafka010.TokenUtil: TOKENID HMAC OWNER RENEWERS ISSUEDATE EXPIRYDATE MAXDATE 18/10/01 01:07:49 INFO kafka010.TokenUtil: D1-v__Q5T_uHx55rW16Jwg [hidden] User:user [] 2018-10-01T01:07 2018-10-02T01:07 2018-10-08T01:07 18/10/01 01:07:49 INFO security.KafkaDelegationTokenProvider: Get token from Kafka: Kind: KAFKA_DELEGATION_TOKEN, Service: kafka.server.delegation.token, Ident: 44 31 2d 76 5f 5f 51 35 54 5f 75 48 78 35 35 72 57 31 36 4a 77 67 ``` An example token usage: ``` 18/10/01 01:08:07 INFO kafka010.KafkaSecurityHelper: Scram JAAS params: org.apache.kafka.common.security.scram.ScramLoginModule required tokenauth=true serviceName="kafka" username="D1-v__Q5T_uHx55rW16Jwg" password="[hidden]"; 18/10/01 01:08:07 INFO kafka010.KafkaSourceProvider: Delegation token detected, using it for login. ``` Closes #22598 from gaborgsomogyi/SPARK-25501. Authored-by: Gabor Somogyi Signed-off-by: Marcelo Vanzin --- core/pom.xml | 13 + .../HadoopDelegationTokenManager.scala | 3 +- .../KafkaDelegationTokenProvider.scala | 61 +++++ .../deploy/security/KafkaTokenUtil.scala | 202 +++++++++++++++ .../apache/spark/internal/config/Kafka.scala | 82 ++++++ .../HadoopDelegationTokenManagerSuite.scala | 5 +- .../deploy/security/KafkaTokenUtilSuite.scala | 239 ++++++++++++++++++ external/kafka-0-10-sql/pom.xml | 2 - .../sql/kafka010/KafkaSecurityHelper.scala | 56 ++++ .../sql/kafka010/KafkaSourceProvider.scala | 82 +++--- .../kafka010/KafkaStreamingWriteSupport.scala | 22 +- .../kafka010/KafkaContinuousSinkSuite.scala | 4 +- .../kafka010/KafkaSecurityHelperSuite.scala | 100 ++++++++ external/kafka-0-10/pom.xml | 2 - pom.xml | 2 + 15 files changed, 825 insertions(+), 50 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala create mode 100644 core/src/main/scala/org/apache/spark/internal/config/Kafka.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala create mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala diff --git a/core/pom.xml b/core/pom.xml index 36d93212ba9f9..49b1a54e32598 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -408,6 +408,19 @@ provided + + + org.apache.kafka + kafka-clients + ${kafka.version} + provided + + target/scala-${scala.binary.version}/classes diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 1169b2878e993..126a6ab801369 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -274,7 +274,8 @@ private[spark] class HadoopDelegationTokenManager( new HadoopFSDelegationTokenProvider( () => HadoopDelegationTokenManager.this.fileSystemsToAccess())) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) // Filter out providers for which spark.security.credentials.{service}.enabled is false. providers diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala new file mode 100644 index 0000000000000..45995be630cc5 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.language.existentials +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[security] class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { + try { + logDebug("Attempting to fetch Kafka security token.") + val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf) + creds.addToken(token.getService, token) + return Some(nextRenewalDate) + } catch { + case NonFatal(e) => + logInfo(s"Failed to get token from service $serviceName", e) + } + None + } + + override def delegationTokensRequired( + sparkConf: SparkConf, + hadoopConf: Configuration): Boolean = { + val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL) + sparkConf.contains(Kafka.BOOTSTRAP_SERVERS) && + (protocol == SASL_SSL.name || + protocol == SSL.name || + protocol == SASL_PLAINTEXT.name) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala new file mode 100644 index 0000000000000..c890cee59ffe0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.{ util => ju } +import java.text.SimpleDateFormat + +import scala.util.control.NonFatal + +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions} +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.JaasContext +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.token.delegation.DelegationToken + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[spark] object KafkaTokenUtil extends Logging { + val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN") + val TOKEN_SERVICE = new Text("kafka.server.delegation.token") + + private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier { + override def getKind: Text = TOKEN_KIND + } + + private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { + val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) + val createDelegationTokenOptions = new CreateDelegationTokenOptions() + val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) + val token = createResult.delegationToken().get() + printToken(token) + + (new Token[KafkaDelegationTokenIdentifier]( + token.tokenInfo.tokenId.getBytes, + token.hmacAsBase64String.getBytes, + TOKEN_KIND, + TOKEN_SERVICE + ), token.tokenInfo.expiryTimestamp) + } + + private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = { + val adminClientProperties = new ju.Properties + + val bootstrapServers = sparkConf.get(Kafka.BOOTSTRAP_SERVERS) + require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " + + "servers not configured.") + adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get) + + val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL) + adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol) + protocol match { + case SASL_SSL.name => + setTrustStoreProperties(sparkConf, adminClientProperties) + + case SSL.name => + setTrustStoreProperties(sparkConf, adminClientProperties) + setKeyStoreProperties(sparkConf, adminClientProperties) + logWarning("Obtaining kafka delegation token with SSL protocol. Please " + + "configure 2-way authentication on the broker side.") + + case SASL_PLAINTEXT.name => + logWarning("Obtaining kafka delegation token through plain communication channel. Please " + + "consider the security impact.") + } + + // There are multiple possibilities to log in and applied in the following order: + // - JVM global security provided -> try to log in with JVM global security configuration + // which can be configured for example with 'java.security.auth.login.config'. + // For this no additional parameter needed. + // - Keytab is provided -> try to log in with kerberos module and keytab using kafka's dynamic + // JAAS configuration. + // - Keytab not provided -> try to log in with kerberos module and ticket cache using kafka's + // dynamic JAAS configuration. + // Kafka client is unable to use subject from JVM which already logged in + // to kdc (see KAFKA-7677) + if (isGlobalJaasConfigurationProvided) { + logDebug("JVM global security configuration detected, using it for login.") + } else { + adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM) + if (sparkConf.contains(KEYTAB)) { + logDebug("Keytab detected, using it for login.") + val jaasParams = getKeytabJaasParams(sparkConf) + adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) + } else { + logDebug("Using ticket cache for login.") + val jaasParams = getTicketCacheJaasParams(sparkConf) + adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) + } + } + + adminClientProperties + } + + def isGlobalJaasConfigurationProvided: Boolean = { + try { + JaasContext.loadClientContext(ju.Collections.emptyMap[String, Object]()) + true + } catch { + case NonFatal(_) => false + } + } + + private def setTrustStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = { + sparkConf.get(Kafka.TRUSTSTORE_LOCATION).foreach { truststoreLocation => + properties.put("ssl.truststore.location", truststoreLocation) + } + sparkConf.get(Kafka.TRUSTSTORE_PASSWORD).foreach { truststorePassword => + properties.put("ssl.truststore.password", truststorePassword) + } + } + + private def setKeyStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = { + sparkConf.get(Kafka.KEYSTORE_LOCATION).foreach { keystoreLocation => + properties.put("ssl.keystore.location", keystoreLocation) + } + sparkConf.get(Kafka.KEYSTORE_PASSWORD).foreach { keystorePassword => + properties.put("ssl.keystore.password", keystorePassword) + } + sparkConf.get(Kafka.KEY_PASSWORD).foreach { keyPassword => + properties.put("ssl.key.password", keyPassword) + } + } + + private[security] def getKeytabJaasParams(sparkConf: SparkConf): String = { + val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME) + require(serviceName.nonEmpty, "Kerberos service name must be defined") + + val params = + s""" + |${getKrb5LoginModuleName} required + | useKeyTab=true + | serviceName="${serviceName.get}" + | keyTab="${sparkConf.get(KEYTAB).get}" + | principal="${sparkConf.get(PRINCIPAL).get}"; + """.stripMargin.replace("\n", "") + logDebug(s"Krb keytab JAAS params: $params") + params + } + + def getTicketCacheJaasParams(sparkConf: SparkConf): String = { + val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME) + require(serviceName.nonEmpty, "Kerberos service name must be defined") + + val params = + s""" + |${getKrb5LoginModuleName} required + | useTicketCache=true + | serviceName="${serviceName.get}"; + """.stripMargin.replace("\n", "") + logDebug(s"Krb ticket cache JAAS params: $params") + params + } + + /** + * Krb5LoginModule package vary in different JVMs. + * Please see Hadoop UserGroupInformation for further details. + */ + private def getKrb5LoginModuleName(): String = { + if (System.getProperty("java.vendor").contains("IBM")) { + "com.ibm.security.auth.module.Krb5LoginModule" + } else { + "com.sun.security.auth.module.Krb5LoginModule" + } + } + + private def printToken(token: DelegationToken): Unit = { + if (log.isDebugEnabled) { + val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm") + logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format( + "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE")) + val tokenInfo = token.tokenInfo + logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format( + tokenInfo.tokenId, + tokenInfo.owner, + tokenInfo.renewersAsString, + dateFormat.format(tokenInfo.issueTimestamp), + dateFormat.format(tokenInfo.expiryTimestamp), + dateFormat.format(tokenInfo.maxTimestamp))) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala new file mode 100644 index 0000000000000..85d74c27142ad --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +private[spark] object Kafka { + + val BOOTSTRAP_SERVERS = + ConfigBuilder("spark.kafka.bootstrap.servers") + .doc("A list of coma separated host/port pairs to use for establishing the initial " + + "connection to the Kafka cluster. For further details please see kafka documentation. " + + "Only used to obtain delegation token.") + .stringConf + .createOptional + + val SECURITY_PROTOCOL = + ConfigBuilder("spark.kafka.security.protocol") + .doc("Protocol used to communicate with brokers. For further details please see kafka " + + "documentation. Only used to obtain delegation token.") + .stringConf + .createWithDefault("SASL_SSL") + + val KERBEROS_SERVICE_NAME = + ConfigBuilder("spark.kafka.sasl.kerberos.service.name") + .doc("The Kerberos principal name that Kafka runs as. This can be defined either in " + + "Kafka's JAAS config or in Kafka's config. For further details please see kafka " + + "documentation. Only used to obtain delegation token.") + .stringConf + .createOptional + + val TRUSTSTORE_LOCATION = + ConfigBuilder("spark.kafka.ssl.truststore.location") + .doc("The location of the trust store file. For further details please see kafka " + + "documentation. Only used to obtain delegation token.") + .stringConf + .createOptional + + val TRUSTSTORE_PASSWORD = + ConfigBuilder("spark.kafka.ssl.truststore.password") + .doc("The store password for the trust store file. This is optional for client and only " + + "needed if ssl.truststore.location is configured. For further details please see kafka " + + "documentation. Only used to obtain delegation token.") + .stringConf + .createOptional + + val KEYSTORE_LOCATION = + ConfigBuilder("spark.kafka.ssl.keystore.location") + .doc("The location of the key store file. This is optional for client and can be used for " + + "two-way authentication for client. For further details please see kafka documentation. " + + "Only used to obtain delegation token.") + .stringConf + .createOptional + + val KEYSTORE_PASSWORD = + ConfigBuilder("spark.kafka.ssl.keystore.password") + .doc("The store password for the key store file. This is optional for client and only " + + "needed if ssl.keystore.location is configured. For further details please see kafka " + + "documentation. Only used to obtain delegation token.") + .stringConf + .createOptional + + val KEY_PASSWORD = + ConfigBuilder("spark.kafka.ssl.key.password") + .doc("The password of the private key in the key store file. This is optional for client. " + + "For further details please see kafka documentation. Only used to obtain delegation token.") + .stringConf + .createOptional +} diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index e0e630e3be63b..def9e626a2df2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -19,8 +19,6 @@ package org.apache.spark.deploy.security import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.security.Credentials import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.util.Utils @@ -33,6 +31,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite { assert(manager.isProviderLoaded("hadoopfs")) assert(manager.isProviderLoaded("hbase")) assert(manager.isProviderLoaded("hive")) + assert(manager.isProviderLoaded("kafka")) } test("disable hive credential provider") { @@ -41,6 +40,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite { assert(manager.isProviderLoaded("hadoopfs")) assert(manager.isProviderLoaded("hbase")) assert(!manager.isProviderLoaded("hive")) + assert(manager.isProviderLoaded("kafka")) } test("using deprecated configurations") { @@ -51,6 +51,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite { assert(!manager.isProviderLoaded("hadoopfs")) assert(manager.isProviderLoaded("hbase")) assert(!manager.isProviderLoaded("hive")) + assert(manager.isProviderLoaded("kafka")) } test("SPARK-23209: obtain tokens when Hive classes are not available") { diff --git a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala new file mode 100644 index 0000000000000..682bebde916fa --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.{ util => ju } +import javax.security.auth.login.{AppConfigurationEntry, Configuration} + +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.config.SaslConfigs +import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config._ + +class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { + private val bootStrapServers = "127.0.0.1:0" + private val trustStoreLocation = "/path/to/trustStore" + private val trustStorePassword = "trustStoreSecret" + private val keyStoreLocation = "/path/to/keyStore" + private val keyStorePassword = "keyStoreSecret" + private val keyPassword = "keySecret" + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "user@domain.com" + + private var sparkConf: SparkConf = null + + private class KafkaJaasConfiguration extends Configuration { + val entry = + new AppConfigurationEntry( + "DummyModule", + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + ju.Collections.emptyMap[String, Object]() + ) + + override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = { + if (name.equals("KafkaClient")) { + Array(entry) + } else { + null + } + } + } + + override def beforeEach(): Unit = { + super.beforeEach() + sparkConf = new SparkConf() + } + + override def afterEach(): Unit = { + try { + resetGlobalConfig() + } finally { + super.afterEach() + } + } + + private def setGlobalKafkaClientConfig(): Unit = { + Configuration.setConfiguration(new KafkaJaasConfiguration) + } + + private def resetGlobalConfig(): Unit = { + Configuration.setConfiguration(null) + } + + test("createAdminClientProperties without bootstrap servers should throw exception") { + val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.createAdminClientProperties(sparkConf) + } + assert(thrown.getMessage contains + "Tried to obtain kafka delegation token but bootstrap servers not configured.") + } + + test("createAdminClientProperties with SASL_PLAINTEXT protocol should not include " + + "keystore and truststore config") { + sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) + sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_PLAINTEXT.name) + sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation) + sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStoreLocation) + sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation) + sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword) + sparkConf.set(Kafka.KEY_PASSWORD, keyPassword) + sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName) + + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === SASL_PLAINTEXT.name) + assert(!adminClientProperties.containsKey("ssl.truststore.location")) + assert(!adminClientProperties.containsKey("ssl.truststore.password")) + assert(!adminClientProperties.containsKey("ssl.keystore.location")) + assert(!adminClientProperties.containsKey("ssl.keystore.password")) + assert(!adminClientProperties.containsKey("ssl.key.password")) + } + + test("createAdminClientProperties with SASL_SSL protocol should include truststore config") { + sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) + sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name) + sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation) + sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStorePassword) + sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation) + sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword) + sparkConf.set(Kafka.KEY_PASSWORD, keyPassword) + sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName) + + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === SASL_SSL.name) + assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation) + assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword) + assert(!adminClientProperties.containsKey("ssl.keystore.location")) + assert(!adminClientProperties.containsKey("ssl.keystore.password")) + assert(!adminClientProperties.containsKey("ssl.key.password")) + } + + test("createAdminClientProperties with SSL protocol should include keystore and truststore " + + "config") { + sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) + sparkConf.set(Kafka.SECURITY_PROTOCOL, SSL.name) + sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation) + sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStorePassword) + sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation) + sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword) + sparkConf.set(Kafka.KEY_PASSWORD, keyPassword) + sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName) + + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === SSL.name) + assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation) + assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword) + assert(adminClientProperties.get("ssl.keystore.location") === keyStoreLocation) + assert(adminClientProperties.get("ssl.keystore.password") === keyStorePassword) + assert(adminClientProperties.get("ssl.key.password") === keyPassword) + } + + test("createAdminClientProperties with global config should not set dynamic jaas config") { + sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) + sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name) + setGlobalKafkaClientConfig() + + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === SASL_SSL.name) + assert(!adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM)) + assert(!adminClientProperties.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) + } + + test("createAdminClientProperties with keytab should set keytab dynamic jaas config") { + sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) + sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name) + sparkConf.set(KEYTAB, keytab) + sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName) + sparkConf.set(PRINCIPAL, principal) + + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === SASL_SSL.name) + assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM)) + val saslJaasConfig = adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG) + assert(saslJaasConfig.contains("Krb5LoginModule required")) + assert(saslJaasConfig.contains("useKeyTab=true")) + } + + test("createAdminClientProperties without keytab should set ticket cache dynamic jaas config") { + sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers) + sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name) + sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName) + + val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf) + + assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + === bootStrapServers) + assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) + === SASL_SSL.name) + assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM)) + val saslJaasConfig = adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG) + assert(saslJaasConfig.contains("Krb5LoginModule required")) + assert(saslJaasConfig.contains("useTicketCache=true")) + } + + test("isGlobalJaasConfigurationProvided without global config should return false") { + assert(!KafkaTokenUtil.isGlobalJaasConfigurationProvided) + } + + test("isGlobalJaasConfigurationProvided with global config should return false") { + setGlobalKafkaClientConfig() + + assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided) + } + + test("getKeytabJaasParams with keytab no service should throw exception") { + sparkConf.set(KEYTAB, keytab) + + val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.getKeytabJaasParams(sparkConf) + } + + assert(thrown.getMessage contains "Kerberos service name must be defined") + } + + test("getTicketCacheJaasParams without service should throw exception") { + val thrown = intercept[IllegalArgumentException] { + KafkaTokenUtil.getTicketCacheJaasParams(sparkConf) + } + + assert(thrown.getMessage contains "Kerberos service name must be defined") + } +} diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 1af407167597b..de8731c4b774b 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -29,8 +29,6 @@ spark-sql-kafka-0-10_2.12 sql-kafka-0-10 - - 2.1.0 jar Kafka 0.10+ Source for Structured Streaming diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala new file mode 100644 index 0000000000000..74d5ef9c05f14 --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.kafka.common.security.scram.ScramLoginModule + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ + +private[kafka010] object KafkaSecurityHelper extends Logging { + def isTokenAvailable(): Boolean = { + UserGroupInformation.getCurrentUser().getCredentials.getToken( + KafkaTokenUtil.TOKEN_SERVICE) != null + } + + def getTokenJaasParams(sparkConf: SparkConf): String = { + val token = UserGroupInformation.getCurrentUser().getCredentials.getToken( + KafkaTokenUtil.TOKEN_SERVICE) + val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME) + require(serviceName.isDefined, "Kerberos service name must be defined") + val username = new String(token.getIdentifier) + val password = new String(token.getPassword) + + val loginModuleName = classOf[ScramLoginModule].getName + val params = + s""" + |$loginModuleName required + | tokenauth=true + | serviceName="${serviceName.get}" + | username="$username" + | password="$password"; + """.stripMargin.replace("\n", "") + logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}") + + params + } +} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index f770f0c2a04c2..0ac330435e5c5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -18,16 +18,19 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.{Locale, Optional, UUID} +import java.util.{Locale, UUID} import scala.collection.JavaConverters._ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} +import org.apache.spark.SparkEnv +import org.apache.spark.deploy.security.KafkaTokenUtil import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext} +import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources.v2._ @@ -80,12 +83,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath) val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } - val specifiedKafkaParams = - parameters - .keySet - .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) - .map { k => k.drop(6).toString -> parameters(k) } - .toMap + val specifiedKafkaParams = convertToSpecifiedParams(parameters) val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) @@ -122,12 +120,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath) val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } - val specifiedKafkaParams = - parameters - .keySet - .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) - .map { k => k.drop(6).toString -> parameters(k) } - .toMap + val specifiedKafkaParams = convertToSpecifiedParams(parameters) val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) @@ -198,12 +191,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister parameters: Map[String, String]): BaseRelation = { validateBatchOptions(parameters) val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } - val specifiedKafkaParams = - parameters - .keySet - .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) - .map { k => k.drop(6).toString -> parameters(k) } - .toMap + val specifiedKafkaParams = convertToSpecifiedParams(parameters) val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) @@ -230,8 +218,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister outputMode: OutputMode): Sink = { val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim) val specifiedKafkaParams = kafkaParamsForProducer(parameters) - new KafkaSink(sqlContext, - new ju.HashMap[String, Object](specifiedKafkaParams.asJava), defaultTopic) + new KafkaSink(sqlContext, specifiedKafkaParams, defaultTopic) } override def createRelation( @@ -248,8 +235,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim) val specifiedKafkaParams = kafkaParamsForProducer(parameters) - KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution, - new ju.HashMap[String, Object](specifiedKafkaParams.asJava), topic) + KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution, specifiedKafkaParams, + topic) /* This method is suppose to return a relation that reads the data that was written. * We cannot support this for Kafka. Therefore, in order to make things consistent, @@ -274,13 +261,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister options: DataSourceOptions): StreamingWriteSupport = { import scala.collection.JavaConverters._ - val spark = SparkSession.getActiveSession.get val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim) // We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable. val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap) - KafkaWriter.validateQuery( - schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic) + KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) new KafkaStreamingWriteSupport(topic, producerParams, schema) } @@ -481,6 +466,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { + private val serClassName = classOf[ByteArraySerializer].getName private val deserClassName = classOf[ByteArrayDeserializer].getName def getKafkaOffsetRangeLimit( @@ -515,6 +501,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { // If buffer config is not set, set it to reasonable value to work around // buffer issues (see KAFKA-3135) .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) + .setTokenJaasConfigIfNeeded() .build() def kafkaParamsForExecutors( @@ -536,6 +523,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { // If buffer config is not set, set it to reasonable value to work around // buffer issues (see KAFKA-3135) .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) + .setTokenJaasConfigIfNeeded() .build() /** @@ -568,11 +556,32 @@ private[kafka010] object KafkaSourceProvider extends Logging { this } + def setTokenJaasConfigIfNeeded(): ConfigUpdater = { + // There are multiple possibilities to log in and applied in the following order: + // - JVM global security provided -> try to log in with JVM global security configuration + // which can be configured for example with 'java.security.auth.login.config'. + // For this no additional parameter needed. + // - Token is provided -> try to log in with scram module using kafka's dynamic JAAS + // configuration. + if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) { + logDebug("JVM global security configuration detected, using it for login.") + } else if (KafkaSecurityHelper.isTokenAvailable()) { + logDebug("Delegation token detected, using it for login.") + val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) + val mechanism = kafkaParams + .getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM) + require(mechanism.startsWith("SCRAM"), + "Delegation token works only with SCRAM mechanism.") + set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) + } + this + } + def build(): ju.Map[String, Object] = map } private[kafka010] def kafkaParamsForProducer( - parameters: Map[String, String]): Map[String, String] = { + parameters: Map[String, String]): ju.Map[String, Object] = { val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) { throw new IllegalArgumentException( @@ -580,17 +589,26 @@ private[kafka010] object KafkaSourceProvider extends Logging { + "are serialized with ByteArraySerializer.") } - if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}")) - { + if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}")) { throw new IllegalArgumentException( s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is not supported as " + "value are serialized with ByteArraySerializer.") } + + val specifiedKafkaParams = convertToSpecifiedParams(parameters) + + ConfigUpdater("executor", specifiedKafkaParams) + .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName) + .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName) + .setTokenJaasConfigIfNeeded() + .build() + } + + private def convertToSpecifiedParams(parameters: Map[String, String]): Map[String, String] = { parameters .keySet .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) .map { k => k.drop(6).toString -> parameters(k) } - .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName) + .toMap } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala index 927c56d9ce829..0d831c3884609 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.kafka010 -import scala.collection.JavaConverters._ +import java.{util => ju} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute @@ -41,10 +41,12 @@ case object KafkaWriterCommitMessage extends WriterCommitMessage * @param schema The schema of the input data. */ class KafkaStreamingWriteSupport( - topic: Option[String], producerParams: Map[String, String], schema: StructType) + topic: Option[String], + producerParams: ju.Map[String, Object], + schema: StructType) extends StreamingWriteSupport { - validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic) + validateQuery(schema.toAttributes, producerParams, topic) override def createStreamingWriterFactory(): KafkaStreamWriterFactory = KafkaStreamWriterFactory(topic, producerParams, schema) @@ -62,7 +64,9 @@ class KafkaStreamingWriteSupport( * @param schema The schema of the input data. */ case class KafkaStreamWriterFactory( - topic: Option[String], producerParams: Map[String, String], schema: StructType) + topic: Option[String], + producerParams: ju.Map[String, Object], + schema: StructType) extends StreamingDataWriterFactory { override def createWriter( @@ -83,12 +87,12 @@ case class KafkaStreamWriterFactory( * @param inputSchema The attributes in the input data. */ class KafkaStreamDataWriter( - targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute]) + targetTopic: Option[String], + producerParams: ju.Map[String, Object], + inputSchema: Seq[Attribute]) extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { - import scala.collection.JavaConverters._ - private lazy val producer = CachedKafkaProducer.getOrCreate( - new java.util.HashMap[String, Object](producerParams.asJava)) + private lazy val producer = CachedKafkaProducer.getOrCreate(producerParams) def write(row: InternalRow): Unit = { checkForErrors() @@ -112,7 +116,7 @@ class KafkaStreamDataWriter( if (producer != null) { producer.flush() checkForErrors() - CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava)) + CachedKafkaProducer.close(producerParams) } } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala index 3f6fcf6b2e52c..b21037b1340ce 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala @@ -409,7 +409,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { */ val topic = newTopic() testUtils.createTopic(topic, 1) - val options = new java.util.HashMap[String, String] + val options = new java.util.HashMap[String, Object] options.put("bootstrap.servers", testUtils.brokerAddress) options.put("buffer.memory", "16384") // min buffer size options.put("block.on.buffer.full", "true") @@ -417,7 +417,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName) val inputSchema = Seq(AttributeReference("value", BinaryType)()) val data = new Array[Byte](15000) // large value - val writeTask = new KafkaStreamDataWriter(Some(topic), options.asScala.toMap, inputSchema) + val writeTask = new KafkaStreamDataWriter(Some(topic), options, inputSchema) try { val fieldTypes: Array[DataType] = Array(BinaryType) val converter = UnsafeProjection.create(fieldTypes) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala new file mode 100644 index 0000000000000..772fe4614bad0 --- /dev/null +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.UUID + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier +import org.apache.spark.internal.config._ + +class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach { + private val keytab = "/path/to/keytab" + private val kerberosServiceName = "kafka" + private val principal = "user@domain.com" + private val tokenId = "tokenId" + UUID.randomUUID().toString + private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString + + private var sparkConf: SparkConf = null + + override def beforeEach(): Unit = { + super.beforeEach() + sparkConf = new SparkConf() + } + + override def afterEach(): Unit = { + try { + resetUGI + } finally { + super.afterEach() + } + } + + private def addTokenToUGI(): Unit = { + val token = new Token[KafkaDelegationTokenIdentifier]( + tokenId.getBytes, + tokenPassword.getBytes, + KafkaTokenUtil.TOKEN_KIND, + KafkaTokenUtil.TOKEN_SERVICE + ) + val creds = new Credentials() + creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token) + UserGroupInformation.getCurrentUser.addCredentials(creds) + } + + private def resetUGI: Unit = { + UserGroupInformation.setLoginUser(null) + } + + test("isTokenAvailable without token should return false") { + assert(!KafkaSecurityHelper.isTokenAvailable()) + } + + test("isTokenAvailable with token should return true") { + addTokenToUGI() + + assert(KafkaSecurityHelper.isTokenAvailable()) + } + + test("getTokenJaasParams with token no service should throw exception") { + addTokenToUGI() + + val thrown = intercept[IllegalArgumentException] { + KafkaSecurityHelper.getTokenJaasParams(sparkConf) + } + + assert(thrown.getMessage contains "Kerberos service name must be defined") + } + + test("getTokenJaasParams with token should return scram module") { + addTokenToUGI() + sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName) + + val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf) + + assert(jaasParams.contains("ScramLoginModule required")) + assert(jaasParams.contains("tokenauth=true")) + assert(jaasParams.contains(tokenId)) + assert(jaasParams.contains(tokenPassword)) + } +} diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index ea18b7e035915..333572e99b1c7 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -28,8 +28,6 @@ spark-streaming-kafka-0-10_2.12 streaming-kafka-0-10 - - 2.1.0 jar Spark Integration for Kafka 0.10 diff --git a/pom.xml b/pom.xml index 3ca2f739ce0ea..dfc3c540dc18e 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,8 @@ 1.2.1.spark2 1.2.1 + + 2.1.0 10.12.1.1 1.10.0 1.5.3