Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial POC of RabbitMQ transport (WIP) #60775

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def _runtests(session):

@nox.session(python=_PYTHON_VERSIONS, name="runtests-parametrized")
@nox.parametrize("coverage", [False, True])
@nox.parametrize("transport", ["zeromq", "tcp"])
@nox.parametrize("transport", ["zeromq", "tcp", "rabbitmq"])
@nox.parametrize("crypto", [None, "m2crypto", "pycryptodome"])
def runtests_parametrized(session, coverage, transport, crypto):
"""
Expand Down Expand Up @@ -511,7 +511,7 @@ def runtests_tornado(session, coverage):

@nox.session(python=_PYTHON_VERSIONS, name="pytest-parametrized")
@nox.parametrize("coverage", [False, True])
@nox.parametrize("transport", ["zeromq", "tcp"])
@nox.parametrize("transport", ["zeromq", "tcp", "rabbitmq"])
@nox.parametrize("crypto", [None, "m2crypto", "pycryptodome"])
def pytest_parametrized(session, coverage, transport, crypto):
"""
Expand Down
4 changes: 4 additions & 0 deletions requirements/rabbitmq.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-r base.txt
-r crypto.txt

pika >= 1.2.0
4 changes: 2 additions & 2 deletions salt/cli/caller.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ def factory(opts, **kwargs):
ttype = opts["pillar"]["master"]["transport"]

