Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS Handling updates #919

Merged
merged 2 commits into from
Jun 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions DBADash/Messaging/SQSMessageProcessing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public class SQSMessageProcessing
private const int clearMessageVisibilityTimeout = 0; //ms
private const int messageVisibilityTimeout = 10000; //ms
private const int delayAfterReceivingMessageForDifferentAgent = 1000; // ms
private const int delayBetweenMessages = 1000; // ms
private const int delayBetweenMessages = 100; // ms
private const int errorDelay= 1000; // ms
private AsyncRetryPolicy _retryPolicy;
private readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphores = new();
private const int MaxDegreeOfParallelism = 2;
Expand Down Expand Up @@ -66,7 +67,6 @@ public async Task ProcessSQSQueue(string DBADashAgentIdentifier)
MessageAttributeNames = new List<string> { "All" },
MessageSystemAttributeNames = new List<string> { MessageSystemAttributeName.SentTimestamp }
};

while (true)
{
try
Expand Down Expand Up @@ -121,16 +121,18 @@ public async Task ProcessSQSQueue(string DBADashAgentIdentifier)
{
// Handle any exceptions that occurred during processing
Log.Error(ex, $"Error processing message: {message.Body}");
await Task.Delay(errorDelay); // Extra delay if error occurs to avoid burning CPU cycles
}
}
}
}
catch (Exception ex)
{
Log.Error(ex, "Error receiving messages from SQS Queue");
await Task.Delay(errorDelay); // Extra delay if error occurs to avoid burning CPU cycles
}

await Task.Delay(delayBetweenMessages);
await Task.Delay(delayBetweenMessages); // Wait a small amount of time before checking for more messages to avoid burning CPU cycles (shouldn't be required)
}
}

Expand All @@ -145,20 +147,7 @@ private static bool ValidateMessageAttributes(Message message,string expectedAge
replySQS = string.Empty;
replyAgent = string.Empty;

if (!message.MessageAttributes.TryGetValue("DBADashToIdentifier", out var targetAgent))
{
reason = "Message does not contain a DBADashToIdentifier attribute.";
deleteMessage = false;
return false;
}
if (targetAgent.StringValue != expectedAgent)
{
reason = $"Message is not intended for this agent. Expected {expectedAgent} but received {targetAgent.StringValue}. This is expected when using a shared queue.";
notForThisAgent = true;
deleteMessage = false;
return false;
}

/* Remove old messages regardless of target service. This will prevent constant re-processing of old messages */
if (!message.Attributes.TryGetValue(MessageSystemAttributeName.SentTimestamp, out var sentTimestamp))
{
reason = "Message does not contain a SentTimestamp attribute.";
Expand All @@ -180,6 +169,21 @@ private static bool ValidateMessageAttributes(Message message,string expectedAge
return false;
}

if (!message.MessageAttributes.TryGetValue("DBADashToIdentifier", out var targetAgent))
{
reason = "Message does not contain a DBADashToIdentifier attribute.";
deleteMessage = false;
return false;
}
/* Might be using a shared queue. Keep the message. Visibility timeout adjusted to allow other service to pick up the message. */
if (targetAgent.StringValue != expectedAgent)
{
reason = $"Message is not intended for this agent. Expected {expectedAgent} but received {targetAgent.StringValue}. This is expected when using a shared queue.";
notForThisAgent = true;
deleteMessage = false;
return false;
}

if (!message.MessageAttributes.TryGetValue("MessageType", out var messageTypeAttribute))
{
reason = "Message does not contain a MessageType attribute.";
Expand Down