Skip to content

Commit

Permalink
Fix memory leak produced by batch async find_jobs mechanism (bsc#1140…
Browse files Browse the repository at this point in the history
…912)

Multiple fixes:

- use different JIDs per find_job
- fix bug in detection of find_job returns
- fix timeout passed from request payload
- better cleanup at the end of batching

Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com>
  • Loading branch information
Mihai Dinca and meaksh committed Sep 16, 2019
1 parent 4ce0bc5 commit 6af0703
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 21 deletions.
60 changes: 40 additions & 20 deletions salt/cli/batch_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self, parent_opts, jid_gen, clear_load):
self.done_minions = set()
self.active = set()
self.initialized = False
self.jid_gen = jid_gen
self.ping_jid = jid_gen()
self.batch_jid = jid_gen()
self.find_job_jid = jid_gen()
Expand All @@ -89,30 +90,28 @@ def __init__(self, parent_opts, jid_gen, clear_load):
def __set_event_handler(self):
ping_return_pattern = 'salt/job/{0}/ret/*'.format(self.ping_jid)
batch_return_pattern = 'salt/job/{0}/ret/*'.format(self.batch_jid)
find_job_return_pattern = 'salt/job/{0}/ret/*'.format(self.find_job_jid)
self.event.subscribe(ping_return_pattern, match_type='glob')
self.event.subscribe(batch_return_pattern, match_type='glob')
self.event.subscribe(find_job_return_pattern, match_type='glob')
self.event.patterns = {
self.patterns = {
(ping_return_pattern, 'ping_return'),
(batch_return_pattern, 'batch_run'),
(find_job_return_pattern, 'find_job_return')
}
self.event.set_event_handler(self.__event_handler)

def __event_handler(self, raw):
if not self.event:
return
mtag, data = self.event.unpack(raw, self.event.serial)
for (pattern, op) in self.event.patterns:
for (pattern, op) in self.patterns:
if fnmatch.fnmatch(mtag, pattern):
minion = data['id']
if op == 'ping_return':
self.minions.add(minion)
if self.targeted_minions == self.minions:
self.event.io_loop.spawn_callback(self.start_batch)
elif op == 'find_job_return':
self.find_job_returned.add(minion)
if data.get("return", None):
self.find_job_returned.add(minion)
elif op == 'batch_run':
if minion in self.active:
self.active.remove(minion)
Expand All @@ -131,31 +130,46 @@ def _get_next(self):
return set(list(to_run)[:next_batch_size])

@tornado.gen.coroutine
def check_find_job(self, batch_minions):
def check_find_job(self, batch_minions, jid):
find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid)
self.event.unsubscribe(find_job_return_pattern, match_type='glob')
self.patterns.remove((find_job_return_pattern, "find_job_return"))

timedout_minions = batch_minions.difference(self.find_job_returned).difference(self.done_minions)
self.timedout_minions = self.timedout_minions.union(timedout_minions)
self.active = self.active.difference(self.timedout_minions)
running = batch_minions.difference(self.done_minions).difference(self.timedout_minions)

if timedout_minions:
self.schedule_next()

if running:
self.find_job_returned = self.find_job_returned.difference(running)
self.event.io_loop.add_callback(self.find_job, running)

@tornado.gen.coroutine
def find_job(self, minions):
not_done = minions.difference(self.done_minions)
ping_return = yield self.local.run_job_async(
not_done,
'saltutil.find_job',
[self.batch_jid],
'list',
gather_job_timeout=self.opts['gather_job_timeout'],
jid=self.find_job_jid,
**self.eauth)
self.event.io_loop.call_later(
self.opts['gather_job_timeout'],
self.check_find_job,
not_done)
not_done = minions.difference(self.done_minions).difference(self.timedout_minions)

if not_done:
jid = self.jid_gen()
find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid)
self.patterns.add((find_job_return_pattern, "find_job_return"))
self.event.subscribe(find_job_return_pattern, match_type='glob')

ret = yield self.local.run_job_async(
not_done,
'saltutil.find_job',
[self.batch_jid],
'list',
gather_job_timeout=self.opts['gather_job_timeout'],
jid=jid,
**self.eauth)
self.event.io_loop.call_later(
self.opts['gather_job_timeout'],
self.check_find_job,
not_done,
jid)

@tornado.gen.coroutine
def start(self):
Expand Down Expand Up @@ -203,6 +217,9 @@ def end_batch(self):
}
self.event.fire_event(data, "salt/batch/{0}/done".format(self.batch_jid))
self.event.remove_event_handler(self.__event_handler)
for (pattern, label) in self.patterns:
if label in ["ping_return", "batch_run"]:
self.event.unsubscribe(pattern, match_type='glob')

def schedule_next(self):
if not self.scheduled:
Expand All @@ -226,9 +243,12 @@ def run_next(self):
gather_job_timeout=self.opts['gather_job_timeout'],
jid=self.batch_jid,
metadata=self.metadata)

self.event.io_loop.call_later(self.opts['timeout'], self.find_job, set(next_batch))
except Exception as ex:
log.error("Error in scheduling next batch: %s", ex)
self.active = self.active.difference(next_batch)
else:
self.end_batch()
self.scheduled = False
yield
1 change: 1 addition & 0 deletions salt/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,7 @@ def _prep_pub(self,
'key': self.key,
'tgt_type': tgt_type,
'ret': ret,
'timeout': timeout,
'jid': jid}

# if kwargs are passed, pack them.
Expand Down
1 change: 0 additions & 1 deletion salt/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -2043,7 +2043,6 @@ def get_token(self, clear_load):
def publish_batch(self, clear_load, minions, missing):
batch_load = {}
batch_load.update(clear_load)
import salt.cli.batch_async
batch = salt.cli.batch_async.BatchAsync(
self.local.opts,
functools.partial(self._prep_jid, clear_load, {}),
Expand Down

0 comments on commit 6af0703

Please sign in to comment.