Skip to content

Commit

Permalink
Batch message processing. Closes GH-936
Browse files Browse the repository at this point in the history
Docs on batch messaging

End to end tests with tenancy

First successful end to end of the batching

Can build executor and handler for batched messages

Moved batching around, basic configuration is in place

Minor changes leading up to more batching work
  • Loading branch information
jeremydmiller committed Sep 3, 2024
1 parent ad1e720 commit 3eed33b
Show file tree
Hide file tree
Showing 36 changed files with 870 additions and 46 deletions.
3 changes: 2 additions & 1 deletion docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ export default withMermaid( {
{text: 'Multi-Tenancy', link: '/guide/handlers/multi-tenancy'},
{text: 'Execution Timeouts', link: '/guide/handlers/timeout'},
{text: 'Fluent Validation Middleware', link: '/guide/handlers/fluent-validation'},
{text: 'Sticky Handler to Endpoint Assignments', link: '/guide/handlers/sticky'}
{text: 'Sticky Handler to Endpoint Assignments', link: '/guide/handlers/sticky'},
{text: 'Message Batching', link: '/guide/handlers/batching'}
]
},
]
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/durability/dead-letter-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ app.MapDeadLettersEndpoints()
;
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L133-L143' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_register_dead_letter_endpoints' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L140-L150' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_register_dead_letter_endpoints' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

### Using the Dead Letters REST API
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ var app = builder.Build();
// you will need to explicitly call this *before* MapWolverineEndpoints()
await app.Services.ApplyAsyncWolverineExtensions();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L95-L103' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_calling_applyasyncwolverineextensions' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L102-L110' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_calling_applyasyncwolverineextensions' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Wolverine Plugin Modules
Expand Down
141 changes: 141 additions & 0 deletions docs/guide/handlers/batching.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Batch Message Processing <Badge type="tip" text="3.0" />

