Skip to content

Commit

Permalink
Merge pull request #62 from NerosoftDev/develop
Browse files Browse the repository at this point in the history
Service bus optimization.
  • Loading branch information
Codespilot authored Dec 15, 2023
2 parents 75a01ba + 3f06a80 commit e376e36
Show file tree
Hide file tree
Showing 12 changed files with 960 additions and 681 deletions.
4 changes: 2 additions & 2 deletions Source/Euonia.Bus.InMemory/InMemoryDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public async Task SendAsync<TMessage>(RoutedMessage<TMessage> message, Cancellat

context.Completed += (_, _) =>
{
taskCompletion.SetResult();
taskCompletion.TrySetResult();
};

StrongReferenceMessenger.Default.UnsafeSend(pack, message.Channel);
Expand Down Expand Up @@ -88,7 +88,7 @@ public async Task<TResponse> SendAsync<TMessage, TResponse>(RoutedMessage<TMessa

context.Responded += (_, args) =>
{
taskCompletion.SetResult((TResponse)args.Result);
taskCompletion.TrySetResult((TResponse)args.Result);
};
context.Failed += (_, exception) =>
{
Expand Down
32 changes: 29 additions & 3 deletions Source/Euonia.Bus.InMemory/InMemoryQueueConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,45 @@
namespace Nerosoft.Euonia.Bus.InMemory;

using Microsoft.Extensions.Logging;

namespace Nerosoft.Euonia.Bus.InMemory;

/// <summary>
///
/// </summary>
public class InMemoryQueueConsumer : InMemoryRecipient, IQueueConsumer
{
private readonly IHandlerContext _handler;
private readonly ILogger<InMemoryQueueConsumer> _logger;

/// <summary>
/// Initializes a new instance of the <see cref="InMemoryQueueConsumer"/> class.
/// </summary>
/// <param name="handler"></param>
public InMemoryQueueConsumer(IHandlerContext handler)
: base(handler)
/// <param name="logger"></param>
public InMemoryQueueConsumer(IHandlerContext handler, ILoggerFactory logger)
{
_handler = handler;
_logger = logger.CreateLogger<InMemoryQueueConsumer>();
}

/// <inheritdoc />
public string Name => nameof(InMemoryQueueConsumer);

/// <inheritdoc/>
protected override async Task HandleAsync(string channel, object message, MessageContext context, CancellationToken cancellationToken = default)
{
try
{
await _handler.HandleAsync(channel, message, context, cancellationToken);
}
catch (Exception exception)
{
_logger.LogError(exception, "Message '{Id}' Handle Error: {Message}", context.MessageId, exception.Message);
context.Failure(exception);
}
finally
{
context.Complete(null);
}
}
}
23 changes: 11 additions & 12 deletions Source/Euonia.Bus.InMemory/InMemoryRecipient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@ public abstract class InMemoryRecipient : DisposableObject, IRecipient<MessagePa
/// </summary>
public event EventHandler<MessageAcknowledgedEventArgs> MessageAcknowledged;

private readonly IHandlerContext _handler;

/// <summary>
/// Initializes a new instance of the <see cref="InMemoryRecipient"/> class.
/// </summary>
/// <param name="handler"></param>
public InMemoryRecipient(IHandlerContext handler)
{
_handler = handler;
}

#region IDisposable

/// <inheritdoc />
Expand All @@ -39,7 +28,17 @@ protected override void Dispose(bool disposing)
public async void Receive(MessagePack pack)
{
MessageReceived?.Invoke(this, new MessageReceivedEventArgs(pack.Message, pack.Context));
await _handler.HandleAsync(pack.Message.Channel, pack.Message.Data, pack.Context, pack.Aborted);
await HandleAsync(pack.Message.Channel, pack.Message.Data, pack.Context, pack.Aborted);
MessageAcknowledged?.Invoke(this, new MessageAcknowledgedEventArgs(pack.Message, pack.Context));
}

/// <summary>
///
/// </summary>
/// <param name="channel"></param>
/// <param name="message"></param>
/// <param name="context"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected abstract Task HandleAsync(string channel, object message, MessageContext context, CancellationToken cancellationToken = default);
}
30 changes: 27 additions & 3 deletions Source/Euonia.Bus.InMemory/InMemoryTopicSubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,43 @@
namespace Nerosoft.Euonia.Bus.InMemory;

using Microsoft.Extensions.Logging;

namespace Nerosoft.Euonia.Bus.InMemory;

/// <summary>
///
/// </summary>
public class InMemoryTopicSubscriber : InMemoryRecipient, ITopicSubscriber
{
private readonly IHandlerContext _handler;
private readonly ILogger<InMemoryTopicSubscriber> _logger;

/// <summary>
/// Initializes a new instance of the <see cref="InMemoryTopicSubscriber"/> class.
/// </summary>
/// <param name="handler"></param>
public InMemoryTopicSubscriber(IHandlerContext handler)
: base(handler)
/// <param name="logger"></param>
public InMemoryTopicSubscriber(IHandlerContext handler, ILoggerFactory logger)
{
_handler = handler;
_logger = logger.CreateLogger<InMemoryTopicSubscriber>();
}

/// <inheritdoc />
public string Name => nameof(InMemoryTopicSubscriber);

/// <inheritdoc />
protected override async Task HandleAsync(string channel, object message, MessageContext context, CancellationToken cancellationToken = default)
{
try
{
await _handler.HandleAsync(channel, message, context, cancellationToken);
}
catch (Exception exception)
{
_logger.LogError(exception, "Message '{Id}' Handle Error: {Message}", context.MessageId, exception.Message);
}
finally
{
}
}
}
37 changes: 31 additions & 6 deletions Source/Euonia.Bus.RabbitMq/RabbitMqQueueConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

Expand All @@ -10,16 +11,21 @@ namespace Nerosoft.Euonia.Bus.RabbitMq;
public class RabbitMqQueueConsumer : RabbitMqQueueRecipient, IQueueConsumer
{
private readonly IIdentityProvider _identity;
private readonly IHandlerContext _handler;
private readonly ILogger<RabbitMqQueueConsumer> _logger;

/// <summary>
/// Initializes a new instance of the <see cref="RabbitMqQueueConsumer"/> class.
/// </summary>
/// <param name="connection"></param>
/// <param name="handler"></param>
/// <param name="options"></param>
public RabbitMqQueueConsumer(IPersistentConnection connection, IHandlerContext handler, IOptions<RabbitMqMessageBusOptions> options)
: base(connection, handler, options)
/// <param name="logger"></param>
public RabbitMqQueueConsumer(IPersistentConnection connection, IHandlerContext handler, IOptions<RabbitMqMessageBusOptions> options, ILoggerFactory logger)
: base(connection, options)
{
_handler = handler;
_logger = logger.CreateLogger<RabbitMqQueueConsumer>();
}

/// <summary>
Expand All @@ -28,9 +34,10 @@ public RabbitMqQueueConsumer(IPersistentConnection connection, IHandlerContext h
/// <param name="connection"></param>
/// <param name="handler"></param>
/// <param name="options"></param>
/// <param name="logger"></param>
/// <param name="identity"></param>
public RabbitMqQueueConsumer(IPersistentConnection connection, IHandlerContext handler, IOptions<RabbitMqMessageBusOptions> options, IIdentityProvider identity)
: this(connection, handler, options)
public RabbitMqQueueConsumer(IPersistentConnection connection, IHandlerContext handler, IOptions<RabbitMqMessageBusOptions> options, ILoggerFactory logger, IIdentityProvider identity)
: this(connection, handler, options, logger)
{
_identity = identity;
}
Expand Down Expand Up @@ -96,7 +103,7 @@ protected override async void HandleMessageReceived(object sender, BasicDeliverE

RabbitMqReply<object> reply;

await Handler.HandleAsync(message.Channel, message.Data, context);
await HandleAsync(message.Channel, message.Data, context);

try
{
Expand All @@ -123,6 +130,24 @@ protected override async void HandleMessageReceived(object sender, BasicDeliverE
OnMessageAcknowledged(new MessageAcknowledgedEventArgs(message.Data, context));
}

/// <inheritdoc/>
protected override async Task HandleAsync(string channel, object message, MessageContext context, CancellationToken cancellationToken = default)
{
try
{
await _handler.HandleAsync(channel, message, context, cancellationToken);
}
catch (Exception exception)
{
_logger.LogError(exception, "Message '{Id}' Handle Error: {Message}", context.MessageId, exception.Message);
context.Failure(exception);
}
finally
{
context.Complete(null);
}
}

/// <inheritdoc />
protected override void Dispose(bool disposing)
{
Expand Down
14 changes: 9 additions & 5 deletions Source/Euonia.Bus.RabbitMq/RabbitMqQueueRecipient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ public abstract class RabbitMqQueueRecipient : DisposableObject
/// Initializes a new instance of the <see cref="RabbitMqQueueRecipient"/> class.
/// </summary>
/// <param name="factory"></param>
/// <param name="handler"></param>
/// <param name="options"></param>
protected RabbitMqQueueRecipient(IPersistentConnection factory, IHandlerContext handler, IOptions<RabbitMqMessageBusOptions> options)
///
protected RabbitMqQueueRecipient(IPersistentConnection factory, IOptions<RabbitMqMessageBusOptions> options)
{
Options = options.Value;
Handler = handler;
Connection = factory;
}

Expand All @@ -43,9 +42,14 @@ protected RabbitMqQueueRecipient(IPersistentConnection factory, IHandlerContext
protected virtual RabbitMqMessageBusOptions Options { get; }

/// <summary>
/// Gets the message handler context instance.
///
/// </summary>
protected virtual IHandlerContext Handler { get; }
/// <param name="channel"></param>
/// <param name="message"></param>
/// <param name="context"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected abstract Task HandleAsync(string channel, object message, MessageContext context, CancellationToken cancellationToken = default);

// protected virtual void AcknowledgeMessage(ulong deliveryTag)
// {
Expand Down
36 changes: 29 additions & 7 deletions Source/Euonia.Bus.RabbitMq/RabbitMqTopicSubscriber.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

Expand All @@ -10,16 +11,20 @@ namespace Nerosoft.Euonia.Bus.RabbitMq;
public class RabbitMqTopicSubscriber : RabbitMqQueueRecipient, ITopicSubscriber
{
private readonly IIdentityProvider _identity;

private readonly IHandlerContext _handler;
private readonly ILogger<RabbitMqTopicSubscriber> _logger;
/// <summary>
/// Initializes a new instance of the <see cref="RabbitMqTopicSubscriber"/> class.
/// </summary>
/// <param name="connection"></param>
/// <param name="handler"></param>
/// <param name="options"></param>
public RabbitMqTopicSubscriber(IPersistentConnection connection, IHandlerContext handler, IOptions<RabbitMqMessageBusOptions> options)
: base(connection, handler, options)
/// <param name="logger"></param>
public RabbitMqTopicSubscriber(IPersistentConnection connection, IHandlerContext handler, IOptions<RabbitMqMessageBusOptions> options, ILoggerFactory logger)
: base(connection, options)
{
_handler = handler;
_logger = logger.CreateLogger<RabbitMqTopicSubscriber>();
}

/// <summary>
Expand All @@ -28,9 +33,10 @@ public RabbitMqTopicSubscriber(IPersistentConnection connection, IHandlerContext
/// <param name="connection"></param>
/// <param name="handler"></param>
/// <param name="options"></param>
/// <param name="logger"></param>
/// <param name="identity"></param>
public RabbitMqTopicSubscriber(IPersistentConnection connection, IHandlerContext handler, IOptions<RabbitMqMessageBusOptions> options, IIdentityProvider identity)
: this(connection, handler, options)
public RabbitMqTopicSubscriber(IPersistentConnection connection, IHandlerContext handler, IOptions<RabbitMqMessageBusOptions> options, ILoggerFactory logger, IIdentityProvider identity)
: this(connection, handler, options, logger)
{
_identity = identity;
}
Expand Down Expand Up @@ -87,7 +93,7 @@ protected override async void HandleMessageReceived(object sender, BasicDeliverE

OnMessageReceived(new MessageReceivedEventArgs(message.Data, context));

await Handler.HandleAsync(message.Channel, message.Data, context);
await HandleAsync(message.Channel, message.Data, context);

if (!Options.AutoAck)
{
Expand All @@ -97,6 +103,22 @@ protected override async void HandleMessageReceived(object sender, BasicDeliverE
OnMessageAcknowledged(new MessageAcknowledgedEventArgs(message.Data, context));
}

/// <inheritdoc />
protected override async Task HandleAsync(string channel, object message, MessageContext context, CancellationToken cancellationToken = default)
{
try
{
await _handler.HandleAsync(channel, message, context, cancellationToken);
}
catch (Exception exception)
{
_logger.LogError(exception, "Message '{Id}' Handle Error: {Message}", context.MessageId, exception.Message);
}
finally
{
}
}

/// <inheritdoc />
protected override void Dispose(bool disposing)
{
Expand Down
Loading

0 comments on commit e376e36

Please sign in to comment.