From bf61d36090a3373b98c34c8c9b23a66234948cce Mon Sep 17 00:00:00 2001 From: Philippe El Asmar <53088140+philasmar@users.noreply.github.com> Date: Mon, 24 Apr 2023 11:36:02 -0400 Subject: [PATCH] fix: extending message visibility for more than 10 entries does not fail --- AWS.Messaging.sln | 9 +- src/AWS.Messaging/SQS/SQSMessagePoller.cs | 82 ++++++++++----- .../AWS.Messaging.IntegrationTests.csproj | 1 + .../Handlers/ChatMessageHandler.cs | 19 ++++ .../Models/TempStorage.cs | 4 +- .../SubscriberTests.cs | 99 ++++++++++++++++--- .../AWS.Messaging.Tests.Common.csproj | 15 +++ .../Services/InMemoryLogger.cs | 55 +++++++++++ .../AWS.Messaging.UnitTests.csproj | 1 + .../SQSMessagePollerTests.cs | 32 +++++- 10 files changed, 274 insertions(+), 43 deletions(-) create mode 100644 test/AWS.Messaging.Tests.Common/AWS.Messaging.Tests.Common.csproj create mode 100644 test/AWS.Messaging.Tests.Common/Services/InMemoryLogger.cs diff --git a/AWS.Messaging.sln b/AWS.Messaging.sln index 777b82e..78eb979 100644 --- a/AWS.Messaging.sln +++ b/AWS.Messaging.sln @@ -23,7 +23,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sampleapps", "sampleapps", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherAPI", "sampleapps\PublisherAPI\PublisherAPI.csproj", "{A7F3DA5F-0D7A-4D8A-837D-F78F4A4E3CA7}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SubscriberService", "sampleapps\SubscriberService\SubscriberService.csproj", "{D398D75B-80F3-4D4F-AFC7-15CD61DD22C3}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubscriberService", "sampleapps\SubscriberService\SubscriberService.csproj", "{D398D75B-80F3-4D4F-AFC7-15CD61DD22C3}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Messaging.Tests.Common", "test\AWS.Messaging.Tests.Common\AWS.Messaging.Tests.Common.csproj", "{A174942B-AF9C-4935-AD7B-AF651BACE63C}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -51,6 +53,10 @@ Global {D398D75B-80F3-4D4F-AFC7-15CD61DD22C3}.Debug|Any CPU.Build.0 = Debug|Any CPU {D398D75B-80F3-4D4F-AFC7-15CD61DD22C3}.Release|Any CPU.ActiveCfg = Release|Any CPU {D398D75B-80F3-4D4F-AFC7-15CD61DD22C3}.Release|Any CPU.Build.0 = Release|Any CPU + {A174942B-AF9C-4935-AD7B-AF651BACE63C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A174942B-AF9C-4935-AD7B-AF651BACE63C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A174942B-AF9C-4935-AD7B-AF651BACE63C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A174942B-AF9C-4935-AD7B-AF651BACE63C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -61,6 +67,7 @@ Global {A4D5F664-E3A3-411A-9327-05913794021C} = {80DB2C77-6ADD-4A60-B27D-763BDF9659D3} {A7F3DA5F-0D7A-4D8A-837D-F78F4A4E3CA7} = {1AA8985B-897C-4BD5-9735-FD8B33FEBFFB} {D398D75B-80F3-4D4F-AFC7-15CD61DD22C3} = {1AA8985B-897C-4BD5-9735-FD8B33FEBFFB} + {A174942B-AF9C-4935-AD7B-AF651BACE63C} = {80DB2C77-6ADD-4A60-B27D-763BDF9659D3} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {7B2B759D-6455-4089-8173-3F1619567B36} diff --git a/src/AWS.Messaging/SQS/SQSMessagePoller.cs b/src/AWS.Messaging/SQS/SQSMessagePoller.cs index 45a4ac0..ab856c4 100644 --- a/src/AWS.Messaging/SQS/SQSMessagePoller.cs +++ b/src/AWS.Messaging/SQS/SQSMessagePoller.cs @@ -26,6 +26,11 @@ internal class SQSMessagePoller : IMessagePoller /// private const int SQS_MAX_NUMBER_MESSAGES_TO_READ = 10; + /// + /// Maximum valid value for number of messages in + /// + private const int SQS_MAX_MESSAGE_CHANGE_VISIBILITY = 10; + /// /// The maximum amount of time a polling iteration should pause for while waiting /// for in flight messages to finish processing @@ -216,18 +221,27 @@ public async Task ExtendMessageVisibilityTimeoutAsync(IEnumerable(); + + var currentRequest = new ChangeMessageVisibilityBatchRequest { QueueUrl = _configuration.SubscriberEndpoint }; - foreach (var message in messages) { if (!string.IsNullOrEmpty(message.SQSMetadata?.ReceiptHandle)) { _logger.LogTrace("Preparing to extend the visibility of {MessageId} with SQS receipt handle {ReceiptHandle} by {VisibilityTimeout} seconds", message.Id, message.SQSMetadata.ReceiptHandle, _configuration.VisibilityTimeout); - request.Entries.Add(new ChangeMessageVisibilityBatchRequestEntry + if (currentRequest.Entries.Count >= SQS_MAX_MESSAGE_CHANGE_VISIBILITY) + { + requestBatches.Add(currentRequest); + currentRequest = new ChangeMessageVisibilityBatchRequest + { + QueueUrl = _configuration.SubscriberEndpoint + }; + } + currentRequest.Entries.Add(new ChangeMessageVisibilityBatchRequestEntry { Id = message.Id, ReceiptHandle = message.SQSMetadata.ReceiptHandle, @@ -240,37 +254,57 @@ public async Task ExtendMessageVisibilityTimeoutAsync(IEnumerable> changeMessageVisibilityBatchTasks = + requestBatches + .Select(request => _sqsClient.ChangeMessageVisibilityBatchAsync(request, token)) + .ToList(); try { - var response = await _sqsClient.ChangeMessageVisibilityBatchAsync(request, token); + var responses = await Task.WhenAll(changeMessageVisibilityBatchTasks); + } + catch (Exception ex) + { + _logger.LogError(ex, "An unexpected exception occurred while extending message visibility on queue {SubscriberEndpoint}", _configuration.SubscriberEndpoint); + } - foreach (var successMessage in response.Successful) + foreach (var changeMessageVisibilityBatchTask in changeMessageVisibilityBatchTasks) + { + if (!changeMessageVisibilityBatchTask.IsFaulted) { - _logger.LogTrace("Extended the visibility of message {MessageId} on queue {SubscriberEndpoint} successfully", successMessage.Id, _configuration.SubscriberEndpoint); - } + var response = changeMessageVisibilityBatchTask.Result; + foreach (var successMessage in response.Successful) + { + _logger.LogTrace("Extended the visibility of message {MessageId} on queue {SubscriberEndpoint} successfully", successMessage.Id, _configuration.SubscriberEndpoint); + } - foreach (var failedMessage in response.Failed) - { - _logger.LogError("Failed to extend the visibility of message {FailedMessageId} on queue {SubscriberEndpoint}: {FailedMessage}", - failedMessage.Id, _configuration.SubscriberEndpoint, failedMessage.Message); + foreach (var failedMessage in response.Failed) + { + _logger.LogError("Failed to extend the visibility of message {FailedMessageId} on queue {SubscriberEndpoint}: {FailedMessage}", + failedMessage.Id, _configuration.SubscriberEndpoint, failedMessage.Message); + } } - } - catch (AmazonSQSException ex) - { - _logger.LogError(ex, "Failed to extend the visibility of message(s) [{MessageIds}] on queue {SubscriberEndpoint}", - string.Join(", ", messages.Select(x => x.Id)), _configuration.SubscriberEndpoint); - - // Rethrow the exception to fail fast for invalid configuration, permissioning, etc. - if (IsSQSExceptionFatal(ex)) + else { - throw; + if (changeMessageVisibilityBatchTask.Exception?.InnerException is AmazonSQSException amazonEx) + { + _logger.LogError(amazonEx, "Failed to extend the visibility of message(s) [{MessageIds}] on queue {SubscriberEndpoint}", + string.Join(", ", messages.Select(x => x.Id)), _configuration.SubscriberEndpoint); + + // Rethrow the exception to fail fast for invalid configuration, permissioning, etc. + if (IsSQSExceptionFatal(amazonEx)) + { + throw amazonEx; + } + } + else if (changeMessageVisibilityBatchTask.Exception?.InnerException is Exception ex) + { + _logger.LogError(ex, "An unexpected exception occurred while extending message visibility on queue {SubscriberEndpoint}", _configuration.SubscriberEndpoint); + } } } - catch (Exception ex) - { - _logger.LogError(ex, "An unexpected exception occurred while extending message visibility on queue {SubscriberEndpoint}", _configuration.SubscriberEndpoint); - } } /// diff --git a/test/AWS.Messaging.IntegrationTests/AWS.Messaging.IntegrationTests.csproj b/test/AWS.Messaging.IntegrationTests/AWS.Messaging.IntegrationTests.csproj index a2fc89b..689af5b 100644 --- a/test/AWS.Messaging.IntegrationTests/AWS.Messaging.IntegrationTests.csproj +++ b/test/AWS.Messaging.IntegrationTests/AWS.Messaging.IntegrationTests.csproj @@ -27,6 +27,7 @@ + diff --git a/test/AWS.Messaging.IntegrationTests/Handlers/ChatMessageHandler.cs b/test/AWS.Messaging.IntegrationTests/Handlers/ChatMessageHandler.cs index a9cd3c4..92beb09 100644 --- a/test/AWS.Messaging.IntegrationTests/Handlers/ChatMessageHandler.cs +++ b/test/AWS.Messaging.IntegrationTests/Handlers/ChatMessageHandler.cs @@ -23,3 +23,22 @@ public Task HandleAsync(MessageEnvelope messa return Task.FromResult(MessageProcessStatus.Success()); } } + +public class ChatMessageHandler_10sDelay : IMessageHandler +{ + private readonly TempStorage _tempStorage; + + public ChatMessageHandler_10sDelay(TempStorage tempStorage) + { + _tempStorage = tempStorage; + } + + public async Task HandleAsync(MessageEnvelope messageEnvelope, CancellationToken token = default) + { + await Task.Delay(10000); + + _tempStorage.Messages.Add(messageEnvelope); + + return MessageProcessStatus.Success(); + } +} \ No newline at end of file diff --git a/test/AWS.Messaging.IntegrationTests/Models/TempStorage.cs b/test/AWS.Messaging.IntegrationTests/Models/TempStorage.cs index 99e24e7..1e32b71 100644 --- a/test/AWS.Messaging.IntegrationTests/Models/TempStorage.cs +++ b/test/AWS.Messaging.IntegrationTests/Models/TempStorage.cs @@ -1,11 +1,11 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 -using System.Collections.Generic; +using System.Collections.Concurrent; namespace AWS.Messaging.IntegrationTests.Models; public class TempStorage { - public List> Messages { get; set; } = new List>(); + public ConcurrentBag> Messages { get; set; } = new ConcurrentBag>(); } diff --git a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs index 7576d08..ec8644f 100644 --- a/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs +++ b/test/AWS.Messaging.IntegrationTests/SubscriberTests.cs @@ -8,6 +8,7 @@ using Amazon.SQS; using AWS.Messaging.IntegrationTests.Handlers; using AWS.Messaging.IntegrationTests.Models; +using AWS.Messaging.Tests.Common.Services; using AWS.Messaging.Services; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -18,13 +19,15 @@ namespace AWS.Messaging.IntegrationTests; public class SubscriberTests : IAsyncLifetime { private readonly IAmazonSQS _sqsClient; - private ServiceProvider _serviceProvider; + private readonly IServiceCollection _serviceCollection; private string _sqsQueueUrl; public SubscriberTests() { _sqsClient = new AmazonSQSClient(); - _serviceProvider = default!; + _serviceCollection = new ServiceCollection(); + _serviceCollection.AddSingleton>(); + _serviceCollection.AddLogging(x => x.AddInMemoryLogger()); _sqsQueueUrl = string.Empty; } @@ -32,37 +35,37 @@ public async Task InitializeAsync() { var createQueueResponse = await _sqsClient.CreateQueueAsync($"MPFTest-{Guid.NewGuid().ToString().Split('-').Last()}"); _sqsQueueUrl = createQueueResponse.QueueUrl; + } - var serviceCollection = new ServiceCollection(); - serviceCollection.AddSingleton>(); - serviceCollection.AddLogging(); - serviceCollection.AddAWSMessageBus(builder => + [Fact] + public async Task SendAndReceive1Message() + { + _serviceCollection.AddAWSMessageBus(builder => { builder.AddSQSPublisher(_sqsQueueUrl); - builder.AddSQSPoller(_sqsQueueUrl); + builder.AddSQSPoller(_sqsQueueUrl, options => + { + options.VisibilityTimeoutExtensionInterval = 3; + }); builder.AddMessageHandler(); }); - _serviceProvider = serviceCollection.BuildServiceProvider(); - } + var serviceProvider = _serviceCollection.BuildServiceProvider(); - [Fact] - public async Task SendAndReceive1Message() - { var publishStartTime = DateTime.UtcNow; - var publisher = _serviceProvider.GetRequiredService(); + var publisher = serviceProvider.GetRequiredService(); await publisher.PublishAsync(new ChatMessage { MessageDescription = "Test1" }); var publishEndTime = DateTime.UtcNow; - var pump = _serviceProvider.GetRequiredService() as MessagePumpService; + var pump = serviceProvider.GetRequiredService() as MessagePumpService; Assert.NotNull(pump); var source = new CancellationTokenSource(); await pump.StartAsync(source.Token); - var tempStorage = _serviceProvider.GetRequiredService>(); + var tempStorage = serviceProvider.GetRequiredService>(); source.CancelAfter(60000); while (!source.IsCancellationRequested) { @@ -81,6 +84,72 @@ await publisher.PublishAsync(new ChatMessage Assert.Equal("Test1", message.Message.MessageDescription); } + [Theory] + // Tests that the visibility is extended without needing multiple batch requests + [InlineData(5, 1, 5)] + // Tests that the visibility is extended with the need for multiple batch requests + [InlineData(8, 1, 5)] + // Increasing the number of messages processed to ensure stability at load + [InlineData(15, 1, 15)] + // Increasing the number of messages processed with batching required to extend visibility + [InlineData(20, 3, 15)] + public async Task SendAndReceiveMultipleMessages(int numberOfMessages, int visibilityTimeoutExtension, int maxConcurrentMessages) + { + _serviceCollection.AddAWSMessageBus(builder => + { + builder.AddSQSPublisher(_sqsQueueUrl); + builder.AddSQSPoller(_sqsQueueUrl, options => + { + options.VisibilityTimeoutExtensionInterval = visibilityTimeoutExtension; + options.MaxNumberOfConcurrentMessages = maxConcurrentMessages; + }); + builder.AddMessageHandler(); + }); + var serviceProvider = _serviceCollection.BuildServiceProvider(); + + var publishStartTime = DateTime.UtcNow; + var publisher = serviceProvider.GetRequiredService(); + for (int i = 0; i < numberOfMessages; i++) + { + await publisher.PublishAsync(new ChatMessage + { + MessageDescription = $"Test{i + 1}" + }); + } + var publishEndTime = DateTime.UtcNow; + + var pump = serviceProvider.GetRequiredService() as MessagePumpService; + Assert.NotNull(pump); + var source = new CancellationTokenSource(); + + await pump.StartAsync(source.Token); + + var tempStorage = serviceProvider.GetRequiredService>(); + var numberOfBatches = (int)Math.Ceiling((decimal)numberOfMessages / (decimal)maxConcurrentMessages); + source.CancelAfter(numberOfBatches * 12000); + while (!source.IsCancellationRequested) + { + if (tempStorage.Messages.Count == numberOfMessages) + { + source.Cancel(); + break; + } + } + + var inMemoryLogger = serviceProvider.GetRequiredService(); + Assert.Empty(inMemoryLogger.Logs.Where(x => x.Exception is AmazonSQSException ex && ex.ErrorCode.Equals("AWS.SimpleQueueService.TooManyEntriesInBatchRequest"))); + Assert.Equal(numberOfMessages, tempStorage.Messages.Count); + for (int i = 0; i < numberOfMessages; i++) + { + var message = tempStorage.Messages.FirstOrDefault(x => x.Message.MessageDescription.Equals($"Test{i + 1}")); + Assert.NotNull(message); + Assert.False(string.IsNullOrEmpty(message.Id)); + Assert.Equal("/aws/messaging", message.Source.ToString()); + Assert.True(message.TimeStamp > publishStartTime); + Assert.True(message.TimeStamp < publishEndTime); + } + } + public async Task DisposeAsync() { try diff --git a/test/AWS.Messaging.Tests.Common/AWS.Messaging.Tests.Common.csproj b/test/AWS.Messaging.Tests.Common/AWS.Messaging.Tests.Common.csproj new file mode 100644 index 0000000..e6bea91 --- /dev/null +++ b/test/AWS.Messaging.Tests.Common/AWS.Messaging.Tests.Common.csproj @@ -0,0 +1,15 @@ + + + + net6.0 + enable + enable + + + + + + + + + \ No newline at end of file diff --git a/test/AWS.Messaging.Tests.Common/Services/InMemoryLogger.cs b/test/AWS.Messaging.Tests.Common/Services/InMemoryLogger.cs new file mode 100644 index 0000000..a8eb9de --- /dev/null +++ b/test/AWS.Messaging.Tests.Common/Services/InMemoryLogger.cs @@ -0,0 +1,55 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using System.Collections.Concurrent; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace AWS.Messaging.Tests.Common.Services +{ + public record LogItem(LogLevel LogLevel, string Message, Exception? Exception); + + public class InMemoryLoggerProvider : ILoggerProvider + { + private readonly InMemoryLogger logger; + + public InMemoryLoggerProvider(InMemoryLogger logger) => this.logger = logger; + + public ILogger CreateLogger(string categoryName) => logger; + + public void Dispose() { } + } + + public class InMemoryLogger : ILogger + { + private readonly LoggerExternalScopeProvider _scopeProvider; + + public InMemoryLogger(LoggerExternalScopeProvider scopeProvider) + { + _scopeProvider = scopeProvider; + } + + private readonly ConcurrentBag concurrentLogs = new ConcurrentBag(); + + public IEnumerable Logs => concurrentLogs.ToList(); + + public IDisposable BeginScope(TState state) => _scopeProvider.Push(state); + + public bool IsEnabled(LogLevel logLevel) => logLevel != LogLevel.None; + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func formatter) + { + concurrentLogs.Add(new LogItem(logLevel, formatter(state, exception), exception)); + } + } + + public static class InMemoryLoggerExtensions + { + public static ILoggingBuilder AddInMemoryLogger(this ILoggingBuilder builder) + { + var logger = new InMemoryLogger(new LoggerExternalScopeProvider()); + builder.Services.AddSingleton(logger); + return builder.AddProvider(new InMemoryLoggerProvider(logger)); + } + } +} \ No newline at end of file diff --git a/test/AWS.Messaging.UnitTests/AWS.Messaging.UnitTests.csproj b/test/AWS.Messaging.UnitTests/AWS.Messaging.UnitTests.csproj index 119d337..96cd900 100644 --- a/test/AWS.Messaging.UnitTests/AWS.Messaging.UnitTests.csproj +++ b/test/AWS.Messaging.UnitTests/AWS.Messaging.UnitTests.csproj @@ -8,6 +8,7 @@ + diff --git a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs index f4a1370..c0bd9f3 100644 --- a/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs +++ b/test/AWS.Messaging.UnitTests/SQSMessagePollerTests.cs @@ -16,12 +16,14 @@ using Microsoft.Extensions.Hosting; using Moq; using Xunit; +using AWS.Messaging.Tests.Common.Services; namespace AWS.Messaging.UnitTests; public class SQSMessagePollerTests { private const string TEST_QUEUE_URL = "queueUrl"; + private InMemoryLogger? _inMemoryLogger; /// /// Tests that starting an SQS poller with default settings begins polling SQS @@ -121,6 +123,33 @@ public async Task SQSMessagePoller_ExtendMessageVisibility_Success() It.IsAny())); } + /// + /// Tests that calling calls + /// SQS's ChangeMessageVisibilityBatch with a request that has more than 10 entires. + /// is expected create multiple + /// when there are more than 10 messages since can only handle 10 entries. + /// + [Fact] + public async Task SQSMessagePoller_ExtendMessageVisibility_RequestHasMoreThan10Entries() + { + var client = new Mock(); + + client.Setup(x => x.ChangeMessageVisibilityBatchAsync(It.Is(x => x.Entries.Count > 10), It.IsAny())) + .ThrowsAsync(new AmazonSQSException("Request contains more than 10 entries.") { ErrorCode = "AWS.SimpleQueueService.TooManyEntriesInBatchRequest" }); + + client.Setup(x => x.ChangeMessageVisibilityBatchAsync(It.Is(x => x.Entries.Count <= 10), It.IsAny())) + .ReturnsAsync(new ChangeMessageVisibilityBatchResponse { Failed = new List() }, TimeSpan.FromMilliseconds(50)); + + var messagePoller = CreateSQSMessagePoller(client); + + var messageEnvelopes = Enumerable.Range(0, 15).Select(x => new MessageEnvelope { Id = $"{x + 1}", SQSMetadata = new SQSMetadata { ReceiptHandle = $"rh{x + 1}" } }).Cast().ToList(); + + await messagePoller.ExtendMessageVisibilityTimeoutAsync(messageEnvelopes); + + Assert.NotNull(_inMemoryLogger); + Assert.Empty(_inMemoryLogger.Logs.Where(x => x.Exception is AmazonSQSException ex && ex.ErrorCode.Equals("AWS.SimpleQueueService.TooManyEntriesInBatchRequest"))); + } + /// /// Helper function that initializes and starts a with /// a mocked SQS client, then cancels after 500ms @@ -162,7 +191,7 @@ private async Task RunSQSMessagePollerTest(Mock mockSqsClient, Actio private IMessagePoller CreateSQSMessagePoller(Mock mockSqsClient) { var serviceCollection = new ServiceCollection(); - serviceCollection.AddLogging(); + serviceCollection.AddLogging(x => x.AddInMemoryLogger()); serviceCollection.AddAWSMessageBus(builder => { @@ -174,6 +203,7 @@ private IMessagePoller CreateSQSMessagePoller(Mock mockSqsClient) var serviceProvider = serviceCollection.BuildServiceProvider(); + _inMemoryLogger = serviceProvider.GetRequiredService(); var messagePollerFactory = serviceProvider.GetService(); Assert.NotNull(messagePollerFactory);