diff --git a/changelog/61247.fixed b/changelog/61247.fixed new file mode 100644 index 000000000000..c8737356ae15 --- /dev/null +++ b/changelog/61247.fixed @@ -0,0 +1 @@ +Minimize the number of network connections minions to the master diff --git a/doc/topics/development/architecture.rst b/doc/topics/development/architecture.rst index 1c717092f830..17400db0017c 100644 --- a/doc/topics/development/architecture.rst +++ b/doc/topics/development/architecture.rst @@ -220,11 +220,15 @@ the received message. 4) The new minion thread is created. The _thread_return() function starts up and actually calls out to the requested function contained in the job. 5) The requested function runs and returns a result. [Still in thread.] -6) The result of the function that's run is encrypted and returned to the -master's ReqServer (TCP 4506 on master). [Still in thread.] +6) The result of the function that's run is published on the minion's local event bus with event +tag "__master_req_channel_payload" [Still in thread.] 7) Thread exits. Because the main thread was only blocked for the time that it took to initialize the worker thread, many other requests could have been received and processed during this time. +8) Minion event handler gets the event with tag "__master_req_channel_payload" +and sends the payload to master's ReqServer (TCP 4506 on master), via the long-running async request channel +that was opened when minion first started up. + A Note on ClearFuncs vs. AESFuncs diff --git a/salt/channel/client.py b/salt/channel/client.py index 7de17b64f63e..ac145f9deb8a 100644 --- a/salt/channel/client.py +++ b/salt/channel/client.py @@ -36,6 +36,7 @@ except ImportError: pass + log = logging.getLogger(__name__) @@ -96,6 +97,7 @@ class AsyncReqChannel: "_crypted_transfer", "_uncrypted_transfer", "send", + "connect", ] close_methods = [ "close", @@ -125,7 +127,7 @@ def factory(cls, opts, **kwargs): else: auth = None - transport = salt.transport.request_client(opts, io_loop) + transport = salt.transport.request_client(opts, io_loop=io_loop) return cls(opts, transport, auth) def __init__(self, opts, transport, auth, **kwargs): @@ -152,14 +154,16 @@ def _package_load(self, load): @salt.ext.tornado.gen.coroutine def crypted_transfer_decode_dictentry( - self, load, dictkey=None, tries=3, timeout=60 + self, + load, + dictkey=None, + timeout=60, ): if not self.auth.authenticated: yield self.auth.authenticate() ret = yield self.transport.send( self._package_load(self.auth.crypticle.dumps(load)), timeout=timeout, - tries=tries, ) key = self.auth.get_keys() if "key" not in ret: @@ -168,7 +172,6 @@ def crypted_transfer_decode_dictentry( ret = yield self.transport.send( self._package_load(self.auth.crypticle.dumps(load)), timeout=timeout, - tries=tries, ) if HAS_M2: aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding) @@ -181,7 +184,7 @@ def crypted_transfer_decode_dictentry( raise salt.ext.tornado.gen.Return(data) @salt.ext.tornado.gen.coroutine - def _crypted_transfer(self, load, tries=3, timeout=60, raw=False): + def _crypted_transfer(self, load, timeout=60, raw=False): """ Send a load across the wire, with encryption @@ -192,7 +195,6 @@ def _crypted_transfer(self, load, tries=3, timeout=60, raw=False): minion state execution call :param dict load: A load to send across the wire - :param int tries: The number of times to make before failure :param int timeout: The number of seconds on a response before failing """ @@ -202,7 +204,6 @@ def _do_transfer(): data = yield self.transport.send( self._package_load(self.auth.crypticle.dumps(load)), timeout=timeout, - tries=tries, ) # we may not have always data # as for example for saltcall ret submission, this is a blind @@ -227,40 +228,50 @@ def _do_transfer(): raise salt.ext.tornado.gen.Return(ret) @salt.ext.tornado.gen.coroutine - def _uncrypted_transfer(self, load, tries=3, timeout=60): + def _uncrypted_transfer(self, load, timeout=60): """ Send a load across the wire in cleartext :param dict load: A load to send across the wire - :param int tries: The number of times to make before failure :param int timeout: The number of seconds on a response before failing """ ret = yield self.transport.send( self._package_load(load), timeout=timeout, - tries=tries, ) raise salt.ext.tornado.gen.Return(ret) + @salt.ext.tornado.gen.coroutine + def connect(self): + yield self.transport.connect() + @salt.ext.tornado.gen.coroutine def send(self, load, tries=3, timeout=60, raw=False): """ Send a request, return a future which will complete when we send the message + + :param dict load: A load to send across the wire + :param int tries: The number of times to make before failure + :param int timeout: The number of seconds on a response before failing """ - try: - if self.crypt == "clear": - log.trace("ReqChannel send clear load=%r", load) - ret = yield self._uncrypted_transfer(load, tries=tries, timeout=timeout) - else: - log.trace("ReqChannel send crypt load=%r", load) - ret = yield self._crypted_transfer( - load, tries=tries, timeout=timeout, raw=raw - ) - except salt.ext.tornado.iostream.StreamClosedError: - # Convert to 'SaltClientError' so that clients can handle this - # exception more appropriately. - raise salt.exceptions.SaltClientError("Connection to master lost") + _try = 1 + while True: + try: + if self.crypt == "clear": + log.trace("ReqChannel send clear load=%r", load) + ret = yield self._uncrypted_transfer(load, timeout=timeout) + else: + log.trace("ReqChannel send crypt load=%r", load) + ret = yield self._crypted_transfer(load, timeout=timeout, raw=raw) + break + except Exception as exc: # pylint: disable=broad-except + log.error("Failed to send msg %r", dir(exc)) + if _try >= tries: + raise + else: + _try += 1 + continue raise salt.ext.tornado.gen.Return(ret) def close(self): diff --git a/salt/minion.py b/salt/minion.py index 1bf6221dd097..6fb2e45a02f0 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -102,6 +102,7 @@ except ImportError: HAS_WIN_FUNCTIONS = False + log = logging.getLogger(__name__) # To set up a minion: @@ -1364,11 +1365,30 @@ def connect_master(self, failed=False): """ Return a future which will complete when you are connected to a master """ + # Consider refactoring so that eval_master does not have a subtle side-effect on the contents of the opts array master, self.pub_channel = yield self.eval_master( self.opts, self.timeout, self.safe, failed ) + + # a long-running req channel + self.req_channel = salt.transport.client.AsyncReqChannel.factory( + self.opts, io_loop=self.io_loop + ) + + if hasattr( + self.req_channel, "connect" + ): # TODO: consider generalizing this for all channels + log.debug("Connecting minion's long-running req channel") + yield self.req_channel.connect() + yield self._post_master_init(master) + @salt.ext.tornado.gen.coroutine + def handle_payload(self, payload, reply_func): + self.payloads.append(payload) + yield reply_func(payload) + self.payload_ack.notify() + # TODO: better name... @salt.ext.tornado.gen.coroutine def _post_master_init(self, master): @@ -1583,7 +1603,6 @@ def _load_modules( return functions, returners, errors, executors def _send_req_sync(self, load, timeout): - if self.opts["minion_sign_messages"]: log.trace("Signing event to be published onto the bus.") minion_privkey_path = os.path.join(self.opts["pki_dir"], "minion.pem") @@ -1592,9 +1611,11 @@ def _send_req_sync(self, load, timeout): ) load["sig"] = sig - with salt.channel.client.ReqChannel.factory(self.opts) as channel: - return channel.send( - load, timeout=timeout, tries=self.opts["return_retry_tries"] + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as event: + return event.fire_event( + load, "__master_req_channel_payload", timeout=timeout ) @salt.ext.tornado.gen.coroutine @@ -1607,9 +1628,11 @@ def _send_req_async(self, load, timeout): ) load["sig"] = sig - with salt.channel.client.AsyncReqChannel.factory(self.opts) as channel: - ret = yield channel.send( - load, timeout=timeout, tries=self.opts["return_retry_tries"] + with salt.utils.event.get_event( + "minion", opts=self.opts, listen=False + ) as event: + ret = yield event.fire_event( + load, "__master_req_channel_payload", timeout=timeout ) raise salt.ext.tornado.gen.Return(ret) @@ -2023,12 +2046,10 @@ def _thread_return(cls, minion_instance, opts, data): else: log.warning("The metadata parameter must be a dictionary. Ignoring.") if minion_instance.connected: - minion_instance._return_pub( - ret, timeout=minion_instance._return_retry_timer() - ) + minion_instance._return_pub(ret) # Add default returners from minion config - # Should have been coverted to comma-delimited string already + # Should have been converted to comma-delimited string already if isinstance(opts.get("return"), str): if data["ret"]: data["ret"] = ",".join((data["ret"], opts["return"])) @@ -2139,9 +2160,7 @@ def _thread_multi_return(cls, minion_instance, opts, data): if "metadata" in data: ret["metadata"] = data["metadata"] if minion_instance.connected: - minion_instance._return_pub( - ret, timeout=minion_instance._return_retry_timer() - ) + minion_instance._return_pub(ret) if data["ret"]: if "ret_config" in data: ret["ret_config"] = data["ret_config"] @@ -2637,6 +2656,7 @@ def _mine_send(self, tag, data): """ Send mine data to the master """ + # Consider using a long-running req channel to send mine data with salt.channel.client.ReqChannel.factory(self.opts) as channel: data["tok"] = self.tok try: @@ -2671,6 +2691,12 @@ def handle_event(self, package): force_refresh=data.get("force_refresh", False), notify=data.get("notify", False), ) + elif tag.startswith("__master_req_channel_payload"): + yield self.req_channel.send( + data, + timeout=self._return_retry_timer(), + tries=self.opts["return_retry_tries"], + ) elif tag.startswith("pillar_refresh"): yield _minion.pillar_refresh( force_refresh=data.get("force_refresh", False), @@ -3134,7 +3160,7 @@ def _handle_payload(self, payload): if self._target_load(payload["load"]): self._handle_decoded_payload(payload["load"]) elif self.opts["zmq_filtering"]: - # In the filtering enabled case, we'd like to know when minion sees something it shouldnt + # In the filtering enabled case, we'd like to know when minion sees something it shouldn't log.trace( "Broadcast message received not for this minion, Load: %s", payload["load"], diff --git a/salt/transport/base.py b/salt/transport/base.py index 18a13befc886..d573cfe466a9 100644 --- a/salt/transport/base.py +++ b/salt/transport/base.py @@ -104,7 +104,7 @@ def __init__(self, opts, io_loop, **kwargs): pass @salt.ext.tornado.gen.coroutine - def send(self, load, tries=3, timeout=60): + def send(self, load, timeout=60): """ Send a request message and return the reply from the server. """ diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 90f021c3ff3d..55ed516a4c24 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -208,7 +208,10 @@ def return_message(msg): log.error("Exception occurred while handling stream: %s", exc) def handle_connection(self, connection, address): - log.trace("IPCServer: Handling connection to address: %s", address) + log.trace( + "IPCServer: Handling connection to address: %s", + address if address else connection, + ) try: with salt.utils.asynchronous.current_ioloop(self.io_loop): stream = IOStream( diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 1e6e337577a3..23cdc5704558 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -641,7 +641,6 @@ def getstream(self, **kwargs): self.backoff, ) yield salt.ext.tornado.gen.sleep(self.backoff) - # self._connecting_future.set_exception(exc) raise salt.ext.tornado.gen.Return(stream) @salt.ext.tornado.gen.coroutine @@ -660,8 +659,6 @@ def _stream_return(self): unpacker = salt.utils.msgpack.Unpacker() while not self._closing: try: - # self._read_until_future = self._stream.read_bytes(4096, partial=True) - # wire_bytes = yield self._read_until_future wire_bytes = yield self._stream.read_bytes(4096, partial=True) unpacker.feed(wire_bytes) for framed_msg in unpacker: @@ -691,18 +688,15 @@ def _stream_return(self): for future in self.send_future_map.values(): future.set_exception(e) self.send_future_map = {} - if self._closing: + if self._closing or self._closed: return if self.disconnect_callback: self.disconnect_callback() - # if the last connect finished, then we need to make a new one - # if self._connecting_future.done(): stream = self._stream self._stream = None - stream.close() + if stream: + stream.close() yield self.connect() - # self._connecting_future = self.connect() - # yield self._connecting_future except TypeError: # This is an invalid transport if "detect_mode" in self.opts: @@ -717,18 +711,15 @@ def _stream_return(self): for future in self.send_future_map.values(): future.set_exception(e) self.send_future_map = {} - if self._closing: + if self._closing or self._closed: return if self.disconnect_callback: self.disconnect_callback() stream = self._stream self._stream = None - stream.close() + if stream: + stream.close() yield self.connect() - # if the last connect finished, then we need to make a new one - # if self._connecting_future.done(): - # self._connecting_future = self.connect() - # yield self._connecting_future self._stream_return_running = False def _message_id(self): @@ -769,20 +760,17 @@ def timeout_message(self, message_id, msg): if message_id not in self.send_future_map: return future = self.send_future_map.pop(message_id) - future.set_exception(SaltReqTimeoutError("Message timed out")) + if future is not None: + future.set_exception(SaltReqTimeoutError("Message timed out")) @salt.ext.tornado.gen.coroutine - def send(self, msg, timeout=None, callback=None, raw=False, future=None, tries=3): + def send(self, msg, timeout=None, callback=None, raw=False): if self._closing: raise ClosingError() message_id = self._message_id() header = {"mid": message_id} - if future is None: - future = salt.ext.tornado.concurrent.Future() - future.tries = tries - future.attempts = 0 - future.timeout = timeout + future = salt.ext.tornado.concurrent.Future() if callback is not None: @@ -799,6 +787,7 @@ def handle_future(future): if timeout is not None: self.io_loop.call_later(timeout, self.timeout_message, message_id, msg) + item = salt.transport.frame.frame_msg(msg, header=header) yield self.connect() yield self._stream.write(item) @@ -1095,8 +1084,8 @@ def connect(self): yield self.message_client.connect() @salt.ext.tornado.gen.coroutine - def send(self, load, tries=3, timeout=60): - ret = yield self.message_client.send(load, tries=3, timeout=60) + def send(self, load, timeout=60): + ret = yield self.message_client.send(load, timeout=timeout) raise salt.ext.tornado.gen.Return(ret) def close(self): diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index 963dfd90b5e1..583347a70748 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -584,35 +584,17 @@ def timeout_message(self, message): # In a race condition the message might have been sent by the time # we're timing it out. Make sure the future is not None if future is not None: - if future.attempts < future.tries: - future.attempts += 1 - log.debug( - "SaltReqTimeoutError, retrying. (%s/%s)", - future.attempts, - future.tries, - ) - self.send( - message, - timeout=future.timeout, - tries=future.tries, - future=future, - ) - - else: - future.set_exception(SaltReqTimeoutError("Message timed out")) + future.set_exception(SaltReqTimeoutError("Message timed out")) @salt.ext.tornado.gen.coroutine - def send(self, message, timeout=None, tries=3, future=None, callback=None): + def send(self, message, timeout=None, callback=None): """ Return a future which will be completed when the message has a response """ - if future is None: - future = salt.ext.tornado.concurrent.Future() - future.tries = tries - future.attempts = 0 - future.timeout = timeout - # if a future wasn't passed in, we need to serialize the message - message = salt.payload.dumps(message) + future = salt.ext.tornado.concurrent.Future() + + message = salt.payload.dumps(message) + if callback is not None: def handle_future(future): @@ -620,6 +602,7 @@ def handle_future(future): self.io_loop.add_callback(callback, response) future.add_done_callback(handle_future) + # Add this future to the mapping self.send_future_map[message] = future @@ -927,9 +910,9 @@ def connect(self): self.message_client.connect() @salt.ext.tornado.gen.coroutine - def send(self, load, tries=3, timeout=60): + def send(self, load, timeout=60): self.connect() - ret = yield self.message_client.send(load, tries=tries, timeout=timeout) + ret = yield self.message_client.send(load, timeout=timeout) raise salt.ext.tornado.gen.Return(ret) def close(self): diff --git a/salt/utils/asynchronous.py b/salt/utils/asynchronous.py index 7a78f9f91fc8..2a858feee980 100644 --- a/salt/utils/asynchronous.py +++ b/salt/utils/asynchronous.py @@ -34,7 +34,7 @@ class SyncWrapper: This is uses as a simple wrapper, for example: asynchronous = AsyncClass() - # this method would reguarly return a future + # this method would regularly return a future future = asynchronous.async_method() sync = SyncWrapper(async_factory_method, (arg1, arg2), {'kwarg1': 'val'}) diff --git a/tests/pytests/functional/transport/server/test_req_channel.py b/tests/pytests/functional/transport/server/test_req_channel.py index 050db7565c79..6bcee0c2dba8 100644 --- a/tests/pytests/functional/transport/server/test_req_channel.py +++ b/tests/pytests/functional/transport/server/test_req_channel.py @@ -100,7 +100,7 @@ def req_channel_crypt(request): @pytest.fixture -def req_channel(req_server_channel, salt_minion, req_channel_crypt): +def push_channel(req_server_channel, salt_minion, req_channel_crypt): with salt.channel.client.ReqChannel.factory( salt_minion.config, crypt=req_channel_crypt ) as _req_channel: @@ -111,7 +111,7 @@ def req_channel(req_server_channel, salt_minion, req_channel_crypt): _req_channel.obj._refcount = 0 -def test_basic(req_channel): +def test_basic(push_channel): """ Test a variety of messages, make sure we get the expected responses """ @@ -121,11 +121,11 @@ def test_basic(req_channel): {"baz": "qux", "list": [1, 2, 3]}, ] for msg in msgs: - ret = req_channel.send(msg, timeout=5, tries=1) + ret = push_channel.send(msg, timeout=5, tries=1) assert ret["load"] == msg -def test_normalization(req_channel): +def test_normalization(push_channel): """ Since we use msgpack, we need to test that list types are converted to lists """ @@ -136,21 +136,21 @@ def test_normalization(req_channel): {"list": tuple([1, 2, 3])}, ] for msg in msgs: - ret = req_channel.send(msg, timeout=5, tries=1) + ret = push_channel.send(msg, timeout=5, tries=1) for key, value in ret["load"].items(): assert types[key] == type(value) -def test_badload(req_channel, req_channel_crypt): +def test_badload(push_channel, req_channel_crypt): """ Test a variety of bad requests, make sure that we get some sort of error """ msgs = ["", [], tuple()] if req_channel_crypt == "clear": for msg in msgs: - ret = req_channel.send(msg, timeout=5, tries=1) + ret = push_channel.send(msg, timeout=5, tries=1) assert ret == "payload and load must be a dict" else: for msg in msgs: with pytest.raises(salt.exceptions.AuthenticationError): - req_channel.send(msg, timeout=5, tries=1) + push_channel.send(msg, timeout=5, tries=1) diff --git a/tests/pytests/unit/test_minion.py b/tests/pytests/unit/test_minion.py index d85c9c785774..d8b0d390aed1 100644 --- a/tests/pytests/unit/test_minion.py +++ b/tests/pytests/unit/test_minion.py @@ -53,25 +53,27 @@ def test_minion_load_grains_default(): @pytest.mark.parametrize( - "req_channel", + "event", [ ( - "salt.channel.client.AsyncReqChannel.factory", - lambda load, timeout, tries: salt.ext.tornado.gen.maybe_future(tries), + "fire_event", + lambda data, tag, cb=None, timeout=60: True, ), ( - "salt.channel.client.ReqChannel.factory", - lambda load, timeout, tries: tries, + "fire_event_async", + lambda data, tag, cb=None, timeout=60: salt.ext.tornado.gen.maybe_future( + True + ), ), ], ) -def test_send_req_tries(req_channel): - channel_enter = MagicMock() - channel_enter.send.side_effect = req_channel[1] - channel = MagicMock() - channel.__enter__.return_value = channel_enter +def test_send_req_fires_completion_event(event): + event_enter = MagicMock() + event_enter.send.side_effect = event[1] + event = MagicMock() + event.__enter__.return_value = event_enter - with patch(req_channel[0], return_value=channel): + with patch("salt.utils.event.get_event", return_value=event): opts = salt.config.DEFAULT_MINION_OPTS.copy() opts["random_startup_delay"] = 0 opts["return_retry_tries"] = 30 @@ -82,12 +84,34 @@ def test_send_req_tries(req_channel): load = {"load": "value"} timeout = 60 - if "Async" in req_channel[0]: + if "async" in event[0]: rtn = minion._send_req_async(load, timeout).result() else: rtn = minion._send_req_sync(load, timeout) - assert rtn == 30 + # get the + for idx, call in enumerate(event.mock_calls, 1): + if "fire_event" in call[0]: + condition_event_tag = ( + len(call.args) > 1 + and call.args[1] == "__master_req_channel_payload" + ) + condition_event_tag_error = "{} != {}; Call(number={}): {}".format( + idx, call, call.args[1], "__master_req_channel_payload" + ) + condition_timeout = ( + len(call.kwargs) == 1 and call.kwargs["timeout"] == timeout + ) + condition_timeout_error = "{} != {}; Call(number={}): {}".format( + idx, call, call.kwargs["timeout"], timeout + ) + + fire_event_called = True + assert condition_event_tag, condition_event_tag_error + assert condition_timeout, condition_timeout_error + + assert fire_event_called + assert rtn @patch("salt.channel.client.ReqChannel.factory")