Skip to content

Commit

Permalink
Remove token cache usage without delegation token
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi committed Nov 29, 2018
1 parent 1eceaa9 commit 2a0b1c3
Showing 1 changed file with 8 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -563,24 +563,16 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// For this no additional parameter needed.
// - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
// configuration.
// - Token not provided -> try to log in with kerberos module and ticket cache using kafka's
// dynamic JAAS configuration.
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
logDebug("JVM global security configuration detected, using it for login.")
} else {
if (KafkaSecurityHelper.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
val mechanism = kafkaParams
.getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
"Delegation token works only with SCRAM mechanism.")
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
} else {
logDebug("Using ticket cache for login.")
val jaasParams = KafkaTokenUtil.getTicketCacheJaasParams(SparkEnv.get.conf)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
}
} else if (KafkaSecurityHelper.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
val mechanism = kafkaParams
.getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
"Delegation token works only with SCRAM mechanism.")
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
}
this
}
Expand Down

0 comments on commit 2a0b1c3

Please sign in to comment.