Skip to content

Commit

Permalink
Merge pull request #115 from openedx/diana/commit-offsets
Browse files Browse the repository at this point in the history
fix: Commit offsets instead of assigning them.
  • Loading branch information
dianakhuang authored Jan 26, 2023
2 parents c4e504c + 9bde0eb commit 3995766
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 2 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ Change Log
Unreleased
**********

[3.6.2] - 2023-01-26
********************
Fixed
=====
* Reset mode now commits the correct offsets to Kafka.

[3.6.1] - 2023-01-20
********************
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '3.6.1'
__version__ = '3.6.2'
3 changes: 2 additions & 1 deletion edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ def reset_offsets(consumer, partitions):

logger.info(f'Found offsets for timestamp {offset_timestamp}: {partitions_with_offsets}')

consumer.assign(partitions_with_offsets)
# We need to commit these offsets to Kafka in order to ensure these offsets are persisted.
consumer.commit(offsets=partitions_with_offsets)

# This is already checked at the Command level, but it's possible this loop
# could get called some other way, so check it here too.
Expand Down

0 comments on commit 3995766

Please sign in to comment.