diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 128b852..e4acd6e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,11 @@ Change Log Unreleased ********** +[3.6.2] - 2023-01-26 +******************** +Fixed +===== +* Reset mode now commits the correct offsets to Kafka. [3.6.1] - 2023-01-20 ******************** diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index 2d4fde1..a655068 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -9,4 +9,4 @@ from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer -__version__ = '3.6.1' +__version__ = '3.6.2' diff --git a/edx_event_bus_kafka/internal/consumer.py b/edx_event_bus_kafka/internal/consumer.py index 24ad9fc..bff3183 100644 --- a/edx_event_bus_kafka/internal/consumer.py +++ b/edx_event_bus_kafka/internal/consumer.py @@ -164,7 +164,8 @@ def reset_offsets(consumer, partitions): logger.info(f'Found offsets for timestamp {offset_timestamp}: {partitions_with_offsets}') - consumer.assign(partitions_with_offsets) + # We need to commit these offsets to Kafka in order to ensure these offsets are persisted. + consumer.commit(offsets=partitions_with_offsets) # This is already checked at the Command level, but it's possible this loop # could get called some other way, so check it here too.