-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#30870]: support consumer polling timeout in KafkaIO expansion service #30915
Conversation
Assigning reviewers. If you would like to opt out of this review, comment R: @damccorm for label python. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! One small comment regarding docs.
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Outdated
Show resolved
Hide resolved
Co-authored-by: Jonathan Sabbagh <108473809+jbsabbagh@users.noreply.github.com>
…KafkaIO.java Co-authored-by: Jonathan Sabbagh <108473809+jbsabbagh@users.noreply.github.com>
…KafkaIO.java Co-authored-by: Jonathan Sabbagh <108473809+jbsabbagh@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! (and thanks @jbsabbagh for the reviews!)
Could you please fix the lint/spotless issues? Otherwise this LGTM
./gradlew :sdks:java:io:kafka:spotlessApply
should fix the java spotless issues. https://cwiki.apache.org/confluence/display/BEAM/Python+Tips#PythonTips-LintandFormattingChecks describes how to fix the python linting issues
@xianhualiu any update here? |
@damccorm please merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Looks like Also there is report of KafkaIO added option breaking upgrade compatibility: #30197 (comment) |
@@ -75,6 +75,7 @@ public class KafkaIOTranslationTest { | |||
READ_TRANSFORM_SCHEMA_MAPPING.put( | |||
"getValueDeserializerProvider", "value_deserializer_provider"); | |||
READ_TRANSFORM_SCHEMA_MAPPING.put("getCheckStopReadingFn", "check_stop_reading_fn"); | |||
READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerPollingTimeout", "consumer_polling_timeout"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fix is not complete. Previously, the error was
java.lang.AssertionError: Method getConsumerPollingTimeout will not be tracked when upgrading the 'KafkaIO.Read' transform. Please update 'KafkaIOTranslation.KafkaIOReadWithMetadataTranslator' to track the new method and update this test.
adding this line, the error became
java.lang.AssertionError: Field name consumer_polling_timeout was not found in the read transform schema defined in KafkaIOReadWithMetadataTranslator.
…xpansion service (apache#30915)" This reverts commit a44c4f1.
addresses #30870. Previous PR (#30877) made consumer polling timeout configurable for KafkaIO.Read in Java SDK. This PR contains changes to make the same configurable for Python SDK with the new configuration variable: consumerPollingTimeoutSeconds., which must be greater than 0. If not specified, the default will be 2 second.