Skip to content

Commit

Permalink
fix: Ensure ActiveMessageCount is updated in a thread safe manner
Browse files Browse the repository at this point in the history
  • Loading branch information
ashovlin committed Apr 21, 2023
1 parent 7b65e5f commit fb5857a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 16 deletions.
42 changes: 26 additions & 16 deletions src/AWS.Messaging/Services/DefaultMessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,39 @@ public DefaultMessageManager(IMessagePoller messagePoller, IHandlerInvoker handl
}

/// <inheritdoc/>
public int ActiveMessageCount {
public int ActiveMessageCount
{
get
{
lock (_activeMessageCountLock)
{
return _activeMessageCount;
}
}
set
}

/// <summary>
/// Updates <see cref="ActiveMessageCount"/> in a thread safe manner
/// </summary>
/// <param name="delta">Difference to apply to the current active message count</param>
/// <returns>Updated <see cref="ActiveMessageCount"/></returns>
private int UpdateActiveMessageCount(int delta)
{
lock (_activeMessageCountLock)
{
lock (_activeMessageCountLock)
{
_logger.LogTrace("Updating {ActiveMessageCount} from {Before} to {After}", nameof(ActiveMessageCount), ActiveMessageCount, value);
var newValue = _activeMessageCount + delta;
_logger.LogTrace("Updating {ActiveMessageCount} from {Before} to {After}", nameof(ActiveMessageCount), _activeMessageCount, newValue);

var isDecrementing = value < _activeMessageCount;
_activeMessageCount = value;
_activeMessageCount = newValue;

// If we just decremented the active message count, signal to the poller
// that there may be more capacity available again.
if (isDecrementing)
{
_activeMessageCountDecrementedEvent.Set();
}
// If we just decremented the active message count, signal to the poller
// that there may be more capacity available again.
if (delta < 0)
{
_activeMessageCountDecrementedEvent.Set();
}

return _activeMessageCount;
}
}

Expand All @@ -86,11 +95,10 @@ public Task WaitAsync(TimeSpan timeout)
return Task.CompletedTask;
}


/// <inheritdoc/>
public async Task ProcessMessageAsync(MessageEnvelope messageEnvelope, SubscriberMapping subscriberMapping, CancellationToken token = default)
{
ActiveMessageCount++;
UpdateActiveMessageCount(1);

var handlerTask = _handlerInvoker.InvokeAsync(messageEnvelope, subscriberMapping, token);

Expand Down Expand Up @@ -132,7 +140,7 @@ public async Task ProcessMessageAsync(MessageEnvelope messageEnvelope, Subscribe
_logger.LogError(handlerTask.Exception, "Message handling failed unexpectedly for message ID {MessageId}", messageEnvelope.Id);
}

ActiveMessageCount--;
UpdateActiveMessageCount(-1);
}

/// <summary>
Expand All @@ -149,6 +157,8 @@ private void StartMessageVisibilityExtensionTaskIfNotRunning(CancellationToken t
if (_visibilityTimeoutExtensionTask == null || _visibilityTimeoutExtensionTask.IsCompleted)
{
_visibilityTimeoutExtensionTask = ExtendUnfinishedMessageVisibilityTimeoutBatch(token);

_logger.LogTrace("Started task with id {id} to extend the visibility timeout of in flight messages", _visibilityTimeoutExtensionTask.Id);
}
}
}
Expand Down
37 changes: 37 additions & 0 deletions test/AWS.Messaging.UnitTests/DefaultMessageManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public async Task DefaultMessageManager_ManagesMessageSuccess()
Assert.Equal(0, manager.ActiveMessageCount);
}

/// <summary>
/// Happy path test for a failed message handling, that it is not deleted from the queue at the end
/// </summary>
[Fact]
public async Task DefaultMessageManager_ManagesMessageFailed()
{
Expand Down Expand Up @@ -93,6 +96,10 @@ public async Task DefaultMessageManager_ManagesMessageFailed()
Assert.Equal(0, manager.ActiveMessageCount);
}

/// <summary>
/// Tests that the manager extends the visibility timeout when the message handler
/// takes longer than the refresh interval
/// </summary>
[Fact]
public async Task DefaultMessageManager_RefreshesLongHandler()
{
Expand Down Expand Up @@ -135,6 +142,36 @@ public async Task DefaultMessageManager_RefreshesLongHandler()
Assert.Equal(0, manager.ActiveMessageCount);
}

/// <summary>
/// Queues many message handling tasks for a single message manager to ensure that it
/// is managing its active message count in a thread safe manner
/// </summary>
[Fact]
public async Task DefaultMessageManager_CountsActiveMessagesCorrectly()
{
var mockPoller = CreateMockPoller(messageVisibilityRefreshInterval: 1);
var mockHandlerInvoker = CreateMockHandlerInvoker(MessageProcessStatus.Success(), TimeSpan.FromSeconds(1));

var manager = new DefaultMessageManager(mockPoller.Object, mockHandlerInvoker.Object, new NullLogger<DefaultMessageManager>());
var subscriberMapping = new SubscriberMapping(typeof(ChatMessageHandler), typeof(ChatMessage));

var tasks = new List<Task>();

for (int i = 0; i < 100; i++)
{
var messsageEnvelope = new MessageEnvelope<ChatMessage>()
{
Id = i.ToString()
};
tasks.Add(manager.ProcessMessageAsync(messsageEnvelope, subscriberMapping));
}

await Task.WhenAll(tasks);

// Verify that the active message count was deprecated back to 0
Assert.Equal(0, manager.ActiveMessageCount);
}

/// <summary>
/// Mocks a message poller with the given visibility refresh interval
/// </summary>
Expand Down

0 comments on commit fb5857a

Please sign in to comment.