-
Notifications
You must be signed in to change notification settings - Fork 7
3.4 Message Queueing
A Platibus bus may use a message queueing service on both the sending and receiving sides to ensure messages are delivered and handled at least once. On the sending side, messages sent with critical importance are queued in a special queue named "Outbound" prior to attempting delivery to the remote instance and are acknowledged upon successful delivery and retried upon error (with the configured delay and up to the configured maximum number of attempts).
On the receiving side, messages are placed in queues according to the configured handling rules and are acknowledged by the registered message handler through the supplied message context
Message queueing services implement the IMessageQueueingService interface, which specifies methods for creating queues and for enqueueing messages. The base distribution packages provide a number of implementations out of the box.
The message queueing service is needed for certain types of configuration and is usually specified via a nested <queueing>
configuration element or by setting the MessageQueueingService
property on the configuration object via a configuration hook.
Platibus endpoints can require remote instances to authenticate when transmitting messages. (See endpoint configuration and HTTP authentication.) When messages are received, the authenticated principal is passed to the message queueing service in order to capture and preserve the security context in which the message was received. Platibus captures this information in a security token generated by a security token service.
Security token services implement the ISecurityTokenService interface, which specifies methods for issuing new security tokens from a given IPrincipal
and validating previously generated tokens to produce a corresponding IPrincipal
.
The JwtSecurityTokenService issues and validates JSON Web Tokens. JWTs may optionally be signed with a secret key to ensure that the security information stored in the token was not tampered with between the time a message is enqueued and dequeued.
Platibus will use unsigned JWTs by default. Implementers may specify a hexadecimal encoded secret signing key to sign and validate tokens using the HMACSHA256 algorithm.
Declarative configuration example:
<queueing provider="Filesystem">
<securityTokens provider="JWT" signingKey="F841B54685EF6F3C75C6565842DBE5D8"
defaultTtl="72:00:00"/>
</queueing>
Programmatic configuration example:
public class ConfigurationHook : IConfigurationHook
{
public void Configure(PlatibusConfiguration configuration)
{
var options = new JwtSecurityTokenServiceOptions
{
SigningKey = new HexEncodedSecurityKey("F841B54685EF6F3C75C6565842DBE5D8"),
DefaultTTL = TimeSpan.FromHours(72)
};
var securityTokenService = new JwtSecurityTokenService(options);
var httpServerConfiguration = configuration as HttpServerConfiguration;
if (httpServerConfiguration != null)
{
httpServerConfiguration.MessageQueueingService =
new FilesystemMessageQueueingService(securityTokenService: securityTokenService);
}
}
}
The JWT security token service supports a "fallback" signing key to enable signing keys to be updated without invalidating previously queued messages.
Declarative configuration example:
<queueing provider="Filesystem">
<securityTokens provider="JWT"
signingKey="08ADA45915E2072B8CE8C31A42EA4C72"
fallbackSigningKey="F841B54685EF6F3C75C6565842DBE5D8"/>
</queueing>
Programmatic configuration example:
public class ConfigurationHook : IConfigurationHook
{
public void Configure(PlatibusConfiguration configuration)
{
var newSigningKey = new HexEncodedSecurityKey("08ADA45915E2072B8CE8C31A42EA4C72");
var previousSigningKey = new HexEncodedSecurityKey("F841B54685EF6F3C75C6565842DBE5D8");
var options = new JwtSecurityTokenServiceOptions
{
SigningKey = newSigningKey,
FallbackSigningKey = previousSigningKey
};
var securityTokenService = new JwtSecurityTokenService(options);
var httpServerConfiguration = configuration as HttpServerConfiguration;
if (httpServerConfiguration != null)
{
httpServerConfiguration.MessageQueueingService =
new FilesystemMessageQueueingService(securityTokenService: securityTokenService);
}
}
}
The InMemoryMessageQueueingService queues messages without persisting them using TPL Data Flow blocks. This is a useful configuration for testing or for situations in which queued messages do not need to survive a process crash or restart.
Declarative configuration example:
<queueing provider="InMemory" />
Programmatic configuration example:
public class ConfigurationHook : IConfigurationHook
{
public void Configure(PlatibusConfiguration configuration)
{
var httpServerConfiguration = configuration as HttpServerConfiguration;
if (httpServerConfiguration != null)
{
httpServerConfiguration.MessageQueueingService = new InMemoryMessageQueueingService();
}
}
}
The FilesystemMessageQueueingService queues messages in a directory on the filesystem so that the queue can survive a process crash or restart. It accepts a single (optional) configuration parameter indicating the path in which messages should be written that defaults to the path platibus/queues
relative to the app domain base directory.
Declarative configuration example:
<queueing provider="Filesystem" path="C:\platibus\queues" />
Programmatic configuration example:
public class ConfigurationHook : IConfigurationHook
{
public void Configure(PlatibusConfiguration configuration)
{
var httpServerConfiguration = configuration as HttpServerConfiguration;
if (httpServerConfiguration != null)
{
var path = new DirectoryInfo(@"C:\platibus\queues");
httpServerConfiguration.MessageQueueingService = new FilesystemMessageQueueingService(path);
}
}
}
The SQLMessageQueueingService queues messages in table in a SQL database so that the queue can survive a process crash or restart. It accepts a single (required) configuration parameter indicating the name of the connection string used to create connections to the target database.
Declarative configuration example:
<connectionStrings>
<add name="Platibus" connectionString="Data Source=(LocalDB)\MSSQLLocalDB; Integrated Security=true; Initial Catalog=Platibus" providerName="System.Data.SqlClient"/>
</connectionStrings>
<queueing provider="SQL" connectionName="Platibus" />
Programmatic configuration example:
public class ConfigurationHook : IConfigurationHook
{
public void Configure(PlatibusConfiguration configuration)
{
var httpServerConfiguration = configuration as HttpServerConfiguration;
if (httpServerConfiguration != null)
{
var connectionStringSettings = new ConnectionStringSettings
{
ProviderName = "System.Data.SqlClient",
ConnectionString = "Data Source=(LocalDB)\MSSQLLocalDB; Integrated Security=true; Initial Catalog=Platibus"
}
httpServerConfiguration.MessageQueueingService = new SQLMessageQueueingService(connectionStringSettings);
}
}
}
The Platibus.SQLite
package offers a slight variation of the SQLMessageQueueingService that uses a local SQLite database file to store messages for each queue. This can be advantageous if SQL storage and query functionality are desired but local storage is needed to avoid a single point of failure (i.e. a remote SQL server instance goes down or cannot be reached).
The SQLiteMessageQueueingService requires a single optional parameter indicating the path in which the SQLite database files should be created that defaults to the path platibus/queues
relative to the app domain base directory.
Declarative configuration example:
<queueing provider="SQLite" path="C:\platibus\queues" />
Programmatic configuration example:
public class ConfigurationHook : IConfigurationHook
{
public void Configure(PlatibusConfiguration configuration)
{
var httpServerConfiguration = configuration as HttpServerConfiguration;
if (httpServerConfiguration != null)
{
var path = new DirectoryInfo(@"C:\platibus\queues");
httpServerConfiguration.MessageQueueingService = new SQLiteMessageQueueingService(path);
}
}
}
Platibus instances that do not use the RabbitMQHost can still leverage RabbitMQ for queueing messages for other transports by using the RabbitMQMessageQueueingService provided in the Platibus.RabbitMQ
package.
The RabbitMQMessageQueueingService
accepts two optional parameters, uri
and encoding
, which indicate the URI of the RabbitMQ host to connect to and the text encoding to use when reading and writing messages. The uri
defaults to "amqp://localhost:5672". The use of Virtual Hosts in RabbitMQ is required if multiple Platibus instances are hosted on the same virtual host to avoid queue name collisions.
The encoding
parameter defaults to "UTF-8" which is appropriate for most needs. Valid .NET encoding names are supported. See https://msdn.microsoft.com/en-us/library/system.text.encoding%28v=vs.110%29.aspx for more details.
<queueing provider="RabbitMQ" uri="amqp://localhost:5672/Platibus" encoding="ASCII" />
Programmatic configuration example:
public class ConfigurationHook : IConfigurationHook
{
public void Configure(PlatibusConfiguration configuration)
{
var httpServerConfiguration = configuration as HttpServerConfiguration;
if (httpServerConfiguration != null)
{
var uri = new Uri("amqp://localhost:5672/Platibus");
var encoding = Encoding.ASCII;
httpServerConfiguration.MessageQueueingService = new RabbitMQMessageQueueingService(uri: uri, encoding: encoding);
}
}
}
The RabbitMQMessageQueueingService
constructor also accepts an optional connectionManager
argument that can be used to customize how connections to the RabbitMQ server are managed. The default ConnectionManager implementation maintains and reuses a single connection for each URI.
The MongoDBMessageQueueingService queues messages in collection in a MongoDB database so that the queue can survive a process crash or restart. It accepts a single (required) configuration parameter indicating the name of the connection string used to create connections to the target database.
Declarative configuration example:
<connectionStrings>
<add name="Platibus" connectionString="mongodb://localhost:27017/mydatabase"/>
</connectionStrings>
<queueing provider="MongoDB" connectionName="Platibus" />
Programmatic configuration example:
public class ConfigurationHook : IConfigurationHook
{
public void Configure(PlatibusConfiguration configuration)
{
var httpServerConfiguration = configuration as HttpServerConfiguration;
if (httpServerConfiguration != null)
{
var connectionStringSettings = new ConnectionStringSettings
{
ConnectionString = "mongodb://localhost:27017/mydatabase"
}
httpServerConfiguration.MessageQueueingService =
new MongoDBMessageQueueingService(connectionStringSettings);
}
}
}