Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable minion's IPC channel to aggregate results from spawned jobber processes #61468

Merged
1 change: 1 addition & 0 deletions changelog/61247.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Minimize the number of network connections minions to the master
8 changes: 6 additions & 2 deletions doc/topics/development/architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,15 @@ the received message.
4) The new minion thread is created. The _thread_return() function starts up
and actually calls out to the requested function contained in the job.
5) The requested function runs and returns a result. [Still in thread.]
6) The result of the function that's run is encrypted and returned to the
master's ReqServer (TCP 4506 on master). [Still in thread.]
6) The result of the function that's run is published on the minion's local event bus with event
tag "__master_req_channel_payload" [Still in thread.]
7) Thread exits. Because the main thread was only blocked for the time that it
took to initialize the worker thread, many other requests could have been
received and processed during this time.
8) Minion event handler gets the event with tag "__master_req_channel_payload"
and sends the payload to master's ReqServer (TCP 4506 on master), via the long-running async request channel
that was opened when minion first started up.



A Note on ClearFuncs vs. AESFuncs
Expand Down
57 changes: 34 additions & 23 deletions salt/channel/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
except ImportError:
pass


log = logging.getLogger(__name__)


Expand Down Expand Up @@ -96,6 +97,7 @@ class AsyncReqChannel:
"_crypted_transfer",
"_uncrypted_transfer",
"send",
"connect",
]
close_methods = [
"close",
Expand Down Expand Up @@ -125,7 +127,7 @@ def factory(cls, opts, **kwargs):
else:
auth = None

transport = salt.transport.request_client(opts, io_loop)
transport = salt.transport.request_client(opts, io_loop=io_loop)
return cls(opts, transport, auth)

def __init__(self, opts, transport, auth, **kwargs):
Expand All @@ -152,14 +154,16 @@ def _package_load(self, load):

@salt.ext.tornado.gen.coroutine
def crypted_transfer_decode_dictentry(
self, load, dictkey=None, tries=3, timeout=60
self,
load,
dictkey=None,
timeout=60,
):
if not self.auth.authenticated:
yield self.auth.authenticate()
ret = yield self.transport.send(
self._package_load(self.auth.crypticle.dumps(load)),
timeout=timeout,
tries=tries,
)
key = self.auth.get_keys()
if "key" not in ret:
Expand All @@ -168,7 +172,6 @@ def crypted_transfer_decode_dictentry(
ret = yield self.transport.send(
self._package_load(self.auth.crypticle.dumps(load)),
timeout=timeout,
tries=tries,
)
if HAS_M2:
aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding)
Expand All @@ -181,7 +184,7 @@ def crypted_transfer_decode_dictentry(
raise salt.ext.tornado.gen.Return(data)

@salt.ext.tornado.gen.coroutine
def _crypted_transfer(self, load, tries=3, timeout=60, raw=False):
def _crypted_transfer(self, load, timeout=60, raw=False):
"""
Send a load across the wire, with encryption

Expand All @@ -192,7 +195,6 @@ def _crypted_transfer(self, load, tries=3, timeout=60, raw=False):
minion state execution call

:param dict load: A load to send across the wire
:param int tries: The number of times to make before failure
:param int timeout: The number of seconds on a response before failing
"""

Expand All @@ -202,7 +204,6 @@ def _do_transfer():
data = yield self.transport.send(
self._package_load(self.auth.crypticle.dumps(load)),
timeout=timeout,
tries=tries,
)
# we may not have always data
# as for example for saltcall ret submission, this is a blind
Expand All @@ -227,40 +228,50 @@ def _do_transfer():
raise salt.ext.tornado.gen.Return(ret)

@salt.ext.tornado.gen.coroutine
def _uncrypted_transfer(self, load, tries=3, timeout=60):
def _uncrypted_transfer(self, load, timeout=60):
"""
Send a load across the wire in cleartext

:param dict load: A load to send across the wire
:param int tries: The number of times to make before failure
:param int timeout: The number of seconds on a response before failing
"""
ret = yield self.transport.send(
self._package_load(load),
timeout=timeout,
tries=tries,
)

raise salt.ext.tornado.gen.Return(ret)

@salt.ext.tornado.gen.coroutine
def connect(self):
yield self.transport.connect()

@salt.ext.tornado.gen.coroutine
def send(self, load, tries=3, timeout=60, raw=False):
"""
Send a request, return a future which will complete when we send the message

:param dict load: A load to send across the wire
:param int tries: The number of times to make before failure
:param int timeout: The number of seconds on a response before failing
"""
try:
if self.crypt == "clear":
log.trace("ReqChannel send clear load=%r", load)
ret = yield self._uncrypted_transfer(load, tries=tries, timeout=timeout)
else:
log.trace("ReqChannel send crypt load=%r", load)
ret = yield self._crypted_transfer(
load, tries=tries, timeout=timeout, raw=raw
)
except salt.ext.tornado.iostream.StreamClosedError:
# Convert to 'SaltClientError' so that clients can handle this
# exception more appropriately.
raise salt.exceptions.SaltClientError("Connection to master lost")
_try = 1
while True:
try:
if self.crypt == "clear":
log.trace("ReqChannel send clear load=%r", load)
ret = yield self._uncrypted_transfer(load, timeout=timeout)
else:
log.trace("ReqChannel send crypt load=%r", load)
ret = yield self._crypted_transfer(load, timeout=timeout, raw=raw)
break
except Exception as exc: # pylint: disable=broad-except
log.error("Failed to send msg %r", dir(exc))
if _try >= tries:
raise
else:
_try += 1
continue
raise salt.ext.tornado.gen.Return(ret)

def close(self):
Expand Down
56 changes: 41 additions & 15 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
except ImportError:
HAS_WIN_FUNCTIONS = False


log = logging.getLogger(__name__)

# To set up a minion:
Expand Down Expand Up @@ -1364,11 +1365,30 @@ def connect_master(self, failed=False):
"""
Return a future which will complete when you are connected to a master
"""
# Consider refactoring so that eval_master does not have a subtle side-effect on the contents of the opts array
dwoz marked this conversation as resolved.
Show resolved Hide resolved
master, self.pub_channel = yield self.eval_master(
self.opts, self.timeout, self.safe, failed
)

# a long-running req channel
self.req_channel = salt.transport.client.AsyncReqChannel.factory(
self.opts, io_loop=self.io_loop
)

if hasattr(
self.req_channel, "connect"
): # TODO: consider generalizing this for all channels
log.debug("Connecting minion's long-running req channel")
yield self.req_channel.connect()

yield self._post_master_init(master)

@salt.ext.tornado.gen.coroutine
def handle_payload(self, payload, reply_func):
self.payloads.append(payload)
yield reply_func(payload)
self.payload_ack.notify()

# TODO: better name...
@salt.ext.tornado.gen.coroutine
def _post_master_init(self, master):
Expand Down Expand Up @@ -1583,7 +1603,6 @@ def _load_modules(
return functions, returners, errors, executors

def _send_req_sync(self, load, timeout):

if self.opts["minion_sign_messages"]:
log.trace("Signing event to be published onto the bus.")
minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem")
Expand All @@ -1592,9 +1611,11 @@ def _send_req_sync(self, load, timeout):
)
load["sig"] = sig

with salt.channel.client.ReqChannel.factory(self.opts) as channel:
return channel.send(
load, timeout=timeout, tries=self.opts["return_retry_tries"]
with salt.utils.event.get_event(
"minion", opts=self.opts, listen=False
) as event:
return event.fire_event(
load, "__master_req_channel_payload", timeout=timeout
)

@salt.ext.tornado.gen.coroutine
Expand All @@ -1607,9 +1628,11 @@ def _send_req_async(self, load, timeout):
)
load["sig"] = sig

with salt.channel.client.AsyncReqChannel.factory(self.opts) as channel:
ret = yield channel.send(
load, timeout=timeout, tries=self.opts["return_retry_tries"]
with salt.utils.event.get_event(
"minion", opts=self.opts, listen=False
) as event:
ret = yield event.fire_event(
load, "__master_req_channel_payload", timeout=timeout
)
raise salt.ext.tornado.gen.Return(ret)

Expand Down Expand Up @@ -2023,12 +2046,10 @@ def _thread_return(cls, minion_instance, opts, data):
else:
log.warning("The metadata parameter must be a dictionary. Ignoring.")
if minion_instance.connected:
minion_instance._return_pub(
ret, timeout=minion_instance._return_retry_timer()
)
minion_instance._return_pub(ret)

# Add default returners from minion config
# Should have been coverted to comma-delimited string already
# Should have been converted to comma-delimited string already
if isinstance(opts.get("return"), str):
if data["ret"]:
data["ret"] = ",".join((data["ret"], opts["return"]))
Expand Down Expand Up @@ -2139,9 +2160,7 @@ def _thread_multi_return(cls, minion_instance, opts, data):
if "metadata" in data:
ret["metadata"] = data["metadata"]
if minion_instance.connected:
minion_instance._return_pub(
ret, timeout=minion_instance._return_retry_timer()
)
minion_instance._return_pub(ret)
if data["ret"]:
if "ret_config" in data:
ret["ret_config"] = data["ret_config"]
Expand Down Expand Up @@ -2637,6 +2656,7 @@ def _mine_send(self, tag, data):
"""
Send mine data to the master
"""
# Consider using a long-running req channel to send mine data
with salt.channel.client.ReqChannel.factory(self.opts) as channel:
data["tok"] = self.tok
try:
Expand Down Expand Up @@ -2671,6 +2691,12 @@ def handle_event(self, package):
force_refresh=data.get("force_refresh", False),
notify=data.get("notify", False),
)
elif tag.startswith("__master_req_channel_payload"):
yield self.req_channel.send(
data,
timeout=self._return_retry_timer(),
tries=self.opts["return_retry_tries"],
)
elif tag.startswith("pillar_refresh"):
yield _minion.pillar_refresh(
force_refresh=data.get("force_refresh", False),
Expand Down Expand Up @@ -3134,7 +3160,7 @@ def _handle_payload(self, payload):
if self._target_load(payload["load"]):
self._handle_decoded_payload(payload["load"])
elif self.opts["zmq_filtering"]:
# In the filtering enabled case, we'd like to know when minion sees something it shouldnt
# In the filtering enabled case, we'd like to know when minion sees something it shouldn't
log.trace(
"Broadcast message received not for this minion, Load: %s",
payload["load"],
Expand Down
2 changes: 1 addition & 1 deletion salt/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(self, opts, io_loop, **kwargs):
pass

@salt.ext.tornado.gen.coroutine
def send(self, load, tries=3, timeout=60):
def send(self, load, timeout=60):
"""
Send a request message and return the reply from the server.
"""
Expand Down
5 changes: 4 additions & 1 deletion salt/transport/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,10 @@ def return_message(msg):
log.error("Exception occurred while handling stream: %s", exc)

def handle_connection(self, connection, address):
log.trace("IPCServer: Handling connection to address: %s", address)
log.trace(
"IPCServer: Handling connection to address: %s",
address if address else connection,
)
try:
with salt.utils.asynchronous.current_ioloop(self.io_loop):
stream = IOStream(
Expand Down
Loading