-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix broken error handling around Idempotent producer + Ensure strict ordering when Net.MaxOpenRequests = 1 #2943
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Signed-off-by: Richard Artoul <richardartoul@gmail.com>
Hmm... I think it may still be possible for concurrent requests to be issued sendResponse writes to a channel, and once the response is picked up off that channel there could be an inflight request from the next request + an inflight retry actually. I'm not sure how to resolve that other then somehow "waiting" for a response to be processed (either failing as an error or being retried) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m somewhat concerned by the number of panic()
s in the code. Are these really so serious that they need to potentially crash the whole program? and/or kill off a processing goroutine, without any information up-the-chain that processing has terminated?
@@ -249,6 +250,19 @@ func (pe ProducerError) Unwrap() error { | |||
type ProducerErrors []*ProducerError | |||
|
|||
func (pe ProducerErrors) Error() string { | |||
if len(pe) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this ever actually be produced with zero messages?
If it’s unlikely to ever happen with len(pe) == 0
then that should be the guard condition, and then the complex error message should be the unindented path.
@@ -695,6 +709,9 @@ func (pp *partitionProducer) dispatch() { | |||
// All messages being retried (sent or not) have already had their retry count updated | |||
// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer. | |||
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 { | |||
if msg.hasSequence { | |||
panic("assertion failure: reassigning producer epoch and sequence number to message that already has them") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://go.dev/wiki/CodeReviewComments#dont-panic
Is the condition here so bad that we need to panic? (That is, is it entirely unrecoverable?)
Logger.Println( | ||
"assertion failed: message out of sequence added to batch", | ||
"producer_id", | ||
ps.producerID, | ||
set.recordsToSend.RecordBatch.ProducerID, | ||
"producer_epoch", | ||
ps.producerEpoch, | ||
set.recordsToSend.RecordBatch.ProducerEpoch, | ||
"sequence_number", | ||
msg.sequenceNumber, | ||
set.recordsToSend.RecordBatch.FirstSequence, | ||
"buffer_count", | ||
ps.bufferCount, | ||
"msg_has_sequence", | ||
msg.hasSequence) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend leaving the log message on the same line as the method call, so that it’s easily findable via grep
, and otherwise one-line isolates well.
Additionally, if the line is long enough to break up, then the possibility of adding even more fields is high, so each entry should end with a comma and a newline, so adding new fiels to the end of the call don’t produce unnecessary line changes, where the only change is in punctuation due to syntax requirements.
Then I would pair up each log field name with the log field value, all together:
Logger.Println("assertion failed: message out of sequence added to batch",
"producer_id", ps.producerID, set.recordsToSend.RecordBatch.ProducerID,
"producer_epoch", ps.producerEpoch, set.recordsToSend.RecordBatch.ProducerEpoch,
"sequence_number", msg.sequenceNumber, set.recordsToSend.RecordBatch.FirstSequence,
"buffer_count", ps.bufferCount,
"msg_has_sequence", msg.hasSequence,
)
} | ||
|
||
if !succeeded { | ||
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader, no more retries\n", topic, partition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Newlines at the end should be unnecessary for loggers? (I mean, this is generally the case, but I don’t know if that is specifically true here.)
Three %
verbs are specified but only two arguments are given.
// as expected. This retry loop is very important since prematurely (and unnecessarily) failing | ||
// an idempotent batch is ~equivalent to data loss. | ||
succeeded := false | ||
for i := 0; i < p.conf.Producer.Retry.Max; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest using a different variable name if we’re trying retries/tries rather than indices.
[off-by-one smell] Are we counting retries, or tries? That is, if I’ve asked for 5 retries max, then that’s 6 total tries.
any progress on this ?I think this bug is very urgent to fix |
As we are seeing also a lot of |
I guess @richardartoul has not answered to @puellanivis review comments yet? @richardartoul : Do you think you can continue with this PR, or is this PR abandoned? |
We are waiting for this MR to be continued |
Hey guys, I'm really sorry but I wasn't able to convince myself this P.R was correct or materially improved the situation. There is definitely a bug in the library, but I'm suspicious there is more than one issue and I timeboxed myself to two days on this and don't have more time to continue trying to fix it |
There have been numerous issues filed lately that:
Net.MaxOpenRequests
is set to 1.I was able to reproduce these issues both locally and in a staging environment and have fixed both of them. P.R has many comments explaining the changes.
Relevant issues: