Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26350][SS]Allow to override group id of the Kafka consumer #23301

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,25 @@ The following configurations are optional:
<td>string</td>
<td>spark-kafka-source</td>
<td>streaming and batch</td>
<td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries</td>
<td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming
queries. If "kafka.group.id" is set, this option will be ignored. </td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Given that other option is wrapped with `, might better to follow same rule for consistency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Given that other option is wrapped with `, might better to follow same rule for consistency.

We don't have such rule. See the doc of failOnDataLoss

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. I think I chose word incorrectly. Many options are wrapped with ` so felt we are having implicit rule on that. please ignore if the approach on representation is already not consistent.

</tr>
<tr>
<td>kafka.group.id</td>
<td>string</td>
<td>none</td>
<td>streaming and batch</td>
<td>The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution.
By default, each query generates a unique group id for reading data. This ensures that each Kafka
source has its own consumer group that does not face interference from any other consumer, and
therefore can read all of the partitions of its subscribed topics. In some scenarios (for example,
Kafka group-based authorization), you may want to use a specific authorized group id to read data.
You can optionally set the group ID. However, do this with extreme caution as it can cause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ID -> id

unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the
same group id are likely interfere with each other causing each query to read only part of the
data. This may also occur when queries are started/restarted in quick succession. To minimize such
issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here and below line

be very small. When this is set, option "groupIdPrefix" will be ignored. </td>
</tr>
</table>

Expand Down Expand Up @@ -592,8 +610,9 @@ for parameters related to writing data.
Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:

- **group.id**: Kafka source will create a unique group id for each query automatically. The user can
set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`, default value
is "spark-kafka-source".
set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`,
default value is "spark-kafka-source". You can also set "kafka.group.id" to force Spark to use a special
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here

group id, however, please read warnings for this option and use it with caution.
- **auto.offset.reset**: Set the source option `startingOffsets` to specify
where to start instead. Structured Streaming manages which offsets are consumed internally, rather
than rely on the kafka Consumer to do it. This will ensure that no data is missed when new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
import java.{util => ju}
import java.util.concurrent.TimeoutException

import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, OffsetOutOfRangeException}
import org.apache.kafka.common.TopicPartition

import org.apache.spark.TaskContext
Expand Down Expand Up @@ -167,7 +167,13 @@ class KafkaContinuousScanConfigBuilder(

val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
if (deletedPartitions.nonEmpty) {
reportDataLoss(s"Some partitions were deleted: $deletedPartitions")
val message = if (
offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
} else {
s"$deletedPartitions are gone. Some data may have been missed"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missed . (add period)

}
reportDataLoss(message)
}

val startOffsets = newPartitionOffsets ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io._
import java.nio.charset.StandardCharsets

import org.apache.commons.io.IOUtils
import org.apache.kafka.clients.consumer.ConsumerConfig

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -122,7 +123,13 @@ private[kafka010] class KafkaMicroBatchReadSupport(
// Find deleted partitions, and report data loss if required
val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
if (deletedPartitions.nonEmpty) {
reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
val message =
if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
} else {
s"$deletedPartitions are gone. Some data may have been missed"
}
reportDataLoss(message)
}

// Use the end partitions to calculate offset ranges to ignore partitions that have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
*/
private[kafka010] class KafkaOffsetReader(
consumerStrategy: ConsumerStrategy,
driverKafkaParams: ju.Map[String, Object],
val driverKafkaParams: ju.Map[String, Object],
readerOptions: Map[String, String],
driverGroupIdPrefix: String) extends Logging {
/**
Expand Down Expand Up @@ -81,7 +81,9 @@ private[kafka010] class KafkaOffsetReader(
assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
if (_consumer == null) {
val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
if (driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) == null) {
newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
}
_consumer = consumerStrategy.createConsumer(newKafkaParams)
}
_consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io._
import java.nio.charset.StandardCharsets

import org.apache.commons.io.IOUtils
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -241,7 +242,12 @@ private[kafka010] class KafkaSource(

val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
if (deletedPartitions.nonEmpty) {
reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
val message = if (kafkaReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
} else {
s"$deletedPartitions are gone. Some data may have been missed"
}
reportDataLoss(message)
}

// Use the until partitions to calculate offset ranges to ignore partitions that have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
// Validate user-specified Kafka options

if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.GROUP_ID_CONFIG}")) {
throw new IllegalArgumentException(
s"Kafka option '${ConsumerConfig.GROUP_ID_CONFIG}' is not supported as " +
s"user-specified consumer groups are not used to track offsets.")
logWarning(CUSTOM_GROUP_ID_ERROR_MESSAGE)
if (caseInsensitiveParams.contains(GROUP_ID_PREFIX)) {
logWarning("Option 'groupIdPrefix' will be ignored as " +
s"option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set.")
}
}

if (caseInsensitiveParams.contains(s"kafka.${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG}")) {
Expand Down Expand Up @@ -445,6 +447,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
private val MIN_PARTITIONS_OPTION_KEY = "minpartitions"
private val GROUP_ID_PREFIX = "groupidprefix"

val TOPIC_OPTION_KEY = "topic"

Expand All @@ -464,7 +467,16 @@ private[kafka010] object KafkaSourceProvider extends Logging {
| source option "failOnDataLoss" to "false".
""".stripMargin


val CUSTOM_GROUP_ID_ERROR_MESSAGE =
s"""Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set on this query, it is
| not recommended to set this option. This option is unsafe to use since multiple concurrent
| queries or sources using the same group id will interfere with each other as they are part
| of the same consumer group. Restarted queries may also suffer interference from the
| previous run having the same group id. The user should have only one query per group id,
| and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka
| consumers from the previous query are marked dead by the Kafka group coordinator before the
| restarted query starts running.
""".stripMargin

private val serClassName = classOf[ByteArraySerializer].getName
private val deserClassName = classOf[ByteArrayDeserializer].getName
Expand Down Expand Up @@ -515,7 +527,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")

// So that consumers in executors do not mess with any existing group id
.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
.setIfUnset(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")

// So that consumers in executors does not commit offsets unnecessarily
.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
Expand All @@ -534,7 +546,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
parameters: Map[String, String],
metadataPath: String): String = {
val groupIdPrefix = parameters
.getOrElse("groupIdPrefix", "spark-kafka-source")
.getOrElse(GROUP_ID_PREFIX, "spark-kafka-source")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the parameters map is not lowercased but GROUP_ID_PREFIX is lower.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is actually a fix. org.apache.spark.sql.sources.v2.DataSourceOptions.asMap will return a map that all keys are lower case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see now, it was not clear from the PR description.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I updated the description.

s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,33 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
)
}

test("allow group.id override") {
// Tests code path KafkaSourceProvider.{sourceSchema(.), createSource(.)}
// as well as KafkaOffsetReader.createConsumer(.)
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray, Some(0))
testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))

val dsKafka = spark
.readStream
.format("kafka")
.option("kafka.group.id", "id-" + Random.nextInt())
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.map(_.toInt)

testStream(dsKafka)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we just run query and see whether it works with fixed group id, and I guess result is actually not affected whether the option is applied or not.

Is there any way to verify whether the group.id value is properly set to Kafka parameter? We could ignore if there's no way to get it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we don't have an api to check this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining. Then looks OK to me.

makeSureGetOffsetCalled,
CheckAnswer(1 to 30: _*)
)
}

test("ensure stream-stream self-join generates only one offset in log and correct metrics") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 2)
Expand Down Expand Up @@ -1185,7 +1212,6 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("not supported"))
}

testUnsupportedConfig("kafka.group.id")
testUnsupportedConfig("kafka.auto.offset.reset")
testUnsupportedConfig("kafka.enable.auto.commit")
testUnsupportedConfig("kafka.interceptor.classes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,18 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
}

test("allow group.id overriding") {
// Tests code path KafkaSourceProvider.createRelation(.)
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray, Some(0))
testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1))
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2))

val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom"))
checkAnswer(df, (1 to 30).map(_.toString).toDF())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

}

test("read Kafka transactional messages: read_committed") {
val topic = newTopic()
testUtils.createTopic(topic)
Expand Down