Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial POC of RabbitMQ transport (WIP) #60775

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

devkits
Copy link
Contributor

@devkits devkits commented Aug 20, 2021

What does this PR do?

Initial POC of RabbitMQ transport. The goal is to test out (internally) this new transport as a candidate for RaaS/Salt on SaaS and solicit feedback early feedback.
(work-in-progress, but looking for early high-level feedback before the review gets even larger).

The new transport can be turned on by adding these lines to the corresponding master and minion config files:
transport: rabbitmq
transport_rabbitmq_address: localhost
transport_rabbitmq_auth: {username: user, password: bitnami}

Notes:

  • RabbitMQ design considerations:

    • username/password auth for now
    • connection and channel reuse per thread
    • RPC pattern is implemented with reply_to queues. Eventually we'll probably use a single queue per consumer and use correlation_id instead o a reply_to queue.
    • fanout exchange is used for message brpadcast
    • direct exchange is used when replying
  • Mark tests that test rabbitmq transport as "pytest.mark.xfail". RMQ transport is in POC; so we kip RMQ tests (for now) until RMQ dependencies are dealt with in the CI/CD pipeline.

  • there are a number of TODOs in this review; all should be addressed before review is ready for prime time

  • there are gaps in the implementation that will be addressed. This work is tracked in SSC-978

  • a number of edge cases need to be tested/addressed (e.g. connection error recovery, etc.)

  • added functional tests

  • added integration tests

Testing Done: functional tests with on master and one minion

What issues does this PR fix or reference?

Fixes: VMware SSC-978

Merge requirements satisfied?

[NOTICE] Bug fixes or features added to Salt require tests.

Commits signed with GPG?

No

Please review Salt's Contributing Guide for best practices.

See GitHub's page on GPG signing for more information about signing commits with GPG.

@devkits devkits mentioned this pull request Aug 20, 2021
3 tasks
@devkits
Copy link
Contributor Author

devkits commented Aug 20, 2021

Looking for high-level drive-by comments. Thanks.

@@ -51,10 +51,10 @@ def factory(opts, **kwargs):
ttype = opts["pillar"]["master"]["transport"]

