Skip to content

Commit

Permalink
[SC-5465] Throw errors for unsupported options in Kafka 0.8 source
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

A follow up PR for apache#161 to disallow unsupported options.

## How was this patch tested?

`test("unsupported options")`

Author: Shixiong Zhu <shixiong@databricks.com>

Closes apache#169 from zsxwing/kafka08-errors.
  • Loading branch information
zsxwing committed Jan 17, 2017
1 parent f9c9cd8 commit d7c59b2
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
/*
* Copyright © 2016 Databricks, Inc.
* Copyright (C) 2016 Databricks, Inc.
*
* Portions of this software incorporate or are derived from software contained within Apache Spark,
* and this modified software differs from the Apache Spark software provided under the Apache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
/*
* Copyright © 2016 Databricks, Inc.
* Copyright (C) 2016 Databricks, Inc.
*
* Portions of this software incorporate or are derived from software contained within Apache Spark,
* and this modified software differs from the Apache Spark software provided under the Apache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
/*
* Copyright © 2016 Databricks, Inc.
* Copyright (C) 2016 Databricks, Inc.
*
* Portions of this software incorporate or are derived from software contained within Apache Spark,
* and this modified software differs from the Apache Spark software provided under the Apache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
/*
* Copyright © 2016 Databricks, Inc.
* Copyright (C) 2016 Databricks, Inc.
*
* Portions of this software incorporate or are derived from software contained within Apache Spark,
* and this modified software differs from the Apache Spark software provided under the Apache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
/*
* Copyright © 2016 Databricks, Inc.
* Copyright (C) 2016 Databricks, Inc.
*
* Portions of this software incorporate or are derived from software contained within Apache Spark,
* and this modified software differs from the Apache Spark software provided under the Apache
Expand Down Expand Up @@ -150,6 +150,13 @@ private[kafka08] class KafkaSourceProvider extends StreamSourceProvider
s"Option 'kafka.bootstrap.servers' must be specified for " +
s"configuring Kafka consumer")
}

for (unsupportedOption <- UNSUPPORTED_OPTIONS) {
if (caseInsensitiveParams.contains(unsupportedOption.toLowerCase)) {
throw new IllegalArgumentException(
s"'$unsupportedOption' is not supported in Kafka 0.8 source")
}
}
}

override def shortName: String = "kafka08"
Expand All @@ -158,4 +165,5 @@ private[kafka08] class KafkaSourceProvider extends StreamSourceProvider
private[kafka08] object KafkaSourceProvider {
private val STRATEGY_OPTION_KEYS = Set("subscribe", "assign")
private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private val UNSUPPORTED_OPTIONS = Set("subscribePattern", "failOnDataLoss")
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
/*
* Copyright © 2016 Databricks, Inc.
* Copyright (C) 2016 Databricks, Inc.
*
* Portions of this software incorporate or are derived from software contained within Apache Spark,
* and this modified software differs from the Apache Spark software provided under the Apache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
/*
* Copyright © 2016 Databricks, Inc.
* Copyright (C) 2016 Databricks, Inc.
*
* Portions of this software incorporate or are derived from software contained within Apache Spark,
* and this modified software differs from the Apache Spark software provided under the Apache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/
/*
* Copyright © 2016 Databricks, Inc.
* Copyright (C) 2016 Databricks, Inc.
*
* Portions of this software incorporate or are derived from software contained within Apache Spark,
* and this modified software differs from the Apache Spark software provided under the Apache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2016 Databricks, Inc.
*
* Portions of this software incorporate or are derived from software contained within Apache Spark,
* and this modified software differs from the Apache Spark software provided under the Apache
* License, Version 2.0, a copy of which you may obtain at
* http://www.apache.org/licenses/LICENSE-2.0
*/

package org.apache.spark.sql.kafka08

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2016 Databricks, Inc.
*
* Portions of this software incorporate or are derived from software contained within Apache Spark,
* and this modified software differs from the Apache Spark software provided under the Apache
* License, Version 2.0, a copy of which you may obtain at
* http://www.apache.org/licenses/LICENSE-2.0
*/

package org.apache.spark.sql.kafka08

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2016 Databricks, Inc.
*
* Portions of this software incorporate or are derived from software contained within Apache Spark,
* and this modified software differs from the Apache Spark software provided under the Apache
* License, Version 2.0, a copy of which you may obtain at
* http://www.apache.org/licenses/LICENSE-2.0
*/

package org.apache.spark.sql.kafka08

Expand Down Expand Up @@ -440,6 +448,26 @@ class KafkaSourceSuite extends KafkaSourceTest {
query.stop()
}

test("unsupported options") {
def testUnsupportedOption(key: String, value: String = "someValue"): Unit = {
val ex = intercept[IllegalArgumentException] {
val reader = spark
.readStream
.format("kafka08")
.option("subscribe", "topic")
.option("kafka.bootstrap.servers", "somehost")
.option(key, value)
reader.load()
}
for (msg <- Seq("0.8", "not supported", key)) {
assert(ex.getMessage.contains(msg))
}
}

testUnsupportedOption("subscribePattern")
testUnsupportedOption("failOnDataLoss")
}

private def testFromLatestOffsets(
topic: String,
addPartitions: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (C) 2016 Databricks, Inc.
*
* Portions of this software incorporate or are derived from software contained within Apache Spark,
* and this modified software differs from the Apache Spark software provided under the Apache
* License, Version 2.0, a copy of which you may obtain at
* http://www.apache.org/licenses/LICENSE-2.0
*/

package org.apache.spark.sql.kafka08

Expand Down

0 comments on commit d7c59b2

Please sign in to comment.