diff --git a/README.md b/README.md index 70fa318..84bb240 100644 --- a/README.md +++ b/README.md @@ -1,28 +1,28 @@ -TinyDancer is a high-level abstraction layer on top of the [Azure Service Bus client](https://www.nuget.org/packages/Microsoft.Azure.ServiceBus/) with some convenient features such as handling multiple types of messages, fault tolerance etc. +TinyDancer is a high-level abstraction layer on top of the [Azure Service Bus client](https://www.nuget.org/packages/Microsoft.Azure.ServiceBus/) with some convenient features such as handling multiple types of messages, dependency injection, decoupled fault tolerance etc. ## Install - PM> Install-Package TinyDancer - + PM> Install-Package TinyDancer + ### Consume different types of messages from the same queue/topic ```csharp var client = new QueueClient(...); // or SubscriptionClient client.Configure() - .HandleMessage(seatsReserved => { - // A user reserved one or more seats - SaveReservation(...); - LockSeats(...); - }) - .HandleMessage(async seatsDiscarded => { - // A user has discarded a reservation - await RemoveReservation(...); - await FreeUpSeats(...); - }) - .Catch(x => x.Abandon(maxTimes: 2)) // Probably network instability. Try one more time. - .OnUnrecognizedMessageType(x => x.Abandon()) // Let a different consumer handle this one - .CatchUnhandledExceptions(x => x.Deadletter(), - (msg, ex) => _logger.Error($"Error while processing message {msg.Id}", ex)) - .Subscribe(); + .HandleMessage(seatsReserved => { + // A user reserved one or more seats + SaveReservation(...); + LockSeats(...); + }) + .HandleMessage(async seatsDiscarded => { + // A user has discarded a reservation + await RemoveReservation(...); + await FreeUpSeats(...); + }) + .Catch(x => x.Abandon(maxTimes: 2)) // Probably network instability. Try one more time. + .OnUnrecognizedMessageType(x => x.Abandon()) // Let a different consumer handle this one + .CatchUnhandledExceptions(x => x.Deadletter(), + (msg, ex) => _logger.Error($"Error while processing message {msg.Id}", ex)) + .Subscribe(); ``` ### Publish a message @@ -32,12 +32,12 @@ await client.PublishAsync(myMessageObject); // ...or with all options: await client.PublishAsync( - payload: myMessageObject, - sessionId: "", // For queues/subscriptions with sessions enabled. - deduplicationIdentifier: "", // For queues/topics that support deduplication: - compress: true, // Serialize using MessagePack for smaller byte-size - correlationId: x => x.AnyString - ); + payload: myMessageObject, + sessionId: "", // For queues/subscriptions with sessions enabled. + deduplicationIdentifier: "", // For queues/topics that support deduplication: + compress: true, // Serialize using MessagePack for smaller byte-size + correlationId: x => x.AnyString + ); ``` ### Why? @@ -46,6 +46,7 @@ Unlike frameworks such as Rebus and MassTransit, TinyDancer will not create any - Serialization and deserialization (JSON and MessagePack are supported) - Prevention of partial/unacknowledged message handling - Decoupling of application logic from servicebus concepts when it comes to fault tolerance (see [exception handling](#exception-handling)) +- Dependency resolution # Documentation @@ -53,12 +54,12 @@ Unlike frameworks such as Rebus and MassTransit, TinyDancer will not create any - [Consume by type](#consume-by-type) - [Subscribe to all](#subscribe-to-all) - [Exception handling](#exception-handling) - - Retry (abandon) / Deadletter / Complete - - [Callbacks](#callbacks) + - Retry (abandon) / Deadletter / Complete + - [Callbacks](#callbacks) +- [Dependency injection](#dependency-injection) - [Sessions](#sessions) - [Handle malformed or unknown messages](#handle-malformed-or-unknown-messages) - [Preventing unacknowledged message handling](#preventing-partial-message-handling) -- [Dependency resolution](coming soon) #### Sending messages - PublishMany @@ -109,6 +110,46 @@ Use this to carry out any side-effects, like logging etc. Note that these exception handlers only will be triggered when an exception occurs in user code (or any library used below that). Exceptions thrown from the ServiceBus library will break execution, as this would indicate an unsafe state to operate in. +### Dependency injection + +TinyDancer can be integrated with `Microsoft.Extensions.DependencyInjection`. Just pass an instance of `IServiceCollection` to the `RegisterDependencies` method, and then you can add parameters to your handler functions: + +```csharp +public class Startup +{ + public void ConfigureServices(IServiceCollection services) + { + queueClient.Configure() + .RegisterDependencies(services) + // Inject dependencies like this: + .HandleMessage>(async (message, animalRepo) => + { + await animalRepo.InsertAsync(message); + }) + // Or like this: + .HandleMessage(async (Car message, IRepository carRepo, ILogger logger) => + { + await carRepo.InsertAsync(message); + logger.Info($"Saved car with id {message.CarId}") + }) + .Subscribe(); + } +} +``` + +The first parameter must always be the message, and all subsequent parameters will be resolved. + +If you need to use information from your messages as part of your service resolution, a `Message` is added to your `IServiceCollection` before the handler is called, and can be used like this: + +```csharp +services.AddScoped>(provider => +{ + // In order to resolve IRepository, we need the Tenant key from the incoming message: + var userProperties = provider.GetRequiredService().UserProperties; + return new Repository(userProperties["TenantKey"]); +}); +``` + ### Sessions Sessions are the way Azure Service Bus guarantees order of delivery. diff --git a/source/Consume/AsyncMessageHandler.cs b/source/Consume/AsyncMessageHandler.cs deleted file mode 100644 index 67985b4..0000000 --- a/source/Consume/AsyncMessageHandler.cs +++ /dev/null @@ -1,6 +0,0 @@ -using System.Threading.Tasks; - -namespace TinyDancer.Consume -{ - public delegate Task AsyncMessageHandler(TMessage message); -} \ No newline at end of file diff --git a/source/Consume/Session/DummyDisposable.cs b/source/Consume/DummyDisposable.cs similarity index 68% rename from source/Consume/Session/DummyDisposable.cs rename to source/Consume/DummyDisposable.cs index c41de73..f9885f7 100644 --- a/source/Consume/Session/DummyDisposable.cs +++ b/source/Consume/DummyDisposable.cs @@ -1,6 +1,6 @@ using System; -namespace TinyDancer.Consume.Session +namespace TinyDancer.Consume { public class DummyDisposable : IDisposable { diff --git a/source/Consume/MessageHandler.cs b/source/Consume/MessageHandler.cs deleted file mode 100644 index 83ecd2f..0000000 --- a/source/Consume/MessageHandler.cs +++ /dev/null @@ -1,4 +0,0 @@ -namespace TinyDancer.Consume -{ - public delegate void MessageHandler(TMessage message); -} \ No newline at end of file diff --git a/source/Consume/MessageHandlerBuilder.cs b/source/Consume/MessageHandlerBuilder.cs index 83bf0a1..11986bf 100644 --- a/source/Consume/MessageHandlerBuilder.cs +++ b/source/Consume/MessageHandlerBuilder.cs @@ -6,14 +6,20 @@ using MessagePack; using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus.Core; -using TinyDancer.Consume.Session; +using Microsoft.Extensions.DependencyInjection; namespace TinyDancer.Consume { public class MessageHandlerBuilder { - private readonly IReceiverClient _receiverClient; - private readonly Configuration _config; + enum ReceiverMode + { + Message, Session + } + + private readonly ReceiverClientAdapter _receiverClient; + private readonly SessionConfiguration _sessionConfiguration; + private readonly Configuration _singleMessageConfiguration; private Func _unrecognizedMessageHandler = null; private ExceptionHandler _deserializationErrorHandler = null; @@ -25,10 +31,25 @@ public class MessageHandlerBuilder private readonly Dictionary handler)> _messageHandlers = new Dictionary handler)>(); private (Type type, Func handler)? _globalHandler = null; - public MessageHandlerBuilder(IReceiverClient receiverClient, Configuration config) + private bool _anyDependenciesRegistered; + private IServiceCollection _services; + + private readonly ReceiverMode _mode; + + internal MessageHandlerBuilder(ReceiverClientAdapter receiverClient, Configuration singleMessageConfiguration) { _receiverClient = receiverClient; - _config = config; + _singleMessageConfiguration = singleMessageConfiguration; + + _mode = ReceiverMode.Message; + } + + internal MessageHandlerBuilder(ReceiverClientAdapter receiverClient, SessionConfiguration sessionConfiguration) + { + _receiverClient = receiverClient; + _sessionConfiguration = sessionConfiguration; + + _mode = ReceiverMode.Session; } public MessageHandlerBuilder Catch(Func, ExceptionHandler> handleStrategy, ExceptionCallback callback = null) where TException : Exception @@ -90,41 +111,87 @@ public MessageHandlerBuilder OnReceiverException(Action(AsyncMessageHandler action) + // Asynchronous overloads + + public MessageHandlerBuilder HandleMessage(Func action) => RegisterMessageHandler(action, typeof(TMessage)); + public MessageHandlerBuilder HandleMessage(Func action) => RegisterMessageHandler(action, typeof(TMessage), typeof(TDep1)); + public MessageHandlerBuilder HandleMessage(Func action) => RegisterMessageHandler(action, typeof(TMessage), typeof(TDep1), typeof(TDep2)); + public MessageHandlerBuilder HandleMessage(Func action) => RegisterMessageHandler(action, typeof(TMessage), typeof(TDep1), typeof(TDep2), typeof(TDep3)); + public MessageHandlerBuilder HandleMessage(Func action) => RegisterMessageHandler(action, typeof(TMessage), typeof(TDep1), typeof(TDep2), typeof(TDep3), typeof(TDep4)); + public MessageHandlerBuilder HandleMessage(Func action) => RegisterMessageHandler(action, typeof(TMessage), typeof(TDep1), typeof(TDep2), typeof(TDep3), typeof(TDep4), typeof(TDep5)); + + // Synchronous overloads + + public MessageHandlerBuilder HandleMessage(Action action) => RegisterMessageHandler(action, typeof(TMessage)); + public MessageHandlerBuilder HandleMessage(Action action) => RegisterMessageHandler(action, typeof(TMessage), typeof(TDep1)); + public MessageHandlerBuilder HandleMessage(Action action) => RegisterMessageHandler(action, typeof(TMessage), typeof(TDep1), typeof(TDep2)); + public MessageHandlerBuilder HandleMessage(Action action) => RegisterMessageHandler(action, typeof(TMessage), typeof(TDep1), typeof(TDep2), typeof(TDep3)); + public MessageHandlerBuilder HandleMessage(Action action) => RegisterMessageHandler(action, typeof(TMessage), typeof(TDep1), typeof(TDep2), typeof(TDep3), typeof(TDep4)); + public MessageHandlerBuilder HandleMessage(Action action) => RegisterMessageHandler(action, typeof(TMessage), typeof(TDep1), typeof(TDep2), typeof(TDep3), typeof(TDep4), typeof(TDep5)); + + // Global overloads + + public MessageHandlerBuilder HandleAllAs(Action handler) => RegisterGlobalHandler(handler, typeof(TMessage)); + public MessageHandlerBuilder HandleAllAs(Action handler) => RegisterGlobalHandler(handler, typeof(TMessage), typeof(TDep1)); + public MessageHandlerBuilder HandleAllAs(Action handler) => RegisterGlobalHandler(handler, typeof(TMessage), typeof(TDep1), typeof(TDep2)); + public MessageHandlerBuilder HandleAllAs(Action handler) => RegisterGlobalHandler(handler, typeof(TMessage), typeof(TDep1), typeof(TDep2), typeof(TDep3)); + public MessageHandlerBuilder HandleAllAs(Action handler) => RegisterGlobalHandler(handler, typeof(TMessage), typeof(TDep1), typeof(TDep2), typeof(TDep3), typeof(TDep4)); + public MessageHandlerBuilder HandleAllAs(Action handler) => RegisterGlobalHandler(handler, typeof(TMessage), typeof(TDep1), typeof(TDep2), typeof(TDep3), typeof(TDep4), typeof(TDep5)); + + // Global async overloads + + public MessageHandlerBuilder HandleAllAs(Func handler) => RegisterGlobalHandler(handler, typeof(TMessage)); + public MessageHandlerBuilder HandleAllAs(Func handler) => RegisterGlobalHandler(handler, typeof(TMessage), typeof(TDep1)); + public MessageHandlerBuilder HandleAllAs(Func handler) => RegisterGlobalHandler(handler, typeof(TMessage), typeof(TDep1), typeof(TDep2)); + public MessageHandlerBuilder HandleAllAs(Func handler) => RegisterGlobalHandler(handler, typeof(TMessage), typeof(TDep1), typeof(TDep2), typeof(TDep3)); + public MessageHandlerBuilder HandleAllAs(Func handler) => RegisterGlobalHandler(handler, typeof(TMessage), typeof(TDep1), typeof(TDep2), typeof(TDep3), typeof(TDep4)); + public MessageHandlerBuilder HandleAllAs(Func handler) => RegisterGlobalHandler(handler, typeof(TMessage), typeof(TDep1), typeof(TDep2), typeof(TDep3), typeof(TDep4), typeof(TDep5)); + + private MessageHandlerBuilder RegisterMessageHandler(Action<(Type type, Func handler)> registerHandler, Delegate action, Type messageType, params Type[] dependencies) { - _messageHandlers[typeof(TMessage).Name] = (typeof(TMessage), async (message) => + registerHandler((messageType, async (message) => { - await action((TMessage)message); - }); + if (dependencies.Any()) + { + using (var scope = _services.BuildServiceProvider().CreateScope()) + { + var resolvedDependencies = dependencies.Select(type => + scope.ServiceProvider.GetService(type) ?? + throw new DependencyResolutionException($"Service of type {type.FullName} could not be resolved")); - return this; - } + var returnedObject = action.DynamicInvoke(new [] { message }.Concat(resolvedDependencies).ToArray()); - public MessageHandlerBuilder HandleMessage(MessageHandler action) - { - _messageHandlers[typeof(TMessage).Name] = (typeof(TMessage), async (message) => - { - action((TMessage) message); - await Task.Yield(); - }); + await (returnedObject is Task task + ? task + : Task.CompletedTask); + } + } + else + { + await (action.DynamicInvoke(message) is Task task + ? task + : Task.CompletedTask); + } + })); + + _anyDependenciesRegistered |= dependencies.Any(); return this; } - public MessageHandlerBuilder HandleAllAs(Action handler) + private MessageHandlerBuilder RegisterMessageHandler(Delegate action, Type messageType, params Type[] dependencies) { - _globalHandler = (typeof(T), async msg => - { - handler((T) msg); - await Task.Yield(); - }); + void RegisterHandler((Type type, Func handler) handler) => _messageHandlers[messageType.FullName] = handler; + RegisterMessageHandler(RegisterHandler, action, messageType, dependencies); return this; } - public MessageHandlerBuilder HandleAllAs(Func handler) + private MessageHandlerBuilder RegisterGlobalHandler(Delegate action, Type messageType, params Type[] dependencies) { - _globalHandler = (typeof(T), async msg => await handler((T) msg)); + void RegisterHandler((Type type, Func handler) handler) => _globalHandler = handler; + RegisterMessageHandler(RegisterHandler, action, messageType, dependencies); + return this; } @@ -134,6 +201,12 @@ public void Subscribe(CancellationToken? cancelled, Func dontInterr public void Subscribe() => DoSubscribe(); + public MessageHandlerBuilder RegisterDependencies(ServiceCollection services) + { + _services = services; + return this; + } + private void DoSubscribe(CancellationToken? cancelled = null, Func dontInterrupt = null) { if (_unrecognizedMessageHandler != null && _globalHandler != null) @@ -141,87 +214,135 @@ private void DoSubscribe(CancellationToken? cancelled = null, Func throw new InvalidOperationException($"{nameof(HandleAllAs)} and {nameof(OnUnrecognizedMessageType)} cannot both be used"); } + if (_services == null && _anyDependenciesRegistered) + { + throw new InvalidOperationException($"Please use {nameof(RegisterDependencies)} in order to enable dependency resolution"); + } + // todo: vakta mot typer med samma namn var blockInterruption = dontInterrupt ?? (() => new DummyDisposable()); - var messageHandlerOptions = new MessageHandlerOptions(exceptionEventArgs => - { - _receiverActionCallback?.Invoke(exceptionEventArgs); // todo: implementera async version också - return Task.CompletedTask; - }) + if (_mode == ReceiverMode.Message) { - AutoComplete = false - }; + var messageHandlerOptions = new MessageHandlerOptions(exceptionEventArgs => + { + _receiverActionCallback?.Invoke(exceptionEventArgs); // todo: implementera async version också + return Task.CompletedTask; + }) + { + AutoComplete = false + }; - if (_config?.MaxConcurrentMessages != null) - { - messageHandlerOptions.MaxConcurrentCalls = _config.MaxConcurrentMessages.Value; - } + if (_singleMessageConfiguration?.MaxConcurrentMessages != null) + { + messageHandlerOptions.MaxConcurrentCalls = _singleMessageConfiguration.MaxConcurrentMessages.Value; + } - if (_config?.MaxAutoRenewDuration != null) - { - messageHandlerOptions.MaxAutoRenewDuration = _config.MaxAutoRenewDuration.Value; - } + if (_singleMessageConfiguration?.MaxAutoRenewDuration != null) + { + messageHandlerOptions.MaxAutoRenewDuration = _singleMessageConfiguration.MaxAutoRenewDuration.Value; + } - object Deserialize(Message message, Type type) + _receiverClient.RegisterMessageHandler(async (message, cancel) => + { + await HandleMessageReceived(cancelled, message, blockInterruption, _receiverClient.GetClient()); + }, messageHandlerOptions); + } + else { - if (message.UserProperties.ContainsKey("compressed")) + var messageHandlerOptions = new SessionHandlerOptions(exceptionEventArgs => { - return MessagePackSerializer.NonGeneric.Deserialize(type, message.Body, MessagePack.Resolvers.ContractlessStandardResolver.Instance); + _receiverActionCallback?.Invoke(exceptionEventArgs); // todo: implementera async version också + return Task.CompletedTask; + }) + { + AutoComplete = false + }; + + if (_sessionConfiguration?.MaxConcurrentSessions != null) + { + messageHandlerOptions.MaxConcurrentSessions = _sessionConfiguration.MaxConcurrentSessions.Value; } - else + + if (_sessionConfiguration?.MaxAutoRenewDuration != null) { - return message.Body.Deserialize(type); + messageHandlerOptions.MaxAutoRenewDuration = _sessionConfiguration.MaxAutoRenewDuration.Value; } + + _receiverClient.RegisterSessionHandler(async (session, message, cancel) => + { + await HandleMessageReceived(cancelled, message, blockInterruption, session); + }, messageHandlerOptions); } + } - _receiverClient.RegisterMessageHandler(async (message, cancel) => + private async Task HandleMessageReceived(CancellationToken? cancelled, Message message, Func blockInterruption, IReceiverClient client) + { + if (cancelled?.IsCancellationRequested == true) { - if (cancelled?.IsCancellationRequested == true) - { - await _receiverClient.AbandonAsync(message.SystemProperties.LockToken); - return; - } + await client.AbandonAsync(message.SystemProperties.LockToken); + return; + } - using (blockInterruption()) + using (blockInterruption()) + { + try { - try - { - if (message.UserProperties.TryGetValue("MessageType", out var messageType) && - _messageHandlers.TryGetValue((string) messageType, out var y)) - { - var deserialized = Deserialize(message, y.type); - await y.handler(deserialized); - } - else if (_globalHandler != null) - { - var deserialized = Deserialize(message, _globalHandler.Value.type); - await _globalHandler.Value.handler(deserialized); - } - else if (_unrecognizedMessageHandler != null) - { - await _unrecognizedMessageHandler(_receiverClient, message); - return; - } - - await _receiverClient.CompleteAsync(message.SystemProperties.LockToken); - } - catch (DeserializationFailedException ex) + _services?.AddScoped(provider => message); + + if (message.UserProperties.TryGetValue("MessageType", out var messageType) && + _messageHandlers.TryGetValue((string) messageType, out var y)) { - await _deserializationErrorHandler(_receiverClient, message, ex.InnerException); + var deserialized = Deserialize(message, y.type); + await y.handler(deserialized); } - catch (Exception ex) when (_exceptionHandlers.Keys.Contains(ex.GetType())) + else if (_globalHandler != null) { - await _exceptionHandlers[ex.GetType()](_receiverClient, message, ex); + var deserialized = Deserialize(message, _globalHandler.Value.type); + await _globalHandler.Value.handler(deserialized); } - catch (Exception ex) when (_unhandledExceptionHandler != null) + else if (_unrecognizedMessageHandler != null) { - await _unhandledExceptionHandler(_receiverClient, message, ex); + await _unrecognizedMessageHandler(client, message); + return; } + + await client.CompleteAsync(message.SystemProperties.LockToken); + } + catch (DeserializationFailedException ex) + { + await _deserializationErrorHandler(client, message, ex.InnerException); + } + catch (Exception ex) when (_exceptionHandlers.Keys.Contains(ex.GetType())) + { + await _exceptionHandlers[ex.GetType()](client, message, ex); + } + catch (Exception ex) when (_unhandledExceptionHandler != null) + { + await _unhandledExceptionHandler(client, message, ex); } + } + } - }, messageHandlerOptions); + object Deserialize(Message message, Type type) + { + if (message.UserProperties.ContainsKey("compressed")) + { + return MessagePackSerializer.NonGeneric.Deserialize(type, message.Body, MessagePack.Resolvers.ContractlessStandardResolver.Instance); + } + else + { + return message.Body.Deserialize(type); + } + } + } + + public class DependencyResolutionException : Exception + { + public DependencyResolutionException(string message) : base(message) + { + } } } \ No newline at end of file diff --git a/source/Consume/QueueClientExtensions.cs b/source/Consume/QueueClientExtensions.cs deleted file mode 100644 index b5cff41..0000000 --- a/source/Consume/QueueClientExtensions.cs +++ /dev/null @@ -1,10 +0,0 @@ -using Microsoft.Azure.ServiceBus; -using TinyDancer.Consume.Session; - -namespace TinyDancer.Consume -{ - public static class QueueClientExtensions - { - public static SessionMessageHandlerBuilder ConfigureSessions(this IQueueClient client, SessionConfiguration config = null) => new SessionMessageHandlerBuilder(client, config); - } -} \ No newline at end of file diff --git a/source/Consume/ReceiverClientAdapter.cs b/source/Consume/ReceiverClientAdapter.cs new file mode 100644 index 0000000..32dd116 --- /dev/null +++ b/source/Consume/ReceiverClientAdapter.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Core; + +namespace TinyDancer.Consume +{ + internal class ReceiverClientAdapter + { + private readonly IReceiverClient _receiverClient; + + internal ReceiverClientAdapter(IReceiverClient receiverClient) + { + _receiverClient = receiverClient; + } + + internal void RegisterMessageHandler(Func handler, MessageHandlerOptions messageHandlerOptions) + { + _receiverClient.RegisterMessageHandler(handler, messageHandlerOptions); + } + + internal void RegisterSessionHandler(Func handler, SessionHandlerOptions sessionHandlerOptions) + { + if (_receiverClient is IQueueClient queueClient) + { + queueClient.RegisterSessionHandler(handler, sessionHandlerOptions); + } + else if (_receiverClient is ISubscriptionClient subscriptionClient) + { + subscriptionClient.RegisterSessionHandler(handler, sessionHandlerOptions); + } + } + + public IReceiverClient GetClient() + { + return _receiverClient; + } + } +} \ No newline at end of file diff --git a/source/Consume/ReceiverClientExtensions.cs b/source/Consume/ReceiverClientExtensions.cs index e0f25a5..2150fc0 100644 --- a/source/Consume/ReceiverClientExtensions.cs +++ b/source/Consume/ReceiverClientExtensions.cs @@ -5,10 +5,13 @@ namespace TinyDancer.Consume { public delegate Task ExceptionHandler(IReceiverClient client, Message message, TException exception); - public delegate Task SessionExceptionHandler(IMessageSession session, Message message, TException exception); public static class ReceiverClientExtensions { - public static MessageHandlerBuilder Configure(this IReceiverClient client, Configuration config = null) => new MessageHandlerBuilder(client, config); + public static MessageHandlerBuilder Configure(this IReceiverClient client, Configuration config = null) => + new MessageHandlerBuilder(new ReceiverClientAdapter(client), config); + + public static MessageHandlerBuilder ConfigureSessions(this IReceiverClient client, SessionConfiguration config = null) => + new MessageHandlerBuilder(new ReceiverClientAdapter(client), config); } } \ No newline at end of file diff --git a/source/Consume/Session/AsyncSessionMessageHandler.cs b/source/Consume/Session/AsyncSessionMessageHandler.cs deleted file mode 100644 index 550f3e3..0000000 --- a/source/Consume/Session/AsyncSessionMessageHandler.cs +++ /dev/null @@ -1,6 +0,0 @@ -using System.Threading.Tasks; - -namespace TinyDancer.Consume.Session -{ - public delegate Task AsyncSessionMessageHandler(TMessage message, string sessionId); -} \ No newline at end of file diff --git a/source/Consume/Session/SessionClientWrapper.cs b/source/Consume/Session/SessionClientWrapper.cs deleted file mode 100644 index 47bb30f..0000000 --- a/source/Consume/Session/SessionClientWrapper.cs +++ /dev/null @@ -1,35 +0,0 @@ -using System; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Azure.ServiceBus; - -namespace TinyDancer.Consume.Session -{ - public class SessionClientWrapper - { - private readonly IQueueClient _queueClient; - private readonly ISubscriptionClient _subscriptionClient; - - public SessionClientWrapper(ISubscriptionClient subscriptionClient) - { - _subscriptionClient = subscriptionClient; - } - - public SessionClientWrapper(IQueueClient queueClient) - { - _queueClient = queueClient; - } - - public void RegisterSessionHandler(Func handler, SessionHandlerOptions sessionHandlerOptions) - { - if (_queueClient != null) - { - _queueClient.RegisterSessionHandler(handler, sessionHandlerOptions); - } - else - { - _subscriptionClient.RegisterSessionHandler(handler, sessionHandlerOptions); - } - } - } -} \ No newline at end of file diff --git a/source/Consume/Session/SessionExceptionCallback.cs b/source/Consume/Session/SessionExceptionCallback.cs deleted file mode 100644 index e92e90e..0000000 --- a/source/Consume/Session/SessionExceptionCallback.cs +++ /dev/null @@ -1,6 +0,0 @@ -using Microsoft.Azure.ServiceBus; - -namespace TinyDancer.Consume.Session -{ - public delegate void SessionExceptionCallback(Message message, TException exception, string sessionId); -} \ No newline at end of file diff --git a/source/Consume/Session/SessionExceptionHandler.cs b/source/Consume/Session/SessionExceptionHandler.cs deleted file mode 100644 index b5888cd..0000000 --- a/source/Consume/Session/SessionExceptionHandler.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System; -using System.Threading.Tasks; -using Microsoft.Azure.ServiceBus; - -namespace TinyDancer.Consume.Session -{ - public delegate Task SessionExceptionHandler(IMessageSession session, Message message, Exception exception); -} \ No newline at end of file diff --git a/source/Consume/Session/SessionExceptionHandlingBuilder.cs b/source/Consume/Session/SessionExceptionHandlingBuilder.cs deleted file mode 100644 index 32cea79..0000000 --- a/source/Consume/Session/SessionExceptionHandlingBuilder.cs +++ /dev/null @@ -1,49 +0,0 @@ -using System; - -namespace TinyDancer.Consume.Session -{ - public class SessionExceptionHandlingBuilder where TException : Exception - { - public SessionExceptionHandler Deadletter() - { - return Deadletter(null); - } - - public SessionExceptionHandler Deadletter(DeadletterReasonProvider reason) - { - return async (session, message, exception) => - { - await session.DeadLetterAsync(message.SystemProperties.LockToken, reason?.Invoke(message, exception) ?? string.Empty); - }; - } - - /// - /// Note that maxTimes is enforced at best effort. The DeliveryCount property of the message will be examined, - /// but if there is another consumer of this queue or subscription that does not implement .............. - /// - /// If defined, the message will only be abandoned maxTimes times, and then deadlettered - /// - public SessionExceptionHandler Abandon(int maxTimes = 0) - { - return async (session, message, exception) => - { - if (message.SystemProperties.DeliveryCount <= maxTimes) - { - await session.AbandonAsync(message.SystemProperties.LockToken); - } - else - { - await session.DeadLetterAsync(message.SystemProperties.LockToken); - } - }; - } - - public SessionExceptionHandler Complete() - { - return async (session, message, exception) => - { - await session.CompleteAsync(message.SystemProperties.LockToken); - }; - } - } -} \ No newline at end of file diff --git a/source/Consume/Session/SessionMessageHandler.cs b/source/Consume/Session/SessionMessageHandler.cs deleted file mode 100644 index 4631980..0000000 --- a/source/Consume/Session/SessionMessageHandler.cs +++ /dev/null @@ -1,4 +0,0 @@ -namespace TinyDancer.Consume.Session -{ - public delegate void SessionMessageHandler(TMessage message, string sessionId); -} \ No newline at end of file diff --git a/source/Consume/Session/SessionMessageHandlerBuilder.cs b/source/Consume/Session/SessionMessageHandlerBuilder.cs deleted file mode 100644 index 4a463be..0000000 --- a/source/Consume/Session/SessionMessageHandlerBuilder.cs +++ /dev/null @@ -1,231 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using MessagePack; -using Microsoft.Azure.ServiceBus; - -namespace TinyDancer.Consume.Session -{ - public class SessionMessageHandlerBuilder - { - private readonly SessionConfiguration _config; - private readonly SessionClientWrapper _client; - private readonly Dictionary _exceptionHandlers = new Dictionary(); - private SessionExceptionHandler _unhandledExceptionHandler = null; - private SessionExceptionHandler _deserializationErrorHandler = null; - - private Func _unrecognizedMessageHandler = null; - - private readonly Dictionary handler)> _messageHandlers = new Dictionary handler)>(); - - private Action _receiverActionCallback = null; - private (Type type, Func handler)? _globalHandler = null; - - public SessionMessageHandlerBuilder(ISubscriptionClient client, SessionConfiguration config) - { - _config = config; - _client = new SessionClientWrapper(client); - } - - public SessionMessageHandlerBuilder(IQueueClient client, SessionConfiguration config) - { - _config = config; - _client = new SessionClientWrapper(client); - } - - public SessionMessageHandlerBuilder Catch(Func, SessionExceptionHandler> handleStrategy, SessionExceptionCallback callback = null) where TException : Exception - { - var handler = handleStrategy(new SessionExceptionHandlingBuilder()); - - _exceptionHandlers[typeof(TException)] = async (session, message, ex) => - { - await handler(session, message, (TException)ex); - callback?.Invoke(message, (TException)ex, session.SessionId); - }; - - return this; - } - - public SessionMessageHandlerBuilder CatchUnhandledExceptions(Func, SessionExceptionHandler> handleStrategy, SessionExceptionCallback callback = null) - { - var handler = handleStrategy(new SessionExceptionHandlingBuilder()); - - _unhandledExceptionHandler = async (session, message, ex) => - { - await handler(session, message, ex); - callback?.Invoke(message, ex, session.SessionId); - }; - - return this; - } - - public SessionMessageHandlerBuilder OnDeserializationFailed(Func, SessionExceptionHandler> handleStrategy, SessionExceptionCallback callback = null) - { - var handler = handleStrategy(new SessionExceptionHandlingBuilder()); - - _deserializationErrorHandler = async (session, message, ex) => - { - await handler(session, message, ex); - callback?.Invoke(message, ex, session.SessionId); - }; - - return this; - } - - public SessionMessageHandlerBuilder OnUnrecognizedMessageType(Func handleStrategy, Action callback = null) - { - var handler = handleStrategy(new UnrecognizedSessionMessageHandlingBuilder()); - - _unrecognizedMessageHandler = async (session, message) => - { - await handler(session, message); - callback?.Invoke(message); - }; - - return this; - } - - public SessionMessageHandlerBuilder OnReceiverException(Action callback) - { - _receiverActionCallback = callback; - - return this; - } - - public SessionMessageHandlerBuilder HandleMessage(AsyncSessionMessageHandler action) - { - _messageHandlers[typeof(TMessage).Name] = (typeof(TMessage), async (message, sessionId) => - { - await action((TMessage)message, sessionId); - }); - - return this; - } - - public SessionMessageHandlerBuilder HandleMessage(SessionMessageHandler action) - { - _messageHandlers[typeof(TMessage).Name] = (typeof(TMessage), async (message, sessionId) => - { - action((TMessage) message, sessionId); - await Task.Yield(); - }); - - return this; - } - - public SessionMessageHandlerBuilder HandleAllAs(Action handler) - { - _globalHandler = (typeof(T), async msg => - { - handler((T) msg); - await Task.Yield(); - }); - - return this; - } - - public SessionMessageHandlerBuilder HandleAllAs(Func handler) - { - _globalHandler = (typeof(T), async msg => await handler((T) msg)); - return this; - } - - public void Subscribe(CancellationToken? cancelled, Func dontInterrupt) - => DoSubscribe(cancelled, dontInterrupt); - - public void Subscribe() - => DoSubscribe(); - - private void DoSubscribe(CancellationToken? cancelled = null, Func dontInterrupt = null) - { - if (_unrecognizedMessageHandler != null && _globalHandler != null) - { - throw new InvalidOperationException($"{nameof(HandleAllAs)} and {nameof(OnUnrecognizedMessageType)} cannot both be used"); - } - - // todo: vakta mot typer med samma namn - - var blockInterruption = dontInterrupt ?? (() => new DummyDisposable()); - - var messageHandlerOptions = new SessionHandlerOptions(exceptionEventArgs => - { - _receiverActionCallback?.Invoke(exceptionEventArgs); // todo: implementera async version också - return Task.CompletedTask; - }) - { - AutoComplete = false - }; - - if (_config?.MaxConcurrentSessions != null) - { - messageHandlerOptions.MaxConcurrentSessions = _config.MaxConcurrentSessions.Value; - } - - if (_config?.MaxAutoRenewDuration != null) - { - messageHandlerOptions.MaxAutoRenewDuration = _config.MaxAutoRenewDuration.Value; - } - - object Deserialize(Message message, Type type) - { - if (message.UserProperties.ContainsKey("compressed")) - { - return MessagePackSerializer.NonGeneric.Deserialize(type, message.Body, MessagePack.Resolvers.ContractlessStandardResolver.Instance); - } - else - { - return message.Body.Deserialize(type); - } - } - - _client.RegisterSessionHandler(async (session, message, cancel) => - { - if (cancelled?.IsCancellationRequested == true) - { - await session.AbandonAsync(message.SystemProperties.LockToken); - return; - } - - using (blockInterruption()) - { - try - { - if (message.UserProperties.TryGetValue("MessageType", out var messageType) && - _messageHandlers.TryGetValue((string) messageType, out var y)) - { - var deserialized = Deserialize(message, y.type); - await y.handler(deserialized, session.SessionId); - } - else if (_globalHandler != null) - { - var deserialized = Deserialize(message, _globalHandler.Value.type); - await _globalHandler.Value.handler(deserialized); - } - else if (_unrecognizedMessageHandler != null) - { - await _unrecognizedMessageHandler(session, message); - return; - } - - await session.CompleteAsync(message.SystemProperties.LockToken); - } - catch (DeserializationFailedException ex) - { - await _deserializationErrorHandler(session, message, ex.InnerException); - } - catch (Exception ex) when (_exceptionHandlers.Keys.Contains(ex.GetType())) - { - await _exceptionHandlers[ex.GetType()](session, message, ex); - } - catch (Exception ex) when (_unhandledExceptionHandler != null) - { - await _unhandledExceptionHandler(session, message, ex); - } - } - - }, messageHandlerOptions); - } - } -} \ No newline at end of file diff --git a/source/Consume/Session/UnrecognizedSessionMessageHandler.cs b/source/Consume/Session/UnrecognizedSessionMessageHandler.cs deleted file mode 100644 index fa51ff4..0000000 --- a/source/Consume/Session/UnrecognizedSessionMessageHandler.cs +++ /dev/null @@ -1,7 +0,0 @@ -using System.Threading.Tasks; -using Microsoft.Azure.ServiceBus; - -namespace TinyDancer.Consume.Session -{ - public delegate Task UnrecognizedSessionMessageHandler(IMessageSession session, Message message); -} \ No newline at end of file diff --git a/source/Consume/Session/UnrecognizedSessionMessageHandlingBuilder.cs b/source/Consume/Session/UnrecognizedSessionMessageHandlingBuilder.cs deleted file mode 100644 index a33394c..0000000 --- a/source/Consume/Session/UnrecognizedSessionMessageHandlingBuilder.cs +++ /dev/null @@ -1,44 +0,0 @@ -using System; -using Microsoft.Azure.ServiceBus; - -namespace TinyDancer.Consume.Session -{ - public class UnrecognizedSessionMessageHandlingBuilder - { - public UnrecognizedSessionMessageHandler Deadletter(Func reason) - { - return async (session, message) => - { - await session.DeadLetterAsync(message.SystemProperties.LockToken, reason(message)); - }; - } - - /// - /// - /// - /// If defined, the message will only be abandoned maxTimes times, and then deadlettered - /// - public UnrecognizedSessionMessageHandler Abandon(int maxTimes = 0) - { - return async (session, message) => - { - if (message.SystemProperties.DeliveryCount <= maxTimes) - { - await session.AbandonAsync(message.SystemProperties.LockToken); - } - else - { - await session.DeadLetterAsync(message.SystemProperties.LockToken); - } - }; - } - - public UnrecognizedSessionMessageHandler Complete() - { - return async (session, message) => - { - await session.CompleteAsync(message.SystemProperties.LockToken); - }; - } - } -} \ No newline at end of file diff --git a/source/Consume/SubscriptionClientExtensions.cs b/source/Consume/SubscriptionClientExtensions.cs deleted file mode 100644 index 36aaabc..0000000 --- a/source/Consume/SubscriptionClientExtensions.cs +++ /dev/null @@ -1,11 +0,0 @@ -using Microsoft.Azure.ServiceBus; -using TinyDancer.Consume.Session; - -namespace TinyDancer.Consume -{ - public static class SubscriptionClientExtensions - { - public static SessionMessageHandlerBuilder ConfigureSessions(this ISubscriptionClient client, SessionConfiguration config = null) => - new SessionMessageHandlerBuilder(client, config); - } -} \ No newline at end of file diff --git a/source/Publish/SenderClientExtensions.cs b/source/Publish/SenderClientExtensions.cs index 664e42d..9131d6b 100644 --- a/source/Publish/SenderClientExtensions.cs +++ b/source/Publish/SenderClientExtensions.cs @@ -20,7 +20,7 @@ public static async Task PublishAsync(this ISenderClient client, TMess { SessionId = sessionId, CorrelationId = correlationId?.Invoke(payload), - UserProperties = { ["MessageType"] = payload.GetType().Name } + UserProperties = { ["MessageType"] = payload.GetType().FullName } }; if (compress) @@ -53,7 +53,7 @@ public static async Task PublishAllAsync(this ISenderClient client, IL { SessionId = sessionId, CorrelationId = correlationId?.Invoke(payload), - UserProperties = { ["MessageType"] = payload.GetType().Name } + UserProperties = { ["MessageType"] = payload.GetType().FullName } }; if (compress) diff --git a/source/TinyDancer.csproj b/source/TinyDancer.csproj index db2a3b8..de21f64 100644 --- a/source/TinyDancer.csproj +++ b/source/TinyDancer.csproj @@ -8,7 +8,7 @@ azure servicebus service bus message queue https://github.com/johnknoop/TinyDancer https://github.com/johnknoop/TinyDancer - 1.0.1 + 2.0.0-beta @@ -16,7 +16,14 @@ + + + + ..\..\..\Users\john\.nuget\packages\microsoft.extensions.dependencyinjection.abstractions\2.2.0\lib\netstandard2.0\Microsoft.Extensions.DependencyInjection.Abstractions.dll + + +