Skip to content

3.5 RabbitMQ Configuration

Jesse Sweetland edited this page Nov 12, 2017 · 2 revisions

The Platibus.RabbitMQ package includes a standalone host that listens for new messages from RabbitMQ queues. It can be started via the static Start method of the Platibus.RabbitMQ.RabbitMQHost class.

The RabbitMQHost.Start method has two overloads. The first accepts an optional string parameter specifying the name of the Platibus.RabbitMQ.RabbitMQHostConfigurationSection to load from the app.config (the default section name is "platibus.rabbitmq"). If this overload is used, the RabbitMQ host will be configured according to the declarative configuration specified in the named configuration section, filtered through any concrete public IConfigurationHook implementations found in any of the assemblies in the the app domain base directory.

Declarative configuration example:

<configuration>
    <configSections>
        <section name="platibus.rabbitmq" type="Platibus.RabbitMQ.RabbitMQHostConfigurationSection, Platibus.RabbitMQ"/>
    </configSections>
    <platibus.rabbitmq baseUri="amqp://cmruser:crmpass@rabbitmq.example.com:5672/crm" 
                       replyTimeout="00:00:30"
                       encoding="UTF-8" 
                       concurrencyLimit="2" 
                       autoAcknowledge="false"
                       maxAttempts="10"
                       retryDelay="00:00:10">
        <endpoints>
            <add name="provisioning" address="amqp://prvuser:prvpass@rabbitmq.example.com:5672/prv" />
        </endpoints>
        <topics>
            <add name="CustomerEvents" />
            <add name="OrderEvents" />
        </topics>
        <sendRules>
            <add namePattern=".*Provision.*" endpoint="provisioning" />
        </sendRules>
        <subscriptions>
            <add endpoint="provisioning" topic="ProvisioningEvents" />
        </subscriptions>
    </platibus.rabbitmq>
</configuration>

The second overload accepts the Platibus.RabbitMQ.IRabbitMQHostConfiguration containing all of the necessary configuration data for the RabbitMQ host. This overload bypasses all declarative configuration and uses the supplied configuration object (configuration hooks are not called). This version can be used to programmatically specify 100% of the configuration for the RabbitMQ host.

In addition to the common bus configuration elements, the RabbitMQ host configuration features some options specific to RabbitMQ.

Encoding

Messages are stored as streams of bytes in RabbitMQ queues. In order to ensure consistency with encoding and decoding, the desired encoding can be specified in the RabbitMQ host configuration. Valid .NET encoding names are supported. See https://msdn.microsoft.com/en-us/library/system.text.encoding%28v=vs.110%29.aspx for more details.

Declarative example:

    <platibus.rabbitmq encoding="UTF8">
    </platibus.rabbitmq>

Programmatic example:

public class ConfigurationHook : IConfigurationHook
{
    public void Configure(PlatibusConfiguration configuration)
    {
        var rmqConfiguration = configuration as IRabbitMQHostConfiguration;
        if (rmqConfiguration != null)
        {
            rmqConfiguration.Encoding = Encoding.ASCII;
        }
    }
}

Concurrency Limit

The authors of the RabbitMQ client recommend that deliveries be acknowledged in the same thread in which they were received. The easiest way to ensure that this is the case is to create a consumer per thread, with each consumer processing one message at a time. The concurrency limit indicates how many messages can be processed concurrently from the inbound message queue. In other words, the number of threads (and consumers) to create for the inbound queue. The default value is 1.

Declarative example:

    <platibus.rabbitmq concurrencyLimit="2">
    </platibus.rabbitmq>

Programmatic example:

public class ConfigurationHook : IConfigurationHook
{
    public void Configure(PlatibusConfiguration configuration)
    {
        var rmqConfiguration = configuration as IRabbitMQHostConfiguration;
        if (rmqConfiguration != null)
        {
            rmqConfiguration.ConcurrencyLimit = 2;
        }
    }
}

Auto Acknowledge

Indicates whether messages consumed from the primary inbound queue should be automatically acknowledged. The default is false.

Declarative example:

    <platibus.rabbitmq autoAcknowledge="true">
    </platibus.rabbitmq>

Programmatic example:

public class ConfigurationHook : IConfigurationHook
{
    public void Configure(PlatibusConfiguration configuration)
    {
        var rmqConfiguration = configuration as IRabbitMQHostConfiguration;
        if (rmqConfiguration != null)
        {
            rmqConfiguration.AutoAcknowledge = true;
        }
    }
}

Max Attempts

Indicates the maximum number of attempts that should be made to process messages from the inbound queue before moving them to a Dead Letter Queue (DLQ). The default is 10.

Declarative example:

    <platibus.rabbitmq maxAttempts="10">
    </platibus.rabbitmq>

Programmatic example:

public class ConfigurationHook : IConfigurationHook
{
    public void Configure(PlatibusConfiguration configuration)
    {
        var rmqConfiguration = configuration as IRabbitMQHostConfiguration;
        if (rmqConfiguration != null)
        {
            rmqConfiguration.MaxAttempts = 10;
        }
    }
}

Retry Delay

The amount of time to wait before attempting to reprocess a message from the inbound message queue in which the previous processing attempt failed (i.e. the timeout for the inbound retry queue). The default value is "00:00:05" (5 seconds).

Declarative example:

    <platibus.rabbitmq retryDelay="00:00:10">
    </platibus.rabbitmq>

Programmatic example:

public class ConfigurationHook : IConfigurationHook
{
    public void Configure(PlatibusConfiguration configuration)
    {
        var rmqConfiguration = configuration as IRabbitMQHostConfiguration;
        if (rmqConfiguration != null)
        {
            rmqConfiguration.RetryDelay = TimeSpan.FromSeconds(10);
        }
    }
}