Skip to content

Commit

Permalink
Issue #180: Huge refactoring; attempting to support BasicAck + better…
Browse files Browse the repository at this point in the history
… support for Nack/Reject
  • Loading branch information
odalet committed Nov 12, 2023
1 parent 11303bd commit 6f990d4
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 142 deletions.
65 changes: 35 additions & 30 deletions src/AddUp.FakeRabbitMQ.Tests/FakeModelBasicTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void BasicAck_removes_message_from_queue()
var deliveryTag = model.WorkingMessagesForUnitTests.First().Key;
model.BasicAck(deliveryTag, false);

Assert.Empty(server.Queues["my_queue"].Messages);
Assert.False(server.Queues["my_queue"].HasMessages);
}
}
}
Expand Down Expand Up @@ -252,7 +252,7 @@ public void BasicGet_does_not_remove_the_message_from_the_queue_if_not_acked(boo

_ = model.BasicGet("my_queue", autoAck);

Assert.Equal(expectedMessageCount, server.Queues["my_queue"].Messages.Count);
Assert.Equal(expectedMessageCount, server.Queues["my_queue"].MessageCount);
Assert.Equal(expectedMessageCount, model.WorkingMessagesForUnitTests.Count);
}
}
Expand Down Expand Up @@ -283,42 +283,40 @@ public void BasicNack_does_not_reenqueue_a_brand_new_message(bool requeue, int e
var deliveryTag = model.WorkingMessagesForUnitTests.First().Key;
model.BasicNack(deliveryTag, false, requeue);

Assert.Equal(expectedMessageCount, server.Queues["my_queue"].Messages.Count);
Assert.Equal(expectedMessageCount, server.Queues["my_queue"].MessageCount);
Assert.Equal(expectedMessageCount, model.WorkingMessagesForUnitTests.Count);
}
}
}

[Theory]
[InlineData(true, 1)] // If requeue param to BasicNack is true, the message that is nacked should remain in Rabbit
[InlineData(false, 0)] // If requeue param to BasicNack is false, the message that is nacked should be removed from Rabbit
[InlineData(true, 1)] // If requeue param to BasicNack is true, the message that is nacked should remain in Rabbit
public void BasicReject_does_not_reenqueue_a_brand_new_message(bool requeue, int expectedMessageCount)
{
var server = new RabbitServer();
using (var model = new FakeModel(server))
{
model.ExchangeDeclare("my_exchange", ExchangeType.Direct);
model.QueueDeclare("my_queue");
model.ExchangeBind("my_queue", "my_exchange", null);
using var model = new FakeModel(server);

var encodedMessage = Encoding.ASCII.GetBytes("hello world!");
model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage);
model.ExchangeDeclare("my_exchange", ExchangeType.Direct);
model.QueueDeclare("my_queue");
model.ExchangeBind("my_queue", "my_exchange", null);

var consumer = new EventingBasicConsumer(model);
using (var messageProcessed = new ManualResetEventSlim())
{
consumer.Received += (_, _) => messageProcessed.Set();
model.BasicConsume("my_queue", false, consumer);
Assert.True(consumer.IsRunning);
var encodedMessage = Encoding.ASCII.GetBytes("hello world!");
model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage);

messageProcessed.Wait();
var deliveryTag = model.WorkingMessagesForUnitTests.First().Key;
model.BasicReject(deliveryTag, requeue);
var consumer = new EventingBasicConsumer(model);
using var messageProcessed = new ManualResetEventSlim();

Assert.Equal(expectedMessageCount, server.Queues["my_queue"].Messages.Count);
Assert.Equal(expectedMessageCount, model.WorkingMessagesForUnitTests.Count);
}
}
consumer.Received += (_, _) => messageProcessed.Set();
model.BasicConsume("my_queue", false, consumer);
Assert.True(consumer.IsRunning);

messageProcessed.Wait();
var deliveryTag = model.WorkingMessagesForUnitTests.First().Key;
model.BasicReject(deliveryTag, requeue);

Assert.Equal(expectedMessageCount, server.Queues["my_queue"].MessageCount);
Assert.Equal(expectedMessageCount, model.WorkingMessagesForUnitTests.Count);
}

[Fact]
Expand All @@ -336,8 +334,11 @@ public void BasicPublish_publishes_message()

model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage);

Assert.Single(server.Queues["my_queue"].Messages);
Assert.Equal(encodedMessage, server.Queues["my_queue"].Messages.First().Body);
Assert.Equal(1, server.Queues["my_queue"].MessageCount);
if (!server.Queues["my_queue"].TryPeekForUnitTests(out var peeked))
Assert.Fail("No message in queue");
else
Assert.Equal(encodedMessage, peeked.Body);
}
}

Expand All @@ -354,8 +355,11 @@ public void BasicPublish_to_default_exchange_publishes_message()

model.BasicPublish("", "my_queue", model.CreateBasicProperties(), encodedMessage);

