rabbit_force is a Salesforce Streaming API to RabbitMQ adapter service. It listens for event messages from Salesforce's Streaming API and forwards them to a RabbitMQ broker for you, so you don't have to.
Streaming API is useful when you want notifications to be pushed from the server to the client based on criteria that you define with PushTopics or to receive generic streaming messages. While RabbitMQ is one of the most popular options for implementing inter-service communication in a microservice architecture.
While there are lots of great client implementations for RabbitMQ/AMQP for various languages, there are a lot less Streaming API clients, and many of them are badly maintained. Furthermore RabbitMQ offers much more flexible message consumption techniques.
rabbit_force aims to fix these problems, by providing and adapter between the Streaming API and RabbitMQ, so inter-connected services can consume Streaming API event messages with RabbitMQ. It even supports connection with multiple Salesforce orgs, multiple RabbitMQ brokers, and routing messages between them.
- Forward Streaming API messages from one or more Salesforce orgs to one or more RabbitMQ brokers
- Route incoming messages to a specific broker and exchange with the specified routing key and properties, with the help of routing rules defined as JSONPath expressions
- Support for Salesforce's replay extension for message reliability and durability by storing replay markers in a Redis database
- Configurable error handling behavior, either fail instantly or try to recover from network and service outages
- Message sources, sinks and routing configurable with JSON or YAML configuration files
- Implemented using python asyncio for efficient handling of IO intensive operations
A relatively simple use case is illustrated on the image bellow. rabbit_force
is connected to a single message source and message sink, or in other words to
a single Salesforce org named as my_org
and a single RabbitMQ broker named
my_broker
. It listens for messages from two PushTopics (lead_changes
and contact_changes
) and a StreamingChannel (my_channel
), and forwards
messages into the exchange my_exchange
with different routing keys. A redis
database is used to store replay markers sent by Salesforce to take advantage
of message durability.
The configuration file bellow sets up rabbit_force to forward messages from
lead_changes
, contact_changes
and my_channel
with the routing keys
of lead_change_message
, contact_change_message
and
my_channel_message
respectively.
config.yaml:
# message source definition
source:
# mapping of Salesforce orgs to use as message sources
orgs:
# a Salesforce org named as "my_org"
my_org:
# authentication credentials
consumer_key: "<consumer_key>"
consumer_secret: "<consumer_secret>"
username: "<username>"
password: "<password>"
# list of resources that the service can listen to for messages
# if they doesn't exist, they'll be created on application startup
resources:
# a PushTopic resource
- type: PushTopic
# the definition of the PushTopic
spec:
Name: lead_changes
ApiVersion: 42.0
NotifyForFields: Referenced
NotifyForOperationCreate: true
NotifyForOperationUpdate: true
NotifyForOperationDelete: true
NotifyForOperationUndelete: true
Query: SELECT Id, Email, Name, Phone, MobilePhone, Status, LeadSource FROM Lead
# optional durable flag, if false then the resource will be removed on application shutdown
durable: false
# a PushTopic resource
- type: PushTopic
# the definition of the PushTopic
spec:
Name: contact_changes
ApiVersion: 42.0
NotifyForFields: Referenced
NotifyForOperationCreate: true
NotifyForOperationUpdate: true
NotifyForOperationDelete: true
NotifyForOperationUndelete: true
Query: SELECT Id, Email, Name, Phone, MobilePhone FROM Contact
# optional durable flag, if false then the resource will be removed on application shutdown
durable: false
# a StreamingChannel resource
- type: StreamingChannel
# the definition of the StreamingChannel
spec:
Name: /u/my_channel
Description: Streaming channel for notifications
# optional replay storage definition. if defined it'll be used to store replay
# markers sent by Salesforce in order to support message durability
replay:
# redis server address
address: "redis://localhost:6389"
# key prefix
key_prefix: replay
# message sink definition
sink:
# mapping of RabbitMQ brokers to use as message sinks
brokers:
# a RabbitMQ broker named as "by_broker"
my_broker:
# host name of the broker
host: localhost
# definition of the exchange where the messages should be forwarded
exchanges:
- exchange_name: my_exchange
type_name: topic
durable: true
# message router definition
router:
# optional default route to use if no routing rule matches a given message
default_route:
broker_name: my_broker
exchange_name: my_exchange
routing_key: my_channel_message
# list of routing rules
rules:
# JSONPath filter expression as the condition
- condition: "$[?(@.message.channel ~ '.*/lead_changes')]"
# the route to use if the condition produces a non-empty match
route:
broker_name: my_broker
exchange_name: my_exchange
routing_key: lead_change_message
# JSONPath filter expression as the condition
- condition: "$[?(@.message.channel ~ '.*/contact_changes')]"
# the route to use if the condition produces a non-empty match
route:
broker_name: my_broker
exchange_name: my_exchange
routing_key: contact_change_message
A sample run of rabbit_force with the above configuration file.
$ python -m rabbit_force config.yaml
2018-06-19 16:23:07,909:INFO: Starting up ...
2018-06-19 16:23:07,996:INFO: Configuration loaded from 'config.yaml'
2018-06-19 16:23:07,999:INFO: Configuring application ...
2018-06-19 16:23:10,619:INFO: Using message broker AmqpBroker(host='localhost', port=None, login='guest', password='guest', virtualhost='/', ssl=False, login_method='AMQPLAIN', insist=False, verify_ssl=True)
2018-06-19 16:23:12,128:INFO: Listening for messages from Salesforce org 'my_org':
* from PushTopic 'lead_changes' on channel '/topic/lead_changes'
* from PushTopic 'contact_changes' on channel '/topic/contact_changes'
* from StreamingChannel '/u/my_channel' on channel '/u/my_channel'
With replay storage RedisReplayStorage(address='redis://localhost:6389', key_prefix='replay:my_org', additional_params={}, ignore_network_errors=False).
2018-06-19 16:23:48,119:INFO: Forwarded message 1 on channel '/topic/lead_changes' from 'my_org' to Route(broker_name='my_broker', exchange_name='my_exchange', routing_key='lead_change_message', properties=None).
2018-06-19 16:24:03,039:INFO: Forwarded message 1 on channel '/topic/contact_changes' from 'my_org' to Route(broker_name='my_broker', exchange_name='my_exchange', routing_key='contact_change_message', properties=None).
2018-06-19 16:24:20,180:INFO: Forwarded message 1 on channel '/u/my_channel' from 'my_org' to Route(broker_name='my_broker', exchange_name='my_exchange', routing_key='my_channel_message', properties=None).
2018-06-19 16:24:27,097:INFO: Shutting down ...
More advanced examples can be found in the examples section of the documentation.
https://rabbit-force.readthedocs.io
Python 3.7
The public docker image can be found on docker hub.
$ docker pull robertmrk/rabbit_force
rabbit_force is offered under the MIT license.