From 6a61863c26432214b98d15c4ec4180bab1400368 Mon Sep 17 00:00:00 2001 From: Nikita Zavadin Date: Sun, 15 Dec 2024 16:41:33 +0100 Subject: [PATCH] bug fix --- arq/worker.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/arq/worker.py b/arq/worker.py index 542401f3..880dd48d 100644 --- a/arq/worker.py +++ b/arq/worker.py @@ -488,15 +488,19 @@ async def _read_stream_iteration(self) -> None: if self.allow_pick_jobs: if self.job_counter < self.max_jobs: stream_msgs = await self._get_idle_tasks(count) - count = count - len(stream_msgs) + msgs_count = sum([len(msgs) for _, msgs in stream_msgs]) + + count -= msgs_count if count > 0: - stream_msgs = await self.pool.xreadgroup( - groupname=self.consumer_group_name, - consumername=self.worker_id, - streams={self.queue_name + stream_key_suffix: '>'}, - count=count, - block=int(max(self.stream_block_s * 1000, 1)), + stream_msgs.extend( + await self.pool.xreadgroup( + groupname=self.consumer_group_name, + consumername=self.worker_id, + streams={self.queue_name + stream_key_suffix: '>'}, + count=count, + block=int(max(self.stream_block_s * 1000, 1)), + ) ) jobs = []