Assert.Single(server.Queues["my_queue"].Messages);
Assert.Equal(encodedMessage, server.Queues["my_queue"].Messages.First().Body);
Assert.Equal(1, server.Queues["my_queue"].MessageCount);
if (!server.Queues["my_queue"].TryPeekForUnitTests(out var peeked))
Assert.Fail("No message in queue");
else
Assert.Equal(encodedMessage, peeked.Body);
}
}

Expand Down Expand Up @@ -505,10 +509,11 @@ public void BasicPublishBatch_publishes_messages()
batch.Add("my_exchange", null, true, model.CreateBasicProperties(), encodedMessages[1]);
batch.Publish();

Assert.Equal(2, server.Queues["my_queue"].Messages.Count);
Assert.Equal(2, server.Queues["my_queue"].MessageCount);

var index = 0;
foreach (var item in server.Queues["my_queue"].Messages)
var items = server.Queues["my_queue"].GetAllMessagesForUnitTests();
foreach (var item in items)
{
Assert.Equal(encodedMessages[index].ToArray(), item.Body);
index++;
Expand Down
103 changes: 75 additions & 28 deletions src/AddUp.FakeRabbitMQ.Tests/FakeModelMiscTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Xunit;
Expand Down Expand Up @@ -56,43 +57,89 @@ public void MessageCount_returns_the_number_of_messages_in_the_queue()
public void MessageCount_returns_the_number_of_non_consumed_messages_in_the_queue()
{
var server = new RabbitServer();
using (var model = new FakeModel(server))
using var model = new FakeModel(server);

const string queueName = "myQueue";
model.QueueDeclare(queueName);
model.ExchangeDeclare("my_exchange", ExchangeType.Direct);
model.ExchangeBind(queueName, "my_exchange", null);

for (var i = 0; i < 10; i++)
{
const string queueName = "myQueue";
model.QueueDeclare(queueName);
model.ExchangeDeclare("my_exchange", ExchangeType.Direct);
model.ExchangeBind(queueName, "my_exchange", null);
var message = $"hello world: {i}";
var encodedMessage = Encoding.ASCII.GetBytes(message);
model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage);
}

for (var i = 0; i < 10; i++)
{
// Consume 4 messages
const string consumerTag = "consumer-tag";
var consumer = new EventingBasicConsumer(model);
var consumptionCount = 0;
using var messagesProcessed = new ManualResetEventSlim();

consumer.Received += (s, e) =>
{
consumptionCount++;
if (consumptionCount > 4) return;
model.BasicAck(e.DeliveryTag, false);
if (consumptionCount == 4)
messagesProcessed.Set();
};

model.BasicConsume(queueName, false, consumerTag, consumer);

messagesProcessed.Wait();
Assert.Equal(6u, model.MessageCount(queueName));
}

[Fact]
public async Task MessageCount_returns_the_number_of_non_consumed_messages_in_the_queue_autoAck_mode()
{
var server = new RabbitServer();
using var model = new FakeModel(server);

const string queueName = "myQueue";
model.QueueDeclare(queueName);
model.ExchangeDeclare("my_exchange", ExchangeType.Direct);
model.ExchangeBind(queueName, "my_exchange", null);

void publishMessages(int startIndex, int count)
{
for (var i = startIndex; i < startIndex + count; i++)
{
var message = $"hello world: {i}";
var encodedMessage = Encoding.ASCII.GetBytes(message);
model.BasicPublish("my_exchange", null, model.CreateBasicProperties(), encodedMessage);
}
}

// Consume 4 messages
var consumer = new EventingBasicConsumer(model);
var consumptionCount = 0;
using (var messagesProcessed = new ManualResetEventSlim())
{
consumer.Received += (s, e) =>
{
if (consumptionCount >= 4)
{
messagesProcessed.Set();
return;
}
model.BasicAck(e.DeliveryTag, false);
consumptionCount++;
};

model.BasicConsume(queueName, true, consumer);
messagesProcessed.Wait();
Assert.Equal(6u, model.MessageCount(queueName));
}
publishMessages(0, 4);

// Consume 4 messages
const string consumerTag = "consumer-tag";
var consumer = new EventingBasicConsumer(model);
var consumptionCount = 0;
using var messagesProcessed = new ManualResetEventSlim();

void consume(object sender, BasicDeliverEventArgs e)
{
consumptionCount++;
if (consumptionCount >= 4)
messagesProcessed.Set();
}

consumer.Received += consume;

model.BasicConsume(queueName, true, consumerTag, consumer);
messagesProcessed.Wait();
model.BasicCancel(consumerTag);

publishMessages(4, 6); // Publish another 6 messages
await Task.Delay(1000); // They will never be consumed

Assert.Equal(4, consumptionCount);
Assert.Equal(6u, model.MessageCount(queueName));
}

[Fact]
Expand Down
39 changes: 25 additions & 14 deletions src/AddUp.FakeRabbitMQ.Tests/FakeModelQueueTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Collections.Generic;
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
using Xunit;
Expand All @@ -10,6 +12,8 @@ namespace AddUp.RabbitMQ.Fakes;
[ExcludeFromCodeCoverage]
public class FakeModelQueueTests
{
private long lastDeliveryTag; // USed to simulate generation of the delivery tag by FakeModel

[Fact]
public void QueueBind_binds_an_exchange_to_a_queue()
{
Expand Down Expand Up @@ -202,20 +206,20 @@ public void QueuePurge_removes_all_messages_from_specified_queue()
using (var model = new FakeModel(node))
{
model.QueueDeclare("my_other_queue");
node.Queues["my_other_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_other_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_other_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_other_queue"].Enqueue(MakeRabbitMessage());

model.QueueDeclare("my_queue");
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());

var count = model.QueuePurge("my_queue");
Assert.Equal(4u, count);

Assert.True(node.Queues["my_queue"].Messages.IsEmpty);
Assert.False(node.Queues["my_other_queue"].Messages.IsEmpty);
Assert.False(node.Queues["my_queue"].HasMessages);
Assert.True(node.Queues["my_other_queue"].HasMessages);
}
}

Expand All @@ -226,15 +230,22 @@ public void QueuePurge_returns_0_if_queue_does_not_exist()
using (var model = new FakeModel(node))
{
model.QueueDeclare("my_queue");
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Messages.Enqueue(new RabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());
node.Queues["my_queue"].Enqueue(MakeRabbitMessage());

var count = model.QueuePurge("my_other_queue");
Assert.Equal(0u, count);

Assert.False(node.Queues["my_queue"].Messages.IsEmpty);
Assert.True(node.Queues["my_queue"].HasMessages);
}
}

