diff --git a/src/Queues/RavenSMS.Queue.Coravel/Manager/CoravelQueueManager.cs b/src/Queues/RavenSMS.Queue.Coravel/Manager/CoravelQueueManager.cs index 19be53c..0749519 100644 --- a/src/Queues/RavenSMS.Queue.Coravel/Manager/CoravelQueueManager.cs +++ b/src/Queues/RavenSMS.Queue.Coravel/Manager/CoravelQueueManager.cs @@ -6,11 +6,8 @@ public partial class CoravelQueueManager : IQueueManager { /// - public string QueueEvent(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent + public string QueueEvent(TEvent @event) where TEvent : IEvent { - if (cancellationToken.IsCancellationRequested) - cancellationToken.ThrowIfCancellationRequested(); - var queueId = _queue .QueueInvocableWithPayload, ProcessEventInvocablePayload>( ProcessEventInvocablePayload.Create(@event)); @@ -19,11 +16,8 @@ public string QueueEvent(TEvent @event, CancellationToken cancellationTo } /// - public string QueueMessage(RavenSmsMessage message, CancellationToken cancellationToken = default) + public string QueueMessage(RavenSmsMessage message) { - if (cancellationToken.IsCancellationRequested) - cancellationToken.ThrowIfCancellationRequested(); - var queueId = _queue .QueueInvocableWithPayload( ProcessSmsMessageInvocablePayload.Create(message.Id)); @@ -32,11 +26,8 @@ public string QueueMessage(RavenSmsMessage message, CancellationToken cancellati } /// - public string QueueMessage(RavenSmsMessage message, TimeSpan delay, CancellationToken cancellationToken = default) + public string QueueMessage(RavenSmsMessage message, TimeSpan delay) { - if (cancellationToken.IsCancellationRequested) - cancellationToken.ThrowIfCancellationRequested(); - var queueId = _queue .QueueInvocableWithPayload( ProcessSmsMessageInvocablePayload.Create(message.Id, delay)); diff --git a/src/Queues/RavenSMS.Queue.Hangfire/Manager/HangfireQueueManager.cs b/src/Queues/RavenSMS.Queue.Hangfire/Manager/HangfireQueueManager.cs index b26d34b..9fa9e1c 100644 --- a/src/Queues/RavenSMS.Queue.Hangfire/Manager/HangfireQueueManager.cs +++ b/src/Queues/RavenSMS.Queue.Hangfire/Manager/HangfireQueueManager.cs @@ -6,29 +6,14 @@ public partial class HangfireQueueManager : IQueueManager { /// - public string QueueEvent(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent - { - if (cancellationToken.IsCancellationRequested) - cancellationToken.ThrowIfCancellationRequested(); - - return BackgroundJob.Enqueue(publisher => publisher.ProcessAsync(@event)); - } + public string QueueEvent(TEvent @event) where TEvent : IEvent + => BackgroundJob.Enqueue(publisher => publisher.ProcessAsync(@event)); /// - public string QueueMessage(RavenSmsMessage message, CancellationToken cancellationToken = default) - { - if (cancellationToken.IsCancellationRequested) - cancellationToken.ThrowIfCancellationRequested(); - - return BackgroundJob.Enqueue(manager => manager.ProcessAsync(message.Id, default)); - } + public string QueueMessage(RavenSmsMessage message) + => BackgroundJob.Enqueue(manager => manager.ProcessAsync(message.Id, default)); /// - public string QueueMessage(RavenSmsMessage message, TimeSpan delay, CancellationToken cancellationToken = default) - { - if (cancellationToken.IsCancellationRequested) - cancellationToken.ThrowIfCancellationRequested(); - - return BackgroundJob.Schedule(manager => manager.ProcessAsync(message.Id, default), delay); - } + public string QueueMessage(RavenSmsMessage message, TimeSpan delay) + => BackgroundJob.Schedule(manager => manager.ProcessAsync(message.Id, default), delay); } \ No newline at end of file diff --git a/src/RavenSMS/Configurations.cs b/src/RavenSMS/Configurations.cs index 5eeabff..aa76966 100644 --- a/src/RavenSMS/Configurations.cs +++ b/src/RavenSMS/Configurations.cs @@ -56,7 +56,6 @@ public static RavenSmsBuilder UseInMemoryStores(this RavenSmsBuilder builder) public static RavenSmsBuilder UseInMemoryQueue(this RavenSmsBuilder builder) { builder.ServiceCollection.AddSingleton(); - builder.ServiceCollection.AddSingleton(); builder.ServiceCollection.AddHostedService(); return builder; diff --git a/src/RavenSMS/Managers/RavenSmsManager/RavenSmsManager.cs b/src/RavenSMS/Managers/RavenSmsManager/RavenSmsManager.cs index 1d77ced..b7e2603 100644 --- a/src/RavenSMS/Managers/RavenSmsManager/RavenSmsManager.cs +++ b/src/RavenSMS/Managers/RavenSmsManager/RavenSmsManager.cs @@ -62,7 +62,7 @@ public async Task ProcessAsync(string messageId, CancellationToken cancellationT public async Task QueueMessageAsync(RavenSmsMessage message, CancellationToken cancellationToken = default) { // queue the message for future processing - message.JobQueueId = _queueManager.QueueMessage(message, cancellationToken); + message.JobQueueId = _queueManager.QueueMessage(message); message.Status = RavenSmsMessageStatus.Queued; // save the message @@ -82,7 +82,7 @@ public async Task QueueMessageAsync(RavenSmsMessage message, Cancellatio public async Task QueueMessageAsync(RavenSmsMessage message, TimeSpan delay, CancellationToken cancellationToken = default) { // queue the message for future processing - message.JobQueueId = _queueManager.QueueMessage(message, delay, cancellationToken); + message.JobQueueId = _queueManager.QueueMessage(message, delay); message.Status = RavenSmsMessageStatus.Queued; message.DeliverAt = DateTimeOffset.UtcNow.Add(delay); diff --git a/src/RavenSMS/Managers/RavenSmsQueueManager/Default/IInMemoryQueue.cs b/src/RavenSMS/Managers/RavenSmsQueueManager/Default/IInMemoryQueue.cs deleted file mode 100644 index d9ed651..0000000 --- a/src/RavenSMS/Managers/RavenSmsQueueManager/Default/IInMemoryQueue.cs +++ /dev/null @@ -1,23 +0,0 @@ -namespace RavenSMS.Internal.Queues.InMemory; - -/// -/// interface to identify the InMemory queue in DI -/// -public interface IInMemoryQueue -{ - /// - /// put the message in the queue to be processed - /// - /// the messageId - /// the delay if any - /// the queue job id - string EnqueueMessage(string messageId, TimeSpan? delay = null); - - /// - /// put the event in the queue to be processed - /// - /// the event type - /// the event instance - /// the queue job id - string EnqueueEvent(TEvent @event) where TEvent : IEvent; -} \ No newline at end of file diff --git a/src/RavenSMS/Managers/RavenSmsQueueManager/Default/InMemoryQueue.cs b/src/RavenSMS/Managers/RavenSmsQueueManager/Default/InMemoryQueue.cs deleted file mode 100644 index 08d967d..0000000 --- a/src/RavenSMS/Managers/RavenSmsQueueManager/Default/InMemoryQueue.cs +++ /dev/null @@ -1,150 +0,0 @@ -namespace RavenSMS.Internal.Queues.InMemory; - -/// -/// partial part for -/// -public partial class InMemoryQueue : IInMemoryQueue -{ - private readonly ConcurrentDictionary _tokens = new(); - private readonly ConcurrentQueue _tasks = new(); - - private readonly IServiceScopeFactory _scopeFactory; - private int _queueIsConsuming = 0; - private int _tasksRunningCount = 0; - - public bool IsRunning => _queueIsConsuming > 0; - - public InMemoryQueue(IServiceScopeFactory scopeFactory) - { - _scopeFactory = scopeFactory; - } - - /// - public string EnqueueEvent(TEvent @event) where TEvent : IEvent - { - // create the job - var job = new InMemoryJob(async () => - { - // create the service scope - using var scope = _scopeFactory.CreateScope(); - - // get the event publisher to process the event - var publisher = scope.ServiceProvider.GetService(); - if (publisher is null) - throw new RavenSmsException($"RavenSMS is not registered add AddRavenSMS() to your service collection"); - - // process the message - await publisher.ProcessAsync(@event); - }); - - // queue the job - _tasks.Enqueue(job); - - return job.Id.ToString(); - } - - /// - public string EnqueueMessage(string messageId, TimeSpan? delay = null) - { - // create the job - var job = new InMemoryJob(async () => - { - // if we have a delay we need to wait - if (delay.HasValue) - await Task.Delay(delay.Value); - - // create the service scope - using var scope = _scopeFactory.CreateScope(); - - // get the manager to process the message - var manager = scope.ServiceProvider.GetService(); - if (manager is null) - throw new RavenSmsException($"RavenSMS is not registered add AddRavenSMS() to your service collection"); - - // process the message - await manager.ProcessAsync(messageId); - }); - - // queue the job - _tasks.Enqueue(job); - - return job.Id.ToString(); - } - - public async Task ConsumeQueueAsync() - { - try - { - Interlocked.Increment(ref _queueIsConsuming); - - var dequeuedTasks = DequeueAllTasks(); - var dequeuedGuids = dequeuedTasks.Select(job => job.Id); - - await Task.WhenAll( - dequeuedTasks - .Select(InvokeTask) - .ToArray() - ); - - CleanTokens(dequeuedGuids); - } - finally - { - Interlocked.Decrement(ref _queueIsConsuming); - } - } - - public async Task ConsumeQueueOnShutdown() - { - foreach (var cancellatinTokens in _tokens.Values) - { - if (!cancellatinTokens.IsCancellationRequested) - { - cancellatinTokens.Cancel(); - } - } - - await ConsumeQueueAsync(); - } - - private void CleanTokens(IEnumerable guidsForTokensToClean) - { - foreach (var guid in guidsForTokensToClean) - { - if (_tokens.TryRemove(guid, out var token)) - { - token.Dispose(); - } - } - } - - private List DequeueAllTasks() - { - List dequeuedTasks = new(_tasks.Count); - - while (_tasks.TryPeek(out var _)) - { - _tasks.TryDequeue(out var dequeuedTask); - - if (dequeuedTask is not null) - { - dequeuedTasks.Add(dequeuedTask); - } - } - - return dequeuedTasks; - } - - private async Task InvokeTask(InMemoryJob task) - { - try - { - Interlocked.Increment(ref _tasksRunningCount); - await task.Invoke(); - } - finally - { - Interlocked.Decrement(ref _tasksRunningCount); - } - } -} \ No newline at end of file diff --git a/src/RavenSMS/Managers/RavenSmsQueueManager/Default/InMemoryQueueHost.cs b/src/RavenSMS/Managers/RavenSmsQueueManager/Default/InMemoryQueueHost.cs index 33e07f3..6d573fc 100644 --- a/src/RavenSMS/Managers/RavenSmsQueueManager/Default/InMemoryQueueHost.cs +++ b/src/RavenSMS/Managers/RavenSmsQueueManager/Default/InMemoryQueueHost.cs @@ -44,18 +44,6 @@ public void Dispose() Dispose(true); GC.SuppressFinalize(this); } - - /// - protected virtual void Dispose(bool disposing) - { - if (!_disposed) - { - if (disposing) - _timer?.Dispose(); - - _disposed = true; - } - } } /// @@ -66,13 +54,13 @@ internal partial class InMemoryQueueHost : IHostedService, IDisposable private bool _disposed; private Timer? _timer; - private readonly InMemoryQueue _queue; + private readonly InMemoryQueueManager _queue; private readonly SemaphoreSlim _signal = new(0); private readonly CancellationTokenSource _shutdown = new(); - public InMemoryQueueHost(IInMemoryQueue queue) + public InMemoryQueueHost(IQueueManager queue) { - _queue = queue as InMemoryQueue + _queue = queue as InMemoryQueueManager ?? throw new RavenSmsException("the inMemory Queue is not registered, call UseInMemoryQueue() on RavenSmsBuilder"); } @@ -95,4 +83,16 @@ private void InitTimer() dueTime: TimeSpan.Zero, period: TimeSpan.FromSeconds(5)); } + + /// + protected virtual void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + _timer?.Dispose(); + + _disposed = true; + } + } } \ No newline at end of file diff --git a/src/RavenSMS/Managers/RavenSmsQueueManager/Default/InMemoryQueueManager.cs b/src/RavenSMS/Managers/RavenSmsQueueManager/Default/InMemoryQueueManager.cs index 8841a7c..42f9cd6 100644 --- a/src/RavenSMS/Managers/RavenSmsQueueManager/Default/InMemoryQueueManager.cs +++ b/src/RavenSMS/Managers/RavenSmsQueueManager/Default/InMemoryQueueManager.cs @@ -5,21 +5,157 @@ /// public partial class InMemoryQueueManager : IQueueManager { - private readonly IInMemoryQueue _queue; - - public InMemoryQueueManager(IInMemoryQueue queue) - => _queue = queue; - /// - public string QueueEvent(TEvent @event, CancellationToken cancellationToken = default) + public string QueueEvent(TEvent @event) where TEvent : IEvent - => _queue.EnqueueEvent(@event); + { + // create the job + var job = new InMemoryJob(async () => + { + // create the service scope + using var scope = _scopeFactory.CreateScope(); + + // get the event publisher to process the event + var publisher = scope.ServiceProvider.GetService(); + if (publisher is null) + throw new RavenSmsException($"RavenSMS is not registered add AddRavenSMS() to your service collection"); + + // process the message + await publisher.ProcessAsync(@event); + }); + + // queue the job + _jobs.Enqueue(job); + + return job.Id.ToString(); + } /// - public string QueueMessage(RavenSmsMessage message, CancellationToken cancellationToken = default) - => _queue.EnqueueMessage(message.Id); + public string QueueMessage(RavenSmsMessage message) + => QueueMessage(message, TimeSpan.Zero); /// - public string QueueMessage(RavenSmsMessage message, TimeSpan delay, CancellationToken cancellationToken = default) - => _queue.EnqueueMessage(message.Id, delay); + public string QueueMessage(RavenSmsMessage message, TimeSpan delay) + { + var messageId = message.Id; + + // create the job + var job = new InMemoryJob(async () => + { + // if we have a delay we need to wait + if (delay != TimeSpan.Zero) + await Task.Delay(delay); + + // create the service scope + using var scope = _scopeFactory.CreateScope(); + + // get the manager to process the message + var manager = scope.ServiceProvider.GetService(); + if (manager is null) + throw new RavenSmsException($"RavenSMS is not registered add AddRavenSMS() to your service collection"); + + // process the message + await manager.ProcessAsync(messageId); + }); + + // queue the job + _jobs.Enqueue(job); + + return job.Id.ToString(); + } } + +/// +/// partial part for +/// +public partial class InMemoryQueueManager +{ + private int _queueIsConsuming = 0; + private int _tasksRunningCount = 0; + + private readonly IServiceScopeFactory _scopeFactory; + private readonly ConcurrentQueue _jobs = new(); + private readonly ConcurrentDictionary _tokens = new(); + + public bool IsRunning => _queueIsConsuming > 0; + + public InMemoryQueueManager(IServiceScopeFactory scopeFactory) + => _scopeFactory = scopeFactory; + + public async Task ConsumeQueueAsync() + { + try + { + Interlocked.Increment(ref _queueIsConsuming); + + var dequeuedTasks = DequeueAllTasks(); + var dequeuedGuids = dequeuedTasks.Select(job => job.Id); + + await Task.WhenAll( + dequeuedTasks + .Select(InvokeTask) + .ToArray() + ); + + CleanTokens(dequeuedGuids); + } + finally + { + Interlocked.Decrement(ref _queueIsConsuming); + } + } + + public async Task ConsumeQueueOnShutdown() + { + foreach (var cancellatinTokens in _tokens.Values) + { + if (!cancellatinTokens.IsCancellationRequested) + { + cancellatinTokens.Cancel(); + } + } + + await ConsumeQueueAsync(); + } + + public void CleanTokens(IEnumerable guidsForTokensToClean) + { + foreach (var guid in guidsForTokensToClean) + { + if (_tokens.TryRemove(guid, out var token)) + { + token.Dispose(); + } + } + } + + private List DequeueAllTasks() + { + List dequeuedTasks = new(_jobs.Count); + + while (_jobs.TryPeek(out var _)) + { + _jobs.TryDequeue(out var dequeuedTask); + + if (dequeuedTask is not null) + { + dequeuedTasks.Add(dequeuedTask); + } + } + + return dequeuedTasks; + } + + private async Task InvokeTask(InMemoryJob task) + { + try + { + Interlocked.Increment(ref _tasksRunningCount); + await task.Invoke(); + } + finally + { + Interlocked.Decrement(ref _tasksRunningCount); + } + } +} \ No newline at end of file diff --git a/src/RavenSMS/Managers/RavenSmsQueueManager/IQueueManager.cs b/src/RavenSMS/Managers/RavenSmsQueueManager/IQueueManager.cs index 69c5f8b..e88a431 100644 --- a/src/RavenSMS/Managers/RavenSmsQueueManager/IQueueManager.cs +++ b/src/RavenSMS/Managers/RavenSmsQueueManager/IQueueManager.cs @@ -14,7 +14,7 @@ public interface IQueueManager /// A task that represents the asynchronous operation. The task result contains the if the background job Id generated the by the underlying queue manager. /// /// If the System.Threading.CancellationToken is canceled. - string QueueMessage(RavenSmsMessage message, CancellationToken cancellationToken = default); + string QueueMessage(RavenSmsMessage message); /// /// queue the message for processing with a delay @@ -26,7 +26,7 @@ public interface IQueueManager /// A task that represents the asynchronous operation. The task result contains the if the background job Id generated the by the underlying queue manager. /// /// If the System.Threading.CancellationToken is canceled. - string QueueMessage(RavenSmsMessage message, TimeSpan delay, CancellationToken cancellationToken = default); + string QueueMessage(RavenSmsMessage message, TimeSpan delay); /// /// queue the event for processing @@ -38,5 +38,5 @@ public interface IQueueManager /// A task that represents the asynchronous operation. The task result contains the if the background job Id generated the by the underlying queue manager. /// /// If the System.Threading.CancellationToken is canceled. - string QueueEvent(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent; + string QueueEvent(TEvent @event) where TEvent : IEvent; } \ No newline at end of file