Skip to content

Commit

Permalink
Merge pull request #20 from open-craft/navin/claim-msgs
Browse files Browse the repository at this point in the history
feat: claim pending messages from other consumers
  • Loading branch information
bmtcril authored May 12, 2023
2 parents abdc42b + 2745ee1 commit 86c7f12
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 12 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ Unreleased

*

[0.1.1] - 2023-05-12
************************************************

Added
=====

* Option to claim messages from other consumers based on idle time.

Changed
=======

* Setting ``check_backlog`` will read messages that were not read by this consumer group.

[0.1.0] - 2023-05-04
************************************************

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
from edx_event_bus_redis.internal.consumer import RedisEventConsumer
from edx_event_bus_redis.internal.producer import create_producer

__version__ = '0.1.0'
__version__ = '0.1.1'

default_app_config = 'edx_event_bus_redis.apps.EdxEventBusRedisConfig' # pylint: disable=invalid-name
29 changes: 20 additions & 9 deletions edx_event_bus_redis/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
REDIS_CONSUMERS_ENABLED = SettingToggle('EVENT_BUS_REDIS_CONSUMERS_ENABLED', default=True)

# .. setting_name: EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT
# .. setting_default: 1.0
# .. setting_default: 1
# .. setting_description: How long the consumer should wait, in seconds, for the Redis broker
# to respond to a poll() call.
CONSUMER_POLL_TIMEOUT = getattr(settings, 'EVENT_BUS_REDIS_CONSUMER_POLL_TIMEOUT', 1)
Expand Down Expand Up @@ -93,18 +93,25 @@ class RedisEventConsumer(EventBusConsumer):
consumer_name: unique name for consumer within a group.
last_read_msg_id: Start reading msgs from a specific redis msg id.
check_backlog: flag to process all messages that were not read by this consumer group.
claim_msgs_older_than: claim pending msgs from other consumers in the group with idle time older
than a specific time (in milliseconds).
has_pending_msgs: flag to process pending msgs first.
db: Walrus object for redis connection.
full_topic: topic prefixed with environment name.
consumer: consumer instance.
"""

def __init__(self, topic, group_id, signal, consumer_name, last_read_msg_id=None, check_backlog=False):
def __init__(self, topic, group_id, signal, consumer_name, last_read_msg_id=None, check_backlog=False,
claim_msgs_older_than=None):
self.topic = topic
self.group_id = group_id
self.signal = signal
self.consumer_name = consumer_name
self.last_read_msg_id = last_read_msg_id
self.check_backlog = check_backlog
# always process read but pending msgs first for the consumer in the group.
self.has_pending_msgs = True
self.claim_msgs_older_than = claim_msgs_older_than
self.db = self._create_db()
self.full_topic = get_full_topic(self.topic)
self.consumer = self._create_consumer(self.db, self.full_topic)
Expand Down Expand Up @@ -150,15 +157,16 @@ def _shut_down(self):

def _read_pending_msgs(self) -> Optional[tuple]:
"""
Read pending messages, if no messages found set check_backlog to False.
Read pending messages, if no messages found return None.
"""
logger.debug("Consuming pending msgs first.")
if self.claim_msgs_older_than is not None:
self.consumer.autoclaim(self.consumer_name, min_idle_time=self.claim_msgs_older_than, count=1)
msg_meta = self.consumer.pending(count=1, consumer=self.consumer_name)
if not msg_meta:
logger.debug("No more pending messages.")
self.check_backlog = False
return None
return self.consumer[msg_meta[0]['message_id']]
if msg_meta:
return self.consumer[msg_meta[0]['message_id']]
logger.debug("No more pending messages.")
return None

def _consume_indefinitely(self):
"""
Expand Down Expand Up @@ -215,8 +223,11 @@ def _consume_indefinitely(self):
try:
# The first time we want to read our pending messages, in case we crashed and are recovering.
# Once we consumed our history, we can start getting new messages.
if self.check_backlog:
if self.has_pending_msgs:
redis_raw_msg = self._read_pending_msgs()
if redis_raw_msg is None:
self.has_pending_msgs = False
self.claim_msgs_older_than = None
else:
# poll for msg
redis_raw_msg = self.consumer.read(count=1, block=CONSUMER_POLL_TIMEOUT * 1000)
Expand Down
8 changes: 6 additions & 2 deletions edx_event_bus_redis/internal/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def setUp(self):
'test_group_id',
self.signal,
consumer_name='test_group_id.c1',
check_backlog=True
check_backlog=True,
claim_msgs_older_than=10,
)

def tearDown(self):
Expand Down Expand Up @@ -182,6 +183,7 @@ def test_consume_loop(self, is_pending, mock_sleep, mock_logger, mock_set_custom
self.event_consumer.consume_indefinitely()

# Check that each of the mocked out methods got called as expected.
mock_consumer.autoclaim.assert_called_with('test_group_id.c1', count=1, min_idle_time=10)
mock_consumer.pending.assert_called_with(count=1, consumer='test_group_id.c1')
if not is_pending:
mock_consumer.read.assert_called()
Expand Down Expand Up @@ -229,6 +231,7 @@ def test_consume_loop_with_no_events(self, mock_sleep, mock_set_custom_attribute
"""
mock_pending_value = None
side_effect_method = 'emit_signals_from_message'
self.event_consumer.claim_msgs_older_than = None

with patch.object(
self.event_consumer, side_effect_method,
Expand All @@ -248,7 +251,8 @@ def test_consume_loop_with_no_events(self, mock_sleep, mock_set_custom_attribute
self.event_consumer.consume_indefinitely()

# Check that each of the mocked out methods got called as expected.
mock_consumer.pending.assert_not_called()
mock_consumer.autoclaim.assert_not_called()
mock_consumer.pending.assert_called_once()
mock_consumer.read.assert_called()
mock_consumer.ack.assert_called_once()
mock_method.assert_called_once_with(self.normal_message)
Expand Down

0 comments on commit 86c7f12

Please sign in to comment.