The RabbitMQ connector is one of Conduit standalone plugins. It provides both a source and a destination connector for RabbitMQ.
It uses the AMQP 0-9-1 Model to connect to RabbitMQ.
Field | Description |
---|---|
record.Position |
json object with the delivery tag and the queue name from where the record was read from. |
record.Operation |
currently fixed as "create". |
record.Metadata |
a string to string map, with keys prefixed as rabbitmq.{DELIVERY_PROPERTY} . |
record.Key |
the message id from the read message. |
record.Payload.Before |
|
record.Payload.After |
the message body |
Run make build
to compile the connector.
Execute make test
to perform all non-tls tests. Execute make test-tls
for the TLS tests. Both commands use docker files located at test/docker-compose.yml
and test/docker-compose-tls.yml
respectively.
Tests require docker-compose v2.
Name | Description | Required | Default Value |
---|---|---|---|
url |
The RabbitMQ server's URL. | Yes | |
tls.enabled |
Flag to enable or disable TLS. | false | false |
tls.clientCert |
Path to the client certificate for TLS. | No | |
tls.clientKey |
Path to the client's key for TLS. | No | |
tls.caCert |
Path to the CA (Certificate Authority) certificate for TLS. | No | |
queue.name |
The name of the RabbitMQ queue to consume messages from. | Yes | |
queue.durable |
Specifies whether the queue is durable. | No | true |
queue.autoDelete |
If the queue will auto-delete. | No | false |
queue.exclusive |
If the queue is exclusive. | No | false |
queue.noWait |
If the queue is declared without waiting for server reply. | No | false |
consumer.name |
The name of the consumer. | No | |
consumer.autoAck |
If the server should consider messages acknowledged once delivered. | No | false |
consumer.exclusive |
If the consumer should be exclusive. | No | false |
consumer.noLocal |
If the server should not deliver messages published by the same connection. | No | false |
consumer.noWait |
If the consumer should be declared without waiting for server confirmation. | No | false |
Name | Description | Required | Default Value |
---|---|---|---|
url |
The RabbitMQ server's URL. | Yes | |
tls.enabled |
Flag to enable or disable TLS. | false | false |
tls.clientCert |
Path to the client certificate for TLS. | No | |
tls.clientKey |
Path to the client's key for TLS. | No | |
tls.caCert |
Path to the CA (Certificate Authority) certificate for TLS. | No | |
queue.name |
The name of the RabbitMQ queue where messages will be published to. | Yes | |
queue.durable |
Specifies whether the queue is durable. | No | true |
queue.autoDelete |
If the queue will auto-delete. | No | false |
queue.exclusive |
If the queue is exclusive. | No | false |
queue.noWait |
If the queue is declared without waiting for server reply. | No | false |
contentType |
The MIME content type of the messages written to RabbitMQ. | No | text/plain |
delivery.contentEncoding |
The content encoding for the message. | No | |
delivery.deliveryMode |
Delivery mode of the message. Non-persistent (1) or persistent (2). | No | 2 |
delivery.priority |
The priority of the message. | No | 0 |
delivery.correlationID |
The correlation id associated with the message. | No | |
delivery.replyTo |
Address to reply to. | No | |
delivery.messageTypeName |
The type name of the message. | No | |
delivery.userID |
The user id associated with the message. | No | |
delivery.appID |
The application id associated with the message. | No | |
delivery.mandatory |
Indicates if this message is mandatory. | No | false |
delivery.immediate |
Indicates if this message should be treated as immediate. | No | false |
delivery.expiration |
Indicates the message expiration time, if any. | No | |
exchange.name |
The name of the exchange to publish to. | No | |
exchange.type |
The type of the exchange to publish to. | No | direct |
exchange.durable |
Specifies whether the exchange is durable. | No | true |
exchange.autoDelete |
If the exchange will auto-delete. | No | false |
exchange.internal |
If the exchange is internal. | No | false |
exchange.noWait |
If the exchange is declared without waiting for server reply. | No | false |
routingKey |
The routing key to use when publishing to an exchange. | No |
Here's an example of a pipeline.yml
file using file to RabbitMQ
and RabbitMQ to file
pipelines:
version: 2.0
pipelines:
- id: file-to-rabbitmq
status: running
connectors:
- id: file.in
type: source
plugin: builtin:file
name: file-destination
settings:
path: ./file.in
- id: rabbitmq.out
type: destination
plugin: standalone:rabbitmq
name: rabbitmq-source
settings:
url: amqp://guest:guest@localhost:5672/
queue.name: demo-queue
sdk.record.format: template
sdk.record.format.options: '{{ printf "%s" .Payload.After }}'
- id: rabbitmq-to-file
status: running
connectors:
- id: rabbitmq.in
type: source
plugin: standalone:rabbitmq
name: rabbitmq-source
settings:
url: amqp://guest:guest@localhost:5672/
queue.name: demo-queue
- id: file.out
type: destination
plugin: builtin:file
name: file-destination
settings:
path: ./file.out
sdk.record.format: template
sdk.record.format.options: '{{ printf "%s" .Payload.After }}'