From f65a8437dabd953a13f72402e0a49bf1f03da900 Mon Sep 17 00:00:00 2001 From: Alan Hamlett Date: Sat, 16 Dec 2023 11:40:28 +0100 Subject: [PATCH] fix catching out of memory errors when executing broadcast tasks --- wakaq/worker.py | 46 +++++++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/wakaq/worker.py b/wakaq/worker.py index ca3a23d..76e6134 100644 --- a/wakaq/worker.py +++ b/wakaq/worker.py @@ -411,35 +411,35 @@ def _execute_broadcast_tasks(self): continue retry = 0 current_task.set((task, payload)) - try: - while True: - try: - self._send_ping_to_parent(task_name=task.name) - self._execute_task(task, payload) - break + while True: + try: + self._send_ping_to_parent(task_name=task.name) + self._execute_task(task, payload) + current_task.set(None) + self._send_ping_to_parent() + break - except Exception as e: - if exception_in_chain(e, SoftTimeout): - retry += 1 - max_retries = task.max_retries - if max_retries is None: - max_retries = self.wakaq.max_retries - if retry > max_retries: - log.error(traceback.format_exc()) - break - else: - log.warning(traceback.format_exc()) - else: + except (MemoryError, BlockingIOError, BrokenPipeError): + raise + + except Exception as e: + if exception_in_chain(e, SoftTimeout): + retry += 1 + max_retries = task.max_retries + if max_retries is None: + max_retries = self.wakaq.max_retries + if retry > max_retries: log.error(traceback.format_exc()) break - - except: # catch BaseException, SystemExit, KeyboardInterrupt, and GeneratorExit + else: + log.warning(traceback.format_exc()) + else: log.error(traceback.format_exc()) break - finally: - current_task.set(None) - self._send_ping_to_parent() + except: # catch BaseException, SystemExit, KeyboardInterrupt, and GeneratorExit + log.error(traceback.format_exc()) + break def _read_child_logs(self): for child in self.children: