Skip to content

Commit

Permalink
Merge pull request #56 from NerosoftDev/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
Codespilot authored Dec 11, 2023
2 parents e5b8b59 + 84e7f0b commit 56c80a0
Show file tree
Hide file tree
Showing 36 changed files with 787 additions and 671 deletions.
22 changes: 20 additions & 2 deletions Source/Euonia.Bus.Abstract/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public MessageContext(IRoutedMessage pack)
/// <summary>
/// Invoked while message was handled and replied to dispatcher.
/// </summary>
public event EventHandler<MessageRepliedEventArgs> OnResponse
public event EventHandler<MessageRepliedEventArgs> Responded
{
add => _events.AddEventHandler(value);
remove => _events.RemoveEventHandler(value);
Expand All @@ -59,6 +59,15 @@ public event EventHandler<MessageHandledEventArgs> Completed
remove => _events.RemoveEventHandler(value);
}

/// <summary>
/// Invoked while message handling was failed.
/// </summary>
public event EventHandler<Exception> Failed
{
add => _events.AddEventHandler(value);
remove => _events.RemoveEventHandler(value);
}

/// <inheritdoc />
public object Message { get; }

Expand Down Expand Up @@ -106,7 +115,7 @@ public string Authorization
/// <param name="message">The message to reply.</param>
public void Response(object message)
{
_events.HandleEvent(this, new MessageRepliedEventArgs(message), nameof(OnResponse));
_events.HandleEvent(this, new MessageRepliedEventArgs(message), nameof(Responded));
}

/// <summary>
Expand All @@ -119,6 +128,15 @@ public void Response<TMessage>(TMessage message)
Response((object)message);
}

/// <summary>
/// Called after the message handling was failed.
/// </summary>
/// <param name="exception"></param>
public void Failure(Exception exception)
{
_events.HandleEvent(this, exception, nameof(Failed));
}

/// <summary>
/// Called after the message has been handled.
/// This operate will raised up the <see cref="Completed"/> event.
Expand Down
13 changes: 11 additions & 2 deletions Source/Euonia.Bus.InMemory/InMemoryDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public async Task SendAsync<TMessage>(RoutedMessage<TMessage> message, Cancellat
cancellationToken.Register(() => taskCompletion.SetCanceled(cancellationToken));
}

context.Failed += (_, exception) =>
{
taskCompletion.TrySetException(exception);
};

context.Completed += (_, _) =>
{
taskCompletion.SetResult();
Expand Down Expand Up @@ -68,13 +73,17 @@ public async Task<TResponse> SendAsync<TMessage, TResponse>(RoutedMessage<TMessa
cancellationToken.Register(() => taskCompletion.TrySetCanceled(), false);
}

context.OnResponse += (_, args) =>
context.Responded += (_, args) =>
{
taskCompletion.SetResult((TResponse)args.Result);
};
context.Failed += (_, exception) =>
{
taskCompletion.TrySetException(exception);
};
context.Completed += (_, _) =>
{
taskCompletion.TrySetResult(default);
taskCompletion.TryCompleteFromCompletedTask(Task.FromResult(default(TResponse)));
};

StrongReferenceMessenger.Default.UnsafeSend(pack, message.Channel);
Expand Down
3 changes: 2 additions & 1 deletion Source/Euonia.Bus.RabbitMq/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ internal class Constants
{
ReferenceLoopHandling = ReferenceLoopHandling.Ignore,
ConstructorHandling = ConstructorHandling.Default,
MetadataPropertyHandling = MetadataPropertyHandling.ReadAhead
MetadataPropertyHandling = MetadataPropertyHandling.ReadAhead,
TypeNameHandling = TypeNameHandling.Auto
};

public class MessageHeaders
Expand Down
121 changes: 76 additions & 45 deletions Source/Euonia.Bus.RabbitMq/RabbitMqDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,56 +48,88 @@ public async Task PublishAsync<TMessage>(RoutedMessage<TMessage> message, Cancel
props.Type = typeName;

await Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(3), (exception, _, retryCount, _) =>
{
_logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
})
.ExecuteAsync(async () =>
{
var messageBody = await SerializeAsync(message, cancellationToken);
channel.ExchangeDeclare(_options.ExchangeName, _options.ExchangeType);
channel.BasicPublish(_options.ExchangeName, $"{_options.TopicName}${message.Channel}$", props, messageBody);
Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
});
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(3), (exception, _, retryCount, _) =>
{
_logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
})
.ExecuteAsync(async () =>
{
var messageBody = await SerializeAsync(message, cancellationToken);
channel.ExchangeDeclare(_options.ExchangeName, _options.ExchangeType);
channel.BasicPublish(_options.ExchangeName, $"{_options.TopicName}${message.Channel}$", props, messageBody);
Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
});
}

