Skip to content

4. Sending Messages

Jesse Sweetland edited this page Aug 8, 2017 · 19 revisions

The following examples are based on a configuration based on a Platibus application hosted in the HTTP server with the following configuration:

<configuration>
    <configSections>
        <section name="platibus.httpserver" type="Platibus.Http.HttpServerConfigurationSection, Platibus" />
    </configSections>
    <platibus.httpserver baseUri="http://localhost:52180/platibus/">
        <timeouts replyTimeout="00:05:00" />
        <queueing provider="Filesystem" path="platibus\queues" />
        <subscriptionTracking provider="Filesystem" path="platibus\subscriptions" />
        <endpoints>
            <add name="local" address="http://localhost:52180/platibus/" />
        </endpoints>
        <sendRules>
            <add namePattern=".*" endpoint="local" />
        </sendRules>    
     </platibus.httpserver>
</configuration>

To send a message, invoke one of the overloaded Send methods on the IBus instance. In the simplest case, we can send a string:

var httpServer = HttpServer.Start();
var bus = httpServer.Bus;
var message = "Hello, world!";
await bus.Send(message);

Using the above configuration, this will result in the following message being sent back to the local application:

Platibus-MessageId: 7cab7c1f-c70a-4894-8472-1dd34a19446f
Platibus-MessageName: System.String
Platibus-Origination: http://localhost:52180/platibus/
Content-Type: text/plain
Platibus-Destination: http://localhost:52180/platibus/

Hello, world!

The above is a typical representation of a message. The first 5 lines are headers, following by a blank line, the followed by the body. The representation of the message body depends on the content-type which is in this case plain text.

A more complex message can be sent by defining a class to represent its structure:

namespace Example
{
    public class Order
    {
        public string OrderNumber { get; set; }
        public IList<OrderLine> Lines { get; set; }
        public decimal Total { get; set; }
    } 

    public class OrderLine
    {
        public string SKU { get; set; }
        public string Description { get; set; }
        public int Qty { get; set; }
        public decimal Price { get; set; }
        public decimal Subtotal { get; set; }
    }
}

To send an Order, the object is simply passed to the Send method:

var order = new Order
{
    OrderNumber = "PO-1234",
    Lines = new OrderLine[]
    {
        new OrderLine 
        {
            SKU = "SPRKT",
            Description = "Sprocket",
            Qty = 3,
            Price = 1.99m,
            Subtotal = 5.97m
        }
    },
    Total = 5.97m
};

await bus.Send(order);

By default objects are serialized using JSON, resulting in messages similar to the following:

Platibus-MessageId: 7cab7c1f-c70a-4894-8472-1dd34a19446f
Platibus-MessageName: Example.Order
Platibus-Origination: http://localhost:52180/platibus/
Content-Type: application/json
Platibus-Destination: http://localhost:52180/platibus/

{"OrderNumber":"PO-1234","Lines":[{"SKU":"SPRKT","Description":"Sprocket","Qty":3,"Price":1.99,"Subtotal":5.97}],"Total":5.97}

Send Options

Serialization and outbound queueing behavior can be influenced by specifying options explicitly when sending messages.

Content Type

The ContentType can be overridden to change the content representation depending on the configuration of the serialization service.

Example:

await bus.Send(order, new SendOptions { ContentType = "application/xml" });

Time To Live

The TTL (time to live) property specifies a time period after which the message should not be sent or processed by the recipient. This is useful for managing time-critical operations:

Example:

await bus.Send(order, new SendOptions { TTL = TimeSpan.FromMinutes(5) });

Importance

The Importance option provides an indication of how much effort should go into ensuring message delivery. Values range from Low (-1) to Critical (2). Low importance messages can be discarded if needed to relieve congestion, whereas Critical messages must be sent and/or processed at least once, which usually involves queueing on both the sending and receiving sides. Details are available in the documentation for the Platibus.MessageImportance class.

Example:

await bus.Send(order, new SendOptions { Importance = MessageImportance.Critical });

Credentials

The Credentials option enables senders to pass authentication credentials to the remote instance.

Example 1:

await bus.Send(order, new SendOptions { 
    Credentials = new BasicAuthCredentials("username", "password") 
});

Example 2:

var accessToken = ((ClaimsIdentity)principal.Identity)
    .FindFirst("access_token")
    .Value;

await bus.Send(order, new SendOptions { 
    Credentials = new BearerCredentials(accessToken)
});

Specifying Endpoints

The nominal case for sending messages is to simply pass an object instance to the IBus.Send method and allow the message naming service and send rules to dictate where the message is sent. In the event that the endpoint to which the message is sent depends on criteria that cannot be expressed in a send rule (round-robin load balancing, for example) destination endpoint names or even URIs can be explicitly specified in the IBus.Send call.

Examples:

// Send an order directly to the configured endpoint named "provisioning-system"
await bus.Send(order, "provisioning-system");

// Send an order directly to http://prov.example.com/platibus
await bus.Send(order, new Uri("http://prov.example.com/platibus"));

Replying

Replies can be sent by calling the Reply method on the IMessageContext supplied to the message handler. See section 5. Handling Messages for more information.

Observing Replies

The IBus.Send method returns an awaitable task whose result is an ISentMessage object. This object contains a single method, ObserveReplies that returns an IObservable<object> that can be used to listen and react to the content of incoming replies.

Example:

var sentMessage = await bus.Send(order);
var subscription = sentMessage
    .ObserveReplies()
    .Subscribe(r =>
    {
        Console.WriteLine("Reply received: " + r);
    }, () => Console.WriteLine("No more replies"));

If only one reply is expected, an extension method named GetReply will observe replies until the first one is received and then return its content:

var sentMessage = await bus.Send(order);
var reply = await sentMessage.GetReply();

Note: messages received after the reply timeout may not be received.

Publishing to Topics

Messages can be published to named topics via the IBus.Publish method:

var topicName = "OrderEvents";
await bus.Publish(order, topicName);

The publish message will then be pushed to all registered subscribers.