-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26350][SS]Allow to override group id of the Kafka consumer #23301
Conversation
cc @tdas |
Test build #100048 has finished for PR 23301 at commit
|
Test build #100050 has finished for PR 23301 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. Some nits, and might better to have better test if possible.
s"Kafka option '${ConsumerConfig.GROUP_ID_CONFIG}' is not supported as " + | ||
s"user-specified consumer groups are not used to track offsets.") | ||
logWarning( | ||
s"It is not recommended to set Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}'. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The long string for log message is duplicated 3 times. Could we move this and reuse it like KafkaSourceProvider.INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE
and KafkaSourceProvider.INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Updated.
set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`, default value | ||
is "spark-kafka-source". | ||
set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`, | ||
default value is "spark-kafka-source". You can also set "kafka.group.id" to force Spark to use a special |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same here
@@ -379,7 +379,25 @@ The following configurations are optional: | |||
<td>string</td> | |||
<td>spark-kafka-source</td> | |||
<td>streaming and batch</td> | |||
<td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming queries</td> | |||
<td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming | |||
queries. If "kafka.group.id" is set, this option will be ignored. </td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Given that other option is wrapped with `, might better to follow same rule for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Given that other option is wrapped with `, might better to follow same rule for consistency.
We don't have such rule. See the doc of failOnDataLoss
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. I think I chose word incorrectly. Many options are wrapped with ` so felt we are having implicit rule on that. please ignore if the approach on representation is already not consistent.
unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the | ||
same group id are likely interfere with each other causing each query to read only part of the | ||
data. This may also occur when queries are started/restarted in quick succession. To minimize such | ||
issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same here and below line
.as[String] | ||
.map(_.toInt) | ||
|
||
testStream(dsKafka)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we just run query and see whether it works with fixed group id, and I guess result is actually not affected whether the option is applied or not.
Is there any way to verify whether the group.id value is properly set to Kafka parameter? We could ignore if there's no way to get it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we don't have an api to check this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining. Then looks OK to me.
testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2)) | ||
|
||
val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom")) | ||
checkAnswer(df, (1 to 30).map(_.toString).toDF()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Test build #100115 has finished for PR 23301 at commit
|
reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed") | ||
val message = | ||
if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { | ||
s"$deletedPartitions are gone. " + KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit .. I would use string interpolation tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ignore this if other changes are ready. It just bugged me while reading the codes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally either is fine for me (use +
or string interpolation), but in this case it might be better for error message to be added to former string since former strong is already using string interpolation.
Test build #100256 has finished for PR 23301 at commit
|
retest this please |
Test build #100367 has finished for PR 23301 at commit
|
@@ -534,7 +546,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { | |||
parameters: Map[String, String], | |||
metadataPath: String): String = { | |||
val groupIdPrefix = parameters | |||
.getOrElse("groupIdPrefix", "spark-kafka-source") | |||
.getOrElse(GROUP_ID_PREFIX, "spark-kafka-source") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the parameters
map is not lowercased but GROUP_ID_PREFIX
is lower.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is actually a fix. org.apache.spark.sql.sources.v2.DataSourceOptions.asMap
will return a map that all keys are lower case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see now, it was not clear from the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I updated the description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
source has its own consumer group that does not face interference from any other consumer, and | ||
therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, | ||
Kafka group-based authorization), you may want to use a specific authorized group id to read data. | ||
You can optionally set the group ID. However, do this with extreme caution as it can cause |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ID -> id
offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { | ||
s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" | ||
} else { | ||
s"$deletedPartitions are gone. Some data may have been missed" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missed .
(add period)
Test build #101094 has finished for PR 23301 at commit
|
LGTM. |
Thanks! Merging to master. |
Hi Team, Sorry for intruding into this discussion but I have a query regarding 'groupIdPrefix', currently with the version I work with doesn't support its usage and despite the property is set during the creation of kafka read stream, I still see 'spark-kafka-source' as the group id generation string prefix. Versions of the libraries I use: spark-streaming-kafka-0-10_2.11: 2.3.1 Kindly let me know which version should have this latest fix for 'groupIdPrefix' usage. Again, sorry if I misuse this thread for posting the query. Thanks, |
@joykrishna This PR looks like landed only master branch which will be released as 3.0.0. |
@HeartSaVioR Thanks for the clarification. Any idea by when we are expecting this release? |
@joykrishna good question, roughly 5-6 months but no promise. It's always faster if this PR is backported. |
@gaborgsomogyi Thanks for the quick reply. Kindly let me know if there is any alternate for using a custom prefix till that point of time. |
@joykrishna I don't think so it can be done without this change. Would like to help you but backport is really a committer possibility. |
@gaborgsomogyi I understand it. Thanks for the response. Looking forward to see this fix at the earliest as using custom kafka consumer group prefixes is needful when we work with a third party Kafka providers and thereby to distinguish our consumer group patterns. |
@joykrishna Just to be clear. This is a new feature rather than a bug fix. We don't backport new features like this to maintenance branches. Hence, the next available version for this will be 3.0.0. If you cannot wait for the next release, you can try to backport related patches by yourself and build your own Kafka connector. |
@zsxwing Ah, my bad. Understood that it is a new feature and sure I will take a stab at the option you mentioned. Thanks. |
## What changes were proposed in this pull request? This PR allows the user to override `kafka.group.id` for better monitoring or security. The user needs to make sure there are not multiple queries or sources using the same group id. It also fixes a bug that the `groupIdPrefix` option cannot be retrieved. ## How was this patch tested? The new added unit tests. Closes apache#23301 from zsxwing/SPARK-26350. Authored-by: Shixiong Zhu <zsxwing@gmail.com> Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
What changes were proposed in this pull request?
This PR allows the user to override
kafka.group.id
for better monitoring or security. The user needs to make sure there are not multiple queries or sources using the same group id.It also fixes a bug that the
groupIdPrefix
option cannot be retrieved.How was this patch tested?
The new added unit tests.