# switch on available ttypes
if ttype in ("zeromq", "tcp", "detect"):
if ttype in ("zeromq", "tcp", "rabbitmq", "detect"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question is this the right place for this? I'm not super familiar with the transport layer, but it seems weird to have a ZeroMQCaller for rabbitmq. Or is it just poor/out-of-date naming? 🤔

salt/master.py Show resolved Hide resolved
salt/master.py Outdated Show resolved Hide resolved
requirements/windows.txt Outdated Show resolved Hide resolved
salt/transport/server.py Outdated Show resolved Hide resolved


@pytest.mark.slow_test
@pytest.mark.xfail(reason="RMQ is POC. Skip RMQ tests until RMQ dependencies are dealt with in the CI/CD pipeline")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW this can be moved to the file level. I think it's

pytest_marks = [pytest.mark.xfail(...)]

There are other examples throughout the test code

Comment on lines +50 to +69
last_msg = time.time()
serial = salt.payload.Serial(self.minion_config)
crypticle = salt.crypt.Crypticle(self.minion_config, self.aes_key)
self.started.set()
while True:
curr_time = time.time()
if time.time() > self.hard_timeout:
break
if curr_time - last_msg >= self.timeout:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question I've seen this code in the test_async_pub_channel - would it be possible to refactor this somewhat?

Also, I wonder if it's possible to extract some/most/all of this functionality into the salt.transport.rabbitmq layer. If there's a lot of commonality between spinning up a pub/sub, especially if the only real difference is how the messages are handled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests were adopted from similar tests that test zeromq transport.
There's plenty of opportunity to refactor and eliminate some code duplication. Note: one school of thought on the Java side of things is that duplication in test code is more acceptable than product code. However, I'll try to refactor this more.

Comment on lines 18 to 20
import pika
from pika import SelectConnection, BasicProperties
from pika.exchange_type import ExchangeType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this is the rabbitmq lib - if we end out making pika an optional dependency this will need a ImportError guard.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense in general, but without pika this entire module is useless, so the import guard should probably be around salt.transport.rabbitmq

salt/transport/rabbitmq.py Outdated Show resolved Hide resolved
def queue_name(self):
return self._queue_name

def __del__(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion __del__ is never guaranteed to be called in Python - if guarantees are desired then with blocks are required.

salt/transport/rabbitmq.py Outdated Show resolved Hide resolved
salt/transport/rabbitmq.py Outdated Show resolved Hide resolved
# For the IPC pattern, set up the "reply-to" header and "correlation_id"
properties = pika.BasicProperties()
properties.reply_to = reply_queue_name if reply_queue_name else None
properties.correlation_id = correlation_id if correlation_id else uuid4().hex
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion same or suggestion here - ... = correlation_id or uuid4().hex. It won't call the function unless correlation_id is falsey.

salt/transport/rabbitmq.py Outdated Show resolved Hide resolved
.format(publish_queue_name, exchange, payload, properties))

try:
# TODO: consider different queue types (durable, quorum, etc.)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question would it also make sense to make the queue types configurable? I don't know a lot about rabbitmq so that might be a ridiculous suggestion 🙃

salt/transport/rabbitmq.py Outdated Show resolved Hide resolved
salt/transport/rabbitmq.py Outdated Show resolved Hide resolved

assert reply_queue_name

log.debug("Sending reply payload on queue [{}]: {}. Payload properties: {}".format(reply_queue_name, payload,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is more than just WIP debugging, it may make sense to dump this to trace instead 🤔 Might warrant some further discussion about how noisy things should be. I'd probably check other transport implementations for guidance there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After I posted this review, I changed all log statements in rabbitmq.py to dump as INFO; pika library itself is very noisy and dumps a lot of stuff in DEBUG, to the point that it's hard to make sense of what is happening due to noise. So yes, we need to tweak the log levels before this review is ready, but for now the most useful log level for the POC seems to be INFO.

Comment on lines 349 to 352
def _callback_wrapper(channel, method, properties, payload):
if self._callback:
log.debug("Calling message callback")
return self._callback(payload)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note Not sure if it's desirable, but internal functions are (re)created each time the outer function is called, i.e. each call to _on_queue_bind will create a new copy of _callback_wrapper function. This is fine if it's only meant to be called once, but if the outer function is called 100x then you'll be creating 100x new instances of the function (that may or may not be discarded, depending on what references are kept 🙃 )

"""
Encapsulate sending routines to RabbitMQ.

RMQ Channels default to 'crypt=aes'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note I have no clue if it comes into play for RMQ, but seeing this brought it to mind - I know that there are certain times that we want FIPS complicance, and we've got some flags for that. If that's a consideration for any type of crypts that RMQ needs to blocklist, then that would be something to account for.

Comment on lines +392 to +737
io_loop = kwargs.get("io_loop")
if io_loop is None:
io_loop = tornado.ioloop.IOLoop.current()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion this could also follow the pattern.

io_loop = kwargs.get("io_loop") or tornado.ioloop.IOLoop.current()

typical dict.get fallback would have it as kwargs.get("io_loop", fallback_value) but unlike the or, or ternary statements, the arguments to the function are evaluated first, so if there's a cost associated with .current() then it will be paid every __new__.

Comment on lines +396 to +741
if io_loop not in cls.instance_map:
cls.instance_map[io_loop] = weakref.WeakValueDictionary()
loop_instance_map = cls.instance_map[io_loop]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion pretty sure that this can be replaced with loop_instance_map = cls.instance_map.setdefault(io_loop, wearef.WeakValueDictionary())

see >>> help(dict.setdefault) for more info.


# an init for the singleton instance to call
def __singleton_init__(self, opts, **kwargs):
self.opts = dict(opts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question would copy.deepcopy(opts) be more appropriate?

Comment on lines +491 to +839
self._io_loop = kwargs.get("io_loop")
if self._io_loop is None:
self._io_loop = tornado.ioloop.IOLoop.current()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion or suggestion applies here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Note that some of this implementation came from zeromq.py. One of the next steps is to refactor common transport code into a base class.

salt/transport/rabbitmq.py Outdated Show resolved Hide resolved
def __init__(self, opts, **kwargs):
self.opts = opts
self.ttype = "rabbitmq"
self.io_loop = kwargs.get("io_loop") or tornado.ioloop.IOLoop.instance()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

☝️ 😂 Yeah, that's exactly what I was talking about 🤣

if payload is not None:
callback(payload)

self._rmq_non_blocking_connection_wrapper.register_message_callback(callback=wrap_callback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note I see a lot of tornado stuff, and I don't recall seeing a lot of tornado-based testing. I don't know if that's covered by the existing tests, but especially for things like authentication we should have some tests around that 👍

salt/transport/rabbitmq.py Outdated Show resolved Hide resolved
except Exception as err: # pylint: disable=broad-except
del self.send_queue[0]
continue
del self.send_queue[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: do other things do anything with the send_queue? Perhaps it would make more sense to message = self.send_queue.pop(0)? (also, the collections module contains actual queues, if that would be a better approach here?)

Comment on lines +1174 to +1586
if message not in self.send_timeout_map:
return
timeout = self.send_timeout_map.pop(message, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion with the fallback, this if statement is unnecessary. The one just after this is already guarding for None, so... that's really all that's necessary.

salt/transport/rabbitmq.py Outdated Show resolved Hide resolved
(work-in-progress, but looking for early high-level feedback before the review gets even larger).

The new transport can be turned on by adding these lines to the corresponding master and minion config files:

   transport: rabbitmq
   transport_rabbitmq_address: localhost
   transport_rabbitmq_auth: {username: user, password: bitnami}

Notes:
   - RabbitMQ design considerations:
        - username/password auth for now
        - connection and channel reuse per thread
        - RPC pattern is implemented with reply_to queues. Eventually we'll probably use a single queue per consumer and use correlation_id instead o a reply_to queue.
        - fanout exchange is used for message brpadcast
        - direct exchange is used when replying

  - Mark tests that test rabbitmq transport as "pytest.mark.xfail". RMQ transport is in POC; so we kip RMQ tests (for now) until RMQ dependencies are dealt with in the CI/CD pipeline.
  - there are a number of TODOs in this review; all should be addressed before review is ready for prime time
  - there are gaps in the implementation that will be addressed. This work is tracked in SSC-978
  - a number of edge cases need to be tested/addressed (e.g. connection error recovery, etc.)
  - added functional tests
  - added integration tests

Testing Done: functional tests with on master and one minion
Initial POC of RabbitMQ transport (work-in-progress, but looking for early feedback before the review gets too large).
  - there are a number of TODOs in this review; all should be addressed before review is ready for prime time
  - there are gaps in the implementation that will be addressed. This work is tracked in SSC-978
  - a number of edge cases need to be tested/addressed (e.g. connection error recovery, etc.)
  - added functional tests
  - added integration tests

Testing Done: functional tests with on master and one minion
- Addressed some/most code review comments
- Installed pre-commit hook and addressed issues identified (formatting, etc.)
- Added connection recovery to RMQ*ConnectionWrapper* classes
- Removed use of blocking connection in a separate thread. Now using non-blocking connection with custom io_loop

Testing Done: functional tests with on master and one minion

Testing Done:
  - added functional tests
…this review:

- Separated RMQ topology creation and consumption so that exchanges, queues, etc. can be created out-of-band. It is controlled by this config setting:
        transport_rabbitmq_create_topology_ondemand: True

- Refactored POC so that messages are published to a "publisher exchange" and consumed from a "consumer exchange/consumer queue".

- RPC pattern (where publisher and consumer share the same connection) was refactored to use RMQ's "direct-reply-to" pattern (https://www.rabbitmq.com/direct-reply-to.html

- Moved RMQ topology object names and declaration metadata into config file. Config settings are as follows:
       - transport_rabbitmq_publisher_exchange_name: salt_master_exchange
       - transport_rabbitmq_publisher_exchange_declare_arguments:
       - transport_rabbitmq_consumer_exchange_name: salt_master_exchange
       - transport_rabbitmq_consumer_exchange_declare_arguments:
       - transport_rabbitmq_consumer_queue_name: salt_minion_queue
       - transport_rabbitmq_consumer_queue_declare_arguments: { "x-expires": 600000, "x-max-length": 10000, "x-queue-type": "quorum", "x-queue-mode": "lazy", "x-message-ttl": 259200000}

- Started using quorum queues with expiration and message-ttol of 72 hours, e.g. configurable queue declaration looks like this: { "x-expires": 600000, "x-max-length": 10000, "x-queue-type": "quorum", "x-queue-mode": "lazy", "x-message-ttl": 259200000}

- Added additional error checking for cases when RMQ topology does not exist when Salt tries to use it.

Testing Done:
  - Updated and ran functional tests. Specifically:
     - test_pub_server_channel.test_publish_to_pubserv_ipc
     - test_async_pub_channel.test_rmq_connection_recovery
     - test_async_pub_channel.test_publish_to_pubserv_ipc_async_collector
  - Ran an e2e test of master sending a ping to a minion
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants