Skip to content

Commit

Permalink
dont discard workers on missing heartbeats
Browse files Browse the repository at this point in the history
(cherry picked from commit 69e044cdf368c99a2b974a1aa22d4f97cf8a461a)
  • Loading branch information
Hermann Romanek committed Dec 5, 2024
1 parent 534990a commit e07575c
Showing 1 changed file with 8 additions and 5 deletions.
13 changes: 8 additions & 5 deletions src/sniffles/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,14 +574,17 @@ def run_parent(self) -> bool:
# self._logger.debug(f'Worker {self.id} got heartbeat #{hb}')

if self._heartbeat < time.monotonic() - self.HEARTBEAT_TIMEOUT:
self._logger.warning(f'Worker {self.id} found dead!')
if self.task: # if we were working on a task, requeue it to have it picked up by another worker...
self.tasks.appendleft(self.task)
self._logger.debug(f'Worker {self.id} missed heartbeat!')
try:
self.process.join(0.2) # ...collect any process remains...
self.process.join(0.2) # try collecting process remains...
except: # noqa
...
self.running = False # ...and shut down
if self.process.exitcode is not None:
# if we got an exitcode, the process really was killed
self._logger.warning(f'Worker {self.id} found dead!')
if self.task: # if we were working on a task, requeue it to have it picked up by another worker...
self.tasks.appendleft(self.task)
self.running = False # ...and shut down
except:
self._logger.exception(f'Unhandled error in worker {self.id}. This may result in an orphened worker process.')
try:
Expand Down

0 comments on commit e07575c

Please sign in to comment.