-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12749: Changelog topic config on suppressed KTable lost #10664
Conversation
@ableegoldman Would appreciate it if you can take a look at this PR when you get a chance. |
cc any of @cadonna @vvcephei @lct45 @wcarlson5 @mjsax @guozhangwang to review this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
The implementation looks pretty good so far, but can we add some tests to make sure that configs are no longer los if either emitEarlyWhenFull
or shutDownWhenFull
is set after the logging config?
@@ -46,13 +48,13 @@ public void bufferBuilderShouldBeConsistent() { | |||
assertThat( | |||
"keys alone should be set", | |||
maxRecords(2L), | |||
is(new EagerBufferConfigImpl(2L, MAX_VALUE)) | |||
is(new EagerBufferConfigImpl(2L, MAX_VALUE, Collections.emptyMap())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vichu Can you add some tests that don't just use an empty map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
Added a few tests to verify the logConfig
map as well.
…art of the equals, hashCode & toString methods.
…ion is still set after the calls to withMaxBytes & withMaxRecords are applied.
Sorry, this got lost in my inbox. @vichu if you can rebase I think its good to go. do you agree @ableegoldman ? |
# Conflicts: # streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Suppressed.scala # streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/SuppressedTest.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java
Outdated
Show resolved
Hide resolved
…ls/suppress/EagerBufferConfigImpl.java
@vichu I took the liberty to commit the fix to the nit directly. I hope that is fine with you. |
I will merge as soon as the checks are done. |
The 7th build was green on the same commit. I do not know why a 8th build was started. |
Refactored
logConfig
to be passed appropriately when usingshutDownWhenFull
oremitEarlyWhenFull
. Removed the constructor that doesn't accept alogConfig
parameter so you're forced to specify it explicitly, whether it's empty/unspecified or not.Ticket: https://issues.apache.org/jira/browse/KAFKA-12749
Committer Checklist (excluded from commit message)