Skip to content

Commit

Permalink
fix catching out of memory errors when executing broadcast tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
alanhamlett committed Dec 16, 2023
1 parent 6bc9648 commit f65a843
Showing 1 changed file with 23 additions and 23 deletions.
46 changes: 23 additions & 23 deletions wakaq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,35 +411,35 @@ def _execute_broadcast_tasks(self):
continue
retry = 0
current_task.set((task, payload))
try:
while True:
try:
self._send_ping_to_parent(task_name=task.name)
self._execute_task(task, payload)
break
while True:
try:
self._send_ping_to_parent(task_name=task.name)
self._execute_task(task, payload)
current_task.set(None)
self._send_ping_to_parent()
break

except Exception as e:
if exception_in_chain(e, SoftTimeout):
retry += 1
max_retries = task.max_retries
if max_retries is None:
max_retries = self.wakaq.max_retries
if retry > max_retries:
log.error(traceback.format_exc())
break
else:
log.warning(traceback.format_exc())
else:
except (MemoryError, BlockingIOError, BrokenPipeError):
raise

except Exception as e:
if exception_in_chain(e, SoftTimeout):
retry += 1
max_retries = task.max_retries
if max_retries is None:
max_retries = self.wakaq.max_retries
if retry > max_retries:
log.error(traceback.format_exc())
break

except: # catch BaseException, SystemExit, KeyboardInterrupt, and GeneratorExit
else:
log.warning(traceback.format_exc())
else:
log.error(traceback.format_exc())
break

finally:
current_task.set(None)
self._send_ping_to_parent()
except: # catch BaseException, SystemExit, KeyboardInterrupt, and GeneratorExit
log.error(traceback.format_exc())
break

def _read_child_logs(self):
for child in self.children:
Expand Down

0 comments on commit f65a843

Please sign in to comment.