Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: extending message visibility for more than 10 entries does not fail #34

Merged
merged 1 commit into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion AWS.Messaging.sln
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sampleapps", "sampleapps",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherAPI", "sampleapps\PublisherAPI\PublisherAPI.csproj", "{A7F3DA5F-0D7A-4D8A-837D-F78F4A4E3CA7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SubscriberService", "sampleapps\SubscriberService\SubscriberService.csproj", "{D398D75B-80F3-4D4F-AFC7-15CD61DD22C3}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubscriberService", "sampleapps\SubscriberService\SubscriberService.csproj", "{D398D75B-80F3-4D4F-AFC7-15CD61DD22C3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AWS.Messaging.Tests.Common", "test\AWS.Messaging.Tests.Common\AWS.Messaging.Tests.Common.csproj", "{A174942B-AF9C-4935-AD7B-AF651BACE63C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -51,6 +53,10 @@ Global
{D398D75B-80F3-4D4F-AFC7-15CD61DD22C3}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D398D75B-80F3-4D4F-AFC7-15CD61DD22C3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D398D75B-80F3-4D4F-AFC7-15CD61DD22C3}.Release|Any CPU.Build.0 = Release|Any CPU
{A174942B-AF9C-4935-AD7B-AF651BACE63C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A174942B-AF9C-4935-AD7B-AF651BACE63C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A174942B-AF9C-4935-AD7B-AF651BACE63C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A174942B-AF9C-4935-AD7B-AF651BACE63C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -61,6 +67,7 @@ Global
{A4D5F664-E3A3-411A-9327-05913794021C} = {80DB2C77-6ADD-4A60-B27D-763BDF9659D3}
{A7F3DA5F-0D7A-4D8A-837D-F78F4A4E3CA7} = {1AA8985B-897C-4BD5-9735-FD8B33FEBFFB}
{D398D75B-80F3-4D4F-AFC7-15CD61DD22C3} = {1AA8985B-897C-4BD5-9735-FD8B33FEBFFB}
{A174942B-AF9C-4935-AD7B-AF651BACE63C} = {80DB2C77-6ADD-4A60-B27D-763BDF9659D3}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7B2B759D-6455-4089-8173-3F1619567B36}
Expand Down
82 changes: 58 additions & 24 deletions src/AWS.Messaging/SQS/SQSMessagePoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ internal class SQSMessagePoller : IMessagePoller
/// </summary>
private const int SQS_MAX_NUMBER_MESSAGES_TO_READ = 10;

/// <summary>
/// Maximum valid value for number of messages in <see cref="ChangeMessageVisibilityBatchRequest"/>
/// </summary>
private const int SQS_MAX_MESSAGE_CHANGE_VISIBILITY = 10;

/// <summary>
/// The maximum amount of time a polling iteration should pause for while waiting
/// for in flight messages to finish processing
Expand Down Expand Up @@ -216,18 +221,27 @@ public async Task ExtendMessageVisibilityTimeoutAsync(IEnumerable<MessageEnvelop
return;
}

var request = new ChangeMessageVisibilityBatchRequest
var requestBatches = new List<ChangeMessageVisibilityBatchRequest>();

var currentRequest = new ChangeMessageVisibilityBatchRequest
{
QueueUrl = _configuration.SubscriberEndpoint
};

