diff --git a/src/AWS.Messaging/Services/DefaultMessageManager.cs b/src/AWS.Messaging/Services/DefaultMessageManager.cs
index 8fb6948..1788bd3 100644
--- a/src/AWS.Messaging/Services/DefaultMessageManager.cs
+++ b/src/AWS.Messaging/Services/DefaultMessageManager.cs
@@ -36,7 +36,8 @@ public DefaultMessageManager(IMessagePoller messagePoller, IHandlerInvoker handl
}
///
- public int ActiveMessageCount {
+ public int ActiveMessageCount
+ {
get
{
lock (_activeMessageCountLock)
@@ -44,22 +45,30 @@ public int ActiveMessageCount {
return _activeMessageCount;
}
}
- set
+ }
+
+ ///
+ /// Updates in a threadsafe manner
+ ///
+ /// Difference to apply to the current active message count
+ /// Updated
+ private int UpdateActiveMessageCount(int delta)
+ {
+ lock (_activeMessageCountLock)
{
- lock (_activeMessageCountLock)
- {
- _logger.LogTrace("Updating {activeMessageCount} from {before} to {after}", nameof(ActiveMessageCount), ActiveMessageCount, value);
+ var newValue = _activeMessageCount + delta;
+ _logger.LogTrace("Updating {activeMessageCount} from {before} to {after}", nameof(ActiveMessageCount), _activeMessageCount, newValue);
- var isDecrementing = value < _activeMessageCount;
- _activeMessageCount = value;
+ _activeMessageCount = newValue;
- // If we just decremented the active message count, signal to the poller
- // that there may be more capacity available again.
- if (isDecrementing)
- {
- _activeMessageCountDecrementedEvent.Set();
- }
+ // If we just decremented the active message count, signal to the poller
+ // that there may be more capacity available again.
+ if (delta < 0)
+ {
+ _activeMessageCountDecrementedEvent.Set();
}
+
+ return _activeMessageCount;
}
}
@@ -86,11 +95,10 @@ public Task WaitAsync(TimeSpan timeout)
return Task.CompletedTask;
}
-
///
public async Task ProcessMessageAsync(MessageEnvelope messageEnvelope, SubscriberMapping subscriberMapping, CancellationToken token = default)
{
- ActiveMessageCount++;
+ UpdateActiveMessageCount(1);
var handlerTask = _handlerInvoker.InvokeAsync(messageEnvelope, subscriberMapping, token);
@@ -132,7 +140,7 @@ public async Task ProcessMessageAsync(MessageEnvelope messageEnvelope, Subscribe
_logger.LogError(handlerTask.Exception, "Message handling failed unexpectedly for message ID {messageId}", messageEnvelope.Id);
}
- ActiveMessageCount--;
+ UpdateActiveMessageCount(-1);
}
///
@@ -149,6 +157,8 @@ private void StartMessageVisibilityExtensionTaskIfNotRunning(CancellationToken t
if (_visibilityTimeoutExtensionTask == null || _visibilityTimeoutExtensionTask.IsCompleted)
{
_visibilityTimeoutExtensionTask = ExtendUnfinishedMessageVisibilityTimeoutBatch(token);
+
+ _logger.LogTrace("Started task with id {id} to extend the visibility timeout of in flight messages", _visibilityTimeoutExtensionTask.Id);
}
}
}
diff --git a/test/AWS.Messaging.UnitTests/DefaultMessageManagerTests.cs b/test/AWS.Messaging.UnitTests/DefaultMessageManagerTests.cs
index 6612929..a9de73b 100644
--- a/test/AWS.Messaging.UnitTests/DefaultMessageManagerTests.cs
+++ b/test/AWS.Messaging.UnitTests/DefaultMessageManagerTests.cs
@@ -57,6 +57,9 @@ public async Task DefaultMessageManager_ManagesMessageSuccess()
Assert.Equal(0, manager.ActiveMessageCount);
}
+ ///
+ /// Happy path test for a failed message handling, that it is not deleted from the queue at the end
+ ///
[Fact]
public async Task DefaultMessageManager_ManagesMessageFailed()
{
@@ -93,6 +96,10 @@ public async Task DefaultMessageManager_ManagesMessageFailed()
Assert.Equal(0, manager.ActiveMessageCount);
}
+ ///
+ /// Tests that the manager extends the visibility timeout when the message handler
+ /// takes longer than the refresh interval
+ ///
[Fact]
public async Task DefaultMessageManager_RefreshesLongHandler()
{
@@ -135,6 +142,36 @@ public async Task DefaultMessageManager_RefreshesLongHandler()
Assert.Equal(0, manager.ActiveMessageCount);
}
+ ///
+ /// Queues many message handling tasks for a single message manager to ensure that it
+ /// is managing its active message count in a threadsafe manner
+ ///
+ [Fact]
+ public async Task DefaultMessageManager_CountsActiveMessagesCorrectly()
+ {
+ var mockPoller = CreateMockPoller(messageVisibilityRefreshInterval: 1);
+ var mockHandlerInvoker = CreateMockHandlerInvoker(MessageProcessStatus.Success(), TimeSpan.FromSeconds(1));
+
+ var manager = new DefaultMessageManager(mockPoller.Object, mockHandlerInvoker.Object, new NullLogger());
+ var subscriberMapping = new SubscriberMapping(typeof(ChatMessageHandler), typeof(ChatMessage));
+
+ var tasks = new List();
+
+ for (int i = 0; i < 100; i++)
+ {
+ var messsageEnvelope = new MessageEnvelope()
+ {
+ Id = i.ToString()
+ };
+ tasks.Add(manager.ProcessMessageAsync(messsageEnvelope, subscriberMapping));
+ }
+
+ await Task.WhenAll(tasks);
+
+ // Verify that the active message count was deprecated back to 0
+ Assert.Equal(0, manager.ActiveMessageCount);
+ }
+
///
/// Mocks a message poller with the given visibility refresh interval
///