/// <inheritdoc />
public async Task SendAsync<TMessage>(RoutedMessage<TMessage> message, CancellationToken cancellationToken = default) where TMessage : class
{
using var channel = _connection.CreateChannel();
var task = new TaskCompletionSource<dynamic>();

var requestQueueName = $"{_options.QueueName}${message.Channel}$";

using var channel = _connection.CreateChannel();

CheckQueue(channel, requestQueueName);

var responseQueueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel);

consumer.Received += OnReceived;

var typeName = message.GetTypeName();

var props = channel.CreateBasicProperties();
props.Headers ??= new Dictionary<string, object>();
props.Headers[Constants.MessageHeaders.MessageType] = typeName;
props.Type = typeName;
props.CorrelationId = message.CorrelationId;
props.ReplyTo = responseQueueName;

await Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(3), (exception, _, retryCount, _) =>
{
_logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
})
.ExecuteAsync(async () =>
{
var messageBody = await SerializeAsync(message, cancellationToken);
channel.BasicPublish("", requestQueueName, props, messageBody);
Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
});
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(1), (exception, _, retryCount, _) =>
{
_logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
})
.ExecuteAsync(async () =>
{
var messageBody = await SerializeAsync(message, cancellationToken);
channel.BasicPublish("", requestQueueName, props, messageBody);
channel.BasicConsume(consumer, responseQueueName, true);
Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
});

await task.Task;
consumer.Received -= OnReceived;

void OnReceived(object sender, BasicDeliverEventArgs args)
{
if (args.BasicProperties.CorrelationId != message.CorrelationId)
{
return;
}

var body = args.Body.ToArray();
var response = JsonConvert.DeserializeObject<RabbitMqReply<object>>(Encoding.UTF8.GetString(body), Constants.SerializerSettings);
if (response.IsSuccess)
{
task.SetResult(response.Result);
}
else
{
task.SetException(response.Error);
}
}
}

/// <inheritdoc />
public async Task<TResponse> SendAsync<TMessage, TResponse>(RoutedMessage<TMessage, TResponse> message, CancellationToken cancellationToken = default) where TMessage : class
public async Task<TResponse> SendAsync<TMessage, TResponse>(RoutedMessage<TMessage, TResponse> message, CancellationToken cancellationToken = default)
where TMessage : class
{
var task = new TaskCompletionSource<TResponse>();

Expand All @@ -122,19 +154,19 @@ public async Task<TResponse> SendAsync<TMessage, TResponse>(RoutedMessage<TMessa
props.ReplyTo = responseQueueName;

await Policy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(1), (exception, _, retryCount, _) =>
{
_logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
})
.ExecuteAsync(async () =>
{
var messageBody = await SerializeAsync(message, cancellationToken);
channel.BasicPublish("", requestQueueName, props, messageBody);
channel.BasicConsume(consumer, responseQueueName, true);
Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
});
.Or<BrokerUnreachableException>()
.WaitAndRetryAsync(_options.MaxFailureRetries, _ => TimeSpan.FromSeconds(1), (exception, _, retryCount, _) =>
{
_logger.LogError(exception, "Retry:{RetryCount}, {Message}", retryCount, exception.Message);
})
.ExecuteAsync(async () =>
{
var messageBody = await SerializeAsync(message, cancellationToken);
channel.BasicPublish("", requestQueueName, props, messageBody);
channel.BasicConsume(consumer, responseQueueName, true);
Delivered?.Invoke(this, new MessageDispatchedEventArgs(message.Data, null));
});

var result = await task.Task;
consumer.Received -= OnReceived;
Expand All @@ -148,9 +180,8 @@ void OnReceived(object sender, BasicDeliverEventArgs args)
}

