diff --git a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/JsonUtils.scala b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/JsonUtils.scala index 68b8a28a6ed69..5a966eec33992 100644 --- a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/JsonUtils.scala +++ b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/JsonUtils.scala @@ -15,7 +15,7 @@ * limitations under the License. */ /* - * Copyright © 2016 Databricks, Inc. + * Copyright (C) 2016 Databricks, Inc. * * Portions of this software incorporate or are derived from software contained within Apache Spark, * and this modified software differs from the Apache Spark software provided under the Apache diff --git a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaCluster.scala b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaCluster.scala index 77a6db83e5af2..ce11f42ea180d 100644 --- a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaCluster.scala +++ b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaCluster.scala @@ -15,7 +15,7 @@ * limitations under the License. */ /* - * Copyright © 2016 Databricks, Inc. + * Copyright (C) 2016 Databricks, Inc. * * Portions of this software incorporate or are derived from software contained within Apache Spark, * and this modified software differs from the Apache Spark software provided under the Apache diff --git a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSource.scala b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSource.scala index ecfdc4461d9b0..3383d67d690bf 100644 --- a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSource.scala +++ b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSource.scala @@ -15,7 +15,7 @@ * limitations under the License. */ /* - * Copyright © 2016 Databricks, Inc. + * Copyright (C) 2016 Databricks, Inc. * * Portions of this software incorporate or are derived from software contained within Apache Spark, * and this modified software differs from the Apache Spark software provided under the Apache diff --git a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSourceOffset.scala b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSourceOffset.scala index 24c76a8b202db..bea33b8845458 100644 --- a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSourceOffset.scala +++ b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSourceOffset.scala @@ -15,7 +15,7 @@ * limitations under the License. */ /* - * Copyright © 2016 Databricks, Inc. + * Copyright (C) 2016 Databricks, Inc. * * Portions of this software incorporate or are derived from software contained within Apache Spark, * and this modified software differs from the Apache Spark software provided under the Apache diff --git a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSourceProvider.scala b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSourceProvider.scala index e4035707b9508..c0469151993d2 100644 --- a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSourceProvider.scala +++ b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSourceProvider.scala @@ -15,7 +15,7 @@ * limitations under the License. */ /* - * Copyright © 2016 Databricks, Inc. + * Copyright (C) 2016 Databricks, Inc. * * Portions of this software incorporate or are derived from software contained within Apache Spark, * and this modified software differs from the Apache Spark software provided under the Apache @@ -150,6 +150,13 @@ private[kafka08] class KafkaSourceProvider extends StreamSourceProvider s"Option 'kafka.bootstrap.servers' must be specified for " + s"configuring Kafka consumer") } + + for (unsupportedOption <- UNSUPPORTED_OPTIONS) { + if (caseInsensitiveParams.contains(unsupportedOption.toLowerCase)) { + throw new IllegalArgumentException( + s"'$unsupportedOption' is not supported in Kafka 0.8 source") + } + } } override def shortName: String = "kafka08" @@ -158,4 +165,5 @@ private[kafka08] class KafkaSourceProvider extends StreamSourceProvider private[kafka08] object KafkaSourceProvider { private val STRATEGY_OPTION_KEYS = Set("subscribe", "assign") private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets" + private val UNSUPPORTED_OPTIONS = Set("subscribePattern", "failOnDataLoss") } diff --git a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSourceRDD.scala b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSourceRDD.scala index 6f09c56339ab2..d375cb4fd1b2c 100644 --- a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSourceRDD.scala +++ b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/KafkaSourceRDD.scala @@ -15,7 +15,7 @@ * limitations under the License. */ /* - * Copyright © 2016 Databricks, Inc. + * Copyright (C) 2016 Databricks, Inc. * * Portions of this software incorporate or are derived from software contained within Apache Spark, * and this modified software differs from the Apache Spark software provided under the Apache diff --git a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/StartingOffsets.scala b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/StartingOffsets.scala index b4ca9d541632f..0880202a7b69d 100644 --- a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/StartingOffsets.scala +++ b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/StartingOffsets.scala @@ -15,7 +15,7 @@ * limitations under the License. */ /* - * Copyright © 2016 Databricks, Inc. + * Copyright (C) 2016 Databricks, Inc. * * Portions of this software incorporate or are derived from software contained within Apache Spark, * and this modified software differs from the Apache Spark software provided under the Apache diff --git a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/package-info.java b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/package-info.java index 152e95893cbe8..ac84090d61328 100644 --- a/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/package-info.java +++ b/external/kafka-0-8-sql/src/main/scala/org/apache/spark/sql/kafka08/package-info.java @@ -15,7 +15,7 @@ * limitations under the License. */ /* - * Copyright © 2016 Databricks, Inc. + * Copyright (C) 2016 Databricks, Inc. * * Portions of this software incorporate or are derived from software contained within Apache Spark, * and this modified software differs from the Apache Spark software provided under the Apache diff --git a/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/JsonUtilsSuite.scala b/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/JsonUtilsSuite.scala index 20661a54fe3eb..c5c44fc9ec18b 100644 --- a/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/JsonUtilsSuite.scala +++ b/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/JsonUtilsSuite.scala @@ -14,6 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ package org.apache.spark.sql.kafka08 diff --git a/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/KafkaSourceOffsetSuite.scala b/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/KafkaSourceOffsetSuite.scala index d706f27f94174..19c51a8f351e1 100644 --- a/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/KafkaSourceOffsetSuite.scala +++ b/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/KafkaSourceOffsetSuite.scala @@ -14,6 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ package org.apache.spark.sql.kafka08 diff --git a/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/KafkaSourceSuite.scala b/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/KafkaSourceSuite.scala index 3887c8910d303..90c7b223cbf2f 100644 --- a/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/KafkaSourceSuite.scala +++ b/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/KafkaSourceSuite.scala @@ -14,6 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ package org.apache.spark.sql.kafka08 @@ -440,6 +448,26 @@ class KafkaSourceSuite extends KafkaSourceTest { query.stop() } + test("unsupported options") { + def testUnsupportedOption(key: String, value: String = "someValue"): Unit = { + val ex = intercept[IllegalArgumentException] { + val reader = spark + .readStream + .format("kafka08") + .option("subscribe", "topic") + .option("kafka.bootstrap.servers", "somehost") + .option(key, value) + reader.load() + } + for (msg <- Seq("0.8", "not supported", key)) { + assert(ex.getMessage.contains(msg)) + } + } + + testUnsupportedOption("subscribePattern") + testUnsupportedOption("failOnDataLoss") + } + private def testFromLatestOffsets( topic: String, addPartitions: Boolean, diff --git a/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/KafkaTestUtils.scala b/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/KafkaTestUtils.scala index 0005a52ad7449..7caf01dd1892a 100644 --- a/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/KafkaTestUtils.scala +++ b/external/kafka-0-8-sql/src/test/scala/org/apache/spark/sql/kafka08/KafkaTestUtils.scala @@ -14,6 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +/* + * Copyright (C) 2016 Databricks, Inc. + * + * Portions of this software incorporate or are derived from software contained within Apache Spark, + * and this modified software differs from the Apache Spark software provided under the Apache + * License, Version 2.0, a copy of which you may obtain at + * http://www.apache.org/licenses/LICENSE-2.0 + */ package org.apache.spark.sql.kafka08