Skip to content

Commit

Permalink
ConcurrencyLimiter implementation (#387)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennanConroy authored Oct 26, 2021
1 parent 5868ec0 commit b1312a2
Show file tree
Hide file tree
Showing 13 changed files with 744 additions and 61 deletions.
2 changes: 2 additions & 0 deletions src/RateLimiting/src/AggregatedRateLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace System.Threading.RateLimiting
{
#pragma warning disable 1591
// Represent an aggregated resource (e.g. a resource limiter aggregated by IP)
public abstract class AggregatedRateLimiter<TKey>
{
Expand All @@ -19,4 +20,5 @@ public abstract class AggregatedRateLimiter<TKey>
// Wait until the requested resources are available
public abstract ValueTask<RateLimitLease> WaitAsync(TKey resourceID, int requestedCount, CancellationToken cancellationToken = default);
}
#pragma warning restore
}
156 changes: 118 additions & 38 deletions src/RateLimiting/src/ConcurrencyLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,125 +4,185 @@
// Pending dotnet API review

using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;

namespace System.Threading.RateLimiting
{
/// <summary>
/// <see cref="RateLimiter"/> implementation that helps manage concurrent access to a resource.
/// </summary>
public sealed class ConcurrencyLimiter : RateLimiter
{
private int _permitCount;
private int _queueCount;

private readonly object _lock = new object();
private readonly ConcurrencyLimiterOptions _options;
private readonly Deque<RequestRegistration> _queue = new Deque<RequestRegistration>();

private static readonly ConcurrencyLease SuccessfulLease = new ConcurrencyLease(true, null, 0);
private static readonly ConcurrencyLease FailedLease = new ConcurrencyLease(false, null, 0);
private static readonly ConcurrencyLease QueueLimitLease = new ConcurrencyLease(false, null, 0, "Queue limit reached");

// Use the queue as the lock field so we don't need to allocate another object for a lock and have another field in the object
private object Lock => _queue;

/// <summary>
/// Initializes the <see cref="ConcurrencyLimiter"/>.
/// </summary>
/// <param name="options">Options to specify the behavior of the <see cref="ConcurrencyLimiter"/>.</param>
public ConcurrencyLimiter(ConcurrencyLimiterOptions options)
{
_options = options;
_permitCount = _options.PermitLimit;
}

/// <inheritdoc/>
public override int GetAvailablePermits() => _permitCount;

/// <inheritdoc/>
protected override RateLimitLease AcquireCore(int permitCount)
{
// These amounts of resources can never be acquired
if (permitCount > _options.PermitLimit)
{
throw new InvalidOperationException($"{permitCount} permits exceeds the permit limit of {_options.PermitLimit}.");
throw new ArgumentOutOfRangeException(nameof(permitCount), $"{permitCount} permits exceeds the permit limit of {_options.PermitLimit}.");
}

// Return SuccessfulAcquisition or FailedAcquisition depending to indicate limiter state
// Return SuccessfulLease or FailedLease to indicate limiter state
if (permitCount == 0)
{
return GetAvailablePermits() > 0 ? SuccessfulLease : FailedLease;
return _permitCount > 0 ? SuccessfulLease : FailedLease;
}

// Perf: Check SemaphoreSlim implementation instead of locking
if (GetAvailablePermits() >= permitCount)
if (_permitCount >= permitCount)
{
lock (_lock)
lock (Lock)
{
if (GetAvailablePermits() >= permitCount)
if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
{
_permitCount -= permitCount;
return new ConcurrencyLease(true, this, permitCount);
return lease;
}
}
}

return FailedLease;
}

/// <inheritdoc/>
protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

// These amounts of resources can never be acquired
if (permitCount < 0 || permitCount > _options.PermitLimit)
if (permitCount > _options.PermitLimit)
{
throw new ArgumentOutOfRangeException();
throw new ArgumentOutOfRangeException(nameof(permitCount), $"{permitCount} permits exceeds the permit limit of {_options.PermitLimit}.");
}

// Return SuccessfulAcquisition if requestedCount is 0 and resources are available
if (permitCount == 0 && GetAvailablePermits() > 0)
if (permitCount == 0 && _permitCount > 0)
{
// Perf: static failed/successful value tasks?
return new ValueTask<RateLimitLease>(SuccessfulLease);
}

// Perf: Check SemaphoreSlim implementation instead of locking
lock (_lock) // Check lock check
lock (Lock)
{
if (GetAvailablePermits() >= permitCount)
if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
{
_permitCount -= permitCount;
return new ValueTask<RateLimitLease>(new ConcurrencyLease(true, this, permitCount));
return new ValueTask<RateLimitLease>(lease);
}

// Don't queue if queue limit reached
if (_queueCount + permitCount > _options.QueueLimit)
{
// Perf: static failed/successful value tasks?
return new ValueTask<RateLimitLease>(FailedLease);
return new ValueTask<RateLimitLease>(QueueLimitLease);
}

TaskCompletionSource<RateLimitLease> tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);
CancellationTokenRegistration ctr;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(obj =>
{
((TaskCompletionSource<RateLimitLease>)obj).TrySetException(new OperationCanceledException(cancellationToken));
}, tcs);
}

var request = new RequestRegistration(permitCount);
RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr);
_queue.EnqueueTail(request);
_queueCount += permitCount;
Debug.Assert(_queueCount <= _options.QueueLimit);

// TODO: handle cancellation
return new ValueTask<RateLimitLease>(request.TCS.Task);
return new ValueTask<RateLimitLease>(request.Tcs.Task);
}
}

