Skip to content

Commit

Permalink
#275 Notify outbox service that a message is available for publishing
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Pringle <richardpringle@gmail.com>
  • Loading branch information
EtherZa authored and zarusz committed Jun 30, 2024
1 parent 6d7e827 commit d15328c
Show file tree
Hide file tree
Showing 25 changed files with 686 additions and 425 deletions.
6 changes: 3 additions & 3 deletions docs/plugin_outbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,12 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe

- The transaction can be managed by the application, starting it either explicitly using `DbContext.Database.BeginTransactionAsync()` or creating a `TransactionScope()`.

- The plugin accounts for distributed service running in multiple instances (concurrently).
- On the interceptor being disposed (at then end of a message lifecycle), the outbox service will be notified that a message is waiting to be published if at least one messages was placed in the outbox during the message processing lifetime. Alternatively, on expiry of the `PollIdleSleep` period following outbox processing, the same process will be initiated.

- Message added to the `Outbox` table are initially owned by the respective service instance that created it, the message has a lock that expires at some point in time (driven by settings). Every service instance task attempts to publish their owned messages which happens in order of creation (this ensures order of delivery within the same process).
- If the configuration setting `MaintainSequence` is set to `true`, only one application instance will be able to lock messages for delivery. This ensure messages are delivered in the original order to the service bus at the expense of delivery throughput. If `false`, each distributed instance will place an exclusive lock on `PollBatchSize` messages for concurrent distribution. This will greatly increase throughput at the expense of the FIFO sequence.

- If a service instance where to crash or restart, the undelivered messages will be picked and locked by another instance.

- Once a message is picked from outbox and successfully delivered then it is marked as sent in the outbox table.

- At configured intervals and after a certain time span the sent messages are removed from the outbox table.
- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table.
20 changes: 3 additions & 17 deletions docs/plugin_outbox.t.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,26 +137,12 @@ When applied on the (child) bus level then all consumers (or handlers) will inhe

- The transaction can be managed by the application, starting it either explicitly using `DbContext.Database.BeginTransactionAsync()` or creating a `TransactionScope()`.

- The plugin accounts for distributed service running in multiple instances (concurrently).
- On the interceptor being disposed (at then end of a message lifecycle), the outbox service will be notified that a message is waiting to be published if at least one messages was placed in the outbox during the message processing lifetime. Alternatively, on expiry of the `PollIdleSleep` period following outbox processing, the same process will be initiated.

- Message added to the `Outbox` table are initially owned by the respective service instance that created it, the message has a lock that expires at some point in time (driven by settings). Every service instance task attempts to publish their owned messages which happens in order of creation (this ensures order of delivery within the same process).
- If the configuration setting `MaintainSequence` is set to `true`, only one application instance will be able to lock messages for delivery. This ensure messages are delivered in the original order to the service bus at the expense of delivery throughput. If `false`, each distributed instance will place an exclusive lock on `PollBatchSize` messages for concurrent distribution. This will greatly increase throughput at the expense of the FIFO sequence.

- If a service instance where to crash or restart, the undelivered messages will be picked and locked by another instance.

- Once a message is picked from outbox and successfully delivered then it is marked as sent in the outbox table.

- At configured intervals and after a certain time span the sent messages are removed from the outbox table.

## Interceptors/Batch processing

