diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 05083bd..128b852 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -15,6 +15,12 @@ Unreleased ********** +[3.6.1] - 2023-01-20 +******************** +Fixed +======= +* Added a poll call to force resets to be processed during replay/offset-reset mode. + [3.6.0] - 2023-01-06 ******************** Changed diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index b35190a..2d4fde1 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -9,4 +9,4 @@ from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer -__version__ = '3.6.0' +__version__ = '3.6.1' diff --git a/edx_event_bus_kafka/internal/consumer.py b/edx_event_bus_kafka/internal/consumer.py index 392c867..24ad9fc 100644 --- a/edx_event_bus_kafka/internal/consumer.py +++ b/edx_event_bus_kafka/internal/consumer.py @@ -213,6 +213,15 @@ def reset_offsets(consumer, partitions): # This log message may be noisy when we are replaying, but hopefully we only see it # once every 30 seconds. logger.info("Offsets are being reset. Sleeping instead of consuming events.") + + # We are calling poll here because we believe the offsets will not be set + # correctly until poll is called, despite the offsets being reset in a different call. + # This is because we don't believe that the partitions for the current consumer are assigned + # until the first poll happens. Because we are not trying to consume any messages in this mode, + # we are deliberately calling poll without processing the message it returns + # or commiting the new offset. + self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT) + time.sleep(30) continue