Sometimes you might want to process a stream of incoming messages in batches rather than one at a time. This might
be for performance reasons, or maybe there's some kind of business logic that makes more sense to calculate for batches,
or maybe you want a logical ["debounce"](https://medium.com/@jamischarles/what-is-debouncing-2505c0648ff1) in how your system responds to the incoming messages.

::: info
The batching is supported both for messages published in process to local queues and from incoming messages from
external transports.
:::

Regardless, Wolverine has a mechanism to locally batch incoming messages and forward them to a batch handler. First,
let's say that you have a message type called `Item`:

<!-- snippet: sample_batch_processing_item -->
<a id='snippet-sample_batch_processing_item'></a>
```cs
public record Item(string Name);
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/batch_processing.cs#L141-L145' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_batch_processing_item' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And for whatever reason, we need to process these messages in batches. To do that, we first need to have
a message handler for an array of `Item` like so:

<!-- snippet: sample_batch_processing_handler -->
<a id='snippet-sample_batch_processing_handler'></a>
```cs
public static class ItemHandler
{
public static void Handle(Item[] items)
{
// Handle this just like a normal message handler,
// just that the message type is Item[]
}
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/batch_processing.cs#L147-L158' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_batch_processing_handler' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

::: warning
At this point, Wolverine **only** supports an array of the message type for the batched handler
:::

::: tip
Batch message handlers are just like any other message handler and have no special rules about their
capabilities
:::

With that in our system, now we need to tell Wolverine to group `Item` messages, and we do that with the following
syntax:

<!-- snippet: sample_configuring_batch_processing -->
<a id='snippet-sample_configuring_batch_processing'></a>
```cs
theHost = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.BatchMessagesOf<Item>(batching =>
{
// Really the maximum batch size
batching.BatchSize = 500;

// You can alternatively override the local queue
// for the batch publishing.
batching.LocalExecutionQueueName = "items";

// We can tell Wolverine to wait longer for incoming
// messages before kicking out a batch if there
// are fewer waiting messages than the maximum
// batch size
batching.TriggerTime = 1.Seconds();

})

// The object returned here is the local queue configuration that
// will handle the batched messages. This may be useful for fine
// tuning the behavior of the batch processing
.Sequential();
}).StartAsync();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/batch_processing.cs#L19-L47' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_batch_processing' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And that's that! Just to bring this a little more into focus, here's an end to end test from the Wolverine
codebase:

<!-- snippet: sample_send_end_to_end_with_batch -->
<a id='snippet-sample_send_end_to_end_with_batch'></a>
```cs
[Fact]
public async Task send_end_to_end_with_batch()
{
// Items to publish
var item1 = new Item("one");
var item2 = new Item("two");
var item3 = new Item("three");
var item4 = new Item("four");

Func<IMessageContext, Task> publish = async c =>
{
// I'm publishing the 4 items in sequence
await c.PublishAsync(item1);
await c.PublishAsync(item2);
await c.PublishAsync(item3);
await c.PublishAsync(item4);
};

// This is the "act" part of the test
var session = await theHost.TrackActivity()

// Wolverine testing helper to "wait" until
// the tracking receives a message of Item[]
.WaitForMessageToBeReceivedAt<Item[]>(theHost)
.ExecuteAndWaitAsync(publish);

// The four Item messages should be processed as a single
// batch message
var items = session.Executed.SingleMessage<Item[]>();

items.Length.ShouldBe(4);
items.ShouldContain(item1);
items.ShouldContain(item2);
items.ShouldContain(item3);
items.ShouldContain(item4);
}
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Testing/CoreTests/Acceptance/batch_processing.cs#L97-L136' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_send_end_to_end_with_batch' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Alright, with all that being said, here's a few more facts about the batch messaging support:

1. There is absolutely no need to create a specific message handler for the `Item` message, and in fact, you should
not do so
2. The message batching is able to group the message batches by tenant id *if* your Wolverine system uses multi-tenancy

## What about durable messaging ("inbox")?

The durable inbox behaves just a little bit differently for message batching. Wolverine will technically
"handle" the individual messages, but does not mark them as handled in the message store until a batch message
that refers to the original message is completely processed.
2 changes: 1 addition & 1 deletion docs/guide/http/endpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ and register that strategy within our `MapWolverineEndpoints()` set up like so:
// Customizing parameter handling
opts.AddParameterHandlingStrategy<NowParameterStrategy>();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L195-L200' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_adding_custom_parameter_handling' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L202-L207' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_adding_custom_parameter_handling' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And lastly, here's the application within an HTTP endpoint for extra context:
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/http/fluentvalidation.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ app.MapWolverineEndpoints(opts =>
// Wolverine.Http.FluentValidation
opts.UseFluentValidationProblemDetailMiddleware();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L145-L166' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_configure_endpoints' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L152-L173' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_configure_endpoints' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
2 changes: 1 addition & 1 deletion docs/guide/http/marten.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ Register it in `WolverineHttpOptions` like this:
```cs
opts.UseMartenCompiledQueryResultPolicy();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L172-L174' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_user_marten_compiled_query_policy' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L179-L181' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_user_marten_compiled_query_policy' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

If you now return a compiled query from an Endpoint the result will get directly streamed to the client as JSON. Short circuiting JSON deserialization.
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/http/mediator.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ app.MapPostToWolverine<CustomRequest, CustomResponse>("/wolverine/request");
app.MapDeleteToWolverine<CustomRequest, CustomResponse>("/wolverine/request");
app.MapPutToWolverine<CustomRequest, CustomResponse>("/wolverine/request");
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L206-L218' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_optimized_mediator_usage' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L213-L225' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_optimized_mediator_usage' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

With this mechanism, Wolverine is able to optimize the runtime function for Minimal API by eliminating IoC service locations
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/http/metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ builder.Services.AddSwaggerGen(x =>
x.OperationFilter<WolverineOperationFilter>();
});
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L36-L43' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_register_custom_swashbuckle_filter' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L43-L50' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_register_custom_swashbuckle_filter' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Operation Id
Expand Down
2 changes: 1 addition & 1 deletion docs/guide/http/middleware.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Which is registered like this (or as described in [`Registering Middleware by Me
opts.AddMiddlewareByMessageType(typeof(FakeAuthenticationMiddleware));
opts.AddMiddlewareByMessageType(typeof(CanShipOrderMiddleWare));
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L176-L179' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_register_http_middleware_by_type' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L183-L186' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_register_http_middleware_by_type' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The key point to notice there is that `IResult` is a "return value" of the middleware. In the case of an HTTP endpoint,
Expand Down
6 changes: 3 additions & 3 deletions docs/guide/http/policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ app.MapWolverineEndpoints(opts =>
// Wolverine.Http.FluentValidation
opts.UseFluentValidationProblemDetailMiddleware();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L145-L166' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_configure_endpoints' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L152-L173' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_configure_endpoints' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The `HttpChain` model is a configuration time structure that Wolverine.Http will use at runtime to create the full
Expand Down Expand Up @@ -97,7 +97,7 @@ app.MapWolverineEndpoints(opts =>
// Wolverine.Http.FluentValidation
opts.UseFluentValidationProblemDetailMiddleware();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L145-L166' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_configure_endpoints' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L152-L173' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_configure_endpoints' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Resource Writer Policies
Expand Down Expand Up @@ -132,7 +132,7 @@ If you need special handling of a primary return type you can implement `IResour
```cs
opts.AddResourceWriterPolicy<CustomResourceWriterPolicy>();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L181-L183' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_register_resource_writer_policy' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L188-L190' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_register_resource_writer_policy' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Resource writer policies registered this way will be applied in order before all built in policies.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class SpecialMapper : IRabbitMqEnvelopeMapper
}
}

public void MapIncomingToEnvelope(Envelope envelope, ReadOnlyBasicProperties incoming)
public void MapIncomingToEnvelope(Envelope envelope, IReadOnlyBasicProperties incoming)
{
envelope.CorrelationId = incoming.CorrelationId;
envelope.ContentType = "application/json";
Expand Down
15 changes: 14 additions & 1 deletion docs/guide/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,17 @@ For [Wolverine.Http usage](/guide/http/), the Wolverine 3.0 usage of the less ca
mandated [Lamar](https://jasperfx.github.io/lamar) library necessitates the usage of this API to register necessary
services for Wolverine.HTTP in addition to adding the Wolverine endpoints:

snippet: sample_adding_http_services
<!-- snippet: sample_adding_http_services -->
<a id='snippet-sample_adding_http_services'></a>
```cs
var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
// Necessary services for Wolverine HTTP
// And don't worry, if you forget this, Wolverine
// will assert this is missing on startup:(
builder.Services.AddWolverineHttp();
```
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Http/WolverineWebApi/Program.cs#L26-L37' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_adding_http_services' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
Loading

0 comments on commit 3eed33b

Please sign in to comment.