Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-ordering of offset commit requests can cause committed offset to move "backwards". #2940

Closed
prestona opened this issue Jul 19, 2024 · 1 comment
Assignees

Comments

@prestona
Copy link
Member

Description

Offsets committed by multiple consumer group sessions can be re-ordered by the time they arrive at Kafka, potentially resulting in duplicate message delivery, should the offset be used to resume consumption.

If a single sarama.ConsumerGroup is consuming from more than one partition, then reordering of offset commit requests can occur.
This is because of a race condition between the goroutines used to run the ConsumerClaim method for different topic partitions.

When ConsumerGroupSession.Commit() is called, this ends up calling into offsetManager.flushToBroker().
This in turn:

  1. Builds a offset commit request, using any so-far uncommitted offsets across all of the partitions belonging to the group
  2. Finds the group's coordinator (if not already known)
  3. Sends the offset commit request to the broker that is the group coordinator.

However, there is no locking to prevent the interleaving of two (or more) go routines between the building an offset commit request, and sending the request to Kafka.

For example, consider the following interactions between two goroutines P0 and P1 - each belonging to the same consumer group, but consuming from different partitions of a topic:

  1. P0 marks offset 10 and calls commit (the uncommitted offsets tracked by the session are: {P0 -> 11})
  2. P0 builds the offset commit request (containing {P0 -> 11}).
  3. P0 goroutine yields, and P1 goroutine is scheduled
  4. P1 marks offset 20 and calls commit (the uncommitted offsets tracked by the session are: {P0 -> 11, P1 -> 21})
  5. P1 goroutine yields, and P0 goroutine is scheduled
  6. P0 sends the offset commit ({P0 -> 11}) request, gets the response, and resumes running code in the ConsumerClaim method
  7. P0 marks offset 11 and calls commit (the uncommitted offsets tracked by the session are {0->12, 1->21})
  8. P0 builds the offset commit request (containing {0->12, 1->21}).
  9. P0 sends its offset commit request (containing {0->12, 1->21})
  10. P0 goroutine yields, and P1 goroutine is scheduled
  11. P1 sends its offset commit request (containing {P0 -> 11, P1 -> 21}).

From the Kafka broker's perspective, it receives:

  1. Offset commit request (step 6): {P0 -> 11}
  2. Offset commit request (step 9) {P0 -> 12, P2 -> 21}
  3. Offset commit request (step 10): {P0 -> 11, P2 -> 21}

This causes the offset committed for P0 to move "backwards" from 12 to 11.

Due to the nature of this race condition, it is more likely to occur if:

  • A single Sarama ConsumerGroup is consuming from many partitions (at least 2 are required to trigger this bug)
  • The application Frequently manual commits offsets (e.g. on each message that it consumes)
  • Enough messages are present on the partitions for multiple messages to be delivered to a single call of the ConsumerClaim method
  • Non-zero latency between client and broker (e.g. not both running on the same laptop).
Versions
Sarama Kafka Go
main (d2246cc) 3.6 and 3.8 1.21.10
Configuration
	config := sarama.NewConfig()
	config.Version = sarama.V3_3_0_0
	config.Consumer.Return.Errors = true
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	config.Consumer.Offsets.AutoCommit.Enable = false
Logs

Not applicable. Sarama does not emit any logging relating to this problem.

Additional Context
@prestona prestona self-assigned this Jul 19, 2024
prestona added a commit to prestona/sarama that referenced this issue Jul 19, 2024
Add locking to prevent concurrent calls to commit from potentially being
re-ordered between determining which offsets to include in the offset
commit request, and sending the request to Kafka.

Fixes: IBM#2940
Co-authored-by: Michael Burgess <michburg@uk.ibm.com>
Signed-off-by: Adrian Preston <PRESTONA@uk.ibm.com>
prestona added a commit to prestona/sarama that referenced this issue Jul 19, 2024
Add locking to prevent concurrent calls to commit from potentially being
re-ordered between determining which offsets to include in the offset
commit request, and sending the request to Kafka.

Move finding the coordinator before creating the request to avoid
holding the lock in the case the coordinator is unknown. This requires a
change to the "new offset manager" test, which previously didn't expect
the mock broker to receive a find coordinator request.

Fixes: IBM#2940
Co-authored-by: Michael Burgess <michburg@uk.ibm.com>
Signed-off-by: Adrian Preston <PRESTONA@uk.ibm.com>
prestona added a commit to prestona/sarama that referenced this issue Jul 23, 2024
Hold broker lock to prevent concurrent calls to commit from potentially
being re-ordered between determining which offsets to include in the
offset commit request, and sending the request to Kafka.

Move finding the coordinator before creating the request to avoid
holding the lock in the case the coordinator is unknown. This requires a
change to the "new offset manager" test, which previously didn't expect
the mock broker to receive a find coordinator request.

Fixes: IBM#2940
Co-authored-by: Michael Burgess <michburg@uk.ibm.com>
Signed-off-by: Adrian Preston <PRESTONA@uk.ibm.com>
@prestona
Copy link
Member Author

I've opened a couple of PR's for this one.

I think I favor #2941, but either should fix this issue.

@dnwe dnwe closed this as completed in 6b4f9be Aug 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant