Skip to content

Commit

Permalink
feat: Refactor consume-loop, adding `reset_offsets_and_sleep_indefini…
Browse files Browse the repository at this point in the history
…tely` method (#118)

The goal here is just to split out the two fairly unrelated pieces of
functionality of consuming vs. resetting, especially since the reset code
no longer does any consuming. (A linter was also starting to get testy
about how large the method was.)

This deprecates the `offset_timestamp` parameter to the existing
`consume_indefinitely` method; callers should switch to using the new
method. After course-discovery is updated to use it, can remove the
parameter.

One of the tests still uses the deprecated call, just to retain coverage
until it can be removed.

Also: Add a comment about repeated reassign.
  • Loading branch information
timmc-edx authored Jan 30, 2023
1 parent 1343b66 commit f997905
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 29 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ Change Log
Unreleased
**********

[3.7.0] - 2023-01-30
********************
Changed
=======
* Added ``reset_offsets_and_sleep_indefinitely`` method to consumer; relying code should switch to calling this when an offset timestamp is set.
* Deprecated the ``offset_timestamp`` parameter on the consumer's ``consume_indefinitely`` method, since ``reset_offsets_and_sleep_indefinitely`` should be used instead.

[3.6.3] - 2023-01-27
********************
Fixed
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '3.6.3'
__version__ = '3.7.0'
83 changes: 61 additions & 22 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import time
import warnings
from datetime import datetime

from django.conf import settings
Expand Down Expand Up @@ -143,12 +144,12 @@ def _shut_down(self):
"""
self._shut_down_loop = True

def consume_indefinitely(self, offset_timestamp=None):
def reset_offsets_and_sleep_indefinitely(self, offset_timestamp):
"""
Consume events from a topic in an infinite loop.
Reset any assigned partitions to the given offset, and sleep indefinitely.
Arguments:
offset_timestamp(datetime): reset the offsets of the consumer partitions to this timestamp before consuming.
offset_timestamp (datetime): Reset the offsets of the consumer partitions to this timestamp.
"""

def reset_offsets(consumer, partitions):
Expand Down Expand Up @@ -178,6 +179,37 @@ def reset_offsets(consumer, partitions):
# We need to commit these offsets to Kafka in order to ensure these offsets are persisted.
consumer.commit(offsets=partitions_with_offsets)

full_topic = get_full_topic(self.topic)
# Partition assignment will trigger the reset logic. This should happen on the first poll call,
# but will also happen any time the broker needs to rebalance partitions among the consumer
# group, which could happen repeatedly over the lifetime of this process.
self.consumer.subscribe([full_topic], on_assign=reset_offsets)

while True:
# Allow unit tests to break out of loop
if self._shut_down_loop:
break

# This log message may be noisy when we are replaying, but hopefully we only see it
# once every 30 seconds.
logger.info("Offsets are being reset. Sleeping instead of consuming events.")

# We are calling poll here because we believe the offsets will not be set
# correctly until poll is called, despite the offsets being reset in a different call.
# This is because we don't believe that the partitions for the current consumer are assigned
# until the first poll happens. Because we are not trying to consume any messages in this mode,
# we are deliberately calling poll without processing the message it returns
# or commiting the new offset.
self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)

time.sleep(30)
continue

def _consume_indefinitely(self):
"""
Consume events from a topic in an infinite loop.
"""

# This is already checked at the Command level, but it's possible this loop
# could get called some other way, so check it here too.
if not KAFKA_CONSUMERS_ENABLED.is_enabled():
Expand Down Expand Up @@ -205,7 +237,7 @@ def reset_offsets(consumer, partitions):
'consumer_group': self.group_id,
'expected_signal': self.signal,
}
self.consumer.subscribe([full_topic], on_assign=reset_offsets)
self.consumer.subscribe([full_topic])
logger.info(f"Running consumer for {run_context!r}")

# How many errors have we seen in a row? If this climbs too high, exit with error.
Expand All @@ -220,23 +252,6 @@ def reset_offsets(consumer, partitions):
if self._shut_down_loop:
break

# If offsets are set, do not consume events.
if offset_timestamp is not None:
# This log message may be noisy when we are replaying, but hopefully we only see it
# once every 30 seconds.
logger.info("Offsets are being reset. Sleeping instead of consuming events.")

# We are calling poll here because we believe the offsets will not be set
# correctly until poll is called, despite the offsets being reset in a different call.
# This is because we don't believe that the partitions for the current consumer are assigned
# until the first poll happens. Because we are not trying to consume any messages in this mode,
# we are deliberately calling poll without processing the message it returns
# or commiting the new offset.
self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)

time.sleep(30)
continue

# Detect probably-broken consumer and exit with error.
if CONSECUTIVE_ERRORS_LIMIT and consecutive_errors >= CONSECUTIVE_ERRORS_LIMIT:
raise Exception(f"Too many consecutive errors, exiting ({consecutive_errors} in a row)")
Expand Down Expand Up @@ -274,6 +289,27 @@ def reset_offsets(consumer, partitions):
finally:
self.consumer.close()

def consume_indefinitely(self, offset_timestamp=None):
"""
Consume events from a topic in an infinite loop.
Arguments:
offset_timestamp (datetime): Optional and deprecated; if supplied, calls
``reset_offsets_and_sleep_indefinitely`` instead. Relying code should
switch to calling that method directly.
"""
# TODO: Once this deprecated argument can be removed, just
# remove this delegation method entirely and rename
# `_consume_indefinitely` to no longer have the `_` prefix.
if offset_timestamp is None:
self._consume_indefinitely()
else:
warnings.warn(
"Calling consume_indefinitely with offset_timestamp is deprecated; "
"please call reset_offsets_and_sleep_indefinitely directly instead."
)
self.reset_offsets_and_sleep_indefinitely(offset_timestamp)

def emit_signals_from_message(self, msg):
"""
Determine the correct signal and send the event from the message.
Expand Down Expand Up @@ -540,6 +576,9 @@ def handle(self, *args, **options):
group_id=options['group_id'][0],
signal=signal,
)
event_consumer.consume_indefinitely(offset_timestamp=offset_timestamp)
if offset_timestamp is None:
event_consumer.consume_indefinitely()
else:
event_consumer.reset_offsets_and_sleep_indefinitely(offset_timestamp=offset_timestamp)
except Exception: # pylint: disable=broad-except
logger.exception("Error consuming Kafka events")
25 changes: 19 additions & 6 deletions edx_event_bus_kafka/internal/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import copy
from datetime import datetime
from typing import Optional
from unittest.mock import ANY, Mock, call, patch
from unittest.mock import Mock, call, patch

