Skip to content

Commit

Permalink
Merge pull request #48 from mycroes/job-pool-exception-on-dispose
Browse files Browse the repository at this point in the history
fix: Prevent exception on return of job ID to disposed pool
  • Loading branch information
mycroes authored Feb 28, 2024
2 parents ca06882 + 4b335b0 commit cc1c04a
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 93 deletions.
33 changes: 33 additions & 0 deletions Sally7.Tests/RequestExecutor/JobPoolTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System.Threading.Channels;
using Sally7.RequestExecutor;

namespace Sally7.Tests.RequestExecutor;

public class JobPoolTests
{
[Fact]
public async Task RentJobIdAsync_Throws_If_Disposed_And_Depleted()
{
// Arrange
var sut = new JobPool(1);
sut.Dispose();
_ = await sut.RentJobIdAsync(CancellationToken.None); // Empty the pool

// Act
// Assert
await Should.ThrowAsync<ChannelClosedException>(() => sut.RentJobIdAsync(CancellationToken.None).AsTask());
}

[Fact]
public void ReturnJobId_Does_Not_Throw_If_Disposed()
{
// Arrange
var sut = new JobPool(1);
var jobId = sut.RentJobIdAsync(CancellationToken.None).Result;
sut.Dispose();

// Act
// Assert
sut.ReturnJobId(jobId);
}
}
93 changes: 0 additions & 93 deletions Sally7/RequestExecutor/ConcurrentRequestExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,50 +188,6 @@ public async ValueTask<Memory<byte>> PerformRequest(ReadOnlyMemory<byte> request
}
}

private class JobPool : IDisposable
{
private readonly Channel<int> _jobIdPool;
private readonly Request[] _requests;

public JobPool(int maxNumberOfConcurrentRequests)
{
_jobIdPool = Channel.CreateBounded<int>(maxNumberOfConcurrentRequests);
_requests = new Request[maxNumberOfConcurrentRequests];

for (int i = 0; i < maxNumberOfConcurrentRequests; ++i)
{
if (!_jobIdPool.Writer.TryWrite(i + 1))
{
Sally7Exception.ThrowFailedToInitJobPool();
}

_requests[i] = new Request();
}
}

public void Dispose() => _jobIdPool.Writer.Complete();

public ValueTask<int> RentJobIdAsync(CancellationToken cancellationToken) => _jobIdPool.Reader.ReadAsync(cancellationToken);

public void ReturnJobId(int jobId)
{
if (!_jobIdPool.Writer.TryWrite(jobId))
{
Sally7Exception.ThrowFailedToReturnJobIDToPool(jobId);
}
}

[DebuggerNonUserCode]
public Request GetRequest(int jobId) => _requests[jobId - 1];

public void SetBufferForRequest(int jobId, Memory<byte> buffer)
{
Request req = GetRequest(jobId);
req.Reset();
req.SetBuffer(buffer);
}
}

[DebuggerNonUserCode]
[DebuggerDisplay(nameof(NeedToWait) + ": {" + nameof(NeedToWait) + ",nq}")]
private class Signal : IDisposable
Expand All @@ -248,54 +204,5 @@ private class Signal : IDisposable

private bool NeedToWait => _channel.Reader.Count == 0;
}

private class Request : INotifyCompletion
{
private static readonly Action Sentinel = () => { };

private Memory<byte> _buffer;

public bool IsCompleted { get; private set; }
private int _length;
private Action? _continuation = Sentinel;

public Memory<byte> Buffer => _buffer;

public void Complete(int length)
{
this._length = length;
IsCompleted = true;

var prev = _continuation ?? Interlocked.CompareExchange(ref _continuation, Sentinel, null);
prev?.Invoke();
}

public Memory<byte> GetResult()
{
return _buffer.Slice(0, _length);
}

public Request GetAwaiter() => this;

public void OnCompleted(Action continuation)
{
if (this._continuation == Sentinel ||
Interlocked.CompareExchange(ref this._continuation, continuation, null) == Sentinel)
{
continuation.Invoke();
}
}

public void Reset()
{
_continuation = null;
IsCompleted = false;
}

public void SetBuffer(Memory<byte> buffer)
{
this._buffer = buffer;
}
}
}
}
56 changes: 56 additions & 0 deletions Sally7/RequestExecutor/JobPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Sally7.RequestExecutor;

internal class JobPool : IDisposable
{
private readonly Channel<int> _jobIdPool;
private readonly Request[] _requests;
private bool _disposed;

public JobPool(int maxNumberOfConcurrentRequests)
{
_jobIdPool = Channel.CreateBounded<int>(maxNumberOfConcurrentRequests);
_requests = new Request[maxNumberOfConcurrentRequests];

for (int i = 0; i < maxNumberOfConcurrentRequests; ++i)
{
if (!_jobIdPool.Writer.TryWrite(i + 1))
{
Sally7Exception.ThrowFailedToInitJobPool();
}

_requests[i] = new Request();
}
}

public void Dispose()
{
Volatile.Write(ref _disposed, true);
_jobIdPool.Writer.Complete();
}

public ValueTask<int> RentJobIdAsync(CancellationToken cancellationToken) => _jobIdPool.Reader.ReadAsync(cancellationToken);

public void ReturnJobId(int jobId)
{
if (!_jobIdPool.Writer.TryWrite(jobId) && !Volatile.Read(ref _disposed))
{
Sally7Exception.ThrowFailedToReturnJobIDToPool(jobId);
}
}

[DebuggerNonUserCode]
public Request GetRequest(int jobId) => _requests[jobId - 1];

public void SetBufferForRequest(int jobId, Memory<byte> buffer)
{
Request req = GetRequest(jobId);
req.Reset();
req.SetBuffer(buffer);
}
}
54 changes: 54 additions & 0 deletions Sally7/RequestExecutor/Request.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;

namespace Sally7.RequestExecutor;

internal class Request : INotifyCompletion
{
private static readonly Action Sentinel = () => { };

private Memory<byte> _buffer;

public bool IsCompleted { get; private set; }
private int _length;
private Action? _continuation = Sentinel;

public Memory<byte> Buffer => _buffer;

public void Complete(int length)
{
this._length = length;
IsCompleted = true;

var prev = _continuation ?? Interlocked.CompareExchange(ref _continuation, Sentinel, null);
prev?.Invoke();
}

public Memory<byte> GetResult()
{
return _buffer.Slice(0, _length);
}

public Request GetAwaiter() => this;

public void OnCompleted(Action continuation)
{
if (this._continuation == Sentinel ||
Interlocked.CompareExchange(ref this._continuation, continuation, null) == Sentinel)
{
continuation.Invoke();
}
}

public void Reset()
{
_continuation = null;
IsCompleted = false;
}

public void SetBuffer(Memory<byte> buffer)
{
this._buffer = buffer;
}
}

0 comments on commit cc1c04a

Please sign in to comment.