diff --git a/Sally7.Tests/RequestExecutor/JobPoolTests.cs b/Sally7.Tests/RequestExecutor/JobPoolTests.cs index 0d6e1ff..6ced3ef 100644 --- a/Sally7.Tests/RequestExecutor/JobPoolTests.cs +++ b/Sally7.Tests/RequestExecutor/JobPoolTests.cs @@ -30,4 +30,19 @@ public void ReturnJobId_Does_Not_Throw_If_Disposed() // Assert sut.ReturnJobId(jobId); } + + [Fact] + public void Dispose_Calls_Dispose_On_Requests() + { + // Arrange + var sut = new JobPool(1); + var jobId = sut.RentJobIdAsync(CancellationToken.None).Result; + var request = sut.GetRequest(jobId); + + // Act + sut.Dispose(); + + // Assert + Should.Throw(() => request.GetResult()); + } } \ No newline at end of file diff --git a/Sally7.Tests/RequestExecutor/RequestTests.cs b/Sally7.Tests/RequestExecutor/RequestTests.cs new file mode 100644 index 0000000..e649ddf --- /dev/null +++ b/Sally7.Tests/RequestExecutor/RequestTests.cs @@ -0,0 +1,35 @@ +using FakeItEasy; +using Sally7.RequestExecutor; + +namespace Sally7.Tests.RequestExecutor; + +public class RequestTests +{ + [Fact] + public void Completes_On_Dispose() + { + // Arrange + var sut = new Request(); + var callback = A.Fake(); + sut.OnCompleted(callback); + + // Act + sut.Dispose(); + + // Assert + A.CallTo(() => callback.Invoke()).MustHaveHappenedOnceExactly(); + } + + [Fact] + public async Task Throws_When_Awaited_After_Dispose() + { + // Arrange + var sut = new Request(); + + // Act + sut.Dispose(); + + // Assert + await Should.ThrowAsync(async () => await sut); + } +} \ No newline at end of file diff --git a/Sally7.Tests/RequestExecutor/SignalTests.cs b/Sally7.Tests/RequestExecutor/SignalTests.cs new file mode 100644 index 0000000..7517562 --- /dev/null +++ b/Sally7.Tests/RequestExecutor/SignalTests.cs @@ -0,0 +1,19 @@ +using System.Threading.Channels; +using Sally7.RequestExecutor; + +namespace Sally7.Tests.RequestExecutor; + +public class SignalTests +{ + [Fact] + public async Task WaitAsync_Throws_If_Disposed() + { + // Arrange + var sut = new Signal(); + sut.Dispose(); + + // Act + // Assert + await Should.ThrowAsync(() => sut.WaitAsync(CancellationToken.None).AsTask()); + } +} \ No newline at end of file diff --git a/Sally7/Internal/DisposableHelper.cs b/Sally7/Internal/DisposableHelper.cs new file mode 100644 index 0000000..b2beaa1 --- /dev/null +++ b/Sally7/Internal/DisposableHelper.cs @@ -0,0 +1,17 @@ +using System; + +namespace Sally7.Internal; + +internal static class DisposableHelper +{ + public static void ThrowIf(bool condition, object instance) + { +#if NET7_OR_GREATER + ObjectDisposedException.ThrowIf(condition, instance); +#else + void Throw() => throw new ObjectDisposedException(instance.GetType().FullName); + + if (condition) Throw(); +#endif + } +} \ No newline at end of file diff --git a/Sally7/RequestExecutor/ConcurrentRequestExecutor.cs b/Sally7/RequestExecutor/ConcurrentRequestExecutor.cs index 44772ce..e2e3b68 100644 --- a/Sally7/RequestExecutor/ConcurrentRequestExecutor.cs +++ b/Sally7/RequestExecutor/ConcurrentRequestExecutor.cs @@ -2,10 +2,8 @@ using System.Buffers; using System.Diagnostics; using System.Net.Sockets; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; using System.Threading; -using System.Threading.Channels; using System.Threading.Tasks; using Sally7.Internal; @@ -187,22 +185,5 @@ public async ValueTask> PerformRequest(ReadOnlyMemory request _jobPool.ReturnJobId(jobId); } } - - [DebuggerNonUserCode] - [DebuggerDisplay(nameof(NeedToWait) + ": {" + nameof(NeedToWait) + ",nq}")] - private class Signal : IDisposable - { - private readonly Channel _channel = Channel.CreateBounded(1); - - public void Dispose() => _channel.Writer.Complete(); - - public bool TryInit() => _channel.Writer.TryWrite(0); - - public ValueTask WaitAsync(CancellationToken cancellationToken) => _channel.Reader.ReadAsync(cancellationToken); - - public bool TryRelease() => _channel.Writer.TryWrite(0); - - private bool NeedToWait => _channel.Reader.Count == 0; - } } } diff --git a/Sally7/RequestExecutor/JobPool.cs b/Sally7/RequestExecutor/JobPool.cs index d4716f1..6fd2dd1 100644 --- a/Sally7/RequestExecutor/JobPool.cs +++ b/Sally7/RequestExecutor/JobPool.cs @@ -3,6 +3,7 @@ using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; +using Sally7.Internal; namespace Sally7.RequestExecutor; @@ -32,6 +33,11 @@ public void Dispose() { Volatile.Write(ref _disposed, true); _jobIdPool.Writer.Complete(); + + foreach (var request in _requests) + { + request.Dispose(); + } } public ValueTask RentJobIdAsync(CancellationToken cancellationToken) => _jobIdPool.Reader.ReadAsync(cancellationToken); @@ -45,7 +51,12 @@ public void ReturnJobId(int jobId) } [DebuggerNonUserCode] - public Request GetRequest(int jobId) => _requests[jobId - 1]; + public Request GetRequest(int jobId) + { + DisposableHelper.ThrowIf(Volatile.Read(ref _disposed), this); + + return _requests[jobId - 1]; + } public void SetBufferForRequest(int jobId, Memory buffer) { diff --git a/Sally7/RequestExecutor/Request.cs b/Sally7/RequestExecutor/Request.cs index 275b876..485cc68 100644 --- a/Sally7/RequestExecutor/Request.cs +++ b/Sally7/RequestExecutor/Request.cs @@ -1,15 +1,18 @@ using System; using System.Runtime.CompilerServices; using System.Threading; +using Sally7.Internal; namespace Sally7.RequestExecutor; -internal class Request : INotifyCompletion +internal class Request : INotifyCompletion, IDisposable { private static readonly Action Sentinel = () => { }; private Memory _buffer; + private bool _disposed; + public bool IsCompleted { get; private set; } private int _length; private Action? _continuation = Sentinel; @@ -21,12 +24,13 @@ public void Complete(int length) this._length = length; IsCompleted = true; - var prev = _continuation ?? Interlocked.CompareExchange(ref _continuation, Sentinel, null); - prev?.Invoke(); + InvokeContinuation(); } public Memory GetResult() { + DisposableHelper.ThrowIf(_disposed, this); + return _buffer.Slice(0, _length); } @@ -44,11 +48,23 @@ public void OnCompleted(Action continuation) public void Reset() { _continuation = null; - IsCompleted = false; + IsCompleted = _disposed; } public void SetBuffer(Memory buffer) { this._buffer = buffer; } + + public void Dispose() + { + _disposed = true; + InvokeContinuation(); + } + + private void InvokeContinuation() + { + var prev = _continuation ?? Interlocked.CompareExchange(ref _continuation, Sentinel, null); + prev?.Invoke(); + } } \ No newline at end of file diff --git a/Sally7/RequestExecutor/Signal.cs b/Sally7/RequestExecutor/Signal.cs new file mode 100644 index 0000000..81ee855 --- /dev/null +++ b/Sally7/RequestExecutor/Signal.cs @@ -0,0 +1,26 @@ +using System; +using System.Diagnostics; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace Sally7.RequestExecutor +{ + [DebuggerNonUserCode] + [DebuggerDisplay(nameof(NeedToWait) + ": {" + nameof(NeedToWait) + ",nq}")] + internal class Signal : IDisposable + { + private readonly Channel channel = Channel.CreateBounded(1); + + public void Dispose() => channel.Writer.Complete(); + + public bool TryInit() => channel.Writer.TryWrite(0); + + public ValueTask WaitAsync(CancellationToken cancellationToken) => + channel.Reader.ReadAsync(cancellationToken); + + public bool TryRelease() => channel.Writer.TryWrite(0); + + private bool NeedToWait => channel.Reader.Count == 0; + } +}