# switch on available ttypes
if ttype in ("zeromq", "tcp", "detect"):
if ttype in ("zeromq", "tcp", "rabbitmq", "detect"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question is this the right place for this? I'm not super familiar with the transport layer, but it seems weird to have a ZeroMQCaller for rabbitmq. Or is it just poor/out-of-date naming? 🤔

return ZeroMQCaller(opts, **kwargs)
else:
raise Exception("Callers are only defined for ZeroMQ and TCP")
raise Exception("Callers are only defined for ZeroMQ, TCP and RabbitMQ")
# return NewKindOfCaller(opts, **kwargs)


Expand Down
6 changes: 3 additions & 3 deletions salt/cli/daemons.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,13 @@ def prepare(self):
if (HAS_PSUTIL and not self.claim_process_responsibility()) or (
not HAS_PSUTIL and self.check_running()
):
self.action_log_info("An instance is already running. Exiting")
self.action_log_info("An instance is already running. Exiting.")
self.shutdown(1)

transport = self.config.get("transport").lower()

# TODO: AIO core is separate from transport
if transport in ("zeromq", "tcp", "detect"):
if transport in ("zeromq", "tcp", "rabbitmq", "detect"):
# Late import so logging works correctly
import salt.minion

Expand All @@ -320,7 +320,7 @@ def prepare(self):
else:
log.error(
"The transport '%s' is not supported. Please use one of "
"the following: tcp, zeromq, or detect.",
"the following: tcp, zeromq, rabbitmq or detect.",
transport,
)
self.shutdown(1)
Expand Down
3 changes: 2 additions & 1 deletion salt/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ def run_job(
raise
except Exception as general_exception: # pylint: disable=broad-except
# Convert to generic client error and pass along message
log.exception("Error publishing command to minions")
raise SaltClientError(general_exception)

return self._check_pub_data(pub_data, listen=listen)
Expand Down Expand Up @@ -1874,7 +1875,7 @@ def pub(
A set, the targets that the tgt passed should match.
"""
# Make sure the publisher is running by checking the unix socket
if self.opts.get("ipc_mode", "") != "tcp" and not os.path.exists(
if self.opts.get("ipc_mode", "") == "zeromq" and not os.path.exists(
os.path.join(self.opts["sock_dir"], "publish_pull.ipc")
):
log.error(
Expand Down
2 changes: 1 addition & 1 deletion salt/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ def _gather_buffer_space():
"sign_pub_messages": bool,
# The size of key that should be generated when creating new keys
"keysize": int,
# The transport system for this daemon. (i.e. zeromq, tcp, detect, etc)
# The transport system for this daemon. (i.e. zeromq, tcp, rabbitmq, detect, etc.)
"transport": str,
# The number of seconds to wait when the client is requesting information about running jobs
"gather_job_timeout": int,
Expand Down
34 changes: 18 additions & 16 deletions salt/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ def _post_fork_init(self):
if self.opts.get("presence_events", False):
tcp_only = True
for transport, _ in iter_transport_opts(self.opts):
if transport != "tcp":
if transport not in ("tcp", "rabbitmq"): # TODO: RMQ
tcp_only = False
if not tcp_only:
# For a TCP only transport, the presence events will be
# For a TCP and rabbitmq-only transport, the presence events will be
# handled in the transport code.
self.presence_events = True

Expand Down Expand Up @@ -237,7 +237,7 @@ def handle_key_cache(self):
if self.opts["key_cache"] == "sched":
keys = []
# TODO DRY from CKMinions
if self.opts["transport"] in ("zeromq", "tcp"):
if self.opts["transport"] in ("zeromq", "tcp", "rabbitmq"):
acc = "minions"
else:
acc = "accepted"
Expand Down Expand Up @@ -852,13 +852,10 @@ def __bind(self):
)

req_channels = []
tcp_only = True
for transport, opts in iter_transport_opts(self.opts):
chan = salt.transport.server.ReqServerChannel.factory(opts)
chan.pre_fork(self.process_manager)
req_channels.append(chan)
if transport != "tcp":
tcp_only = False
devkits marked this conversation as resolved.
Show resolved Hide resolved

kwargs = {}
if salt.utils.platform.is_windows():
Expand Down Expand Up @@ -972,7 +969,7 @@ def __bind(self):
pass

@salt.ext.tornado.gen.coroutine
def _handle_payload(self, payload):
def _handle_payload(self, payload, **optional_message_properties):
"""
The _handle_payload method is the key method used to figure out what
needs to be done with communication to the server
Expand All @@ -992,10 +989,13 @@ def _handle_payload(self, payload):
'user': 'root'}}

:param dict payload: The payload route to the appropriate handler
:param optional_message_properties additional metadata that accompanies the payload, e.g. transport-specific stuff
"""
key = payload["enc"]
load = payload["load"]
ret = {"aes": self._handle_aes, "clear": self._handle_clear}[key](load)
ret = {"aes": self._handle_aes, "clear": self._handle_clear}[key](
load, **optional_message_properties
)
raise salt.ext.tornado.gen.Return(ret)

def _post_stats(self, start, cmd):
Expand All @@ -1020,11 +1020,12 @@ def _post_stats(self, start, cmd):
self.stats = collections.defaultdict(lambda: {"mean": 0, "runs": 0})
self.stat_clock = end

def _handle_clear(self, load):
def _handle_clear(self, load, **optional_load_properties):
"""
Process a cleartext command

:param dict load: Cleartext payload
:param optional additional metadata that accompanies the payload, e.g. transport-specific stuff
:return: The result of passing the load to a function in ClearFuncs corresponding to
the command specified in the load's 'cmd' key.
"""
Expand All @@ -1036,16 +1037,17 @@ def _handle_clear(self, load):
if self.opts["master_stats"]:
start = time.time()
self.stats[cmd]["runs"] += 1
ret = method(load), {"fun": "send_clear"}
ret = method(load, **optional_load_properties), {"fun": "send_clear"}
if self.opts["master_stats"]:
self._post_stats(start, cmd)
return ret

def _handle_aes(self, data):
def _handle_aes(self, data, **optional_load_properties):
"""
Process a command sent via an AES key

:param str load: Encrypted payload
:param optional additional metadata that accompanies the payload, e.g. transport-specific stuff
:return: The result of passing the load to a function in AESFuncs corresponding to
the command specified in the load's 'cmd' key.
"""
Expand Down Expand Up @@ -2112,7 +2114,7 @@ def get_token(self, clear_load):
return False
return self.loadauth.get_tok(clear_load["token"])

def publish(self, clear_load):
def publish(self, clear_load, **optional_transport_args):
"""
This method sends out publications to the minions, it can only be used
by the LocalClient.
Expand Down Expand Up @@ -2170,7 +2172,7 @@ def publish(self, clear_load):
}
}

