Skip to content

Commit

Permalink
Review fixes:
Browse files Browse the repository at this point in the history
 * Removed kafka.version from external modules
 * Some minor beautifying
  • Loading branch information
gaborgsomogyi committed Nov 27, 2018
1 parent 30df8f1 commit 36d05d2
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 24 deletions.
2 changes: 0 additions & 2 deletions external/kafka-0-10-sql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<properties>
<sbt.project.name>sql-kafka-0-10</sbt.project.name>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>2.1.0</kafka.version>
</properties>
<packaging>jar</packaging>
<name>Kafka 0.10+ Source for Structured Streaming</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,15 @@ class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
sparkConf = new SparkConf()
}

private def addTokenToUGI: Unit = {
override def afterEach(): Unit = {
try {
resetUGI
} finally {
super.afterEach()
}
}

private def addTokenToUGI(): Unit = {
val token = new Token[KafkaDelegationTokenIdentifier](
tokenId.getBytes,
tokenPassword.getBytes,
Expand All @@ -64,31 +72,23 @@ class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
}

test("getTokenJaasParams with token no service should throw exception") {
try {
addTokenToUGI

val thrown = intercept[IllegalArgumentException] {
KafkaSecurityHelper.getTokenJaasParams(sparkConf)
}
addTokenToUGI

assert(thrown.getMessage contains "Kerberos service name must be defined")
} finally {
resetUGI
val thrown = intercept[IllegalArgumentException] {
KafkaSecurityHelper.getTokenJaasParams(sparkConf)
}

assert(thrown.getMessage contains "Kerberos service name must be defined")
}

test("getTokenJaasParams with token should return scram module") {
try {
addTokenToUGI
sparkConf.set(KAFKA_KERBEROS_SERVICE_NAME, kerberosServiceName)
addTokenToUGI
sparkConf.set(KAFKA_KERBEROS_SERVICE_NAME, kerberosServiceName)

val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)

assert(jaasParams.get.contains("ScramLoginModule"))
assert(jaasParams.get.contains(tokenId))
assert(jaasParams.get.contains(tokenPassword))
} finally {
resetUGI
}
assert(jaasParams.get.contains("ScramLoginModule"))
assert(jaasParams.get.contains(tokenId))
assert(jaasParams.get.contains(tokenPassword))
}
}
2 changes: 0 additions & 2 deletions external/kafka-0-10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<properties>
<sbt.project.name>streaming-kafka-0-10</sbt.project.name>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>2.1.0</kafka.version>
</properties>
<packaging>jar</packaging>
<name>Spark Integration for Kafka 0.10</name>
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
<hive.version>1.2.1.spark2</hive.version>
<!-- Version used for internal directory structure -->
<hive.version.short>1.2.1</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>2.1.0</kafka.version>
<derby.version>10.12.1.1</derby.version>
<parquet.version>1.10.0</parquet.version>
Expand Down

0 comments on commit 36d05d2

Please sign in to comment.