Skip to content

Commit

Permalink
refactor: Extract JobPool, Request from ConcurrentRequestExecutor to …
Browse files Browse the repository at this point in the history
…facilitate testing
  • Loading branch information
mycroes committed Feb 28, 2024
1 parent 69db2ce commit d58463d
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 98 deletions.
98 changes: 0 additions & 98 deletions Sally7/RequestExecutor/ConcurrentRequestExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,55 +188,6 @@ public async ValueTask<Memory<byte>> PerformRequest(ReadOnlyMemory<byte> request
}
}

private class JobPool : IDisposable
{
private readonly Channel<int> _jobIdPool;
private readonly Request[] _requests;
private bool _disposed;

public JobPool(int maxNumberOfConcurrentRequests)
{
_jobIdPool = Channel.CreateBounded<int>(maxNumberOfConcurrentRequests);
_requests = new Request[maxNumberOfConcurrentRequests];

for (int i = 0; i < maxNumberOfConcurrentRequests; ++i)
{
if (!_jobIdPool.Writer.TryWrite(i + 1))
{
Sally7Exception.ThrowFailedToInitJobPool();
}

_requests[i] = new Request();
}
}

public void Dispose()
{
Volatile.Write(ref _disposed, true);
_jobIdPool.Writer.Complete();
}

public ValueTask<int> RentJobIdAsync(CancellationToken cancellationToken) => _jobIdPool.Reader.ReadAsync(cancellationToken);

public void ReturnJobId(int jobId)
{
if (!_jobIdPool.Writer.TryWrite(jobId) && !Volatile.Read(ref _disposed))
{
Sally7Exception.ThrowFailedToReturnJobIDToPool(jobId);
}
}

[DebuggerNonUserCode]
public Request GetRequest(int jobId) => _requests[jobId - 1];

public void SetBufferForRequest(int jobId, Memory<byte> buffer)
{
Request req = GetRequest(jobId);
req.Reset();
req.SetBuffer(buffer);
}
}

[DebuggerNonUserCode]
[DebuggerDisplay(nameof(NeedToWait) + ": {" + nameof(NeedToWait) + ",nq}")]
private class Signal : IDisposable
Expand All @@ -253,54 +204,5 @@ private class Signal : IDisposable

private bool NeedToWait => _channel.Reader.Count == 0;
}

private class Request : INotifyCompletion
{
private static readonly Action Sentinel = () => { };

private Memory<byte> _buffer;

public bool IsCompleted { get; private set; }
private int _length;
private Action? _continuation = Sentinel;

public Memory<byte> Buffer => _buffer;

public void Complete(int length)
{
this._length = length;
IsCompleted = true;

var prev = _continuation ?? Interlocked.CompareExchange(ref _continuation, Sentinel, null);
prev?.Invoke();
}

public Memory<byte> GetResult()
{
return _buffer.Slice(0, _length);
}

public Request GetAwaiter() => this;

public void OnCompleted(Action continuation)
{
if (this._continuation == Sentinel ||
Interlocked.CompareExchange(ref this._continuation, continuation, null) == Sentinel)
{
continuation.Invoke();
}
}

public void Reset()
{
_continuation = null;
IsCompleted = false;
}

public void SetBuffer(Memory<byte> buffer)
{
this._buffer = buffer;
}
}
}
}
56 changes: 56 additions & 0 deletions Sally7/RequestExecutor/JobPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace Sally7.RequestExecutor;

internal class JobPool : IDisposable
{
private readonly Channel<int> _jobIdPool;
private readonly Request[] _requests;
private bool _disposed;

public JobPool(int maxNumberOfConcurrentRequests)
{
_jobIdPool = Channel.CreateBounded<int>(maxNumberOfConcurrentRequests);
_requests = new Request[maxNumberOfConcurrentRequests];

for (int i = 0; i < maxNumberOfConcurrentRequests; ++i)
{
if (!_jobIdPool.Writer.TryWrite(i + 1))
{
Sally7Exception.ThrowFailedToInitJobPool();
}

_requests[i] = new Request();
}
}

public void Dispose()
{
Volatile.Write(ref _disposed, true);
_jobIdPool.Writer.Complete();
}

public ValueTask<int> RentJobIdAsync(CancellationToken cancellationToken) => _jobIdPool.Reader.ReadAsync(cancellationToken);

public void ReturnJobId(int jobId)
{
if (!_jobIdPool.Writer.TryWrite(jobId) && !Volatile.Read(ref _disposed))
{
Sally7Exception.ThrowFailedToReturnJobIDToPool(jobId);
}
}

[DebuggerNonUserCode]
public Request GetRequest(int jobId) => _requests[jobId - 1];

public void SetBufferForRequest(int jobId, Memory<byte> buffer)
{
Request req = GetRequest(jobId);
req.Reset();
req.SetBuffer(buffer);
}
}
54 changes: 54 additions & 0 deletions Sally7/RequestExecutor/Request.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;

namespace Sally7.RequestExecutor;

internal class Request : INotifyCompletion
{
private static readonly Action Sentinel = () => { };

private Memory<byte> _buffer;

public bool IsCompleted { get; private set; }
private int _length;
private Action? _continuation = Sentinel;

public Memory<byte> Buffer => _buffer;

public void Complete(int length)
{
this._length = length;
IsCompleted = true;

var prev = _continuation ?? Interlocked.CompareExchange(ref _continuation, Sentinel, null);
prev?.Invoke();
}

public Memory<byte> GetResult()
{
return _buffer.Slice(0, _length);
}

public Request GetAwaiter() => this;

public void OnCompleted(Action continuation)
{
if (this._continuation == Sentinel ||
Interlocked.CompareExchange(ref this._continuation, continuation, null) == Sentinel)
{
continuation.Invoke();
}
}

public void Reset()
{
_continuation = null;
IsCompleted = false;
}

public void SetBuffer(Memory<byte> buffer)
{
this._buffer = buffer;
}
}

0 comments on commit d58463d

Please sign in to comment.