-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage: remove kafka producer limits in sinks #24784
Conversation
options.insert("queue.buffering.max.kbytes", "2147483647".into()); | ||
// Disable the default buffer limit of 100k messages. We don't want to impose any limit | ||
// here as the operator has nothing better to do with the data than to buffer them. | ||
options.insert("queue.buffering.max.messages", "0".into()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not have the fix to confluentinc/librdkafka#4018 in our fork? might explain the test failures
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't :(
The kafka sink operator has nothing better to do with incoming data other than to buffer them, which provides no additional value than to send them to `librdkafka` and let that buffer them instead. The operator is already set up to never buffer messages that are ready to send but the current limits are too conservative. If a large snapshot arrives fast enough then it is possible to reach the 10M message limit and cause the sink to restart. I have observed this happening locally during benchmarking. For this reason this PR disables rdkafka message count and size limits. Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
ccd34d2
to
da3d237
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fine as long as ci passes! we might want to come back and add LD flags for this
da3d237
to
a8f4ee4
Compare
b29320f
to
79a5814
Compare
Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
79a5814
to
83e5b88
Compare
Now that we're on librdkafka v2.4.0, we don't need to catch and retry QueueFull errors, but can instead disable the queue limit. This commit is a combination of: * Reverting the QueueFull workaround from MaterializeInc#24871 * Reapplying Petros's original implementation in MaterializeInc#24784 Co-authored-by: Petros Angelatos <petrosagg@gmail.com>
Now that we're on librdkafka v2.4.0, we don't need to catch and retry QueueFull errors, but can instead disable the queue limit. This commit is a combination of: * Reverting the QueueFull workaround from MaterializeInc#24871 * Reapplying Petros's original implementation in MaterializeInc#24784 Co-authored-by: Petros Angelatos <petrosagg@gmail.com>
Motivation
The kafka sink operator has nothing better to do with incoming data other than to buffer them, which provides no additional value than to send them to
librdkafka
and let that buffer them instead.The operator is already set up to never buffer messages that are ready to send but the current limits are too conservative. If a large snapshot arrives fast enough then it is possible to reach the 10M message limit and cause the sink to restart. I have observed this happening locally during benchmarking.
For this reason this PR disables rdkafka message count and size limits.
Tips for reviewer
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.