While all transports (apart from memory) can make use of the [Transactional Outbox](https://microservices.io/patterns/data/transactional-outbox.html) pattern, not all transports are able to process messages in bulk. Those that do, may have varying levels of support (no batching). If the transport provider implements the `IMessageBusBulkProducer` interface, messages that read from outbox __will not be reprocessed through the interceptor pipeline__. Providers that do not implement `IMessageBusBulkProducer` however, will run through the pipeline a second time but under a different execution context from the first.

| Transport | First Pass Interceptors | Second Pass Interceptors | Batches |
|--------------------------------------------------------------------------------------|
| Azure Event Hub | Yes | No | Yes |
| Azure Service Bus | Yes | No | Yes |
| Hybrid | Yes | Yes | No |
| Kafka | Yes | No | No |
| Memory | Yes | n/a | n/a |
| Mqtt | Yes | No | No |
| Redis | Yes | No | No |
- At configured intervals (`MessageCleanup.Interval`), and after a configured time span (`MessageCleanup.Age`), the sent messages are removed from the outbox table.
2 changes: 2 additions & 0 deletions src/Samples/Sample.OutboxWebApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
// doc:fragment:ExampleStartup
builder.Services.AddSlimMessageBus(mbb =>
{
mbb.PerMessageScopeEnabled(false);
mbb
.AddChildBus("Memory", mbb =>
{
Expand Down Expand Up @@ -73,6 +74,7 @@
.AddOutboxUsingDbContext<CustomerContext>(opts =>
{
opts.PollBatchSize = 100;
opts.PollIdleSleep = TimeSpan.FromSeconds(10);
opts.MessageCleanup.Interval = TimeSpan.FromSeconds(10);
opts.MessageCleanup.Age = TimeSpan.FromMinutes(1);
//opts.SqlSettings.TransactionIsolationLevel = System.Data.IsolationLevel.RepeatableRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class DbContextTransactionService<TDbContext>(TDbContext dbContext, ISqlS

public override SqlTransaction CurrentTransaction => (SqlTransaction)DbContext.Database.CurrentTransaction?.GetDbTransaction();

protected override Task OnBeginTransation()
protected override Task OnBeginTransaction()
{
return DbContext.Database.BeginTransactionAsync(sqlSettings.TransactionIsolationLevel);
}
Expand Down
59 changes: 59 additions & 0 deletions src/SlimMessageBus.Host.Outbox/AsyncManualResetEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
namespace SlimMessageBus.Host.Outbox;

public sealed class AsyncManualResetEvent
{
private readonly object _lock;
private TaskCompletionSource<bool> _tcs;

public AsyncManualResetEvent(bool initialState = false)
{
_lock = new object();
_tcs = new TaskCompletionSource<bool>();

if (initialState)
{
_tcs.SetResult(true);
}
}

public async Task<bool> Wait(int millisecondsDelay = -1, CancellationToken cancellationToken = default)
{
Task GetTask()
{
lock (_lock)
{
return _tcs.Task;
}
}

var resetEvent = GetTask();
var task = await Task.WhenAny(resetEvent, Task.Delay(millisecondsDelay, cancellationToken));

return task == resetEvent;
}

public Task<bool> Wait(TimeSpan delay, CancellationToken cancellationToken = default)
{
return Wait((int)delay.TotalMilliseconds, cancellationToken);
}

public void Set()
{
lock (_lock)
{
var tcs = _tcs;
tcs.TrySetResult(true);
}
}

public void Reset()
{
lock (_lock)
{
if (_tcs.Task.IsCompleted)
{
_tcs = new TaskCompletionSource<bool>();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static ProducerBuilder<T> UseOutbox<T>(this ProducerBuilder<T> builder, b
}

/// <summary>
/// Enables the cration of <see cref="TransactionScope"/> for every message consumed by this bus.
/// Enables the creation of <see cref="TransactionScope"/> for every message consumed by this bus.
/// </summary>
/// <param name="builder"></param>
/// <param name="enabled"></param>
Expand All @@ -44,7 +44,7 @@ public static MessageBusBuilder UseTransactionScope(this MessageBusBuilder build
}

/// <summary>
/// Enables the cration of <see cref="TransactionScope"/> for every message on this consumer.
/// Enables the creation of <see cref="TransactionScope"/> for every message on this consumer.
/// </summary>
/// <param name="builder"></param>
/// <param name="enabled"></param>
Expand All @@ -56,7 +56,7 @@ public static ConsumerBuilder<T> UseTransactionScope<T>(this ConsumerBuilder<T>
}

/// <summary>
/// Enables the cration of <see cref="TransactionScope"/> every messages on this handler.
/// Enables the creation of <see cref="TransactionScope"/> every messages on this handler.
/// </summary>
/// <param name="builder"></param>
/// <param name="enabled"></param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

using Microsoft.Extensions.DependencyInjection;

using SlimMessageBus.Host.Outbox.Services;

public static class MessageBusBuilderExtensions
{
public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action<OutboxSettings> configure = null)
Expand Down Expand Up @@ -34,6 +36,7 @@ public static MessageBusBuilder AddOutbox(this MessageBusBuilder mbb, Action<Out
services.AddSingleton<OutboxSendingTask>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBusLifecycleInterceptor, OutboxSendingTask>(sp => sp.GetRequiredService<OutboxSendingTask>()));
services.TryAdd(ServiceDescriptor.Singleton<IOutboxNotificationService, OutboxSendingTask>(sp => sp.GetRequiredService<OutboxSendingTask>()));
services.TryAddSingleton<IInstanceIdProvider, DefaultInstanceIdProvider>();
services.TryAddSingleton<IOutboxLockRenewalTimerFactory, OutboxLockRenewalTimerFactory>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class OutboxSettings
/// <summary>
/// Sleep time of the outbox polling loop if there were no messages to process in previous database poll.
/// </summary>
public TimeSpan PollIdleSleep { get; set; } = TimeSpan.FromSeconds(1);
public TimeSpan PollIdleSleep { get; set; } = TimeSpan.FromMinutes(1);
/// <summary>
/// The maximum number of delivery attempts before delivery will not be attempted again.
/// </summary>
Expand Down
9 changes: 9 additions & 0 deletions src/SlimMessageBus.Host.Outbox/IOutboxNotificationService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace SlimMessageBus.Host.Outbox.Services;

public interface IOutboxNotificationService
{
/// <summary>
/// Notify outbox service that a message is waiting to be published.
/// </summary>
void Notify();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,44 @@

using Microsoft.Extensions.Logging;

using SlimMessageBus.Host.Outbox.Services;

public abstract class OutboxForwardingPublishInterceptor
{
}

public class OutboxForwardingPublishInterceptor<T>(
/// <remarks>
/// Interceptor must be registered as a transient in order for outbox notifications to be raised.
/// Notifications are raised on disposal (if required), to ensure they occur outside of a transaction scope.
/// </remarks>
public sealed class OutboxForwardingPublishInterceptor<T>(
ILogger<OutboxForwardingPublishInterceptor> logger,
IOutboxRepository outboxRepository,
IInstanceIdProvider instanceIdProvider)
: OutboxForwardingPublishInterceptor, IInterceptorWithOrder, IPublishInterceptor<T> where T : class
IInstanceIdProvider instanceIdProvider,
IOutboxNotificationService outboxNotificationService)
: OutboxForwardingPublishInterceptor, IInterceptorWithOrder, IPublishInterceptor<T>, IDisposable where T : class
{
static readonly internal string SkipOutboxHeader = "__SkipOutbox";

private readonly ILogger _logger = logger;
private readonly IOutboxRepository _outboxRepository = outboxRepository;
private readonly IInstanceIdProvider _instanceIdProvider = instanceIdProvider;
private readonly IOutboxNotificationService _outboxNotificationService = outboxNotificationService;

private bool _notifyOutbox = false;

public int Order => int.MaxValue;

public void Dispose()
{
if (_notifyOutbox)
{
_outboxNotificationService.Notify();
}

GC.SuppressFinalize(this);
}

public async Task OnHandle(T message, Func<Task> next, IProducerContext context)
{
var skipOutbox = context.Headers != null && context.Headers.ContainsKey(SkipOutboxHeader);
Expand Down Expand Up @@ -56,5 +76,8 @@ public async Task OnHandle(T message, Func<Task> next, IProducerContext context)
InstanceId = _instanceIdProvider.GetInstanceId()
};
await _outboxRepository.Save(outboxMessage, context.CancellationToken);

// a message was sent, notify outbox service to poll on dispose (post transaction)
_notifyOutbox = true;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace SlimMessageBus.Host.Outbox;
namespace SlimMessageBus.Host.Outbox.Services;

public interface IOutboxLockRenewalTimer : IDisposable
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace SlimMessageBus.Host.Outbox;
namespace SlimMessageBus.Host.Outbox.Services;

using System.Threading;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace SlimMessageBus.Host.Outbox;
namespace SlimMessageBus.Host.Outbox.Services;
using SlimMessageBus.Host.Outbox;

public class OutboxLockRenewalTimer : IOutboxLockRenewalTimer
public sealed class OutboxLockRenewalTimer : IOutboxLockRenewalTimer
{
private readonly object _lock;
private readonly Timer _timer;
Expand All @@ -11,7 +12,7 @@ public class OutboxLockRenewalTimer : IOutboxLockRenewalTimer
private bool _active;
private bool _renewingLock;

public OutboxLockRenewalTimer(ILogger<OutboxLockRenewalTimer> logger, IOutboxRepository outboxRepository, IInstanceIdProvider instanceIdProvider, TimeSpan lockDuration, TimeSpan lockRenewalInterval, CancellationToken cancellationToken, Action<Exception> lockLost)
public OutboxLockRenewalTimer(ILogger<OutboxLockRenewalTimer> logger, IOutboxRepository outboxRepository, IInstanceIdProvider instanceIdProvider, TimeSpan lockDuration, TimeSpan lockRenewalInterval, Action<Exception> lockLost, CancellationToken cancellationToken)
{

Debug.Assert(lockRenewalInterval < lockDuration);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
namespace SlimMessageBus.Host.Outbox;

namespace SlimMessageBus.Host.Outbox.Services;
public class OutboxLockRenewalTimerFactory : IOutboxLockRenewalTimerFactory, IAsyncDisposable
{
private readonly IServiceScope _scope;
Expand Down
Loading

0 comments on commit d15328c

Please sign in to comment.