private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out RateLimitLease? lease)
{
// if permitCount is 0 we want to queue it if there are no available permits
if (_permitCount >= permitCount && _permitCount != 0)
{
if (permitCount == 0)
{
// Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available
lease = SuccessfulLease;
return true;
}

// a. if there are no items queued we can lease
// b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest
if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
{
_permitCount -= permitCount;
Debug.Assert(_permitCount >= 0);
lease = new ConcurrencyLease(true, this, permitCount);
return true;
}
}

lease = null;
return false;
}

private void Release(int releaseCount)
{
lock (_lock) // Check lock check
lock (Lock)
{
_permitCount += releaseCount;
Debug.Assert(_permitCount <= _options.PermitLimit);

while (_queue.Count > 0)
{
var nextPendingRequest =
RequestRegistration nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.PeekHead()
: _queue.PeekTail();

if (GetAvailablePermits() >= nextPendingRequest.Count)
if (_permitCount >= nextPendingRequest.Count)
{
var request =
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();

_permitCount -= request.Count;
_queueCount -= request.Count;

// requestToFulfill == request
request.TCS.SetResult(new ConcurrencyLease(true, this, request.Count));
_permitCount -= nextPendingRequest.Count;
_queueCount -= nextPendingRequest.Count;
Debug.Assert(_queueCount >= 0);
Debug.Assert(_permitCount >= 0);

ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count);
// Check if request was canceled
if (!nextPendingRequest.Tcs.TrySetResult(lease))
{
// Queued item was canceled so add count back
_permitCount += nextPendingRequest.Count;
}
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
else
{
Expand All @@ -134,25 +194,41 @@ private void Release(int releaseCount)

private class ConcurrencyLease : RateLimitLease
{
private static readonly IEnumerable<string> Empty = new string[0];

private bool _disposed;
private readonly ConcurrencyLimiter? _limiter;
private readonly int _count;
private readonly string? _reason;

public ConcurrencyLease(bool isAcquired, ConcurrencyLimiter? limiter, int count)
public ConcurrencyLease(bool isAcquired, ConcurrencyLimiter? limiter, int count, string? reason = null)
{
IsAcquired = isAcquired;
_limiter = limiter;
_count = count;
_reason = reason;

// No need to set the limiter if count is 0, Dispose will noop
Debug.Assert(count == 0 ? limiter is null : true);
}

public override bool IsAcquired { get; }

public override IEnumerable<string> MetadataNames => Empty;
public override IEnumerable<string> MetadataNames => Enumerable();

private IEnumerable<string> Enumerable()
{
if (_reason is not null)
{
yield return MetadataName.ReasonPhrase.Name;
}
}

public override bool TryGetMetadata(string metadataName, out object? metadata)
{
if (_reason is not null && metadataName == MetadataName.ReasonPhrase.Name)
{
metadata = _reason;
return true;
}
metadata = default;
return false;
}
Expand All @@ -170,18 +246,22 @@ protected override void Dispose(bool disposing)
}
}

private struct RequestRegistration
private readonly struct RequestRegistration
{
public RequestRegistration(int requestedCount)
public RequestRegistration(int requestedCount, TaskCompletionSource<RateLimitLease> tcs,
CancellationTokenRegistration cancellationTokenRegistration)
{
Count = requestedCount;
// Perf: Use AsyncOperation<TResult> instead
TCS = new TaskCompletionSource<RateLimitLease>();
Tcs = tcs;
CancellationTokenRegistration = cancellationTokenRegistration;
}

public int Count { get; }

public TaskCompletionSource<RateLimitLease> TCS { get; }
public TaskCompletionSource<RateLimitLease> Tcs { get; }

public CancellationTokenRegistration CancellationTokenRegistration { get; }
}
}
}
37 changes: 32 additions & 5 deletions src/RateLimiting/src/ConcurrencyLimiterOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,49 @@

namespace System.Threading.RateLimiting
{
sealed public class ConcurrencyLimiterOptions
/// <summary>
/// Options to specify the behavior of a <see cref="ConcurrencyLimiter"/>.
/// </summary>
public sealed class ConcurrencyLimiterOptions
{
/// <summary>
/// Initializes the <see cref="ConcurrencyLimiterOptions"/>.
/// </summary>
/// <param name="permitLimit">Maximum number of permits that can be leased concurrently.</param>
/// <param name="queueProcessingOrder">Determines the behaviour of <see cref="RateLimiter.WaitAsync"/> when not enough resources can be leased.</param>
/// <param name="queueLimit">Maximum number of permits that can be queued concurrently.</param>
/// <exception cref="ArgumentOutOfRangeException">When <paramref name="permitLimit"/> or <paramref name="queueLimit"/> are less than 0.</exception>
public ConcurrencyLimiterOptions(int permitLimit, QueueProcessingOrder queueProcessingOrder, int queueLimit)
{
if (permitLimit < 0)
{
throw new ArgumentOutOfRangeException(nameof(permitLimit));
}
if (queueLimit < 0)
{
throw new ArgumentOutOfRangeException(nameof(queueLimit));
}
PermitLimit = permitLimit;
QueueProcessingOrder = queueProcessingOrder;
QueueLimit = queueLimit;
}

// Maximum number of permits allowed to be leased
/// <summary>
/// Maximum number of permits that can be leased concurrently.
/// </summary>
public int PermitLimit { get; }

// Behaviour of WaitAsync when not enough resources can be leased
public QueueProcessingOrder QueueProcessingOrder { get; }
/// <summary>
/// Determines the behaviour of <see cref="RateLimiter.WaitAsync"/> when not enough resources can be leased.
/// </summary>
/// <value>
/// <see cref="QueueProcessingOrder.OldestFirst"/> by default.
/// </value>
public QueueProcessingOrder QueueProcessingOrder { get; } = QueueProcessingOrder.OldestFirst;

// Maximum cumulative permit count of queued acquisition requests
/// <summary>
/// Maximum number of permits that can be queued concurrently.
/// </summary>
public int QueueLimit { get; }
}
}
Loading

0 comments on commit b1312a2

Please sign in to comment.