Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset "resets" during rebalance #2782

Closed
4 of 7 tasks
jacekchmiel opened this issue Mar 26, 2020 · 6 comments
Closed
4 of 7 tasks

Offset "resets" during rebalance #2782

jacekchmiel opened this issue Mar 26, 2020 · 6 comments

Comments

@jacekchmiel
Copy link

jacekchmiel commented Mar 26, 2020

Description

Hi, before I get to the point I'd like to note that somewhat similar issue was reported already couple of times but each time the resolution was inconclusive.

In our setup we are using rust bindings to rdkafka and high level consumer with automatic commits and manual offset stores to achieve at least once delivery guarantee. Occasionally a situation occurs that our commited group offsets go back in time (i.e. reset to an offset commited a long time before). I'm still not entirely sure if i've found the root cause but I've found a way to reproduce that situation. I'd like to share the idea/code that reproduces this and ask for advice if this might be one of the following:

  • issue in librdkafka,
  • my misuse of librdkafka,
  • combination of both,
  • or maybe something completely different.

So, getting to the point, my guess was that automatic librdkafka commit takes place after a consumer got assigned a partition that was revoked from it before. This way, old, stored offsets are getting their way to the broker. I couldn't find a way to reproduce such situation with auto commits, so I went with manual commits taking place in the specific order between rebalance events. It goes like this:

  1. There are two consumers A and B that have the same group id. Both of them will subscribe to a single topic with single partition (topic[0]).
  2. Consumer A starts and gets assigned to topic[0].
  3. Consumer A receives first message (with offset 1). It stores this message using rd_kafka_offset_store. But does not commit it.
  4. Consumer B starts at this point, gets assigned to topic[0], consumer A gets assigned to empty set of partitions at the same time.
  5. Consumer B receives message with offset 1 (this is normal - it was not committed). Offset 1 is stored and committed synchronously using rd_kafka_commit(.., NULL, ..)
  6. Consumer B collects another message (offset 2), and commits it.
  7. Consumer B goes down
  8. Consumer A gets Revoke event. In the rebalance callback rd_kafka_assign(.., NULL) is called.
  9. Consumer A gets Assign event. rd_kafka_assign is called with topic_partition_list prodivded by rebalance callback. Then, a rd_kafka_commit(.., NULL, ..) is called. The offset 1 gets committed second time. If I'm not missing something, offset store should be reset during rd_kafka_assign, which provides RD_KAFKA_OFFSET_INVALID offset for the partition.

A workaround/fix that works with this case is to call rd_kafka_offsets_store when handling revoke, with previously assigned partitions and offsets set to RD_KAFKA_OFFSET_INVALID (-1001).

Am I missing something or doing something obviously wrong here? And is it possible that such situation might occur with automatic commit?

How to reproduce

