Skip to content

5. Handling Messages

rycornell edited this page Dec 13, 2017 · 14 revisions

Incoming messages are processed by message handlers that are programmatically registered with the bus via handling rules.

Implementing a Message Handler

The only requirement for message handlers is that they implement the Platibus.IMessageHandler interface, which specifies one method:

public Task HandleMessage(object content, IMessageContext context, 
                          CancellationToken cancellationToken);

This method receives the content of the message as an object, the context in which the message was received, and a cancellationToken that can be used by the bus to interrupt message processing (during application shutdown, for example).

Message Content

Provided the same objects, message naming conventions, and serialization tools exist on both the sender and the receiver, the content object provided to the handler should be identical to the object that was originally passed to the IBus.Send or IBus.Publish method. If the type is known a direct cast should be possible.

In many cases it is more convenient to implement the generic form of the IMessageHandler interface, IMessageHandler<TContent>. The HandleMessage signature is modified slightly to provide a typed message content object:

public Task HandleMessage(TContent content, IMessageContext context, 
                          CancellationToken cancellationToken);

Because IMessageHandler<TContent> does not inherit from IMessageHandler, it must be wrapped in an adapter class in order to be registered as a handler. The GenericMessageHandlerAdapter class is provided to make this easier:

public class OrderHandler : IMessageHandler<Order> {
    public Task HandleMessage(Order order, IMessageContext context, CancellationToken cancellationToken)
    {
        Console.WriteLine("Order {0} received from {1}", order.OrderNumber, context.Headers.Origination);
        return Task.FromResult(true);
    }
}

public class ConfigurationHook : IConfigurationHook
{
    public void Configure(PlatibusConfiguration configuration)
    {
        var orderHandler = new OrderHandler();
        var messageHandler = GenericMessageHandlerAdapter.For(orderHandler);
        configuration.AddHandlingRule("Example.Order", messageHandler);
    }
}

Message Context

The message context provides context that is not necessarily represented in the object that was sent. For example, the sender principal and message headers computed by the sending bus instance. The message context also maintains the information necessary to route messages back to the sender or the sender's designated "reply to" address. See sending replies for more information.

Message Handler Registration

Message handlers must be registered in a configuration hook using the IPlatibusConfiguration.AddHandlingRule method:

public class ConfigurationHook : IConfigurationHook
{
    public void Configure(PlatibusConfiguration configuration)
    {
        var orderSpec = new MessageNamePatternSpecification("Order");
        var orderHandler = new OrderHandler();
        configuration.AddHandlingRule(orderSpec, orderHandler);
    }
}

Extension Methods

The PlatibusConfigurationExtensions class features several extension methods to simplify handler registration, reducing the previous example to the following:

public class ConfigurationHook : IConfigurationHook
{
    public void Configure(PlatibusConfiguration configuration)
    {
        configuration.AddHandlingRule("Order", new OrderHandler());
    }
}

Additionally, method groups and lambdas can be used in place of IMessageHandler instances. The method groups or lambda expressions must match one of the following:

  • Func<TContent, IMessageContext, CancellationToken, Task>
  • Func<TContent, IMessageContext, Task>
  • Action<TContent, IMessageContext, CancellationToken>
  • Action<TContent, IMessageContext>

Where TContent is the type of deserialized message content expected or object if automatic casting is not desired.

Examples:

class StaticHandlers
{
    public static async Task HandleOrderAsync(Order order, 
                                              IMessageContext messageContext, 
                                              CancellationToken cancellationToken)
    {
        cancellationToken.ThrowIfCancellationRequested();
        await Console.Out.WriteLineAsync(string.Format("Received order #{0} from {1}", 
            order.OrderNumber, messageContext.Origination));
    }

    public static void HandleOrderCancelation(OrderCancelationRequest request, 
                                              IMessageContext messageContext)
    {
        Console.WriteLine("Received cancelation request for order #{0} from {1}", 
            request.OrderNumber, messageContext.Origination);
    }
}

public class ConfigurationHook : IConfigurationHook
{
    public void Configure(PlatibusConfiguration configuration)
    {
        configuration.AddHandlingRule<Order>("Order", StaticHandlers.HandleOrderAsync);

        configuration.AddHandlingRule<Order>("OrderCancelationRequest", 
             StaticHandlers.HandleOrderCancelation);

        configuration.AddHandlingRule("OrderStatusRequest", 
            (request, messageContext) =>
                Console.WriteLine("Received status request for order {0}", request.OrderNumber));
    }
}

Message Handling Scope

The recommended pattern for handling messages is with a stateless singleton message handler instance. However, for various reasons it may be necessary or even advantageous to initialize a new handler instance for each message or use pools of handlers. The lambda syntax above can be combined with a factory method to produce a new handler for each request:

public class ConfigurationHook : IConfigurationHook
{
    public void Configure(PlatibusConfiguration configuration)
    {
        // New disposable order handler per message 
        configuration.AddHandlingRule("Order", (order, ctx, ct) => {
            using (var orderHandler = new OrderHandler())
            {
                return orderHandler.HandleMessage(order, ctx, ct);
            }
        });

        // e.g. using an IoC container such as Ninject
        var kernel = new StandardKernel();
        // ... Dependency registration ...
        kernel.Bind<OrderCancelationHandler>().ToSelf();
        configuration.AddHandlingRule("OrderCancelationRequest", (request, ctx, ct) => 
        {
            using (var handler = kernel.Get<OrderCancelationHandler>())
            {
                return handler.HandleMessage(request, ctx, ct);
            }
        });
    }
}

Sending Replies

One or more replies can be sent in response to an incoming message. Replies will be directed to the base URI specified in the Platibus-ReplyTo header if one is specified. Otherwise replies will be directed to the base URI specified in the Platibus-Origination header.

Replies are sent using the IMessageContext.SendReply method:

public class MessageHandler : IMessageHandler
{
    public async Task HandleMessage(object message, IMessageContext context, 
                                    CancellationToken cancellationToken)
    {
        await context.SendReply("Order received!", default(SendOptions), cancellationToken);
    }
}

The default SendOptions can also be overridden when sending replies. See the section on send options for more details.