Skip to content

Commit

Permalink
Merge pull request #112 from openedx/diana/sleep-on-replay
Browse files Browse the repository at this point in the history
fix: Added poll call to reset-offset code.
  • Loading branch information
dianakhuang authored Jan 20, 2023
2 parents 61eb562 + 857b9ba commit c4e504c
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ Unreleased
**********


[3.6.1] - 2023-01-20
********************
Fixed
=======
* Added a poll call to force resets to be processed during replay/offset-reset mode.

[3.6.0] - 2023-01-06
********************
Changed
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '3.6.0'
__version__ = '3.6.1'
9 changes: 9 additions & 0 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,15 @@ def reset_offsets(consumer, partitions):
# This log message may be noisy when we are replaying, but hopefully we only see it
# once every 30 seconds.
logger.info("Offsets are being reset. Sleeping instead of consuming events.")

# We are calling poll here because we believe the offsets will not be set
# correctly until poll is called, despite the offsets being reset in a different call.
# This is because we don't believe that the partitions for the current consumer are assigned
# until the first poll happens. Because we are not trying to consume any messages in this mode,
# we are deliberately calling poll without processing the message it returns
# or commiting the new offset.
self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)

time.sleep(30)
continue

Expand Down

0 comments on commit c4e504c

Please sign in to comment.