diff --git a/Src/Coravel/Queuing/Interfaces/IQueue.cs b/Src/Coravel/Queuing/Interfaces/IQueue.cs
index a57dd74c..0d35026d 100644
--- a/Src/Coravel/Queuing/Interfaces/IQueue.cs
+++ b/Src/Coravel/Queuing/Interfaces/IQueue.cs
@@ -51,5 +51,12 @@ public interface IQueue
///
///
QueueMetrics GetMetrics();
+
+ ///
+ /// Try to cancel an invocable by its token id return from QueueInvocableWithPayload
+ ///
+ /// Token id return from QueueInvocableWithPayload
+ ///
+ bool TryCancelInvocable(Guid tokenId);
}
}
\ No newline at end of file
diff --git a/Src/Coravel/Queuing/Queue.cs b/Src/Coravel/Queuing/Queue.cs
index feaf88d2..ffc4ab0e 100644
--- a/Src/Coravel/Queuing/Queue.cs
+++ b/Src/Coravel/Queuing/Queue.cs
@@ -47,10 +47,16 @@ public Guid QueueInvocable() where T : IInvocable
public Guid QueueInvocableWithPayload(TParams payload) where T : IInvocable, IInvocableWithPayload
{
+ var tokenSource = new CancellationTokenSource();
var job = this.EnqueueInvocable(invocable => {
- IInvocableWithPayload invocableWithParams = (IInvocableWithPayload) invocable;
+ var invocableWithParams = (IInvocableWithPayload) invocable;
invocableWithParams.Payload = payload;
+ if (invocableWithParams is ICancellableTask task)
+ {
+ task.Token = tokenSource.Token;
+ }
});
+ this._tokens.TryAdd(job.Guid, tokenSource);
return job.Guid;
}
@@ -132,6 +138,14 @@ public QueueMetrics GetMetrics()
return new QueueMetrics(this._tasksRunningCount, waitingCount);
}
+ public bool TryCancelInvocable(Guid tokenId)
+ {
+ if (!_tokens.TryGetValue(tokenId, out var tokenNeedCancel)) return false; // token does not exist
+ if (tokenNeedCancel.IsCancellationRequested) return false; // token is already canceled
+ tokenNeedCancel.Cancel();
+ return true;
+ }
+
private void CancelAllTokens()
{
foreach(var kv in this._tokens.AsEnumerable())
diff --git a/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs b/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs
index 0dd98247..8a9d6e19 100644
--- a/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs
+++ b/Src/UnitTests/CoravelUnitTests/Queuing/CancellableInvocableForQueueTests.cs
@@ -12,6 +12,61 @@ namespace UnitTests.Queuing
{
public class CancellableInvocableForQueueTests
{
+ [Fact]
+ public async Task CanCancelSpecificInvocableWithPayload()
+ {
+ // init
+ var services = new ServiceCollection();
+ services.AddTransient();
+ services.AddTransient();
+ var provider = services.BuildServiceProvider();
+ var queue = new Queue(provider.GetRequiredService(), new DispatcherStub());
+
+ //exec
+ var payload = new TestPayload()
+ {
+ Code = "test"
+ };
+ var firstItem = queue.QueueInvocableWithPayload(payload);
+ var secondItem = queue.QueueInvocableWithPayload(payload);
+ var thirdItem = queue.QueueInvocableWithPayload(payload);
+
+ var cancelResultFirstItem = queue.TryCancelInvocable(firstItem);
+ var cancelResultThirdItem = queue.TryCancelInvocable(thirdItem);
+
+ //assert
+ TestCancellableInvocableWithPayload.TokensCancelled = 0;
+ await queue.ConsumeQueueAsync();
+ Assert.True(cancelResultFirstItem);
+ Assert.True(cancelResultThirdItem);
+ Assert.Equal(2, TestCancellableInvocableWithPayload.TokensCancelled);
+ }
+
+ [Fact]
+ public async Task CanCancelInvocableWithPayloadOnQueueShutdown()
+ {
+ // init
+ var services = new ServiceCollection();
+ services.AddTransient();
+ services.AddTransient();
+ var provider = services.BuildServiceProvider();
+ var queue = new Queue(provider.GetRequiredService(), new DispatcherStub());
+
+ //exec
+ var payload = new TestPayload()
+ {
+ Code = "test"
+ };
+ var firstItem = queue.QueueInvocableWithPayload(payload);
+ var secondItem = queue.QueueInvocableWithPayload(payload);
+ var thirdItem = queue.QueueInvocableWithPayload(payload);
+
+ // assert
+ TestCancellableInvocableWithPayload.TokensCancelled = 0;
+ await queue.ConsumeQueueOnShutdown();
+ Assert.Equal(3, TestCancellableInvocableWithPayload.TokensCancelled);
+ }
+
[Fact]
public async Task CanCancelInvocable()
{
@@ -82,9 +137,35 @@ public Task Invoke()
{
Interlocked.Increment(ref TokensCancelled);
}
+ return Task.CompletedTask;
+ }
+ }
+
+ private class TestCancellableInvocableWithPayload : IInvocable, IInvocableWithPayload, ICancellableTask
+ {
+ ///
+ /// Static fields keeps track of all cancelled tokens count.
+ ///
+ public static int TokensCancelled = 0;
+ public TestPayload Payload { get; set; }
+ public TestCancellableInvocableWithPayload() {}
+
+ public CancellationToken Token { get; set; }
+ public Task Invoke()
+ {
+ if(this.Token.IsCancellationRequested)
+ {
+ Interlocked.Increment(ref TokensCancelled);
+ }
return Task.CompletedTask;
}
+
+ }
+
+ private class TestPayload
+ {
+ public string Code { get; set; }
}
}
}
\ No newline at end of file