Skip to content

Commit

Permalink
SQS Message handling improvement
Browse files Browse the repository at this point in the history
Process multiple messages at the same time.  Limit the number of concurrent messages per destination to 2 with a timeout and error condition after 2 seconds.
  • Loading branch information
DavidWiseman committed Jun 23, 2024
1 parent e63cb5b commit 53a5203
Showing 1 changed file with 137 additions and 47 deletions.
184 changes: 137 additions & 47 deletions DBADash/Messaging/SQSMessageProcessing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
using Microsoft.SqlServer.TransactSql.ScriptDom;
using Polly.Retry;
using Polly;
using System.Collections.Concurrent;
using System.Threading;
using System.Reflection.Metadata;

namespace DBADash.Messaging
{
Expand All @@ -26,6 +29,10 @@ public class SQSMessageProcessing
private const int delayAfterReceivingMessageForDifferentAgent = 1000; // ms
private const int delayBetweenMessages = 1000; // ms
private AsyncRetryPolicy _retryPolicy;
private readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphores = new();
private const int MaxDegreeOfParallelism = 2;
private const int SemaphoreTimeout = 2000; // ms


public SQSMessageProcessing(CollectionConfig config)
{
Expand Down Expand Up @@ -73,7 +80,7 @@ public async Task ProcessSQSQueue(string DBADashAgentIdentifier)
try
{
// Validations
if (!ValidateMessage(message, DBADashAgentIdentifier, out var reason,
if (!ValidateMessageAttributes(message, DBADashAgentIdentifier, out var reason,
out var messageType, out var handle, out var destinationConnectionHash, out var notForThisAgent, out var deleteMessage, out string replySQS, out var replyAgent))
{
Log.Warning("Invalid message: {reason}", reason);
Expand Down Expand Up @@ -104,7 +111,8 @@ await AWSTools.DeleteMessageAsync(_sqsClient, Config.ServiceSQSQueueUrl,
}
else // A message for this service to action
{
await ProcessMessage(message, DBADashAgentIdentifier, handle,
// Process on a separate thread
_= ProcessMessageAsync(message, DBADashAgentIdentifier, handle,
destinationConnectionHash, replySQS, replyAgent);
}

Expand All @@ -126,7 +134,7 @@ await ProcessMessage(message, DBADashAgentIdentifier, handle,
}
}

private bool ValidateMessage(Message message,string expectedAgent, out string reason, out string messageType, out Guid handle, out string destinationConnectionHash,out bool notForThisAgent, out bool deleteMessage, out string replySQS, out string replyAgent)
private static bool ValidateMessageAttributes(Message message,string expectedAgent, out string reason, out string messageType, out Guid handle, out string destinationConnectionHash,out bool notForThisAgent, out bool deleteMessage, out string replySQS, out string replyAgent)
{
reason = string.Empty;
messageType = string.Empty;
Expand Down Expand Up @@ -218,66 +226,148 @@ private bool ValidateMessage(Message message,string expectedAgent, out string re
/// <param name="replySQS">SQS queue to send reply messages to</param>
/// <param name="replyAgent">Identifier for agent to reply messages to</param>
/// <returns></returns>
private async Task ProcessMessage(Message message, string DBADashAgentIdentifier, Guid handle, string destinationConnectionHash,string replySQS,string replyAgent)
private async Task ProcessMessageAsync(Message message, string DBADashAgentIdentifier, Guid handle, string destinationConnectionHash,string replySQS,string replyAgent)
{
var msg = await ValidateMessage(message, DBADashAgentIdentifier, handle, destinationConnectionHash, replySQS, replyAgent);
if (msg == null) return;
byte[] payload;

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Security Scanning (csharp)

The variable 'payload' is declared but never used

Check warning on line 233 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Security Scanning (csharp)

The variable 'payload' is declared but never used

// Implementations of MessageBase will process the message and return a DataSet or null
var semaphore =
_semaphores.GetOrAdd(destinationConnectionHash,
_ => new SemaphoreSlim(MaxDegreeOfParallelism));
try
{
if (!await semaphore.WaitAsync(SemaphoreTimeout)) // Semaphore used to limit concurrent processing per connection
{
Log.Warning("Semaphore timeout for {handle}. Service is busy.", handle);
// Semaphore timed out, service is busy
await SendReplyMessage(DBADashAgentIdentifier, handle, destinationConnectionHash, replySQS, replyAgent,
ResponseMessage.ResponseTypes.Failure, "Service is busy. Try again later.").ConfigureAwait(false);

return;
}

await DoProcessMessageAsync(msg, DBADashAgentIdentifier, handle, destinationConnectionHash, replySQS,
replyAgent);

}
catch (Exception ex)
{
Log.Error(ex, "Error processing message with handle {handle}", handle);
await SendReplyMessage(DBADashAgentIdentifier, handle, destinationConnectionHash, replySQS, replyAgent,
ResponseMessage.ResponseTypes.Failure, ex.Message).ConfigureAwait(false);

}
finally
{
semaphore.Release();
}
}


/// <summary>
/// Run the processing task for the message. Send a reply message to the source agent when complete
/// </summary>
/// <param name="msg">MessageBase object. Process method is called to execute the task</param>
/// <param name="DBADashAgentIdentifier">Target DBA Dash agent for this message</param>
/// <param name="handle">Service broker conversation handle</param>
/// <param name="destinationConnectionHash">Hash of destination connection so we know where the message came from when there are multiple destinations</param>
/// <param name="replySQS">SQS queue to send reply messages to</param>
/// <param name="replyAgent">Identifier for agent to reply messages to</param>
/// <returns></returns>
private async Task DoProcessMessageAsync(MessageBase msg, string DBADashAgentIdentifier, Guid handle, string destinationConnectionHash, string replySQS, string replyAgent)
{
// Errors handled by the caller
Log.Information("Send message receipt for handle {handle} & process message", handle);
await SendReplyMessage(DBADashAgentIdentifier, handle, destinationConnectionHash, replySQS, replyAgent,
ResponseMessage.ResponseTypes.Progress, "Message Received");

var ds = await msg.Process(Config, handle);
string messageDataPath = null;
if (ds != null)
{
// DataSet might be large, so write to S3 and send the path in the reply message
var fileName = $"{handle}.message";
Log.Debug("Writing message to S3 {filename}", fileName);
messageDataPath = msg.CollectAgent.S3Path.AppendToUrl(fileName);
await DestinationHandling.WriteS3(ds, msg.CollectAgent.S3Path, fileName, Config);
}

// Send a reply message to the source agent
await SendReplyMessage(DBADashAgentIdentifier, handle, destinationConnectionHash, replySQS, replyAgent,
ResponseMessage.ResponseTypes.Success, "Completed", messageDataPath).ConfigureAwait(false);
}

// Acknowledge the message is received before we start processing
Log.Debug($"Received message: {message.MessageId}");
var payload = (new ResponseMessage()
{ Type = ResponseMessage.ResponseTypes.Progress, Message = "Message Received" })
.Serialize();
Log.Information("Send message receipt for handle {handle}", handle);
private async Task SendReplyMessage(string DBADashAgentIdentifier, Guid handle,
string destinationConnectionHash, string replySQS, string replyAgent,
ResponseMessage.ResponseTypes responseType, string message, string messageDataPath=null)
{
var payload = CreateResponsePayload(responseType, message, messageDataPath);
await AWSTools.SendSQSMessageAsync(Config, Convert.ToBase64String(payload),
DBADashAgentIdentifier, replyAgent, handle, replySQS, "REPLY", destinationConnectionHash);
}

// Deserialize the message.
var payloadBin = Convert.FromBase64String(message.Body);
var msg = MessageBase.Deserialize(payloadBin);

/// <summary>
/// Deserialize the message payload and check if it is expired
/// </summary>
/// <param name="message">SQS message</param>
/// <param name="DBADashAgentIdentifier">Target DBA Dash agent for this message</param>
/// <param name="handle">Service broker conversation handle</param>
/// <param name="destinationConnectionHash">Hash of destination connection so we know where the message came from when there are multiple destinations</param>
/// <param name="replySQS">SQS queue to send reply messages to</param>
/// <param name="replyAgent">Identifier for agent to reply messages to</param>
/// <returns>Deserialized MessageBase object. null if message is expired or couldn't be deserialized.</returns>
private async Task<MessageBase> ValidateMessage(Message message, string DBADashAgentIdentifier, Guid handle, string destinationConnectionHash, string replySQS, string replyAgent)
{
MessageBase msg;
try
{
// Acknowledge the message is received before we start processing
Log.Debug($"Received message: {message.MessageId}");
byte[] payload;

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Build (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Security Scanning (csharp)

The variable 'payload' is declared but never used

Check warning on line 329 in DBADash/Messaging/SQSMessageProcessing.cs

View workflow job for this annotation

GitHub Actions / Security Scanning (csharp)

The variable 'payload' is declared but never used
var payloadBin = Convert.FromBase64String(message.Body);
msg = MessageBase.Deserialize(payloadBin);

}
catch(Exception ex)
{
Log.Error(ex,"Message with handle {handle} couldn't be deserialized", handle);
await SendReplyMessage(DBADashAgentIdentifier, handle, destinationConnectionHash, replySQS, replyAgent,
ResponseMessage.ResponseTypes.Failure, "Message couldn't be read");
return null;
}
if (msg.IsExpired) // Check if the message is expired - if so, send a failure message
{
Log.Error("Message with handle {handle} created at {Created} is expired.", handle, msg.Created);

payload = (new ResponseMessage()
{ Type = ResponseMessage.ResponseTypes.Failure, Message = "Message is Expired." }).Serialize();
await AWSTools.SendSQSMessageAsync(Config, Convert.ToBase64String(payload),
DBADashAgentIdentifier, replyAgent, handle, replySQS, "REPLY", destinationConnectionHash);
await SendReplyMessage(DBADashAgentIdentifier, handle, destinationConnectionHash, replySQS, replyAgent,
ResponseMessage.ResponseTypes.Failure, "Message is Expired");

return null;
}
else
{
Log.Information("Processing message with handle {handle}", handle);
// Implementations of MessageBase will process the message and return a DataSet or null
DataSet ds=null;
try
{
ds = await msg.Process(Config, handle);
string messageDataPath = null;
if (ds != null)
{
// DataSet might be large, so write to S3 and send the path in the reply message
var fileName = $"{handle}.message";
Log.Debug("Writing message to S3 {filename}", fileName);
messageDataPath = msg.CollectAgent.S3Path.AppendToUrl(fileName);
await DestinationHandling.WriteS3(ds, msg.CollectAgent.S3Path, fileName, Config);
}
return msg;
}

// Send a reply message to the source agent
payload = (new ResponseMessage()
{ Type = ResponseMessage.ResponseTypes.Success, Message = "Completed", MessageDataPath = messageDataPath }).Serialize();
await AWSTools.SendSQSMessageAsync(Config, Convert.ToBase64String(payload),
DBADashAgentIdentifier, replyAgent, handle, replySQS, "REPLY", destinationConnectionHash);
}
catch(Exception ex)
{
Log.Error(ex, "Error processing message with handle {handle}", handle);
payload = (new ResponseMessage()
{ Type = ResponseMessage.ResponseTypes.Failure, Message = ex.Message }).Serialize();
await AWSTools.SendSQSMessageAsync(Config, Convert.ToBase64String(payload),
DBADashAgentIdentifier, replyAgent, handle, replySQS, "REPLY", destinationConnectionHash);
}

}
}


private static byte[] CreateResponsePayload(ResponseMessage.ResponseTypes responseType, string message, string messageDataPath = null)
{
var responseMessage = new ResponseMessage
{
Type = responseType,
Message = message,
MessageDataPath = messageDataPath
};
return responseMessage.Serialize();
}


/// <summary>
/// Process the reply message from the remote service. Send data back to the SQL repository DB & notification back to GUI via service broker
/// </summary>
Expand Down

0 comments on commit 53a5203

Please sign in to comment.