Skip to content

Commit

Permalink
Adds the initial DefaultMessageManager implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ashovlin committed Apr 19, 2023
1 parent a15e361 commit 26799f8
Show file tree
Hide file tree
Showing 12 changed files with 487 additions and 60 deletions.
3 changes: 2 additions & 1 deletion src/AWS.Messaging/Configuration/MessageBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public IMessageBusBuilder AddSQSPoller(string queueUrl, Action<SQSMessagePollerO
{
MaxNumberOfConcurrentMessages = sqsMessagePollerOptions.MaxNumberOfConcurrentMessages,
VisibilityTimeout = sqsMessagePollerOptions.VisibilityTimeout,
VisibilityTimeoutExtensionInterval = sqsMessagePollerOptions.VisibilityTimeoutExtensionInterval,
WaitTimeSeconds = sqsMessagePollerOptions.WaitTimeSeconds
};

Expand Down Expand Up @@ -147,7 +148,7 @@ internal void Build(IServiceCollection services)
if (_messageConfiguration.MessagePollerConfigurations.Any())
{
services.AddHostedService<MessagePumpService>();
services.TryAddSingleton<HandlerInvoker>();
services.TryAddSingleton<IHandlerInvoker, HandlerInvoker>();
services.TryAddSingleton<IMessagePollerFactory, DefaultMessagePollerFactory>();
services.TryAddSingleton<IMessageManagerFactory, DefaultMessageManagerFactory>();

Expand Down
21 changes: 18 additions & 3 deletions src/AWS.Messaging/Configuration/SQSMessagePollerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@ internal class SQSMessagePollerConfiguration : IMessagePollerConfiguration
/// Default value for <see cref="VisibilityTimeout"/>
/// </summary>
/// <remarks>The default value is 20 seconds.</remarks>
public const int DEFAULT_VISBILITY_TIMEOUT_SECONDS = 20;
public const int DEFAULT_VISIBILITY_TIMEOUT_SECONDS = 20;

/// <summary>
/// Default value for <see cref="VisibilityTimeoutExtensionInterval"/>
/// </summary>
/// <remarks>The default value is 18 seconds.</remarks>
public const int DEFAULT_VISIBILITY_TIMEOUT_EXTENSION_INTERVAL_SECONDS = 18;

/// <summary>
/// Default value for <see cref="WaitTimeSeconds"/>
Expand All @@ -43,10 +49,19 @@ internal class SQSMessagePollerConfiguration : IMessagePollerConfiguration
/// <inheritdoc cref="ReceiveMessageRequest.VisibilityTimeout"/>
/// </summary>
/// <remarks>
/// <inheritdoc cref="DEFAULT_VISBILITY_TIMEOUT_SECONDS" path="//remarks"/>
/// <inheritdoc cref="DEFAULT_VISIBILITY_TIMEOUT_SECONDS" path="//remarks"/>
/// The minimum is 0 seconds. The maximum is 12 hours.
/// </remarks>
public int VisibilityTimeout { get; init; } = DEFAULT_VISBILITY_TIMEOUT_SECONDS;
public int VisibilityTimeout { get; init; } = DEFAULT_VISIBILITY_TIMEOUT_SECONDS;

/// <summary>
/// How often in seconds to extend the visibility timeout of messages that have been
/// received from SQS but are still being proceessed
/// </summary>
/// <remarks>
/// <inheritdoc cref="DEFAULT_VISIBILITY_TIMEOUT_EXTENSION_INTERVAL_SECONDS" path="//remarks"/>
/// </remarks>
public int VisibilityTimeoutExtensionInterval { get; init; } = DEFAULT_VISIBILITY_TIMEOUT_EXTENSION_INTERVAL_SECONDS;

/// <summary>
/// <inheritdoc cref="ReceiveMessageRequest.WaitTimeSeconds"/>
Expand Down
17 changes: 16 additions & 1 deletion src/AWS.Messaging/Configuration/SQSMessagePollerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ public class SQSMessagePollerOptions
public int MaxNumberOfConcurrentMessages { get; set; } = SQSMessagePollerConfiguration.DEFAULT_MAX_NUMBER_OF_CONCURRENT_MESSAGES;

/// <inheritdoc cref="SQSMessagePollerConfiguration.VisibilityTimeout"/>
public int VisibilityTimeout { get; set; } = SQSMessagePollerConfiguration.DEFAULT_VISBILITY_TIMEOUT_SECONDS;
public int VisibilityTimeout { get; set; } = SQSMessagePollerConfiguration.DEFAULT_VISIBILITY_TIMEOUT_SECONDS;

/// <inheritdoc cref="SQSMessagePollerConfiguration.WaitTimeSeconds"/>
public int WaitTimeSeconds { get; set; } = SQSMessagePollerConfiguration.DEFAULT_WAIT_TIME_SECONDS;

/// <inheritdoc cref="SQSMessagePollerConfiguration.VisibilityTimeoutExtensionInterval"/>
public int VisibilityTimeoutExtensionInterval { get; set; } = SQSMessagePollerConfiguration.DEFAULT_VISIBILITY_TIMEOUT_EXTENSION_INTERVAL_SECONDS;

/// <summary>
/// Validates that the options are valid against the message framework's and/or SQS limits
/// </summary>
Expand All @@ -44,6 +47,18 @@ internal void Validate()
errorMessages.Add($"{nameof(WaitTimeSeconds)} must be between 0 seconds and 20 seconds. Current value: {WaitTimeSeconds}.");
}

if (VisibilityTimeoutExtensionInterval <= 0)
{
errorMessages.Add($"{nameof(VisibilityTimeoutExtensionInterval)} must be greater than 0. Current value: {VisibilityTimeoutExtensionInterval}.");
}

if (VisibilityTimeoutExtensionInterval >= VisibilityTimeout)
{
errorMessages.Add($"{nameof(VisibilityTimeoutExtensionInterval)} ({VisibilityTimeoutExtensionInterval} seconds) " +
$"must be less than {nameof(VisibilityTimeout)} ({VisibilityTimeout} seconds), " +
$"or else other consumers may receive the message while it is still being processed.");
}

if (errorMessages.Any())
{
throw new InvalidSQSMessagePollerOptionsException(string.Join(Environment.NewLine, errorMessages));
Expand Down
36 changes: 24 additions & 12 deletions src/AWS.Messaging/SQS/SQSMessagePoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ internal class SQSMessagePoller : IMessagePoller
private readonly IEnvelopeSerializer _envelopeSerializer;

/// <summary>
/// The number of milliseconds to pause if already processing the configured maximum number of messages
/// Maximum valid value for <see cref="ReceiveMessageRequest.MaxNumberOfMessages"/>
/// </summary>
private const int CONCURRENCY_LIMIT_PAUSE_MILLIS = 50;
private const int SQS_MAX_NUMBER_MESSAGES_TO_READ = 10;

/// <summary>
/// Maximum valid value for <see cref="ReceiveMessageRequest.MaxNumberOfMessages"/>
/// The maximum amount of time a polling iteration should pause for while waiting
/// for in flight messages to finish processing
/// </summary>
private const int SQS_MAX_NUMBER_MESSAGES_TO_READ = 10;
private static readonly TimeSpan CONCURRENT_CAPACITY_WAIT_TIMEOUT = TimeSpan.FromMinutes(1);

/// <summary>
/// Creates instance of <see cref="AWS.Messaging.SQS.SQSMessagePoller" />
Expand All @@ -54,6 +55,10 @@ public SQSMessagePoller(
_messageManager = messageManagerFactory.CreateMessageManager(this);
}

/// <inheritdoc/>
public int VisibilityTimeoutExtensionInterval => _configuration.VisibilityTimeoutExtensionInterval;


/// <inheritdoc/>
public async Task StartPollingAsync(CancellationToken token = default)
{
Expand All @@ -70,11 +75,14 @@ private async Task PollQueue(CancellationToken token)
{
var numberOfMessagesToRead = _configuration.MaxNumberOfConcurrentMessages - _messageManager.ActiveMessageCount;

// If already processing the maximum number of messages, block and then try again
// TODO: change this to a signal once DefaultMessageManager is implemented
// If already processing the maximum number of messages, wait for at least one to complete and then try again
if (numberOfMessagesToRead <= 0)
{
await Task.Delay(CONCURRENCY_LIMIT_PAUSE_MILLIS);
_logger.LogTrace("The maximum number of {max} concurrent messages is already being processed. " +
"Waiting for one or more to complete for a maximum of {timeout} seconds before attempting to poll again.",
_configuration.MaxNumberOfConcurrentMessages, CONCURRENT_CAPACITY_WAIT_TIMEOUT.TotalSeconds);

await _messageManager.WaitAsync(CONCURRENT_CAPACITY_WAIT_TIMEOUT);
continue;
}

Expand Down Expand Up @@ -108,7 +116,9 @@ private async Task PollQueue(CancellationToken token)
foreach (var message in receiveMessageResponse.Messages)
{
var messageEnvelopeResult = await _envelopeSerializer.ConvertToEnvelopeAsync(message);
_messageManager.StartProcessMessage(messageEnvelopeResult.Envelope, messageEnvelopeResult.Mapping);

// Don't await this result, we want to process multiple messages concurrently
_ = _messageManager.ProcessMessageAsync(messageEnvelopeResult.Envelope, messageEnvelopeResult.Mapping, token);
}
}
catch (AWSMessagingException)
Expand All @@ -135,7 +145,7 @@ private async Task PollQueue(CancellationToken token)
}

/// <inheritdoc/>
public async Task DeleteMessagesAsync(IEnumerable<MessageEnvelope> messages)
public async Task DeleteMessagesAsync(IEnumerable<MessageEnvelope> messages, CancellationToken token = default)
{
if (messages.Count() == 0)
{
Expand All @@ -155,6 +165,7 @@ public async Task DeleteMessagesAsync(IEnumerable<MessageEnvelope> messages)
message.Id, message.SQSMetadata.ReceiptHandle, _configuration.SubscriberEndpoint);
request.Entries.Add(new DeleteMessageBatchRequestEntry()
{
Id = message.Id,
ReceiptHandle = message.SQSMetadata.ReceiptHandle
});
}
Expand All @@ -169,7 +180,7 @@ public async Task DeleteMessagesAsync(IEnumerable<MessageEnvelope> messages)

try
{
var response = await _sqsClient.DeleteMessageBatchAsync(request);
var response = await _sqsClient.DeleteMessageBatchAsync(request, token);

foreach (var successMessage in response.Successful)
{
Expand Down Expand Up @@ -200,7 +211,7 @@ public async Task DeleteMessagesAsync(IEnumerable<MessageEnvelope> messages)
}

/// <inheritdoc/>
public async Task ExtendMessageVisiblityTimeoutAsync(IEnumerable<MessageEnvelope> messages)
public async Task ExtendMessageVisibilityTimeoutAsync(IEnumerable<MessageEnvelope> messages, CancellationToken token = default)
{
if (messages.Count() == 0)
{
Expand All @@ -220,6 +231,7 @@ public async Task ExtendMessageVisiblityTimeoutAsync(IEnumerable<MessageEnvelope
message.Id, message.SQSMetadata.ReceiptHandle, _configuration.VisibilityTimeout);
request.Entries.Add(new ChangeMessageVisibilityBatchRequestEntry
{
Id = message.Id,
ReceiptHandle = message.SQSMetadata.ReceiptHandle,
VisibilityTimeout = _configuration.VisibilityTimeout
});
Expand All @@ -235,7 +247,7 @@ public async Task ExtendMessageVisiblityTimeoutAsync(IEnumerable<MessageEnvelope

try
{
var response = await _sqsClient.ChangeMessageVisibilityBatchAsync(request);
var response = await _sqsClient.ChangeMessageVisibilityBatchAsync(request, token);

foreach (var successMessage in response.Successful)
{
Expand Down
168 changes: 159 additions & 9 deletions src/AWS.Messaging/Services/DefaultMessageManager.cs
Original file line number Diff line number Diff line change
@@ -1,30 +1,180 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Collections.Concurrent;
using AWS.Messaging.Configuration;
using Microsoft.Extensions.Logging;

namespace AWS.Messaging.Services;

/// <inheritdoc/>
/// <inheritdoc cref="IMessageManager"/>
public class DefaultMessageManager : IMessageManager
{
private readonly IMessagePoller _messagePoller;
private readonly HandlerInvoker _handlerInvoker;
/// <inheritdoc/>
public DefaultMessageManager(IMessagePoller messagePoller, HandlerInvoker handlerInvoker)
private readonly IHandlerInvoker _handlerInvoker;
private readonly ILogger<DefaultMessageManager> _logger;

private readonly object _activeMessageCountLock = new object();
private int _activeMessageCount;
private readonly ManualResetEventSlim _activeMessageCountDecrementedEvent = new ManualResetEventSlim(false);

private readonly ConcurrentDictionary<Task, MessageEnvelope> _runningHandlerTasks = new();
private readonly object _visibilityTimeoutExtensionTaskLock = new object();
private Task? _visibilityTimeoutExtensionTask;

/// <summary>
/// Constructs an instance of <see cref="DefaultMessageManager"/>
/// </summary>
/// <param name="messagePoller">The poller that this manager is managing messages for</param>
/// <param name="handlerInvoker">Used to look up and invoke the correct handler for each message</param>
/// <param name="logger">Logger for debugging information</param>
public DefaultMessageManager(IMessagePoller messagePoller, IHandlerInvoker handlerInvoker, ILogger<DefaultMessageManager> logger)
{
_messagePoller = messagePoller;
_handlerInvoker = handlerInvoker;
_logger = logger;
}

/// <inheritdoc/>
public int ActiveMessageCount {
get
{
lock (_activeMessageCountLock)
{
return _activeMessageCount;
}
}
set
{
lock (_activeMessageCountLock)
{
_logger.LogTrace("Updating {activeMessageCount} from {before} to {after}", nameof(ActiveMessageCount), ActiveMessageCount, value);

var isDecrementing = value < _activeMessageCount;
_activeMessageCount = value;

// If we just decremented the active message count, signal to the poller
// that there may be more capacity available again.
if (isDecrementing)
{
_activeMessageCountDecrementedEvent.Set();
}
}
}
}

/// <inheritdoc/>
public int ActiveMessageCount { get; set; }
public Task WaitAsync(TimeSpan timeout)
{
_logger.LogTrace("Beginning wait for {name} for a maximum of {timeout} seconds", nameof(_activeMessageCountDecrementedEvent), timeout.TotalSeconds);

// TODO: Rework to avoid this synchronous wait.
// See https://github.com/dotnet/runtime/issues/35962 for potential workarounds
var wasSet = _activeMessageCountDecrementedEvent.Wait(timeout);

// Don't reset if we hit the timeout
if (wasSet)
{
_logger.LogTrace("{name} was set, resetting the event", nameof(_activeMessageCountDecrementedEvent));
_activeMessageCountDecrementedEvent.Reset();
}
else
{
_logger.LogTrace("Timed out at {timeout} seconds while waiting for {name}, returning.", timeout.TotalSeconds, nameof(_activeMessageCountDecrementedEvent));
}

return Task.CompletedTask;
}


/// <inheritdoc/>
public void StartProcessMessage(MessageEnvelope messageEnvelope, SubscriberMapping subscriberMapping)
public async Task ProcessMessageAsync(MessageEnvelope messageEnvelope, SubscriberMapping subscriberMapping, CancellationToken token = default)
{
ActiveMessageCount++;

var handlerTask = _handlerInvoker.InvokeAsync(messageEnvelope, subscriberMapping, token);

// Add it to the dictionary of running task, used to extend the visibility timeout if necessary
_runningHandlerTasks.TryAdd(handlerTask, messageEnvelope);

StartMessageVisibilityExtensionTaskIfNotRunning(token);

// Wait for the handler to finish processing the message
try
{
await handlerTask;
}
catch (AWSMessagingException)
{
// Swallow exceptions thrown by the framework, and rely on the thrower to log
}
catch (Exception ex)
{
_logger.LogError(ex, "An unknown exception occurred while processing message ID {subscriberEndpoint}", messageEnvelope.Id);
}

_runningHandlerTasks.Remove(handlerTask, out _);

if (handlerTask.IsCompletedSuccessfully)
{
if (handlerTask.Result.IsSuccess)
{
// Delete the message from the queue if it was processed successfully
await _messagePoller.DeleteMessagesAsync(new MessageEnvelope[] { messageEnvelope });
}
else // the handler still finished, but returned MessageProcessStatus.Failed
{
_logger.LogError("Message handling completed unsuccessfully for message ID {messageId}", messageEnvelope.Id);
}
}
else if (handlerTask.IsFaulted)
{
_logger.LogError(handlerTask.Exception, "Message handling failed unexpectedly for message ID {messageId}", messageEnvelope.Id);
}

ActiveMessageCount--;
}

/// <summary>
/// Starts the task that extends the visibility timeout of in-flight messages
/// </summary>
/// <param name="token">Cancellation token to stop the visibility timeout extension task</param>
private void StartMessageVisibilityExtensionTaskIfNotRunning(CancellationToken token)
{
// It may either have been never started, or previously started and completed because there were no more in flight messages
if (_visibilityTimeoutExtensionTask == null || _visibilityTimeoutExtensionTask.IsCompleted)
{
lock(_visibilityTimeoutExtensionTaskLock)
{
if (_visibilityTimeoutExtensionTask == null || _visibilityTimeoutExtensionTask.IsCompleted)
{
_visibilityTimeoutExtensionTask = ExtendUnfinishedMessageVisibilityTimeoutBatch(token);
}
}
}
}

/// <summary>
/// Extends the visibility timeout periodically for messages whose corresponding handler task is not yet complete
/// </summary>
/// <param name="token">Cancellation token to stop the visibility timeout extension loop</param>
private async Task ExtendUnfinishedMessageVisibilityTimeoutBatch(CancellationToken token)
{
// TODO: a follow-up PR will handle managing this task, updating ActiveMessageCount, deleting the message when done.
// This commit is just getting the HandlerInvoker in place
var task = _handlerInvoker.InvokeAsync(messageEnvelope, subscriberMapping);
IEnumerable<MessageEnvelope> unfinishedMessages;

do
{
// Wait for the configured interval before extending visibility
await Task.Delay(_messagePoller.VisibilityTimeoutExtensionInterval * 1000, token);

// Select the message envelopes whose corresponding handler task is not yet complete
unfinishedMessages = _runningHandlerTasks.Values;

// TODO: The envelopes in _runningHandlerTasks may have been received at different times, we could track + extend visibility separately
// TODO: The underlying ChangeMessageVisibilityBatch only takes up to 10 messages, we may need to slice and make multiple calls
// TODO: Handle the race condition where a message could have finished handling and be deleted concurrently
await _messagePoller.ExtendMessageVisibilityTimeoutAsync(unfinishedMessages);

} while (unfinishedMessages.Count() > 0 && !token.IsCancellationRequested);
}
}
Loading

0 comments on commit 26799f8

Please sign in to comment.