Skip to content

Commit

Permalink
Improve batch_async to release consumed memory (bsc#1140912)
Browse files Browse the repository at this point in the history
  • Loading branch information
meaksh committed Sep 26, 2019
1 parent 6af0703 commit 002543d
Showing 1 changed file with 45 additions and 28 deletions.
73 changes: 45 additions & 28 deletions salt/cli/batch_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

# Import python libs
from __future__ import absolute_import, print_function, unicode_literals
import gc
import tornado

# Import salt libs
Expand Down Expand Up @@ -77,6 +78,7 @@ def __init__(self, parent_opts, jid_gen, clear_load):
self.batch_jid = jid_gen()
self.find_job_jid = jid_gen()
self.find_job_returned = set()
self.ended = False
self.event = salt.utils.event.get_event(
'master',
self.opts['sock_dir'],
Expand All @@ -86,6 +88,7 @@ def __init__(self, parent_opts, jid_gen, clear_load):
io_loop=ioloop,
keep_loop=True)
self.scheduled = False
self.patterns = {}

def __set_event_handler(self):
ping_return_pattern = 'salt/job/{0}/ret/*'.format(self.ping_jid)
Expand Down Expand Up @@ -116,7 +119,7 @@ def __event_handler(self, raw):
if minion in self.active:
self.active.remove(minion)
self.done_minions.add(minion)
self.schedule_next()
self.event.io_loop.spawn_callback(self.schedule_next)

def _get_next(self):
to_run = self.minions.difference(
Expand All @@ -129,23 +132,23 @@ def _get_next(self):
)
return set(list(to_run)[:next_batch_size])

@tornado.gen.coroutine
def check_find_job(self, batch_minions, jid):
find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid)
self.event.unsubscribe(find_job_return_pattern, match_type='glob')
self.patterns.remove((find_job_return_pattern, "find_job_return"))
if self.event:
find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid)
self.event.unsubscribe(find_job_return_pattern, match_type='glob')
self.patterns.remove((find_job_return_pattern, "find_job_return"))

timedout_minions = batch_minions.difference(self.find_job_returned).difference(self.done_minions)
self.timedout_minions = self.timedout_minions.union(timedout_minions)
self.active = self.active.difference(self.timedout_minions)
running = batch_minions.difference(self.done_minions).difference(self.timedout_minions)
timedout_minions = batch_minions.difference(self.find_job_returned).difference(self.done_minions)
self.timedout_minions = self.timedout_minions.union(timedout_minions)
self.active = self.active.difference(self.timedout_minions)
running = batch_minions.difference(self.done_minions).difference(self.timedout_minions)

if timedout_minions:
self.schedule_next()
if timedout_minions:
self.schedule_next()

if running:
self.find_job_returned = self.find_job_returned.difference(running)
self.event.io_loop.add_callback(self.find_job, running)
if running:
self.find_job_returned = self.find_job_returned.difference(running)
self.event.io_loop.spawn_callback(self.find_job, running)

@tornado.gen.coroutine
def find_job(self, minions):
Expand All @@ -165,19 +168,15 @@ def find_job(self, minions):
gather_job_timeout=self.opts['gather_job_timeout'],
jid=jid,
**self.eauth)
self.event.io_loop.call_later(
self.opts['gather_job_timeout'],
yield tornado.gen.sleep(self.opts['gather_job_timeout'])
self.event.io_loop.spawn_callback(
self.check_find_job,
not_done,
jid)

@tornado.gen.coroutine
def start(self):
self.__set_event_handler()
#start batching even if not all minions respond to ping
self.event.io_loop.call_later(
self.batch_presence_ping_timeout or self.opts['gather_job_timeout'],
self.start_batch)
ping_return = yield self.local.run_job_async(
self.opts['tgt'],
'test.ping',
Expand All @@ -191,6 +190,10 @@ def start(self):
metadata=self.metadata,
**self.eauth)
self.targeted_minions = set(ping_return['minions'])
#start batching even if not all minions respond to ping
yield tornado.gen.sleep(self.batch_presence_ping_timeout or self.opts['gather_job_timeout'])
self.event.io_loop.spawn_callback(self.start_batch)


@tornado.gen.coroutine
def start_batch(self):
Expand All @@ -202,12 +205,14 @@ def start_batch(self):
"down_minions": self.targeted_minions.difference(self.minions),
"metadata": self.metadata
}
self.event.fire_event(data, "salt/batch/{0}/start".format(self.batch_jid))
yield self.run_next()
ret = self.event.fire_event(data, "salt/batch/{0}/start".format(self.batch_jid))
self.event.io_loop.spawn_callback(self.run_next)

@tornado.gen.coroutine
def end_batch(self):
left = self.minions.symmetric_difference(self.done_minions.union(self.timedout_minions))
if not left:
if not left and not self.ended:
self.ended = True
data = {
"available_minions": self.minions,
"down_minions": self.targeted_minions.difference(self.minions),
Expand All @@ -220,20 +225,26 @@ def end_batch(self):
for (pattern, label) in self.patterns:
if label in ["ping_return", "batch_run"]:
self.event.unsubscribe(pattern, match_type='glob')
del self
gc.collect()
yield

@tornado.gen.coroutine
def schedule_next(self):
if not self.scheduled:
self.scheduled = True
# call later so that we maybe gather more returns
self.event.io_loop.call_later(self.batch_delay, self.run_next)
yield tornado.gen.sleep(self.batch_delay)
self.event.io_loop.spawn_callback(self.run_next)

@tornado.gen.coroutine
def run_next(self):
self.scheduled = False
next_batch = self._get_next()
if next_batch:
self.active = self.active.union(next_batch)
try:
yield self.local.run_job_async(
ret = yield self.local.run_job_async(
next_batch,
self.opts['fun'],
self.opts['arg'],
Expand All @@ -244,11 +255,17 @@ def run_next(self):
jid=self.batch_jid,
metadata=self.metadata)

self.event.io_loop.call_later(self.opts['timeout'], self.find_job, set(next_batch))
yield tornado.gen.sleep(self.opts['timeout'])
self.event.io_loop.spawn_callback(self.find_job, set(next_batch))
except Exception as ex:
log.error("Error in scheduling next batch: %s", ex)
self.active = self.active.difference(next_batch)
else:
self.end_batch()
self.scheduled = False
yield self.end_batch()
gc.collect()
yield

def __del__(self):
self.event = None
self.ioloop = None
gc.collect()

0 comments on commit 002543d

Please sign in to comment.