diff --git a/src/AddUp.FakeRabbitMQ.Tests/FakeModelBasicTests.cs b/src/AddUp.FakeRabbitMQ.Tests/FakeModelBasicTests.cs index f01cbde..fa0ee46 100644 --- a/src/AddUp.FakeRabbitMQ.Tests/FakeModelBasicTests.cs +++ b/src/AddUp.FakeRabbitMQ.Tests/FakeModelBasicTests.cs @@ -41,7 +41,7 @@ public void BasicAck_removes_message_from_queue() var deliveryTag = model.WorkingMessagesForUnitTests.First().Key; model.BasicAck(deliveryTag, false); - Assert.Empty(server.Queues["my_queue"].Messages); + Assert.False(server.Queues["my_queue"].HasMessages); } } } @@ -252,7 +252,7 @@ public void BasicGet_does_not_remove_the_message_from_the_queue_if_not_acked(boo _ = model.BasicGet("my_queue", autoAck); - Assert.Equal(expectedMessageCount, server.Queues["my_queue"].Messages.Count); + Assert.Equal(expectedMessageCount, server.Queues["my_queue"].MessageCount); Assert.Equal(expectedMessageCount, model.WorkingMessagesForUnitTests.Count); } } @@ -283,42 +283,40 @@ public void BasicNack_does_not_reenqueue_a_brand_new_message(bool requeue, int e var deliveryTag = model.WorkingMessagesForUnitTests.First().Key; model.BasicNack(deliveryTag, false, requeue); - Assert.Equal(expectedMessageCount, server.Queues["my_queue"].Messages.Count); + Assert.Equal(expectedMessageCount, server.Queues["my_queue"].MessageCount); Assert.Equal(expectedMessageCount, model.WorkingMessagesForUnitTests.Count); } } } [Theory] - [InlineData(true, 1)] // If requeue param to BasicNack is true, the message that is nacked should remain in Rabbit [InlineData(false, 0)] // If requeue param to BasicNack is false, the message that is nacked should be removed from Rabbit + [InlineData(true, 1)] // If requeue param to BasicNack is true, the message that is nacked should remain in Rabbit public void BasicReject_does_not_reenqueue_a_brand_new_message(bool requeue, int expectedMessageCount) { var server = new RabbitServer(); - using (var model = new FakeModel(server)) - { - model.ExchangeDeclare("my_exchange", ExchangeType.Direct); - model.QueueDeclare("my_queue"); - model.ExchangeBind("my_queue", "my_exchange", null); + using var model = new FakeModel(server); - var encodedMessage = Encoding.ASCII.GetBytes("hello world!"); - model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage); + model.ExchangeDeclare("my_exchange", ExchangeType.Direct); + model.QueueDeclare("my_queue"); + model.ExchangeBind("my_queue", "my_exchange", null); - var consumer = new EventingBasicConsumer(model); - using (var messageProcessed = new ManualResetEventSlim()) - { - consumer.Received += (_, _) => messageProcessed.Set(); - model.BasicConsume("my_queue", false, consumer); - Assert.True(consumer.IsRunning); + var encodedMessage = Encoding.ASCII.GetBytes("hello world!"); + model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage); - messageProcessed.Wait(); - var deliveryTag = model.WorkingMessagesForUnitTests.First().Key; - model.BasicReject(deliveryTag, requeue); + var consumer = new EventingBasicConsumer(model); + using var messageProcessed = new ManualResetEventSlim(); - Assert.Equal(expectedMessageCount, server.Queues["my_queue"].Messages.Count); - Assert.Equal(expectedMessageCount, model.WorkingMessagesForUnitTests.Count); - } - } + consumer.Received += (_, _) => messageProcessed.Set(); + model.BasicConsume("my_queue", false, consumer); + Assert.True(consumer.IsRunning); + + messageProcessed.Wait(); + var deliveryTag = model.WorkingMessagesForUnitTests.First().Key; + model.BasicReject(deliveryTag, requeue); + + Assert.Equal(expectedMessageCount, server.Queues["my_queue"].MessageCount); + Assert.Equal(expectedMessageCount, model.WorkingMessagesForUnitTests.Count); } [Fact] @@ -336,8 +334,11 @@ public void BasicPublish_publishes_message() model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage); - Assert.Single(server.Queues["my_queue"].Messages); - Assert.Equal(encodedMessage, server.Queues["my_queue"].Messages.First().Body); + Assert.Equal(1, server.Queues["my_queue"].MessageCount); + if (!server.Queues["my_queue"].TryPeekForUnitTests(out var peeked)) + Assert.Fail("No message in queue"); + else + Assert.Equal(encodedMessage, peeked.Body); } } @@ -354,8 +355,11 @@ public void BasicPublish_to_default_exchange_publishes_message() model.BasicPublish("", "my_queue", model.CreateBasicProperties(), encodedMessage); - Assert.Single(server.Queues["my_queue"].Messages); - Assert.Equal(encodedMessage, server.Queues["my_queue"].Messages.First().Body); + Assert.Equal(1, server.Queues["my_queue"].MessageCount); + if (!server.Queues["my_queue"].TryPeekForUnitTests(out var peeked)) + Assert.Fail("No message in queue"); + else + Assert.Equal(encodedMessage, peeked.Body); } } @@ -505,10 +509,11 @@ public void BasicPublishBatch_publishes_messages() batch.Add("my_exchange", null, true, model.CreateBasicProperties(), encodedMessages[1]); batch.Publish(); - Assert.Equal(2, server.Queues["my_queue"].Messages.Count); + Assert.Equal(2, server.Queues["my_queue"].MessageCount); var index = 0; - foreach (var item in server.Queues["my_queue"].Messages) + var items = server.Queues["my_queue"].GetAllMessagesForUnitTests(); + foreach (var item in items) { Assert.Equal(encodedMessages[index].ToArray(), item.Body); index++; diff --git a/src/AddUp.FakeRabbitMQ.Tests/FakeModelMiscTests.cs b/src/AddUp.FakeRabbitMQ.Tests/FakeModelMiscTests.cs index 06a280a..97a5423 100644 --- a/src/AddUp.FakeRabbitMQ.Tests/FakeModelMiscTests.cs +++ b/src/AddUp.FakeRabbitMQ.Tests/FakeModelMiscTests.cs @@ -1,6 +1,7 @@ using System.Diagnostics.CodeAnalysis; using System.Text; using System.Threading; +using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; using Xunit; @@ -56,43 +57,89 @@ public void MessageCount_returns_the_number_of_messages_in_the_queue() public void MessageCount_returns_the_number_of_non_consumed_messages_in_the_queue() { var server = new RabbitServer(); - using (var model = new FakeModel(server)) + using var model = new FakeModel(server); + + const string queueName = "myQueue"; + model.QueueDeclare(queueName); + model.ExchangeDeclare("my_exchange", ExchangeType.Direct); + model.ExchangeBind(queueName, "my_exchange", null); + + for (var i = 0; i < 10; i++) { - const string queueName = "myQueue"; - model.QueueDeclare(queueName); - model.ExchangeDeclare("my_exchange", ExchangeType.Direct); - model.ExchangeBind(queueName, "my_exchange", null); + var message = $"hello world: {i}"; + var encodedMessage = Encoding.ASCII.GetBytes(message); + model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage); + } - for (var i = 0; i < 10; i++) - { + // Consume 4 messages + const string consumerTag = "consumer-tag"; + var consumer = new EventingBasicConsumer(model); + var consumptionCount = 0; + using var messagesProcessed = new ManualResetEventSlim(); + + consumer.Received += (s, e) => + { + consumptionCount++; + if (consumptionCount > 4) return; + + model.BasicAck(e.DeliveryTag, false); + if (consumptionCount == 4) + messagesProcessed.Set(); + }; + + model.BasicConsume(queueName, false, consumerTag, consumer); + + messagesProcessed.Wait(); + Assert.Equal(6u, model.MessageCount(queueName)); + } + [Fact] + public async Task MessageCount_returns_the_number_of_non_consumed_messages_in_the_queue_autoAck_mode() + { + var server = new RabbitServer(); + using var model = new FakeModel(server); + + const string queueName = "myQueue"; + model.QueueDeclare(queueName); + model.ExchangeDeclare("my_exchange", ExchangeType.Direct); + model.ExchangeBind(queueName, "my_exchange", null); + + void publishMessages(int startIndex, int count) + { + for (var i = startIndex; i < startIndex + count; i++) + { var message = $"hello world: {i}"; var encodedMessage = Encoding.ASCII.GetBytes(message); model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage); } + } - // Consume 4 messages - var consumer = new EventingBasicConsumer(model); - var consumptionCount = 0; - using (var messagesProcessed = new ManualResetEventSlim()) - { - consumer.Received += (s, e) => - { - if (consumptionCount >= 4) - { - messagesProcessed.Set(); - return; - } - - model.BasicAck(e.DeliveryTag, false); - consumptionCount++; - }; - - model.BasicConsume(queueName, true, consumer); - messagesProcessed.Wait(); - Assert.Equal(6u, model.MessageCount(queueName)); - } + publishMessages(0, 4); + + // Consume 4 messages + const string consumerTag = "consumer-tag"; + var consumer = new EventingBasicConsumer(model); + var consumptionCount = 0; + using var messagesProcessed = new ManualResetEventSlim(); + + void consume(object sender, BasicDeliverEventArgs e) + { + consumptionCount++; + if (consumptionCount >= 4) + messagesProcessed.Set(); } + + consumer.Received += consume; + + model.BasicConsume(queueName, true, consumerTag, consumer); + messagesProcessed.Wait(); + model.BasicCancel(consumerTag); + + publishMessages(4, 6); // Publish another 6 messages + await Task.Delay(1000); // They will never be consumed + + Assert.Equal(4, consumptionCount); + Assert.Equal(6u, model.MessageCount(queueName)); } [Fact] diff --git a/src/AddUp.FakeRabbitMQ.Tests/FakeModelQueueTests.cs b/src/AddUp.FakeRabbitMQ.Tests/FakeModelQueueTests.cs index 89942a9..ac20cba 100644 --- a/src/AddUp.FakeRabbitMQ.Tests/FakeModelQueueTests.cs +++ b/src/AddUp.FakeRabbitMQ.Tests/FakeModelQueueTests.cs @@ -1,6 +1,8 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; +using System.Threading; using RabbitMQ.Client; using RabbitMQ.Client.Exceptions; using Xunit; @@ -10,6 +12,8 @@ namespace AddUp.RabbitMQ.Fakes; [ExcludeFromCodeCoverage] public class FakeModelQueueTests { + private long lastDeliveryTag; // USed to simulate generation of the delivery tag by FakeModel + [Fact] public void QueueBind_binds_an_exchange_to_a_queue() { @@ -202,20 +206,20 @@ public void QueuePurge_removes_all_messages_from_specified_queue() using (var model = new FakeModel(node)) { model.QueueDeclare("my_other_queue"); - node.Queues["my_other_queue"].Messages.Enqueue(new RabbitMessage()); - node.Queues["my_other_queue"].Messages.Enqueue(new RabbitMessage()); + node.Queues["my_other_queue"].Enqueue(MakeRabbitMessage()); + node.Queues["my_other_queue"].Enqueue(MakeRabbitMessage()); model.QueueDeclare("my_queue"); - node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage()); - node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage()); - node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage()); - node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage()); + node.Queues["my_queue"].Enqueue(MakeRabbitMessage()); + node.Queues["my_queue"].Enqueue(MakeRabbitMessage()); + node.Queues["my_queue"].Enqueue(MakeRabbitMessage()); + node.Queues["my_queue"].Enqueue(MakeRabbitMessage()); var count = model.QueuePurge("my_queue"); Assert.Equal(4u, count); - Assert.True(node.Queues["my_queue"].Messages.IsEmpty); - Assert.False(node.Queues["my_other_queue"].Messages.IsEmpty); + Assert.False(node.Queues["my_queue"].HasMessages); + Assert.True(node.Queues["my_other_queue"].HasMessages); } } @@ -226,15 +230,22 @@ public void QueuePurge_returns_0_if_queue_does_not_exist() using (var model = new FakeModel(node)) { model.QueueDeclare("my_queue"); - node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage()); - node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage()); - node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage()); - node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage()); + node.Queues["my_queue"].Enqueue(MakeRabbitMessage()); + node.Queues["my_queue"].Enqueue(MakeRabbitMessage()); + node.Queues["my_queue"].Enqueue(MakeRabbitMessage()); + node.Queues["my_queue"].Enqueue(MakeRabbitMessage()); var count = model.QueuePurge("my_other_queue"); Assert.Equal(0u, count); - Assert.False(node.Queues["my_queue"].Messages.IsEmpty); + Assert.True(node.Queues["my_queue"].HasMessages); } } + + private RabbitMessage MakeRabbitMessage() + { + _ = Interlocked.Increment(ref lastDeliveryTag); + var deliveryTag = Convert.ToUInt64(lastDeliveryTag); + return new(deliveryTag); + } } diff --git a/src/AddUp.FakeRabbitMQ.Tests/Repros/BasicAckSemanticTests.cs b/src/AddUp.FakeRabbitMQ.Tests/Repros/BasicAckSemanticTests.cs index e09d587..e8530d9 100644 --- a/src/AddUp.FakeRabbitMQ.Tests/Repros/BasicAckSemanticTests.cs +++ b/src/AddUp.FakeRabbitMQ.Tests/Repros/BasicAckSemanticTests.cs @@ -48,9 +48,9 @@ public void Ack_DeliveryTagsAreRespected() // setup a basic consumer that stores delivery tags to the deliveryTags list var consumer = new EventingBasicConsumer(model); - consumer.Received += (_, args) => + consumer.Received += (_, e) => { - deliveryTags.Add(args.DeliveryTag); + deliveryTags.Add(e.DeliveryTag); if (deliveryTags.Count == 2) allMessagesDelivered.Set(); }; @@ -68,10 +68,10 @@ public void Ack_DeliveryTagsAreRespected() model.BasicAck(deliveryTags[1], false); // asserts that only one message is still queued - Assert.True(rabbitServer.Queues[queueName].Messages.Count == 1, "Only one message is still queued"); + Assert.True(rabbitServer.Queues[queueName].MessageCount == 1, "Only one message is still queued"); // asserts that the remaining message in queue is the first one, since we acked the second - rabbitServer.Queues[queueName].Messages.TryPeek(out var pendingMessage); + rabbitServer.Queues[queueName].TryPeekForUnitTests(out var pendingMessage); Assert.True(Encoding.UTF8.GetString(pendingMessage.Body) == "first", "The remaining message in queue is the first one"); } @@ -83,9 +83,9 @@ public void Ack_Multiple() // setup a basic consumer that stores delivery tags to the deliveryTags list var consumer = new EventingBasicConsumer(model); - consumer.Received += (_, args) => + consumer.Received += (_, e) => { - deliveryTags.Add(args.DeliveryTag); + deliveryTags.Add(e.DeliveryTag); if (deliveryTags.Count == 2) allMessagesDelivered.Set(); }; @@ -103,6 +103,34 @@ public void Ack_Multiple() model.BasicAck(deliveryTags[1], true); // asserts queue is empty - Assert.Empty(rabbitServer.Queues[queueName].Messages); + Assert.False(rabbitServer.Queues[queueName].HasMessages); + } + + [Fact] + public void AutoAck_is_honored_by_BasicConsume() + { + using var allMessagesDelivered = new ManualResetEventSlim(); + var deliveryTags = new List(); + + // setup a basic consumer that stores delivery tags to the deliveryTags list + var consumer = new EventingBasicConsumer(model); + consumer.Received += (_, e) => + { + deliveryTags.Add(e.DeliveryTag); + if (deliveryTags.Count == 2) + allMessagesDelivered.Set(); + }; + + model.BasicConsume(queueName, true, consumer); + + // publish two messages + model.BasicPublish(exchange, routingKey, true, null, Encoding.UTF8.GetBytes("first")); + model.BasicPublish(exchange, routingKey, true, null, Encoding.UTF8.GetBytes("second")); + + // wait for both messages to be delivered + allMessagesDelivered.Wait(timeout * 1000000); + + // asserts queue is empty + Assert.False(rabbitServer.Queues[queueName].HasMessages); } } diff --git a/src/AddUp.FakeRabbitMQ.Tests/UseCases/SendMessages.cs b/src/AddUp.FakeRabbitMQ.Tests/UseCases/SendMessages.cs index 22e0b21..694dc7a 100644 --- a/src/AddUp.FakeRabbitMQ.Tests/UseCases/SendMessages.cs +++ b/src/AddUp.FakeRabbitMQ.Tests/UseCases/SendMessages.cs @@ -42,7 +42,7 @@ public void SendToExchangeWithBoundQueue() channel.BasicPublish(exchange: "my_exchange", routingKey: null, mandatory: false, basicProperties: null, body: messageBody); } - Assert.Single(rabbitServer.Queues["some_queue"].Messages); + Assert.Equal(1, rabbitServer.Queues["some_queue"].MessageCount); } [Fact] @@ -62,8 +62,8 @@ public void SendToExchangeWithMultipleBoundQueues() channel.BasicPublish(exchange: "my_exchange", routingKey: null, mandatory: false, basicProperties: null, body: messageBody); } - Assert.Single(rabbitServer.Queues["some_queue"].Messages); - Assert.Single(rabbitServer.Queues["some_other_queue"].Messages); + Assert.Equal(1, rabbitServer.Queues["some_queue"].MessageCount); + Assert.Equal(1, rabbitServer.Queues["some_other_queue"].MessageCount); } [Fact] @@ -97,8 +97,8 @@ public void SendToExchangeWithAlternate() Assert.Equal(2, rabbitServer.Exchanges["my_exchange"].Messages.Count); Assert.Single(rabbitServer.Exchanges["my_alternate_exchange"].Messages); - Assert.Single(rabbitServer.Queues["fallback_queue"].Messages); - Assert.Single(rabbitServer.Queues["main_queue"].Messages); + Assert.Equal(1, rabbitServer.Queues["fallback_queue"].MessageCount); + Assert.Equal(1, rabbitServer.Queues["main_queue"].MessageCount); } private static void ConfigureQueueBinding(RabbitServer rabbitServer, string exchangeName, string queueName) diff --git a/src/AddUp.FakeRabbitMQ/FakeModel.cs b/src/AddUp.FakeRabbitMQ/FakeModel.cs index aa0a9e8..5e7f2b3 100644 --- a/src/AddUp.FakeRabbitMQ/FakeModel.cs +++ b/src/AddUp.FakeRabbitMQ/FakeModel.cs @@ -53,15 +53,62 @@ public FakeModel(RabbitServer rabbitServer) public void BasicAck(ulong deliveryTag, bool multiple) { - _ = workingMessages.TryRemove(deliveryTag, out var message); - if (message != null) + void ack(ulong dtag) { + _ = workingMessages.TryRemove(dtag, out var message); + if (message == null) return; + + _ = server.Queues.TryGetValue(message.Queue, out var queue); + if (queue == null) return; + + queue.Ack(dtag); + } + + if (multiple) + { + var dtag = deliveryTag; + while (dtag > 0) + { + ack(dtag); + dtag--; + } + } + else ack(deliveryTag); + } + + public void BasicNack(ulong deliveryTag, bool multiple, bool requeue) + { + void nack(ulong dtag) + { + RabbitMessage message; + if (requeue) + _ = workingMessages.TryGetValue(dtag, out message); + else + _ = workingMessages.TryRemove(dtag, out message); + + if (message == null) return; + _ = server.Queues.TryGetValue(message.Queue, out var queue); - if (queue != null) - _ = queue.Messages.TryDequeue(out _); + if (queue == null) return; + + if (!requeue) queue.Ack(deliveryTag); } + + if (multiple) + { + var dtag = deliveryTag; + while (dtag > 0) + { + nack(dtag); + dtag--; + } + } + else nack(deliveryTag); } + // BasicReject is simply BasicNack with no possibility for multiple rejections + public void BasicReject(ulong deliveryTag, bool requeue) => BasicNack(deliveryTag, false, requeue); + public void BasicCancel(string consumerTag) { _ = consumers.TryRemove(consumerTag, out var consumerData); @@ -87,9 +134,8 @@ public string BasicConsume(string queue, bool autoAck, string consumerTag, bool { void notifyConsumerOfMessage(RabbitMessage message) { - _ = Interlocked.Increment(ref lastDeliveryTag); + var deliveryTag = message.DeliveryTag; - var deliveryTag = Convert.ToUInt64(lastDeliveryTag); const bool redelivered = false; var exchange = message.Exchange; var routingKey = message.RoutingKey; @@ -109,6 +155,9 @@ void notifyConsumerOfMessage(RabbitMessage message) .GetResult(); else consumer .HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body); + + if (autoAck) + BasicAck(deliveryTag, false); } // Deliberately check for empty string here, latest RabbitMQ client accepts "" @@ -136,7 +185,7 @@ ConsumerData updateFunction(string s, ConsumerData _) => new ShutdownEventArgs(ShutdownInitiator.Peer, 530, $"NOT_ALLOWED - attempt to reuse consumer tag '{s}'")); _ = consumers.AddOrUpdate(consumerTag, consumerData, updateFunction); - foreach (var message in queueInstance.Messages) + foreach (var message in queueInstance.GetMessages()) consumerData.QueueMessagePublished(this, message); queueInstance.MessagePublished += consumerData.QueueMessagePublished; @@ -155,18 +204,15 @@ public BasicGetResult BasicGet(string queue, bool autoAck) _ = server.Queues.TryGetValue(queue, out var queueInstance); if (queueInstance == null) return null; - _ = autoAck ? - queueInstance.Messages.TryDequeue(out var message) : - queueInstance.Messages.TryPeek(out message); - + queueInstance.TryGet(out var message, autoAck); if (message == null) return null; - _ = Interlocked.Increment(ref lastDeliveryTag); - var deliveryTag = Convert.ToUInt64(lastDeliveryTag); + var deliveryTag = message.DeliveryTag; + const bool redelivered = false; var exchange = message.Exchange; var routingKey = message.RoutingKey; - var messageCount = Convert.ToUInt32(queueInstance.Messages.Count); + var messageCount = Convert.ToUInt32(queueInstance.MessageCount); var basicProperties = message.BasicProperties ?? CreateBasicProperties(); var body = message.Body; @@ -181,29 +227,13 @@ public BasicGetResult BasicGet(string queue, bool autoAck) return new BasicGetResult(deliveryTag, redelivered, exchange, routingKey, messageCount, basicProperties, body); } - public void BasicNack(ulong deliveryTag, bool multiple, bool requeue) - { - if (requeue) return; - - foreach (var queue in workingMessages.Select(m => m.Value.Queue)) - { - _ = server.Queues.TryGetValue(queue, out var queueInstance); - queueInstance?.ClearMessages(); - } - - _ = workingMessages.TryRemove(deliveryTag, out var message); - if (message == null) return; - - foreach (var workingMessage in workingMessages.Select(m => m.Value)) - { - _ = server.Queues.TryGetValue(workingMessage.Queue, out var queueInstance); - queueInstance?.PublishMessage(workingMessage); - } - } - public void BasicPublish(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, ReadOnlyMemory body) { - var parameters = new RabbitMessage + // Let's create the delivery tag as soon as publishing so that we can find it in the queues later on + _ = Interlocked.Increment(ref lastDeliveryTag); + var deliveryTag = Convert.ToUInt64(lastDeliveryTag); + + var message = new RabbitMessage(deliveryTag) { Exchange = exchange, RoutingKey = routingKey, @@ -222,13 +252,13 @@ RabbitExchange addExchange(string s) IsDurable = false }; - newExchange.PublishMessage(parameters); + newExchange.PublishMessage(message); return newExchange; } RabbitExchange updateExchange(string s, RabbitExchange existingExchange) { - existingExchange.PublishMessage(parameters); + existingExchange.PublishMessage(message); return existingExchange; } @@ -259,9 +289,6 @@ public void BasicRecover(bool requeue) public void BasicRecoverAsync(bool requeue) => BasicRecover(requeue); - public void BasicReject(ulong deliveryTag, bool requeue) => - BasicNack(deliveryTag, false, requeue); - public void Close() => Close(200, "Goodbye"); public void Close(ushort replyCode, string replyText) => Close(replyCode, replyText, abort: false); private void Close(ushort replyCode, string replyText, bool abort) @@ -414,8 +441,8 @@ public QueueDeclareOk QueueDeclarePassive(string queue) { if (server.Queues.TryGetValue(queue, out var rabbitQueue)) { - var result = new QueueDeclareOk(queue, - (uint)unchecked(rabbitQueue.Messages.Count), + var result = new QueueDeclareOk(queue, + (uint)unchecked(rabbitQueue.MessageCount), (uint)unchecked(rabbitQueue.ConsumerCount)); CurrentQueue = result.QueueName; @@ -445,14 +472,7 @@ public uint QueuePurge(string queue) if (instance == null) return 0u; - var count = 0u; - while (!instance.Messages.IsEmpty) - { - _ = instance.Messages.TryDequeue(out _); - count++; - } - - return count; + return instance.ClearMessages(); } public void QueueUnbind(string queue, string exchange, string routingKey, IDictionary arguments) => diff --git a/src/AddUp.FakeRabbitMQ/RabbitMessage.cs b/src/AddUp.FakeRabbitMQ/RabbitMessage.cs index b1d6bec..abaf0c0 100644 --- a/src/AddUp.FakeRabbitMQ/RabbitMessage.cs +++ b/src/AddUp.FakeRabbitMQ/RabbitMessage.cs @@ -4,6 +4,8 @@ namespace AddUp.RabbitMQ.Fakes { public sealed class RabbitMessage { + public RabbitMessage(ulong deliveryTag) => DeliveryTag = deliveryTag; + public string Exchange { get; set; } public string RoutingKey { get; set; } public string Queue { get; set; } @@ -11,7 +13,9 @@ public sealed class RabbitMessage public IBasicProperties BasicProperties { get; set; } public byte[] Body { get; set; } - public RabbitMessage Copy() => new RabbitMessage + internal ulong DeliveryTag { get; } + + public RabbitMessage Copy() => new RabbitMessage(DeliveryTag) { Exchange = Exchange, RoutingKey = RoutingKey, diff --git a/src/AddUp.FakeRabbitMQ/RabbitQueue.cs b/src/AddUp.FakeRabbitMQ/RabbitQueue.cs index 031abdd..902e61f 100644 --- a/src/AddUp.FakeRabbitMQ/RabbitQueue.cs +++ b/src/AddUp.FakeRabbitMQ/RabbitQueue.cs @@ -1,17 +1,20 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; namespace AddUp.RabbitMQ.Fakes { public sealed class RabbitQueue { + private readonly object me = new object(); + private readonly ConcurrentDictionary messages = new ConcurrentDictionary(); + private readonly ConcurrentQueue deliveryTags = new ConcurrentQueue(); private readonly HashSet> messagePublishedEventHandlers; public RabbitQueue() { messagePublishedEventHandlers = new HashSet>(); - Messages = new ConcurrentQueue(); Bindings = new ConcurrentDictionary(); Arguments = new Dictionary(); } @@ -22,29 +25,83 @@ public event EventHandler MessagePublished remove => messagePublishedEventHandlers.Remove(value); } - public ConcurrentQueue Messages { get; } public ConcurrentDictionary Bindings { get; } public IDictionary Arguments { get; set; } public string Name { get; set; } public bool IsDurable { get; set; } public bool IsExclusive { get; set; } public bool IsAutoDelete { get; set; } - + public int MessageCount => messages.Count; + public bool HasMessages => !messages.IsEmpty; public int ConsumerCount => messagePublishedEventHandlers.Count; + public void Ack(ulong deliveryTag) + { + lock (me) + _ = messages.TryRemove(deliveryTag, out _); + } + + public void Enqueue(RabbitMessage message) + { + var deliveryTag = message.DeliveryTag; + if (deliveryTag == 0) throw new InvalidOperationException("No Delivery Tag"); + + RabbitMessage updateFunction(ulong key, RabbitMessage existingMessage) => existingMessage; + lock (me) + { + _ = messages.AddOrUpdate(deliveryTag, message, updateFunction); + deliveryTags.Enqueue(deliveryTag); + } + } + + public IEnumerable GetMessages() + { + while (deliveryTags.Count > 0) + { + if (TryGet(out var m, false)) + yield return m; + } + } + + public bool TryGet(out RabbitMessage result, bool remove) + { + result = null; + lock (me) + { + var found = deliveryTags.TryDequeue(out var deliveryTag); + return found && (remove + ? messages.TryRemove(deliveryTag, out result) + : messages.TryGetValue(deliveryTag, out result)); + } + } + public void PublishMessage(RabbitMessage message) { var queueMessage = message.Copy(); queueMessage.Queue = Name; - Messages.Enqueue(queueMessage); + Enqueue(queueMessage); foreach (var handler in messagePublishedEventHandlers) handler(this, queueMessage); } - public void ClearMessages() + public uint ClearMessages() { - while (Messages.TryDequeue(out _)) - ; + var count = 0u; + while (TryGet(out _, true)) + count++; + return count; } + + internal bool TryPeekForUnitTests(out RabbitMessage result) + { + result = null; + lock (me) + { + var found = deliveryTags.TryPeek(out var deliveryTag); + return found && messages.TryGetValue(deliveryTag, out result); + } + } + + internal RabbitMessage[] GetAllMessagesForUnitTests() => messages.Values.ToArray(); } } \ No newline at end of file