foreach (var message in messages)
{
if (!string.IsNullOrEmpty(message.SQSMetadata?.ReceiptHandle))
{
_logger.LogTrace("Preparing to extend the visibility of {MessageId} with SQS receipt handle {ReceiptHandle} by {VisibilityTimeout} seconds",
message.Id, message.SQSMetadata.ReceiptHandle, _configuration.VisibilityTimeout);
request.Entries.Add(new ChangeMessageVisibilityBatchRequestEntry
if (currentRequest.Entries.Count >= SQS_MAX_MESSAGE_CHANGE_VISIBILITY)
{
requestBatches.Add(currentRequest);
currentRequest = new ChangeMessageVisibilityBatchRequest
{
QueueUrl = _configuration.SubscriberEndpoint
};
}
currentRequest.Entries.Add(new ChangeMessageVisibilityBatchRequestEntry
{
Id = message.Id,
ReceiptHandle = message.SQSMetadata.ReceiptHandle,
Expand All @@ -240,37 +254,57 @@ public async Task ExtendMessageVisibilityTimeoutAsync(IEnumerable<MessageEnvelop
throw new MissingSQSReceiptHandleException($"Attempted to change the visibility of message {message.Id} from {_configuration.SubscriberEndpoint} without an SQS receipt handle.");
}
}
requestBatches.Add(currentRequest);

List<Task<ChangeMessageVisibilityBatchResponse>> changeMessageVisibilityBatchTasks =
requestBatches
.Select(request => _sqsClient.ChangeMessageVisibilityBatchAsync(request, token))
.ToList();

try
{
var response = await _sqsClient.ChangeMessageVisibilityBatchAsync(request, token);
var responses = await Task.WhenAll(changeMessageVisibilityBatchTasks);
}
catch (Exception ex)
{
_logger.LogError(ex, "An unexpected exception occurred while extending message visibility on queue {SubscriberEndpoint}", _configuration.SubscriberEndpoint);
}

foreach (var successMessage in response.Successful)
foreach (var changeMessageVisibilityBatchTask in changeMessageVisibilityBatchTasks)
{
if (!changeMessageVisibilityBatchTask.IsFaulted)
{
_logger.LogTrace("Extended the visibility of message {MessageId} on queue {SubscriberEndpoint} successfully", successMessage.Id, _configuration.SubscriberEndpoint);
}
var response = changeMessageVisibilityBatchTask.Result;
foreach (var successMessage in response.Successful)
{
_logger.LogTrace("Extended the visibility of message {MessageId} on queue {SubscriberEndpoint} successfully", successMessage.Id, _configuration.SubscriberEndpoint);
}

foreach (var failedMessage in response.Failed)
{
_logger.LogError("Failed to extend the visibility of message {FailedMessageId} on queue {SubscriberEndpoint}: {FailedMessage}",
failedMessage.Id, _configuration.SubscriberEndpoint, failedMessage.Message);
foreach (var failedMessage in response.Failed)
{
_logger.LogError("Failed to extend the visibility of message {FailedMessageId} on queue {SubscriberEndpoint}: {FailedMessage}",
failedMessage.Id, _configuration.SubscriberEndpoint, failedMessage.Message);
}
}
}
catch (AmazonSQSException ex)
{
_logger.LogError(ex, "Failed to extend the visibility of message(s) [{MessageIds}] on queue {SubscriberEndpoint}",
string.Join(", ", messages.Select(x => x.Id)), _configuration.SubscriberEndpoint);

// Rethrow the exception to fail fast for invalid configuration, permissioning, etc.
if (IsSQSExceptionFatal(ex))
else
{
throw;
if (changeMessageVisibilityBatchTask.Exception?.InnerException is AmazonSQSException amazonEx)
{
_logger.LogError(amazonEx, "Failed to extend the visibility of message(s) [{MessageIds}] on queue {SubscriberEndpoint}",
string.Join(", ", messages.Select(x => x.Id)), _configuration.SubscriberEndpoint);

// Rethrow the exception to fail fast for invalid configuration, permissioning, etc.
if (IsSQSExceptionFatal(amazonEx))
{
throw amazonEx;
philasmar marked this conversation as resolved.
Show resolved Hide resolved
}
}
else if (changeMessageVisibilityBatchTask.Exception?.InnerException is Exception ex)
{
_logger.LogError(ex, "An unexpected exception occurred while extending message visibility on queue {SubscriberEndpoint}", _configuration.SubscriberEndpoint);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "An unexpected exception occurred while extending message visibility on queue {SubscriberEndpoint}", _configuration.SubscriberEndpoint);
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

<ItemGroup>
<ProjectReference Include="..\..\src\AWS.Messaging\AWS.Messaging.csproj" />
<ProjectReference Include="..\AWS.Messaging.Tests.Common\AWS.Messaging.Tests.Common.csproj" />
</ItemGroup>

</Project>
19 changes: 19 additions & 0 deletions test/AWS.Messaging.IntegrationTests/Handlers/ChatMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,22 @@ public Task<MessageProcessStatus> HandleAsync(MessageEnvelope<ChatMessage> messa
return Task.FromResult(MessageProcessStatus.Success());
}
}

public class ChatMessageHandler_10sDelay : IMessageHandler<ChatMessage>
{
private readonly TempStorage<ChatMessage> _tempStorage;

public ChatMessageHandler_10sDelay(TempStorage<ChatMessage> tempStorage)
{
_tempStorage = tempStorage;
}

public async Task<MessageProcessStatus> HandleAsync(MessageEnvelope<ChatMessage> messageEnvelope, CancellationToken token = default)
{
await Task.Delay(10000);

_tempStorage.Messages.Add(messageEnvelope);

return MessageProcessStatus.Success();
}
}
4 changes: 2 additions & 2 deletions test/AWS.Messaging.IntegrationTests/Models/TempStorage.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using System.Collections.Generic;
using System.Collections.Concurrent;

namespace AWS.Messaging.IntegrationTests.Models;

public class TempStorage<T>
{
public List<MessageEnvelope<T>> Messages { get; set; } = new List<MessageEnvelope<T>>();
public ConcurrentBag<MessageEnvelope<T>> Messages { get; set; } = new ConcurrentBag<MessageEnvelope<T>>();
}
99 changes: 84 additions & 15 deletions test/AWS.Messaging.IntegrationTests/SubscriberTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Amazon.SQS;
using AWS.Messaging.IntegrationTests.Handlers;
using AWS.Messaging.IntegrationTests.Models;
using AWS.Messaging.Tests.Common.Services;
using AWS.Messaging.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand All @@ -18,51 +19,53 @@ namespace AWS.Messaging.IntegrationTests;
public class SubscriberTests : IAsyncLifetime
{
private readonly IAmazonSQS _sqsClient;
private ServiceProvider _serviceProvider;
private readonly IServiceCollection _serviceCollection;
private string _sqsQueueUrl;

public SubscriberTests()
{
_sqsClient = new AmazonSQSClient();
_serviceProvider = default!;
_serviceCollection = new ServiceCollection();
_serviceCollection.AddSingleton<TempStorage<ChatMessage>>();
_serviceCollection.AddLogging(x => x.AddInMemoryLogger());
_sqsQueueUrl = string.Empty;
}

public async Task InitializeAsync()
{
var createQueueResponse = await _sqsClient.CreateQueueAsync($"MPFTest-{Guid.NewGuid().ToString().Split('-').Last()}");
_sqsQueueUrl = createQueueResponse.QueueUrl;
}

var serviceCollection = new ServiceCollection();
serviceCollection.AddSingleton<TempStorage<ChatMessage>>();
serviceCollection.AddLogging();
serviceCollection.AddAWSMessageBus(builder =>
[Fact]
public async Task SendAndReceive1Message()
{
_serviceCollection.AddAWSMessageBus(builder =>
{
builder.AddSQSPublisher<ChatMessage>(_sqsQueueUrl);
builder.AddSQSPoller(_sqsQueueUrl);
builder.AddSQSPoller(_sqsQueueUrl, options =>
{
options.VisibilityTimeoutExtensionInterval = 3;
});
builder.AddMessageHandler<ChatMessageHandler, ChatMessage>();
});
_serviceProvider = serviceCollection.BuildServiceProvider();
}
var serviceProvider = _serviceCollection.BuildServiceProvider();

[Fact]
public async Task SendAndReceive1Message()
{
var publishStartTime = DateTime.UtcNow;
var publisher = _serviceProvider.GetRequiredService<IMessagePublisher>();
var publisher = serviceProvider.GetRequiredService<IMessagePublisher>();
await publisher.PublishAsync(new ChatMessage
{
MessageDescription = "Test1"
});
var publishEndTime = DateTime.UtcNow;

var pump = _serviceProvider.GetRequiredService<IHostedService>() as MessagePumpService;
var pump = serviceProvider.GetRequiredService<IHostedService>() as MessagePumpService;
Assert.NotNull(pump);
var source = new CancellationTokenSource();

await pump.StartAsync(source.Token);

var tempStorage = _serviceProvider.GetRequiredService<TempStorage<ChatMessage>>();
var tempStorage = serviceProvider.GetRequiredService<TempStorage<ChatMessage>>();
source.CancelAfter(60000);
while (!source.IsCancellationRequested)
{
Expand All @@ -81,6 +84,72 @@ await publisher.PublishAsync(new ChatMessage
Assert.Equal("Test1", message.Message.MessageDescription);
}

[Theory]
// Tests that the visibility is extended without needing multiple batch requests
[InlineData(5, 1, 5)]
// Tests that the visibility is extended with the need for multiple batch requests
[InlineData(8, 1, 5)]
// Increasing the number of messages processed to ensure stability at load
[InlineData(15, 1, 15)]
// Increasing the number of messages processed with batching required to extend visibility
[InlineData(20, 3, 15)]
public async Task SendAndReceiveMultipleMessages(int numberOfMessages, int visibilityTimeoutExtension, int maxConcurrentMessages)
{
_serviceCollection.AddAWSMessageBus(builder =>
{
builder.AddSQSPublisher<ChatMessage>(_sqsQueueUrl);
builder.AddSQSPoller(_sqsQueueUrl, options =>
{
options.VisibilityTimeoutExtensionInterval = visibilityTimeoutExtension;
options.MaxNumberOfConcurrentMessages = maxConcurrentMessages;
});
builder.AddMessageHandler<ChatMessageHandler_10sDelay, ChatMessage>();
});
var serviceProvider = _serviceCollection.BuildServiceProvider();

var publishStartTime = DateTime.UtcNow;
var publisher = serviceProvider.GetRequiredService<IMessagePublisher>();
for (int i = 0; i < numberOfMessages; i++)
{
await publisher.PublishAsync(new ChatMessage
{
MessageDescription = $"Test{i + 1}"
});
}
var publishEndTime = DateTime.UtcNow;

var pump = serviceProvider.GetRequiredService<IHostedService>() as MessagePumpService;
Assert.NotNull(pump);
var source = new CancellationTokenSource();

await pump.StartAsync(source.Token);

var tempStorage = serviceProvider.GetRequiredService<TempStorage<ChatMessage>>();
var numberOfBatches = (int)Math.Ceiling((decimal)numberOfMessages / (decimal)maxConcurrentMessages);
source.CancelAfter(numberOfBatches * 12000);
while (!source.IsCancellationRequested)
{
if (tempStorage.Messages.Count == numberOfMessages)
{
source.Cancel();
break;
}
}

var inMemoryLogger = serviceProvider.GetRequiredService<InMemoryLogger>();
Assert.Empty(inMemoryLogger.Logs.Where(x => x.Exception is AmazonSQSException ex && ex.ErrorCode.Equals("AWS.SimpleQueueService.TooManyEntriesInBatchRequest")));
Assert.Equal(numberOfMessages, tempStorage.Messages.Count);
for (int i = 0; i < numberOfMessages; i++)
{
var message = tempStorage.Messages.FirstOrDefault(x => x.Message.MessageDescription.Equals($"Test{i + 1}"));
Assert.NotNull(message);
Assert.False(string.IsNullOrEmpty(message.Id));
Assert.Equal("/aws/messaging", message.Source.ToString());
Assert.True(message.TimeStamp > publishStartTime);
Assert.True(message.TimeStamp < publishEndTime);
}
}

public async Task DisposeAsync()
{
try
Expand Down
15 changes: 15 additions & 0 deletions test/AWS.Messaging.Tests.Common/AWS.Messaging.Tests.Common.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" />
</ItemGroup>

</Project>
Loading