Skip to content

Commit

Permalink
Address leaks in fileserver caused by git backends
Browse files Browse the repository at this point in the history
At this time we do not have the ability to fix the upstream memory leaks
in the gitfs backend providers. Work around their limitations by
periodically restarting the file server update proccess. This will at
least partially address saltstack#50313
  • Loading branch information
dwoz committed Jun 22, 2021
1 parent 5ae3bc9 commit 2817556
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 40 deletions.
1 change: 1 addition & 0 deletions changelog/50313.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Periodically restart the fileserver update process to avoid leaks
72 changes: 38 additions & 34 deletions salt/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,52 +438,54 @@ def fill_buckets(self):
(backend, update_func)
] = None

def update_fileserver(self, interval, backends):
@staticmethod
def _do_update(backends):
"""
Threading target which handles all updates for a given wait interval
Perform fileserver updates
"""
for backend, update_args in backends.items():
backend_name, update_func = backend
try:
if update_args:
log.debug(
"Updating %s fileserver cache for the following " "targets: %s",
backend_name,
update_args,
)
args = (update_args,)
else:
log.debug("Updating %s fileserver cache", backend_name)
args = ()

update_func(*args)
except Exception as exc: # pylint: disable=broad-except
log.exception(
"Uncaught exception while updating %s fileserver " "cache",
backend_name,
)

def _do_update():
@classmethod
def update(cls, interval, backends, timeout=300):
"""
Threading target which handles all updates for a given wait interval
"""
start = time.time()
condition = threading.Condition()
while time.time() - start < timeout:
log.debug(
"Performing fileserver updates for items with an update "
"interval of %d",
interval,
)
for backend, update_args in backends.items():
backend_name, update_func = backend
try:
if update_args:
log.debug(
"Updating %s fileserver cache for the following "
"targets: %s",
backend_name,
update_args,
)
args = (update_args,)
else:
log.debug("Updating %s fileserver cache", backend_name)
args = ()

update_func(*args)
except Exception as exc: # pylint: disable=broad-except
log.exception(
"Uncaught exception while updating %s fileserver " "cache",
backend_name,
)

cls._do_update(backends)
log.debug(
"Completed fileserver updates for items with an update "
"interval of %d, waiting %d seconds",
interval,
interval,
)

condition = threading.Condition()
_do_update()
while True:
with condition:
condition.wait(interval)
_do_update()

def run(self):
"""
Expand All @@ -506,13 +508,15 @@ def run(self):

for interval in self.buckets:
self.update_threads[interval] = threading.Thread(
target=self.update_fileserver, args=(interval, self.buckets[interval]),
target=self.update, args=(interval, self.buckets[interval]),
)
self.update_threads[interval].start()

# Keep the process alive
while True:
time.sleep(60)
while self.update_threads:
for name, thread in list(self.update_threads.items()):
thread.join(1)
if not thread.is_alive():
self.update_threads.pop(name)


class Master(SMaster):
Expand Down
21 changes: 15 additions & 6 deletions salt/utils/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,12 +585,21 @@ def restart_process(self, pid):
"""
if self._restart_processes is False:
return
log.info(
"Process %s (%s) died with exit status %s, restarting...",
self._process_map[pid]["tgt"],
pid,
self._process_map[pid]["Process"].exitcode,
)
exit = self._process_map[pid]["Process"].exitcode
if exit > 0:
log.info(
"Process %s (%s) died with exit status %s, restarting...",
self._process_map[pid]["tgt"],
pid,
self._process_map[pid]["Process"].exitcode,
)
else:
log.debug(
"Process %s (%s) died with exit status %s, restarting...",
self._process_map[pid]["tgt"],
pid,
self._process_map[pid]["Process"].exitcode,
)
# don't block, the process is already dead
self._process_map[pid]["Process"].join(1)

Expand Down
17 changes: 17 additions & 0 deletions tests/pytests/unit/test_master.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import time

import pytest
import salt.master
from tests.support.mock import patch


def test_fileserver_duration():
with patch("salt.master.FileserverUpdate._do_update") as update:
start = time.time()
salt.master.FileserverUpdate.update(1, {}, 1)
end = time.time()
# Interval is equal to timeout so the _do_update method will be called
# one time.
update.called_once()
# Timeout is 1 second
assert 2 > end - start > 1

0 comments on commit 2817556

Please sign in to comment.