You could write code that executes steps described above or I can create minimal-ish example of rust code that helps expose such situation. Unfortunately I didn't use the latest librdkafka as it's not yet available in rust-rdkafka crate (https://github.com/fede1024/rust-rdkafka). You can find some logs (debug=consumer,cgrp,topic) in link below.

Checklist

  • librdkafka version (release number or git tag): librdkafka: v1.3.0, rdkafka-sys: 1.3.1, rdkafka 0.23.1
  • Apache Kafka version:
  • librdkafka client configuration: ("bootstrap.servers", "localhost:9092"), ("group.id", "foo-test"), ("enable.auto.offset.store", "false", ("enable.auto.commit", "false"), ("auto.offset.reset", "earliest")
  • Operating system: Reproduced on macos catalina 10.15.3
  • Provide logs (with debug=.. as necessary) from librdkafka: https://gist.github.com/jacekchmiel/d80a01aa1b9b37aa70b32b333c3d5197
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

edenhill commented Apr 3, 2020

At step 8 I guess you mean Consumer B, right?

At step 9, are you calling commit() from the Assign event handler or shortly after calling Assign (but prior to receiving a new message)?
If so I think there lies your problem, consumer A will not yet have fetched the committed offsets after calling Assign(), so when you call Commit() it will commit the last stored offset known to A, which is the offset in step 3.

You shouldn't call commit() until you've offset_store()d a new offset, but that might be tricky to handle, so your proposed workaround sounds like a better alternative.

Should librdkafka automatically reset the stored offsets to INVALID when partitions are unassigned?

I think this might occur with auto commits too, but it is less likely since this is technically a race condition between fetching committed offsets after assign() and the next commit, and offset fetches are typically quick.

@edenhill
Copy link
Contributor

edenhill commented Apr 3, 2020

Great report btw!

@edenhill edenhill added this to the v1.5.0 milestone Apr 3, 2020
@edenhill
Copy link
Contributor

edenhill commented Apr 3, 2020

@mhowlett Thoughts on resetting the stored offset to INVALID on assign/unassign?

@jacekchmiel
Copy link
Author

@edenhill
Thank you for looking into this.

At step 8 I guess you mean Consumer B, right?

At step 8 I've meant consumer A correctly. Every rebalance consists of revoke event (even if 0 partitions were assigned). You can see this in the logs I've provided too:

2020-03-26 15:19:30.630640 UTC  DEBUG librdkafka: librdkafka: ASSIGN [thrd:main]: Group "foo-test": delegating revoke of 0 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: group is rebalancing    

At step 9, are you calling commit() from the Assign event handler or shortly after calling Assign (but prior to receiving a new message)?
If so I think there lies your problem, consumer A will not yet have fetched the committed offsets after calling Assign(), so when you call Commit() it will commit the last stored offset known to A, which is the offset in step 3.

Yes, I'm calling commit shortly after assign (other thread, but stil,l before offsets are fetched) only to expose the situation. The workaround is currently implemented and running in our production deployment and so far we are not seeing offsets being reset anymore. To give you idea about the scale: our production environment consists of 16 instances consuming total of 3000 messages per second (on average). I've also fixed couple of other issues that greatly reduces number of rebalances occurring so we'll have to wait some more time to make any sound conclusions on this.

I think this might occur with auto commits too, but it is less likely since this is technically a race condition between fetching committed offsets after assign() and the next commit, and offset fetches are typically quick.

In our experience kafka cluster can introduce up to 10s of latency (99 percentile) in commit operations, so maybe in such conditions fetch can be slow sometimes?

There is also slight possibility that the issue was actually caused by manual commit that was implemented in case of application shutdown. Invalid offsets would be commited but it's extremely unlikely. It'd require shutdown immediately after rebalance and this rebalance would need to "restore" previously assigned partitions to a consumer.

Anyway, it would be great to either confirm or reject the presence of the issue in auto-commit/manual-store use case.

@edenhill
Copy link
Contributor

edenhill commented Apr 3, 2020

At step 8 I've meant consumer A correctly. Every rebalance consists of revoke event (even if 0 partitions were assigned). You can see this in the logs I've provided too:

Ah, right 👍

Anyway, it would be great to either confirm or reject the presence of the issue in auto-commit/manual-store use case.

The auto commit timer will fire every commit interval regardless of the current state as long as there is a current assignment (non-empty assign()) and a commit request will be sent for all stored offsets of the assigned partitions whose offset is greater than the last known committed offset.

edenhill added a commit that referenced this issue Apr 3, 2020
…ixes #2782)

This fixes the case where a manual offset-less commit() or the auto-committer
would commit a stored offset from a previous assignment before
a new message was consumed by the application.
edenhill added a commit that referenced this issue Apr 20, 2020
…ixes #2782)

This fixes the case where a manual offset-less commit() or the auto-committer
would commit a stored offset from a previous assignment before
a new message was consumed by the application.
edenhill added a commit that referenced this issue Apr 23, 2020
…ixes #2782)

This fixes the case where a manual offset-less commit() or the auto-committer
would commit a stored offset from a previous assignment before
a new message was consumed by the application.
@chrisbeard
Copy link
Contributor

We may have observed this behavior (or something very similar) with a confluent-kafka-python (pinned to v1.5.0) client that manually stores offsets in the offset store and uses auto commit.

I've added an * by the nodes when interesting offsets appear.
Output of debug=cgrp, grepping for "[29]" (c4 is the coordinator).

| node | log
| c1   | 2021-06-12 21:34:30.771 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29137925, committed offset 29137914: setting stored offset 29137925 for commit
| c1   | 2021-06-12 21:34:35.770 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29137948, committed offset 29137925: setting stored offset 29137948 for commit
| c1   | 2021-06-12 21:34:40.771 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29137958, committed offset 29137948: setting stored offset 29137958 for commit
| c1   | 2021-06-12 21:34:45.771 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29137987, committed offset 29137958: setting stored offset 29137987 for commit
| c1   | 2021-06-12 21:34:50.771 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29137990, committed offset 29137987: setting stored offset 29137990 for commit
| c1   | 2021-06-12 21:34:55.771 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29138026, committed offset 29137990: setting stored offset 29138026 for commit
| c1   | 2021-06-12 21:35:00.771 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29138037, committed offset 29138026: setting stored offset 29138037 for commit
| c1   | 2021-06-12 21:35:05.771 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29138065, committed offset 29138037: setting stored offset 29138065 for commit
| c1   | 2021-06-12 21:35:10.771 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29138108, committed offset 29138065: setting stored offset 29138108 for commit
| c1   | 2021-06-12 21:35:15.771 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29138168, committed offset 29138108: setting stored offset 29138168 for commit
| c1   | 2021-06-12 21:35:20.771 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29138221, committed offset 29138168: setting stored offset 29138221 for commit
| c1   | 2021-06-12 21:35:25.771 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29138243, committed offset 29138221: setting stored offset 29138243 for commit
| c1   | 2021-06-12 21:35:29.529 |CGRPOP|kafka_consumer#consumer-1| [thrd:main]: Group "myapp_consumer_group" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v176) for my-topic [29]
| c1   | 2021-06-12 21:35:29.529 |PARTDEL|kafka_consumer#consumer-1| [thrd:main]: Group "myapp_consumer_group": delete my-topic [29]
| c1   | 2021-06-12 21:35:29.529 |CGRPOP|kafka_consumer#consumer-1| [thrd:main]: Group "myapp_consumer_group" received op PARTITION_LEAVE in state up (join state wait-unassign, v176) for my-topic [29]
| c1   | 2021-06-12 21:35:29.529 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 29138254, committed offset 29138243: setting stored offset 29138254 for commit
| c4   | 2021-06-12 21:35:29.723 |ASSIGN|kafka_consumer#consumer-1| [thrd:main]: my-topic [29]
| c4   | 2021-06-12 21:35:31.712 |ASSIGN|kafka_consumer#consumer-1| [thrd:main]: my-topic [29]
| c4   | 2021-06-12 21:35:32.712 |ASSIGN|kafka_consumer#consumer-1| [thrd:main]: my-topic [29]
| c2   | 2021-06-12 21:35:32.721 |FETCHSTART|kafka_consumer#consumer-1| [thrd:main]: my-topic [29] offset INVALID
| c2 * | 2021-06-12 21:35:32.723 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 28886932, committed offset 28886931: setting stored offset 28886932 for commit
| c4   | 2021-06-12 21:35:36.676 |ASSIGN|kafka_consumer#consumer-1| [thrd:main]: my-topic [29]
| c4   | 2021-06-12 21:35:38.674 |ASSIGN|kafka_consumer#consumer-1| [thrd:main]: my-topic [29]
| c4   | 2021-06-12 21:35:39.676 |ASSIGN|kafka_consumer#consumer-1| [thrd:main]: my-topic [29]
| c4   | 2021-06-12 21:35:40.676 |ASSIGN|kafka_consumer#consumer-1| [thrd:main]: my-topic [29]
| c3   | 2021-06-12 21:35:41.681 |FETCHSTART|kafka_consumer#consumer-1| [thrd:main]: my-topic [29] offset INVALID
| c3   | 2021-06-12 21:35:41.682 |OFFSETFETCH|kafka_consumer#consumer-1| [thrd:main]: my-topic [29] offset INVALID
| c3   | 2021-06-12 21:35:41.682 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: setting default offset INVALID
| c3 * | 2021-06-12 21:35:41.682 |FETCHSTART|kafka_consumer#consumer-1| [thrd:main]: my-topic [29] offset 28886932
| c3   | 2021-06-12 21:35:41.682 |CGRPOP|kafka_consumer#consumer-1| [thrd:main]: Group "myapp_consumer_group" received op PARTITION_JOIN in state up (join state started, v247) for my-topic [29]
| c3   | 2021-06-12 21:35:41.682 |PARTADD|kafka_consumer#consumer-1| [thrd:main]: Group "myapp_consumer_group": add my-topic [29]
| c3   | 2021-06-12 21:35:41.900 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 28886940, committed offset 28886932: setting stored offset 28886940 for commit
| c3   | 2021-06-12 21:35:46.900 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 28887126, committed offset 28886940: setting stored offset 28887126 for commit
| c3   | 2021-06-12 21:35:51.900 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 28887371, committed offset 28887126: setting stored offset 28887371 for commit
| c3   | 2021-06-12 21:35:56.900 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 28887564, committed offset 28887371: setting stored offset 28887564 for commit
| c3   | 2021-06-12 21:36:01.900 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 28887822, committed offset 28887564: setting stored offset 28887822 for commit
| c3   | 2021-06-12 21:36:06.900 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 28888028, committed offset 28887822: setting stored offset 28888028 for commit
| c3   | 2021-06-12 21:36:11.900 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 28888250, committed offset 28888028: setting stored offset 28888250 for commit
| c3   | 2021-06-12 21:36:16.900 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 28888477, committed offset 28888250: setting stored offset 28888477 for commit
| c3   | 2021-06-12 21:36:21.900 |OFFSET|kafka_consumer#consumer-1| [thrd:main]: Topic my-topic [29]: stored offset 28888742, committed offset 28888477: setting stored offset 28888742 for commit

Aapplication-level logs captured during the same time:

| node | log
| c1   | 2021-06-12 21:34:55,774 INFO on_commit Offsets committed: partitions=[TopicPartition{topic=my-topic,partition=29,offset=29138026,error=None}]
| c1   | 2021-06-12 21:35:00,774 INFO on_commit Offsets committed: partitions=[TopicPartition{topic=my-topic,partition=29,offset=29138037,error=None}]
| c1   | 2021-06-12 21:35:05,791 INFO on_commit Offsets committed: partitions=[TopicPartition{topic=my-topic,partition=29,offset=29138065,error=None}]
| c1   | 2021-06-12 21:35:10,774 INFO on_commit Offsets committed: partitions=[TopicPartition{topic=my-topic,partition=29,offset=29138108,error=None}]
| c1   | 2021-06-12 21:35:15,774 INFO on_commit Offsets committed: partitions=[TopicPartition{topic=my-topic,partition=29,offset=29138168,error=None}]
| c1   | 2021-06-12 21:35:20,774 INFO on_commit Offsets committed: partitions=[TopicPartition{topic=my-topic,partition=29,offset=29138221,error=None}]
| c1   | 2021-06-12 21:35:25,774 INFO on_commit Offsets committed: partitions=[TopicPartition{topic=my-topic,partition=29,offset=29138243,error=None}]
| c1   | 2021-06-12 21:35:29,529 INFO on_revoke Partition assignment revoked: partitions=[TopicPartition{topic=my-topic,partition=29,offset=-1001,error=None}]
| c1   | 2021-06-12 21:35:29,532 INFO on_commit Offsets committed: partitions=[TopicPartition{topic=my-topic,partition=29,offset=29138254,error=None}]
| c2   | 2021-06-12 21:35:32,720 INFO on_assign Partition assignment: partitions=[TopicPartition{topic=my-topic,partition=29,offset=-1001,error=None}]
| c2   | 2021-06-12 21:35:32,722 INFO on_revoke Partition assignment revoked: partitions=[TopicPartition{topic=my-topic,partition=29,offset=-1001,error=None}]
| c2 * | 2021-06-12 21:35:32,730 INFO on_commit Offsets committed: partitions=[TopicPartition{topic=my-topic,partition=29,offset=28886932,error=None}]
| c3   | 2021-06-12 21:35:41,681 INFO on_assign Partition assignment: partitions=[TopicPartition{topic=my-topic,partition=29,offset=-1001,error=None}]
| c3   | 2021-06-12 21:36:04,696 INFO on_commit Offsets committed: partitions=[TopicPartition{topic=my-topic,partition=29,offset=28886940,error=None}]
| c3   | 2021-06-12 21:36:04,696 INFO on_commit Offsets committed: partitions=[TopicPartition{topic=my-topic,partition=29,offset=28887126,error=None}]
| c3   | 2021-06-12 21:36:04,696 INFO on_commit Offsets committed: partitions=[TopicPartition{topic=my-topic,partition=29,offset=28887371,error=None}]

Going to update this consumer to v1.7.0, but thought I'd report this just in case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants