Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrencyLimiter implementation #387

Merged
merged 5 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/RateLimiting/src/AggregatedRateLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace System.Threading.RateLimiting
{
#pragma warning disable 1591
// Represent an aggregated resource (e.g. a resource limiter aggregated by IP)
public abstract class AggregatedRateLimiter<TKey>
{
Expand All @@ -19,4 +20,5 @@ public abstract class AggregatedRateLimiter<TKey>
// Wait until the requested resources are available
public abstract ValueTask<RateLimitLease> WaitAsync(TKey resourceID, int requestedCount, CancellationToken cancellationToken = default);
}
#pragma warning restore
}
141 changes: 111 additions & 30 deletions src/RateLimiting/src/ConcurrencyLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,43 @@
// Pending dotnet API review

using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;

namespace System.Threading.RateLimiting
{
/// <summary>
/// <see cref="RateLimiter"/> implementation that helps manage concurrent access to a resource.
/// </summary>
public sealed class ConcurrencyLimiter : RateLimiter
{
private int _permitCount;
private int _queueCount;

private readonly object _lock = new object();
private readonly ConcurrencyLimiterOptions _options;
private readonly Deque<RequestRegistration> _queue = new Deque<RequestRegistration>();

private static readonly ConcurrencyLease SuccessfulLease = new ConcurrencyLease(true, null, 0);
private static readonly ConcurrencyLease FailedLease = new ConcurrencyLease(false, null, 0);

// Use the queue as the lock field so we don't need to allocate another object for a lock and have another field in the object
private object Lock => _queue;

/// <summary>
/// Initializes the <see cref="ConcurrencyLimiter"/>.
/// </summary>
/// <param name="options">Options to specify the behavior of the <see cref="ConcurrencyLimiter"/>.</param>
public ConcurrencyLimiter(ConcurrencyLimiterOptions options)
{
_options = options;
_permitCount = _options.PermitLimit;
}

/// <inheritdoc/>
public override int GetAvailablePermits() => _permitCount;

/// <inheritdoc/>
protected override RateLimitLease AcquireCore(int permitCount)
{
// These amounts of resources can never be acquired
Expand All @@ -36,7 +49,7 @@ protected override RateLimitLease AcquireCore(int permitCount)
throw new InvalidOperationException($"{permitCount} permits exceeds the permit limit of {_options.PermitLimit}.");
}

// Return SuccessfulAcquisition or FailedAcquisition depending to indicate limiter state
// Return SuccessfulAcquisition or FailedAcquisition to indicate limiter state
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
if (permitCount == 0)
{
return GetAvailablePermits() > 0 ? SuccessfulLease : FailedLease;
Expand All @@ -45,84 +58,130 @@ protected override RateLimitLease AcquireCore(int permitCount)
// Perf: Check SemaphoreSlim implementation instead of locking
if (GetAvailablePermits() >= permitCount)
{
lock (_lock)
lock (Lock)
{
if (GetAvailablePermits() >= permitCount)
if (TryLeaseUnsynchronized(permitCount, out var lease))
{
_permitCount -= permitCount;
return new ConcurrencyLease(true, this, permitCount);
return lease;
}
}
}

return FailedLease;
}

/// <inheritdoc/>
protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

// These amounts of resources can never be acquired
if (permitCount < 0 || permitCount > _options.PermitLimit)
if (permitCount > _options.PermitLimit)
{
throw new ArgumentOutOfRangeException();
throw new InvalidOperationException($"{permitCount} permits exceeds the permit limit of {_options.PermitLimit}.");
halter73 marked this conversation as resolved.
Show resolved Hide resolved

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Since I can't unresolve the previous conversation, starting a new one here)

I'm not sure I buy the above analysis (I know it came from API review). Looking at the Example of ArgumentOutOfRangeException in the docs doesn't follow this reasoning. There is no property that specifies a "min age" in that example.

Same goes for the wording on https://docs.microsoft.com/en-us/dotnet/standard/design-guidelines/using-standard-exception-types#argumentexception-argumentnullexception-and-argumentoutofrangeexception.

if bad arguments are passed to a member

Lastly, the base RateLimiter.WaitAsync xml doc documents the ArgumentOutOfRangeException. But it doesn't document InvalidOperationException. Following the same reasoning as above, a caller to RateLimiter.WaitAsync wouldn't know to catch InvalidOperationException if it passed in too big of a permitCount.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your arguments and will revert back to ArgumentOutOfRangeException with an error message

}

// Return SuccessfulAcquisition if requestedCount is 0 and resources are available
if (permitCount == 0 && GetAvailablePermits() > 0)
{
// Perf: static failed/successful value tasks?
return new ValueTask<RateLimitLease>(SuccessfulLease);
}

// Perf: Check SemaphoreSlim implementation instead of locking
lock (_lock) // Check lock check
lock (Lock) // Check lock check
{
if (GetAvailablePermits() >= permitCount)
if (TryLeaseUnsynchronized(permitCount, out var lease))
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
{
_permitCount -= permitCount;
return new ValueTask<RateLimitLease>(new ConcurrencyLease(true, this, permitCount));
return new ValueTask<RateLimitLease>(lease);
}

// Don't queue if queue limit reached
if (_queueCount + permitCount > _options.QueueLimit)
{
// Perf: static failed/successful value tasks?
return new ValueTask<RateLimitLease>(FailedLease);
return new ValueTask<RateLimitLease>(new ConcurrencyLease(false, null, 0, "Queue limit reached"));
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
}

TaskCompletionSource<RateLimitLease> tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);
CancellationTokenRegistration ctr;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(obj =>
{
((TaskCompletionSource<RateLimitLease>)obj).TrySetException(new OperationCanceledException(cancellationToken));
}, tcs);
}

var request = new RequestRegistration(permitCount);
RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr);
_queue.EnqueueTail(request);
_queueCount += permitCount;
Debug.Assert(_queueCount <= _options.QueueLimit);

// TODO: handle cancellation
return new ValueTask<RateLimitLease>(request.TCS.Task);
}
}

private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out RateLimitLease? lease)
{
// if permitCount is 0 we want to queue it if there are no available permits
if (GetAvailablePermits() >= permitCount && GetAvailablePermits() != 0)
{
if (permitCount == 0)
{
// Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available
lease = SuccessfulLease;
return true;
}

// a. if there are no items queued we can lease
// b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest
if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
{
_permitCount -= permitCount;
Debug.Assert(_permitCount >= 0);
lease = new ConcurrencyLease(true, this, permitCount);
return true;
}
}

lease = null;
return false;
}

private void Release(int releaseCount)
{
lock (_lock) // Check lock check
lock (Lock) // Check lock check
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
{
_permitCount += releaseCount;
Debug.Assert(_permitCount <= _options.PermitLimit);

while (_queue.Count > 0)
{
var nextPendingRequest =
RequestRegistration nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.PeekHead()
: _queue.PeekTail();

if (GetAvailablePermits() >= nextPendingRequest.Count)
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
{
var request =
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();

_permitCount -= request.Count;
_queueCount -= request.Count;

// requestToFulfill == request
request.TCS.SetResult(new ConcurrencyLease(true, this, request.Count));
_permitCount -= nextPendingRequest.Count;
_queueCount -= nextPendingRequest.Count;
Debug.Assert(_queueCount >= 0);
Debug.Assert(_permitCount >= 0);

var lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count);
// Check if request was canceled
if (!nextPendingRequest.TCS.TrySetResult(lease))
{
// Queued item was canceled so add count back
_permitCount += nextPendingRequest.Count;
}
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
else
{
Expand All @@ -134,25 +193,43 @@ private void Release(int releaseCount)

private class ConcurrencyLease : RateLimitLease
{
private static readonly IEnumerable<string> Empty = new string[0];

private bool _disposed;
private readonly ConcurrencyLimiter? _limiter;
private readonly int _count;
private readonly string? _reason;

public ConcurrencyLease(bool isAcquired, ConcurrencyLimiter? limiter, int count)
public ConcurrencyLease(bool isAcquired, ConcurrencyLimiter? limiter, int count, string? reason = null)
{
IsAcquired = isAcquired;
_limiter = limiter;
_count = count;
_reason = reason;

// No need to set the limiter if count is 0, Dispose will noop
Debug.Assert(count == 0 ? limiter is null : true);
}

public override bool IsAcquired { get; }

public override IEnumerable<string> MetadataNames => Empty;
public override IEnumerable<string> MetadataNames => Enumerable();

private IEnumerable<string> Enumerable()
{
if (_reason is null)
{
yield break;
}

yield return MetadataName.ReasonPhrase.Name;
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
}

public override bool TryGetMetadata(string metadataName, out object? metadata)
{
if (_reason is not null && metadataName == MetadataName.ReasonPhrase.Name)
{
metadata = _reason;
return true;
}
metadata = default;
return false;
}
Expand All @@ -172,16 +249,20 @@ protected override void Dispose(bool disposing)

private struct RequestRegistration
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
{
public RequestRegistration(int requestedCount)
public RequestRegistration(int requestedCount, TaskCompletionSource<RateLimitLease> tcs,
CancellationTokenRegistration cancellationTokenRegistration)
{
Count = requestedCount;
// Perf: Use AsyncOperation<TResult> instead
TCS = new TaskCompletionSource<RateLimitLease>();
TCS = tcs;
CancellationTokenRegistration = cancellationTokenRegistration;
}

public int Count { get; }

public TaskCompletionSource<RateLimitLease> TCS { get; }
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved

public CancellationTokenRegistration CancellationTokenRegistration { get; }
}
}
}
37 changes: 32 additions & 5 deletions src/RateLimiting/src/ConcurrencyLimiterOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,49 @@

namespace System.Threading.RateLimiting
{
sealed public class ConcurrencyLimiterOptions
/// <summary>
/// Options to specify the behavior of a <see cref="ConcurrencyLimiter"/>.
/// </summary>
public sealed class ConcurrencyLimiterOptions
{
/// <summary>
/// Initializes the <see cref="ConcurrencyLimiterOptions"/>.
/// </summary>
/// <param name="permitLimit">Maximum number of permits that can be leased concurrently.</param>
/// <param name="queueProcessingOrder">Determines the behaviour of <see cref="RateLimiter.WaitAsync"/> when not enough resources can be leased.</param>
/// <param name="queueLimit">Maximum number of permits that can be queued concurrently.</param>
/// <exception cref="ArgumentOutOfRangeException">When <paramref name="permitLimit"/> or <paramref name="queueLimit"/> are less than 0.</exception>
public ConcurrencyLimiterOptions(int permitLimit, QueueProcessingOrder queueProcessingOrder, int queueLimit)
{
if (permitLimit < 0)
{
throw new ArgumentOutOfRangeException(nameof(permitLimit));
}
if (queueLimit < 0)
{
throw new ArgumentOutOfRangeException(nameof(queueLimit));
}
PermitLimit = permitLimit;
QueueProcessingOrder = queueProcessingOrder;
QueueLimit = queueLimit;
}

// Maximum number of permits allowed to be leased
/// <summary>
/// Maximum number of permits that can be leased concurrently.
/// </summary>
public int PermitLimit { get; }

// Behaviour of WaitAsync when not enough resources can be leased
public QueueProcessingOrder QueueProcessingOrder { get; }
/// <summary>
/// Determines the behaviour of <see cref="RateLimiter.WaitAsync"/> when not enough resources can be leased.
/// </summary>
/// <value>
/// <see cref="QueueProcessingOrder.OldestFirst"/> by default.
/// </value>
public QueueProcessingOrder QueueProcessingOrder { get; } = QueueProcessingOrder.OldestFirst;

// Maximum cumulative permit count of queued acquisition requests
/// <summary>
/// Maximum number of permits that can be queued concurrently.
/// </summary>
public int QueueLimit { get; }
}
}
Loading