var body = args.Body.ToArray();
var response = JsonConvert.DeserializeObject<TResponse>(Encoding.UTF8.GetString(body), Constants.SerializerSettings);

task.SetResult(response);
var response = JsonConvert.DeserializeObject<RabbitMqReply<TResponse>>(Encoding.UTF8.GetString(body), Constants.SerializerSettings);
task.SetResult(response.Result);
}
}

Expand Down
22 changes: 18 additions & 4 deletions Source/Euonia.Bus.RabbitMq/RabbitMqQueueConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ internal override void Start(string channel)
{
Connection.TryConnect();
}

Channel = Connection.CreateChannel();

Channel.QueueDeclare(queueName, true, false, false, null);
Expand All @@ -65,27 +66,40 @@ protected override async void HandleMessageReceived(object sender, BasicDeliverE
OnMessageReceived(new MessageReceivedEventArgs(message.Data, context));

var taskCompletion = new TaskCompletionSource<object>();
context.OnResponse += (_, a) =>
context.Responded += (_, a) =>
{
taskCompletion.TrySetResult(a.Result);
};
context.Failed += (_, exception) =>
{
taskCompletion.TrySetException(exception);
};
context.Completed += (_, _) =>
{
taskCompletion.TryCompleteFromCompletedTask(Task.FromResult(default(object)));
};

RabbitMqReply<object> reply;

await Handler.HandleAsync(message.Channel, message.Data, context);

var result = await taskCompletion.Task;
try
{
var result = await taskCompletion.Task;
reply = RabbitMqReply<object>.Success(result);
}
catch (Exception exception)
{
reply = RabbitMqReply<object>.Failure(exception);
}

if (!string.IsNullOrEmpty(props.CorrelationId) || !string.IsNullOrWhiteSpace(props.ReplyTo))
{
var replyProps = Channel.CreateBasicProperties();
replyProps.Headers ??= new Dictionary<string, object>();
replyProps.Headers.Add(Constants.MessageHeaders.MessageType, result.GetType().GetFullNameWithAssemblyName());
replyProps.CorrelationId = props.CorrelationId;

var response = SerializeMessage(result);
var response = SerializeMessage(reply);
Channel.BasicPublish(string.Empty, props.ReplyTo, replyProps, response);
}

Expand Down
45 changes: 45 additions & 0 deletions Source/Euonia.Bus.RabbitMq/RabbitMqReply.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
namespace Nerosoft.Euonia.Bus.RabbitMq;

internal class RabbitMqReply<TResult>
{
/// <summary>
/// Gets or sets the result.
/// </summary>
public TResult Result { get; set; }

/// <summary>
/// Gets or sets the error.
/// </summary>
public Exception Error { get; set; }

/// <summary>
/// Gets a value indicating whether this message handing is success.
/// </summary>
public bool IsSuccess => Error == null;

/// <summary>
///
/// </summary>
/// <param name="result"></param>
/// <returns></returns>
public static RabbitMqReply<TResult> Success(TResult result)
{
return new RabbitMqReply<TResult>
{
Result = result
};
}

/// <summary>
///
/// </summary>
/// <param name="error"></param>
/// <returns></returns>
public static RabbitMqReply<TResult> Failure(Exception error)
{
return new RabbitMqReply<TResult>
{
Error = error
};
}
}
Loading

0 comments on commit 56c80a0

Please sign in to comment.