import ddt
import pytest
Expand Down Expand Up @@ -182,11 +182,11 @@ def test_consume_loop_disabled(self, mock_logger):
self.event_consumer.consume_indefinitely() # returns at all
mock_logger.error.assert_called_once_with("Kafka consumers not enabled, exiting.")

def test_offset_time_topics(self):
def test_reset_offsets_and_sleep_indefinitely(self):
test_time = datetime.now()
self.event_consumer.consumer = Mock()
self.event_consumer._shut_down() # pylint: disable=protected-access
self.event_consumer.consume_indefinitely(offset_timestamp=test_time)
self.event_consumer.reset_offsets_and_sleep_indefinitely(offset_timestamp=test_time)
reset_offsets = self.event_consumer.consumer.subscribe.call_args[1]['on_assign']
partitions = [TopicPartition('dummy_topic', 0, 0), TopicPartition('dummy_topic', 1, 0)]
self.event_consumer.consumer.offsets_for_times.return_value = partitions
Expand Down Expand Up @@ -242,7 +242,7 @@ def raise_exception():
self.event_consumer.consume_indefinitely()

# Check that each of the mocked out methods got called as expected.
mock_consumer.subscribe.assert_called_once_with(['prod-some-topic'], on_assign=ANY)
mock_consumer.subscribe.assert_called_once_with(['prod-some-topic'])
# Check that emit was called the expected number of times
assert mock_emit.call_args_list == [call(self.normal_message)] * len(mock_emit_side_effects)

Expand Down Expand Up @@ -608,7 +608,20 @@ def test_kafka_consumers_disabled(self, mock_create_consumer, mock_logger):
@patch('edx_event_bus_kafka.internal.consumer.OpenEdxPublicSignal.get_signal_by_type')
@patch('edx_event_bus_kafka.internal.consumer.KafkaEventConsumer._create_consumer')
@patch('edx_event_bus_kafka.internal.consumer.KafkaEventConsumer.consume_indefinitely')
def test_kafka_consumers_with_timestamp(self, mock_consume_indefinitely, mock_create_consumer, _gsbt):
def test_kafka_consumers_normal(self, mock_consume, mock_create_consumer, _gsbt):
call_command(
Command(),
topic='test',
group_id='test',
signal='openedx',
)
assert mock_create_consumer.called
assert mock_consume.called

@patch('edx_event_bus_kafka.internal.consumer.OpenEdxPublicSignal.get_signal_by_type')
@patch('edx_event_bus_kafka.internal.consumer.KafkaEventConsumer._create_consumer')
@patch('edx_event_bus_kafka.internal.consumer.KafkaEventConsumer.reset_offsets_and_sleep_indefinitely')
def test_kafka_consumers_with_timestamp(self, mock_reset_offsets, mock_create_consumer, _gsbt):
call_command(
Command(),
topic='test',
Expand All @@ -617,7 +630,7 @@ def test_kafka_consumers_with_timestamp(self, mock_consume_indefinitely, mock_cr
offset_time=['2019-05-18T15:17:08.132263']
)
assert mock_create_consumer.called
assert mock_consume_indefinitely.called
assert mock_reset_offsets.called

@patch('edx_event_bus_kafka.internal.consumer.logger', autospec=True)
@patch('edx_event_bus_kafka.internal.consumer.OpenEdxPublicSignal.get_signal_by_type')
Expand Down

0 comments on commit f997905

Please sign in to comment.