# All Token, Eauth, and non-root users must pass the authorization check
# All Token, Auth, and non-root users must pass the authorization check
if auth_type != "user" or (auth_type == "user" and auth_list):
# Authorize the request
authorized = self.ckminions.auth_check(
Expand Down Expand Up @@ -2238,7 +2240,7 @@ def publish(self, clear_load):

# Send it!
self._send_ssh_pub(payload, ssh_minions=ssh_minions)
self._send_pub(payload)
self._send_pub(payload, **optional_transport_args)

return {
"enc": "clear",
Expand Down Expand Up @@ -2287,13 +2289,13 @@ def _prep_jid(self, clear_load, extra):
return {"error": msg}
return jid

def _send_pub(self, load):
def _send_pub(self, load, **optional_transport_args):
"""
Take a load and send it across the network to connected minions
"""
for transport, opts in iter_transport_opts(self.opts):
chan = salt.transport.server.PubServerChannel.factory(opts)
chan.publish(load)
chan.publish(load, **optional_transport_args)

@property
def ssh_client(self):
Expand Down
2 changes: 1 addition & 1 deletion salt/metaproxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def post_master_init(self, master):
self.schedule.delete_job("__mine_interval", persist=True)

# add master_alive job if enabled
if self.opts["transport"] != "tcp" and self.opts["master_alive_interval"] > 0:
if self.opts["transport"] not in ("tcp", "rabbitmq") and self.opts["master_alive_interval"] > 0: # TODO: RMQ
self.schedule.add_job(
{
salt.minion.master_event(type="alive", master=self.opts["master"]): {
Expand Down
12 changes: 6 additions & 6 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ def eval_master(self, opts, timeout=60, safe=True, failed=False, failback=False)
try:
if self.opts["transport"] == "detect":
self.opts["detect_mode"] = True
for trans in ("zeromq", "tcp"):
for trans in ("zeromq", "tcp", "rabbitmq"):
if trans == "zeromq" and not zmq:
continue
self.opts["transport"] = trans
Expand Down Expand Up @@ -1445,7 +1445,7 @@ def _post_master_init(self, master):

# add master_alive job if enabled
if (
self.opts["transport"] != "tcp"
self.opts["transport"] not in ("tcp", "rabbitmq") # TODO: RMQ
and self.opts["master_alive_interval"] > 0
and self.connected
):
Expand Down Expand Up @@ -2736,7 +2736,7 @@ def handle_event(self, package):

if self.opts["master_type"] != "failover":
# modify the scheduled job to fire on reconnect
if self.opts["transport"] != "tcp":
if self.opts["transport"] not in ("tcp", "rabbitmq"): # TODO: RMQ
schedule = {
"function": "status.master",
"seconds": self.opts["master_alive_interval"],
Expand All @@ -2754,7 +2754,7 @@ def handle_event(self, package):
)
else:
# delete the scheduled job to don't interfere with the failover process
if self.opts["transport"] != "tcp":
if self.opts["transport"] not in ("tcp", "rabbitmq"): # TODO: RMQ
self.schedule.delete_job(name=master_event(type="alive"))

log.info("Trying to tune in to next master from master-list")
Expand Down Expand Up @@ -2801,7 +2801,7 @@ def handle_event(self, package):
log.info("Minion is ready to receive requests!")

# update scheduled job to run with the new master addr
if self.opts["transport"] != "tcp":
if self.opts["transport"] not in ("tcp", "rabbitmq"): # TODO: RMQ
schedule = {
"function": "status.master",
"seconds": self.opts["master_alive_interval"],
Expand Down Expand Up @@ -2859,7 +2859,7 @@ def handle_event(self, package):
self.connected = True
# modify the __master_alive job to only fire,
# if the connection is lost again
if self.opts["transport"] != "tcp":
if self.opts["transport"] not in ("tcp", "rabbitmq"): # TODO: RMQ
schedule = {
"function": "status.master",
"seconds": self.opts["master_alive_interval"],
Expand Down
2 changes: 1 addition & 1 deletion salt/modules/mine.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _mine_send(load, opts):


def _mine_get(load, opts):
if opts.get("transport", "") in ("zeromq", "tcp"):
if opts.get("transport", "") in ("zeromq", "tcp", "rabbitmq"):
try:
load["tok"] = _auth().gen_token(b"salt")
except AttributeError:
Expand Down
2 changes: 1 addition & 1 deletion salt/modules/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

def __virtual__():
return (
__virtualname__ if __opts__.get("transport", "") in ("zeromq", "tcp") else False
__virtualname__ if __opts__.get("transport", "") in ("zeromq", "tcp", "rabbitmq") else False
)


Expand Down
2 changes: 1 addition & 1 deletion salt/netapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def _is_master_running(self):
if salt.utils.platform.is_windows():
return True

if self.opts["transport"] == "tcp":
if self.opts["transport"] in ("tcp", "rabbitmq"): # TODO: RMQ
ipc_file = "publish_pull.ipc"
else:
ipc_file = "workers.ipc"
Expand Down
2 changes: 1 addition & 1 deletion salt/output/key.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def output(data, **kwargs): # pylint: disable=unused-argument
ident = 0
if __opts__.get("__multi_key"):
ident = 4
if __opts__["transport"] in ("zeromq", "tcp"):
if __opts__["transport"] in ("zeromq", "tcp", "rabbitmq"):
acc = "minions"
pend = "minions_pre"
den = "minions_denied"
Expand Down
12 changes: 10 additions & 2 deletions salt/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,16 @@ def factory(cls, opts, **kwargs):
import salt.transport.tcp

return salt.transport.tcp.AsyncTCPReqChannel(opts, **kwargs)
elif ttype == "rabbitmq":
import salt.transport.rabbitmq

return salt.transport.rabbitmq.AsyncRabbitMQReqChannel(opts, **kwargs)
elif ttype == "local":
raise Exception("There's no AsyncLocalChannel implementation yet")
# import salt.transport.local
# return salt.transport.local.AsyncLocalChannel(opts, **kwargs)
else:
raise Exception("Channels are only defined for tcp, zeromq, and local")
raise Exception("Channels are only defined for tcp, zeromq, rabbitmq and local")
# return NewKindOfChannel(opts, **kwargs)

def send(self, load, tries=3, timeout=60, raw=False):
Expand Down Expand Up @@ -210,12 +214,16 @@ def factory(cls, opts, **kwargs):
import salt.transport.tcp

return salt.transport.tcp.AsyncTCPPubChannel(opts, **kwargs)
elif ttype == "rabbitmq":
import salt.transport.rabbitmq

return salt.transport.rabbitmq.AsyncRabbitMQPubChannel(opts, **kwargs)
elif ttype == "local": # TODO:
raise Exception("There's no AsyncLocalPubChannel implementation yet")
# import salt.transport.local
# return salt.transport.local.AsyncLocalPubChannel(opts, **kwargs)
else:
raise Exception("Channels are only defined for tcp, zeromq, and local")
raise Exception("Channels are only defined for tcp, zeromq, rabbitmq and local")
# return NewKindOfChannel(opts, **kwargs)

def connect(self):
Expand Down
2 changes: 1 addition & 1 deletion salt/transport/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _decode_embedded_dict(src):

def decode_embedded_strs(src):
"""
Convert enbedded bytes to strings if possible.
Convert embedded bytes to strings if possible.
This is necessary because Python 3 makes a distinction
between these types.

Expand Down
Loading