Skip to content

Commit

Permalink
Merge pull request #53 from mycroes/hotfix/dispose
Browse files Browse the repository at this point in the history
Dispose requests on JobPool Dispose
  • Loading branch information
mycroes authored Mar 13, 2024
2 parents f2caba4 + a79f5fd commit a1606da
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 24 deletions.
15 changes: 15 additions & 0 deletions Sally7.Tests/RequestExecutor/JobPoolTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,19 @@ public void ReturnJobId_Does_Not_Throw_If_Disposed()
// Assert
sut.ReturnJobId(jobId);
}

[Fact]
public void Dispose_Calls_Dispose_On_Requests()
{
// Arrange
var sut = new JobPool(1);
var jobId = sut.RentJobIdAsync(CancellationToken.None).Result;
var request = sut.GetRequest(jobId);

// Act
sut.Dispose();

// Assert
Should.Throw<ObjectDisposedException>(() => request.GetResult());
}
}
35 changes: 35 additions & 0 deletions Sally7.Tests/RequestExecutor/RequestTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using FakeItEasy;
using Sally7.RequestExecutor;

namespace Sally7.Tests.RequestExecutor;

public class RequestTests
{
[Fact]
public void Completes_On_Dispose()
{
// Arrange
var sut = new Request();
var callback = A.Fake<Action>();
sut.OnCompleted(callback);

// Act
sut.Dispose();

// Assert
A.CallTo(() => callback.Invoke()).MustHaveHappenedOnceExactly();
}

[Fact]
public async Task Throws_When_Awaited_After_Dispose()
{
// Arrange
var sut = new Request();

// Act
sut.Dispose();

// Assert
await Should.ThrowAsync<ObjectDisposedException>(async () => await sut);
}
}
19 changes: 19 additions & 0 deletions Sally7.Tests/RequestExecutor/SignalTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Threading.Channels;
using Sally7.RequestExecutor;

namespace Sally7.Tests.RequestExecutor;

public class SignalTests
{
[Fact]
public async Task WaitAsync_Throws_If_Disposed()
{
// Arrange
var sut = new Signal();
sut.Dispose();

// Act
// Assert
await Should.ThrowAsync<ChannelClosedException>(() => sut.WaitAsync(CancellationToken.None).AsTask());
}
}
17 changes: 17 additions & 0 deletions Sally7/Internal/DisposableHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System;

namespace Sally7.Internal;

internal static class DisposableHelper
{
public static void ThrowIf(bool condition, object instance)
{
#if NET7_OR_GREATER
ObjectDisposedException.ThrowIf(condition, instance);
#else
void Throw() => throw new ObjectDisposedException(instance.GetType().FullName);

if (condition) Throw();
#endif
}
}
19 changes: 0 additions & 19 deletions Sally7/RequestExecutor/ConcurrentRequestExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
using System.Buffers;
using System.Diagnostics;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Sally7.Internal;

Expand Down Expand Up @@ -187,22 +185,5 @@ public async ValueTask<Memory<byte>> PerformRequest(ReadOnlyMemory<byte> request
_jobPool.ReturnJobId(jobId);
}
}

[DebuggerNonUserCode]
[DebuggerDisplay(nameof(NeedToWait) + ": {" + nameof(NeedToWait) + ",nq}")]
private class Signal : IDisposable
{
private readonly Channel<int> _channel = Channel.CreateBounded<int>(1);

public void Dispose() => _channel.Writer.Complete();

public bool TryInit() => _channel.Writer.TryWrite(0);

public ValueTask<int> WaitAsync(CancellationToken cancellationToken) => _channel.Reader.ReadAsync(cancellationToken);

public bool TryRelease() => _channel.Writer.TryWrite(0);

private bool NeedToWait => _channel.Reader.Count == 0;
}
}
}
13 changes: 12 additions & 1 deletion Sally7/RequestExecutor/JobPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Sally7.Internal;

namespace Sally7.RequestExecutor;

Expand Down Expand Up @@ -32,6 +33,11 @@ public void Dispose()
{
Volatile.Write(ref _disposed, true);
_jobIdPool.Writer.Complete();

foreach (var request in _requests)
{
request.Dispose();
}
}

public ValueTask<int> RentJobIdAsync(CancellationToken cancellationToken) => _jobIdPool.Reader.ReadAsync(cancellationToken);
Expand All @@ -45,7 +51,12 @@ public void ReturnJobId(int jobId)
}

[DebuggerNonUserCode]
public Request GetRequest(int jobId) => _requests[jobId - 1];
public Request GetRequest(int jobId)
{
DisposableHelper.ThrowIf(Volatile.Read(ref _disposed), this);

return _requests[jobId - 1];
}

public void SetBufferForRequest(int jobId, Memory<byte> buffer)
{
Expand Down
24 changes: 20 additions & 4 deletions Sally7/RequestExecutor/Request.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using Sally7.Internal;

namespace Sally7.RequestExecutor;

internal class Request : INotifyCompletion
internal class Request : INotifyCompletion, IDisposable
{
private static readonly Action Sentinel = () => { };

private Memory<byte> _buffer;

private bool _disposed;

public bool IsCompleted { get; private set; }
private int _length;
private Action? _continuation = Sentinel;
Expand All @@ -21,12 +24,13 @@ public void Complete(int length)
this._length = length;
IsCompleted = true;

var prev = _continuation ?? Interlocked.CompareExchange(ref _continuation, Sentinel, null);
prev?.Invoke();
InvokeContinuation();
}

public Memory<byte> GetResult()
{
DisposableHelper.ThrowIf(_disposed, this);

return _buffer.Slice(0, _length);
}

Expand All @@ -44,11 +48,23 @@ public void OnCompleted(Action continuation)
public void Reset()
{
_continuation = null;
IsCompleted = false;
IsCompleted = _disposed;
}

public void SetBuffer(Memory<byte> buffer)
{
this._buffer = buffer;
}

public void Dispose()
{
_disposed = true;
InvokeContinuation();
}

private void InvokeContinuation()
{
var prev = _continuation ?? Interlocked.CompareExchange(ref _continuation, Sentinel, null);
prev?.Invoke();
}
}
26 changes: 26 additions & 0 deletions Sally7/RequestExecutor/Signal.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Sally7.RequestExecutor
{
[DebuggerNonUserCode]
[DebuggerDisplay(nameof(NeedToWait) + ": {" + nameof(NeedToWait) + ",nq}")]
internal class Signal : IDisposable
{
private readonly Channel<int> channel = Channel.CreateBounded<int>(1);

public void Dispose() => channel.Writer.Complete();

public bool TryInit() => channel.Writer.TryWrite(0);

public ValueTask<int> WaitAsync(CancellationToken cancellationToken) =>
channel.Reader.ReadAsync(cancellationToken);

public bool TryRelease() => channel.Writer.TryWrite(0);

private bool NeedToWait => channel.Reader.Count == 0;
}
}

0 comments on commit a1606da

Please sign in to comment.