diff --git a/changelog/50313.fixed b/changelog/50313.fixed new file mode 100644 index 000000000000..ab01e56197ac --- /dev/null +++ b/changelog/50313.fixed @@ -0,0 +1 @@ +Periodically restart the fileserver update process to avoid leaks diff --git a/salt/master.py b/salt/master.py index e33d76905d54..e3aa387d935e 100644 --- a/salt/master.py +++ b/salt/master.py @@ -438,52 +438,54 @@ def fill_buckets(self): (backend, update_func) ] = None - def update_fileserver(self, interval, backends): + @staticmethod + def _do_update(backends): """ - Threading target which handles all updates for a given wait interval + Perform fileserver updates """ + for backend, update_args in backends.items(): + backend_name, update_func = backend + try: + if update_args: + log.debug( + "Updating %s fileserver cache for the following " "targets: %s", + backend_name, + update_args, + ) + args = (update_args,) + else: + log.debug("Updating %s fileserver cache", backend_name) + args = () + + update_func(*args) + except Exception as exc: # pylint: disable=broad-except + log.exception( + "Uncaught exception while updating %s fileserver " "cache", + backend_name, + ) - def _do_update(): + @classmethod + def update(cls, interval, backends, timeout=300): + """ + Threading target which handles all updates for a given wait interval + """ + start = time.time() + condition = threading.Condition() + while time.time() - start < timeout: log.debug( "Performing fileserver updates for items with an update " "interval of %d", interval, ) - for backend, update_args in backends.items(): - backend_name, update_func = backend - try: - if update_args: - log.debug( - "Updating %s fileserver cache for the following " - "targets: %s", - backend_name, - update_args, - ) - args = (update_args,) - else: - log.debug("Updating %s fileserver cache", backend_name) - args = () - - update_func(*args) - except Exception as exc: # pylint: disable=broad-except - log.exception( - "Uncaught exception while updating %s fileserver " "cache", - backend_name, - ) - + cls._do_update(backends) log.debug( "Completed fileserver updates for items with an update " "interval of %d, waiting %d seconds", interval, interval, ) - - condition = threading.Condition() - _do_update() - while True: with condition: condition.wait(interval) - _do_update() def run(self): """ @@ -506,13 +508,15 @@ def run(self): for interval in self.buckets: self.update_threads[interval] = threading.Thread( - target=self.update_fileserver, args=(interval, self.buckets[interval]), + target=self.update, args=(interval, self.buckets[interval]), ) self.update_threads[interval].start() - # Keep the process alive - while True: - time.sleep(60) + while self.update_threads: + for name, thread in list(self.update_threads.items()): + thread.join(1) + if not thread.is_alive(): + self.update_threads.pop(name) class Master(SMaster): diff --git a/salt/utils/process.py b/salt/utils/process.py index 7519f40c4d8e..8ad988095139 100644 --- a/salt/utils/process.py +++ b/salt/utils/process.py @@ -585,12 +585,21 @@ def restart_process(self, pid): """ if self._restart_processes is False: return - log.info( - "Process %s (%s) died with exit status %s, restarting...", - self._process_map[pid]["tgt"], - pid, - self._process_map[pid]["Process"].exitcode, - ) + exit = self._process_map[pid]["Process"].exitcode + if exit > 0: + log.info( + "Process %s (%s) died with exit status %s, restarting...", + self._process_map[pid]["tgt"], + pid, + self._process_map[pid]["Process"].exitcode, + ) + else: + log.debug( + "Process %s (%s) died with exit status %s, restarting...", + self._process_map[pid]["tgt"], + pid, + self._process_map[pid]["Process"].exitcode, + ) # don't block, the process is already dead self._process_map[pid]["Process"].join(1) diff --git a/tests/pytests/unit/test_master.py b/tests/pytests/unit/test_master.py new file mode 100644 index 000000000000..b4af5390e320 --- /dev/null +++ b/tests/pytests/unit/test_master.py @@ -0,0 +1,17 @@ +import time + +import pytest +import salt.master +from tests.support.mock import patch + + +def test_fileserver_duration(): + with patch("salt.master.FileserverUpdate._do_update") as update: + start = time.time() + salt.master.FileserverUpdate.update(1, {}, 1) + end = time.time() + # Interval is equal to timeout so the _do_update method will be called + # one time. + update.called_once() + # Timeout is 1 second + assert 2 > end - start > 1