From 49c2a774df3eae115bf26e689ae5aacc710b5447 Mon Sep 17 00:00:00 2001 From: David Wiseman <33157668+DavidWiseman@users.noreply.github.com> Date: Tue, 25 Jun 2024 08:48:10 +0100 Subject: [PATCH 1/2] SQS Message handling update Ensure old messages are removed even if the target is a different service to prevent constant re-processing. e.g. Bad message or service is offline. --- DBADash/Messaging/SQSMessageProcessing.cs | 30 ++++++++++++----------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/DBADash/Messaging/SQSMessageProcessing.cs b/DBADash/Messaging/SQSMessageProcessing.cs index f7e1712e..c13b6cbc 100644 --- a/DBADash/Messaging/SQSMessageProcessing.cs +++ b/DBADash/Messaging/SQSMessageProcessing.cs @@ -145,20 +145,7 @@ private static bool ValidateMessageAttributes(Message message,string expectedAge replySQS = string.Empty; replyAgent = string.Empty; - if (!message.MessageAttributes.TryGetValue("DBADashToIdentifier", out var targetAgent)) - { - reason = "Message does not contain a DBADashToIdentifier attribute."; - deleteMessage = false; - return false; - } - if (targetAgent.StringValue != expectedAgent) - { - reason = $"Message is not intended for this agent. Expected {expectedAgent} but received {targetAgent.StringValue}. This is expected when using a shared queue."; - notForThisAgent = true; - deleteMessage = false; - return false; - } - + /* Remove old messages regardless of target service. This will prevent constant re-processing of old messages */ if (!message.Attributes.TryGetValue(MessageSystemAttributeName.SentTimestamp, out var sentTimestamp)) { reason = "Message does not contain a SentTimestamp attribute."; @@ -180,6 +167,21 @@ private static bool ValidateMessageAttributes(Message message,string expectedAge return false; } + if (!message.MessageAttributes.TryGetValue("DBADashToIdentifier", out var targetAgent)) + { + reason = "Message does not contain a DBADashToIdentifier attribute."; + deleteMessage = false; + return false; + } + /* Might be using a shared queue. Keep the message. Visibility timeout adjusted to allow other service to pick up the message. */ + if (targetAgent.StringValue != expectedAgent) + { + reason = $"Message is not intended for this agent. Expected {expectedAgent} but received {targetAgent.StringValue}. This is expected when using a shared queue."; + notForThisAgent = true; + deleteMessage = false; + return false; + } + if (!message.MessageAttributes.TryGetValue("MessageType", out var messageTypeAttribute)) { reason = "Message does not contain a MessageType attribute."; From 0224386d0d2bbf4b021bc178eb4dbbc862c4d884 Mon Sep 17 00:00:00 2001 From: David Wiseman <33157668+DavidWiseman@users.noreply.github.com> Date: Tue, 25 Jun 2024 09:03:53 +0100 Subject: [PATCH 2/2] SQS Message handling update - delay Update message processing delay. Reduce time between loops and add additional delay for error conditions. --- DBADash/Messaging/SQSMessageProcessing.cs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/DBADash/Messaging/SQSMessageProcessing.cs b/DBADash/Messaging/SQSMessageProcessing.cs index c13b6cbc..b335f28f 100644 --- a/DBADash/Messaging/SQSMessageProcessing.cs +++ b/DBADash/Messaging/SQSMessageProcessing.cs @@ -27,7 +27,8 @@ public class SQSMessageProcessing private const int clearMessageVisibilityTimeout = 0; //ms private const int messageVisibilityTimeout = 10000; //ms private const int delayAfterReceivingMessageForDifferentAgent = 1000; // ms - private const int delayBetweenMessages = 1000; // ms + private const int delayBetweenMessages = 100; // ms + private const int errorDelay= 1000; // ms private AsyncRetryPolicy _retryPolicy; private readonly ConcurrentDictionary _semaphores = new(); private const int MaxDegreeOfParallelism = 2; @@ -66,7 +67,6 @@ public async Task ProcessSQSQueue(string DBADashAgentIdentifier) MessageAttributeNames = new List { "All" }, MessageSystemAttributeNames = new List { MessageSystemAttributeName.SentTimestamp } }; - while (true) { try @@ -121,6 +121,7 @@ await AWSTools.DeleteMessageAsync(_sqsClient, Config.ServiceSQSQueueUrl, { // Handle any exceptions that occurred during processing Log.Error(ex, $"Error processing message: {message.Body}"); + await Task.Delay(errorDelay); // Extra delay if error occurs to avoid burning CPU cycles } } } @@ -128,9 +129,10 @@ await AWSTools.DeleteMessageAsync(_sqsClient, Config.ServiceSQSQueueUrl, catch (Exception ex) { Log.Error(ex, "Error receiving messages from SQS Queue"); + await Task.Delay(errorDelay); // Extra delay if error occurs to avoid burning CPU cycles } - await Task.Delay(delayBetweenMessages); + await Task.Delay(delayBetweenMessages); // Wait a small amount of time before checking for more messages to avoid burning CPU cycles (shouldn't be required) } }