private RabbitMessage MakeRabbitMessage()
{
_ = Interlocked.Increment(ref lastDeliveryTag);
var deliveryTag = Convert.ToUInt64(lastDeliveryTag);
return new(deliveryTag);
}
}
42 changes: 35 additions & 7 deletions src/AddUp.FakeRabbitMQ.Tests/Repros/BasicAckSemanticTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public void Ack_DeliveryTagsAreRespected()

// setup a basic consumer that stores delivery tags to the deliveryTags list
var consumer = new EventingBasicConsumer(model);
consumer.Received += (_, args) =>
consumer.Received += (_, e) =>
{
deliveryTags.Add(args.DeliveryTag);
deliveryTags.Add(e.DeliveryTag);
if (deliveryTags.Count == 2)
allMessagesDelivered.Set();
};
Expand All @@ -68,10 +68,10 @@ public void Ack_DeliveryTagsAreRespected()
model.BasicAck(deliveryTags[1], false);

// asserts that only one message is still queued
Assert.True(rabbitServer.Queues[queueName].Messages.Count == 1, "Only one message is still queued");
Assert.True(rabbitServer.Queues[queueName].MessageCount == 1, "Only one message is still queued");

// asserts that the remaining message in queue is the first one, since we acked the second
rabbitServer.Queues[queueName].Messages.TryPeek(out var pendingMessage);
rabbitServer.Queues[queueName].TryPeekForUnitTests(out var pendingMessage);
Assert.True(Encoding.UTF8.GetString(pendingMessage.Body) == "first", "The remaining message in queue is the first one");
}

Expand All @@ -83,9 +83,9 @@ public void Ack_Multiple()

// setup a basic consumer that stores delivery tags to the deliveryTags list
var consumer = new EventingBasicConsumer(model);
consumer.Received += (_, args) =>
consumer.Received += (_, e) =>
{
deliveryTags.Add(args.DeliveryTag);
deliveryTags.Add(e.DeliveryTag);
if (deliveryTags.Count == 2)
allMessagesDelivered.Set();
};
Expand All @@ -103,6 +103,34 @@ public void Ack_Multiple()
model.BasicAck(deliveryTags[1], true);

// asserts queue is empty
Assert.Empty(rabbitServer.Queues[queueName].Messages);
Assert.False(rabbitServer.Queues[queueName].HasMessages);
}

[Fact]
public void AutoAck_is_honored_by_BasicConsume()
{
using var allMessagesDelivered = new ManualResetEventSlim();
var deliveryTags = new List<ulong>();

// setup a basic consumer that stores delivery tags to the deliveryTags list
var consumer = new EventingBasicConsumer(model);
consumer.Received += (_, e) =>
{
deliveryTags.Add(e.DeliveryTag);
if (deliveryTags.Count == 2)
allMessagesDelivered.Set();
};

model.BasicConsume(queueName, true, consumer);

// publish two messages
model.BasicPublish(exchange, routingKey, true, null, Encoding.UTF8.GetBytes("first"));
model.BasicPublish(exchange, routingKey, true, null, Encoding.UTF8.GetBytes("second"));

// wait for both messages to be delivered
allMessagesDelivered.Wait(timeout * 1000000);

// asserts queue is empty
Assert.False(rabbitServer.Queues[queueName].HasMessages);
}
}
Loading

0 comments on commit 6f990d4

Please sign in to comment.