Skip to content

Commit

Permalink
simplified in memory queue
Browse files Browse the repository at this point in the history
  • Loading branch information
YoussefSell committed Jul 31, 2022
1 parent ff9d005 commit fb285d7
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 238 deletions.
15 changes: 3 additions & 12 deletions src/Queues/RavenSMS.Queue.Coravel/Manager/CoravelQueueManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@
public partial class CoravelQueueManager : IQueueManager
{
/// <inheritdoc/>
public string QueueEvent<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent
public string QueueEvent<TEvent>(TEvent @event) where TEvent : IEvent
{
if (cancellationToken.IsCancellationRequested)
cancellationToken.ThrowIfCancellationRequested();

var queueId = _queue
.QueueInvocableWithPayload<ProcessEventInvocable<TEvent>, ProcessEventInvocablePayload<TEvent>>(
ProcessEventInvocablePayload<TEvent>.Create(@event));
Expand All @@ -19,11 +16,8 @@ public string QueueEvent<TEvent>(TEvent @event, CancellationToken cancellationTo
}

/// <inheritdoc/>
public string QueueMessage(RavenSmsMessage message, CancellationToken cancellationToken = default)
public string QueueMessage(RavenSmsMessage message)
{
if (cancellationToken.IsCancellationRequested)
cancellationToken.ThrowIfCancellationRequested();

var queueId = _queue
.QueueInvocableWithPayload<ProcessSmsMessageInvocable, ProcessSmsMessageInvocablePayload>(
ProcessSmsMessageInvocablePayload.Create(message.Id));
Expand All @@ -32,11 +26,8 @@ public string QueueMessage(RavenSmsMessage message, CancellationToken cancellati
}

/// <inheritdoc/>
public string QueueMessage(RavenSmsMessage message, TimeSpan delay, CancellationToken cancellationToken = default)
public string QueueMessage(RavenSmsMessage message, TimeSpan delay)
{
if (cancellationToken.IsCancellationRequested)
cancellationToken.ThrowIfCancellationRequested();

var queueId = _queue
.QueueInvocableWithPayload<ProcessSmsMessageInvocable, ProcessSmsMessageInvocablePayload>(
ProcessSmsMessageInvocablePayload.Create(message.Id, delay));
Expand Down
27 changes: 6 additions & 21 deletions src/Queues/RavenSMS.Queue.Hangfire/Manager/HangfireQueueManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,14 @@
public partial class HangfireQueueManager : IQueueManager
{
/// <inheritdoc/>
public string QueueEvent<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent
{
if (cancellationToken.IsCancellationRequested)
cancellationToken.ThrowIfCancellationRequested();

return BackgroundJob.Enqueue<EventsPublisher>(publisher => publisher.ProcessAsync(@event));
}
public string QueueEvent<TEvent>(TEvent @event) where TEvent : IEvent
=> BackgroundJob.Enqueue<EventsPublisher>(publisher => publisher.ProcessAsync(@event));

/// <inheritdoc/>
public string QueueMessage(RavenSmsMessage message, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
cancellationToken.ThrowIfCancellationRequested();

return BackgroundJob.Enqueue<IRavenSmsManager>(manager => manager.ProcessAsync(message.Id, default));
}
public string QueueMessage(RavenSmsMessage message)
=> BackgroundJob.Enqueue<IRavenSmsManager>(manager => manager.ProcessAsync(message.Id, default));

/// <inheritdoc/>
public string QueueMessage(RavenSmsMessage message, TimeSpan delay, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
cancellationToken.ThrowIfCancellationRequested();

return BackgroundJob.Schedule<IRavenSmsManager>(manager => manager.ProcessAsync(message.Id, default), delay);
}
public string QueueMessage(RavenSmsMessage message, TimeSpan delay)
=> BackgroundJob.Schedule<IRavenSmsManager>(manager => manager.ProcessAsync(message.Id, default), delay);
}
1 change: 0 additions & 1 deletion src/RavenSMS/Configurations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public static RavenSmsBuilder UseInMemoryStores(this RavenSmsBuilder builder)
public static RavenSmsBuilder UseInMemoryQueue(this RavenSmsBuilder builder)
{
builder.ServiceCollection.AddSingleton<IQueueManager, InMemoryQueueManager>();
builder.ServiceCollection.AddSingleton<IInMemoryQueue, InMemoryQueue>();
builder.ServiceCollection.AddHostedService<InMemoryQueueHost>();

return builder;
Expand Down
4 changes: 2 additions & 2 deletions src/RavenSMS/Managers/RavenSmsManager/RavenSmsManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public async Task ProcessAsync(string messageId, CancellationToken cancellationT
public async Task<Result> QueueMessageAsync(RavenSmsMessage message, CancellationToken cancellationToken = default)
{
// queue the message for future processing
message.JobQueueId = _queueManager.QueueMessage(message, cancellationToken);
message.JobQueueId = _queueManager.QueueMessage(message);
message.Status = RavenSmsMessageStatus.Queued;

// save the message
Expand All @@ -82,7 +82,7 @@ public async Task<Result> QueueMessageAsync(RavenSmsMessage message, Cancellatio
public async Task<Result> QueueMessageAsync(RavenSmsMessage message, TimeSpan delay, CancellationToken cancellationToken = default)
{
// queue the message for future processing
message.JobQueueId = _queueManager.QueueMessage(message, delay, cancellationToken);
message.JobQueueId = _queueManager.QueueMessage(message, delay);
message.Status = RavenSmsMessageStatus.Queued;
message.DeliverAt = DateTimeOffset.UtcNow.Add(delay);

Expand Down

This file was deleted.

150 changes: 0 additions & 150 deletions src/RavenSMS/Managers/RavenSmsQueueManager/Default/InMemoryQueue.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,6 @@ public void Dispose()
Dispose(true);
GC.SuppressFinalize(this);
}

/// <inheritdoc/>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
_timer?.Dispose();

_disposed = true;
}
}
}

/// <summary>
Expand All @@ -66,13 +54,13 @@ internal partial class InMemoryQueueHost : IHostedService, IDisposable
private bool _disposed;

private Timer? _timer;
private readonly InMemoryQueue _queue;
private readonly InMemoryQueueManager _queue;
private readonly SemaphoreSlim _signal = new(0);
private readonly CancellationTokenSource _shutdown = new();

public InMemoryQueueHost(IInMemoryQueue queue)
public InMemoryQueueHost(IQueueManager queue)
{
_queue = queue as InMemoryQueue
_queue = queue as InMemoryQueueManager
?? throw new RavenSmsException("the inMemory Queue is not registered, call UseInMemoryQueue() on RavenSmsBuilder");
}

Expand All @@ -95,4 +83,16 @@ private void InitTimer()
dueTime: TimeSpan.Zero,
period: TimeSpan.FromSeconds(5));
}

/// <inheritdoc/>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
_timer?.Dispose();

_disposed = true;
}
}
}
Loading

0 comments on commit fb285d7

Please sign in to comment.