Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrencyLimiter implementation #387

Merged
merged 5 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/RateLimiting/src/AggregatedRateLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace System.Threading.RateLimiting
{
#pragma warning disable 1591
// Represent an aggregated resource (e.g. a resource limiter aggregated by IP)
public abstract class AggregatedRateLimiter<TKey>
{
Expand All @@ -19,4 +20,5 @@ public abstract class AggregatedRateLimiter<TKey>
// Wait until the requested resources are available
public abstract ValueTask<RateLimitLease> WaitAsync(TKey resourceID, int requestedCount, CancellationToken cancellationToken = default);
}
#pragma warning restore
}
105 changes: 85 additions & 20 deletions src/RateLimiting/src/ConcurrencyLimiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
// Pending dotnet API review

using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;

namespace System.Threading.RateLimiting
{
/// <summary>
/// <see cref="RateLimiter"/> implementation that helps manage concurrent access to a resource.
/// </summary>
public sealed class ConcurrencyLimiter : RateLimiter
{
private int _permitCount;
Expand All @@ -20,14 +24,20 @@ public sealed class ConcurrencyLimiter : RateLimiter
private static readonly ConcurrencyLease SuccessfulLease = new ConcurrencyLease(true, null, 0);
private static readonly ConcurrencyLease FailedLease = new ConcurrencyLease(false, null, 0);

/// <summary>
/// Initializes the <see cref="ConcurrencyLimiter"/>.
/// </summary>
/// <param name="options"></param>
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
public ConcurrencyLimiter(ConcurrencyLimiterOptions options)
{
_options = options;
_permitCount = _options.PermitLimit;
}

/// <inheritdoc/>
public override int GetAvailablePermits() => _permitCount;

/// <inheritdoc/>
protected override RateLimitLease AcquireCore(int permitCount)
{
// These amounts of resources can never be acquired
Expand All @@ -36,7 +46,7 @@ protected override RateLimitLease AcquireCore(int permitCount)
throw new InvalidOperationException($"{permitCount} permits exceeds the permit limit of {_options.PermitLimit}.");
}

// Return SuccessfulAcquisition or FailedAcquisition depending to indicate limiter state
// Return SuccessfulAcquisition or FailedAcquisition to indicate limiter state
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
if (permitCount == 0)
{
return GetAvailablePermits() > 0 ? SuccessfulLease : FailedLease;
Expand All @@ -58,12 +68,15 @@ protected override RateLimitLease AcquireCore(int permitCount)
return FailedLease;
}

/// <inheritdoc/>
protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

// These amounts of resources can never be acquired
if (permitCount < 0 || permitCount > _options.PermitLimit)
if (permitCount > _options.PermitLimit)
{
throw new ArgumentOutOfRangeException();
throw new InvalidOperationException($"{permitCount} permits exceeds the permit limit of {_options.PermitLimit}.");
halter73 marked this conversation as resolved.
Show resolved Hide resolved

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Since I can't unresolve the previous conversation, starting a new one here)

I'm not sure I buy the above analysis (I know it came from API review). Looking at the Example of ArgumentOutOfRangeException in the docs doesn't follow this reasoning. There is no property that specifies a "min age" in that example.

Same goes for the wording on https://docs.microsoft.com/en-us/dotnet/standard/design-guidelines/using-standard-exception-types#argumentexception-argumentnullexception-and-argumentoutofrangeexception.

if bad arguments are passed to a member

Lastly, the base RateLimiter.WaitAsync xml doc documents the ArgumentOutOfRangeException. But it doesn't document InvalidOperationException. Following the same reasoning as above, a caller to RateLimiter.WaitAsync wouldn't know to catch InvalidOperationException if it passed in too big of a permitCount.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your arguments and will revert back to ArgumentOutOfRangeException with an error message

}

// Return SuccessfulAcquisition if requestedCount is 0 and resources are available
Expand All @@ -76,24 +89,37 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, Canc
// Perf: Check SemaphoreSlim implementation instead of locking
lock (_lock) // Check lock check
{
if (GetAvailablePermits() >= permitCount)
// if permitCount is 0 we want to queue it if there are no available permits
if (GetAvailablePermits() >= permitCount && GetAvailablePermits() != 0)
{
_permitCount -= permitCount;
if (permitCount == 0)
{
// Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available
return new ValueTask<RateLimitLease>(SuccessfulLease);
}
return new ValueTask<RateLimitLease>(new ConcurrencyLease(true, this, permitCount));
}

// Don't queue if queue limit reached
if (_queueCount + permitCount > _options.QueueLimit)
{
// Perf: static failed/successful value tasks?
return new ValueTask<RateLimitLease>(FailedLease);
return new ValueTask<RateLimitLease>(new ConcurrencyLease(false, null, 0, "Queue limit reached"));
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
}

var request = new RequestRegistration(permitCount);
TaskCompletionSource<RateLimitLease> tcs = new TaskCompletionSource<RateLimitLease>(TaskCreationOptions.RunContinuationsAsynchronously);
CancellationTokenRegistration ctr;
if (cancellationToken.CanBeCanceled)
{
ctr = cancellationToken.Register(obj => CancellationRequested((TaskCompletionSource<RateLimitLease>)obj, cancellationToken), tcs);
}

RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr);
_queue.EnqueueTail(request);
_queueCount += permitCount;
Debug.Assert(_queueCount <= _options.QueueLimit);

// TODO: handle cancellation
return new ValueTask<RateLimitLease>(request.TCS.Task);
}
}
Expand All @@ -103,26 +129,35 @@ private void Release(int releaseCount)
lock (_lock) // Check lock check
{
_permitCount += releaseCount;
Debug.Assert(_permitCount <= _options.PermitLimit);

while (_queue.Count > 0)
{
var nextPendingRequest =
RequestRegistration nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.PeekHead()
: _queue.PeekTail();

if (GetAvailablePermits() >= nextPendingRequest.Count)
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
{
var request =
nextPendingRequest =
_options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();

_permitCount -= request.Count;
_queueCount -= request.Count;

// requestToFulfill == request
request.TCS.SetResult(new ConcurrencyLease(true, this, request.Count));
_permitCount -= nextPendingRequest.Count;
_queueCount -= nextPendingRequest.Count;
Debug.Assert(_queueCount >= 0);
Debug.Assert(_permitCount >= 0);

var lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count);
// Check if request was canceled
if (!nextPendingRequest.TCS.TrySetResult(lease))
{
// Queued item was canceled so add count back
_permitCount += nextPendingRequest.Count;
}
nextPendingRequest.CancellationTokenRegistration.Dispose();
}
else
{
Expand All @@ -132,27 +167,53 @@ private void Release(int releaseCount)
}
}

private class ConcurrencyLease : RateLimitLease
private void CancellationRequested(TaskCompletionSource<RateLimitLease> tcs, CancellationToken token)
{
private static readonly IEnumerable<string> Empty = new string[0];
lock (_lock)
{
tcs.TrySetException(new OperationCanceledException(token));
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
}
}

private class ConcurrencyLease : RateLimitLease
{
private bool _disposed;
private readonly ConcurrencyLimiter? _limiter;
private readonly int _count;
private readonly string? _reason;

public ConcurrencyLease(bool isAcquired, ConcurrencyLimiter? limiter, int count)
public ConcurrencyLease(bool isAcquired, ConcurrencyLimiter? limiter, int count, string? reason = null)
{
IsAcquired = isAcquired;
_limiter = limiter;
_count = count;
_reason = reason;

// No need to set the limiter if count is 0, Dispose will noop
Debug.Assert(count == 0 ? limiter is null : true);
}

public override bool IsAcquired { get; }

public override IEnumerable<string> MetadataNames => Empty;
public override IEnumerable<string> MetadataNames => Enumerable();

private IEnumerable<string> Enumerable()
{
if (_reason is null)
{
yield break;
}

yield return MetadataName.ReasonPhrase.Name;
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
}

public override bool TryGetMetadata(string metadataName, out object? metadata)
{
if (_reason is not null && metadataName == MetadataName.ReasonPhrase.Name)
{
metadata = _reason;
return true;
}
metadata = default;
return false;
}
Expand All @@ -172,16 +233,20 @@ protected override void Dispose(bool disposing)

private struct RequestRegistration
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
{
public RequestRegistration(int requestedCount)
public RequestRegistration(int requestedCount, TaskCompletionSource<RateLimitLease> cts,
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
CancellationTokenRegistration cancellationTokenRegistration)
{
Count = requestedCount;
// Perf: Use AsyncOperation<TResult> instead
TCS = new TaskCompletionSource<RateLimitLease>();
TCS = cts;
CancellationTokenRegistration = cancellationTokenRegistration;
}

public int Count { get; }

public TaskCompletionSource<RateLimitLease> TCS { get; }
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved

public CancellationTokenRegistration CancellationTokenRegistration { get; }
}
}
}
37 changes: 32 additions & 5 deletions src/RateLimiting/src/ConcurrencyLimiterOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,49 @@

namespace System.Threading.RateLimiting
{
sealed public class ConcurrencyLimiterOptions
/// <summary>
/// Options to control the behavior of a <see cref="ConcurrencyLimiter"/>.
/// </summary>
public sealed class ConcurrencyLimiterOptions
{
/// <summary>
/// Initializes the <see cref="ConcurrencyLimiterOptions"/>.
/// </summary>
/// <param name="permitLimit"></param>
/// <param name="queueProcessingOrder"></param>
/// <param name="queueLimit"></param>
/// <exception cref="ArgumentOutOfRangeException">When <paramref name="permitLimit"/> or <paramref name="queueLimit"/> are less than 0.</exception>
public ConcurrencyLimiterOptions(int permitLimit, QueueProcessingOrder queueProcessingOrder, int queueLimit)
{
if (permitLimit < 0)
{
throw new ArgumentOutOfRangeException(nameof(permitLimit));
}
if (queueLimit < 0)
{
throw new ArgumentOutOfRangeException(nameof(queueLimit));
}
PermitLimit = permitLimit;
QueueProcessingOrder = queueProcessingOrder;
QueueLimit = queueLimit;
}

// Maximum number of permits allowed to be leased
/// <summary>
/// Maximum number of permits allowed to be leased.
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public int PermitLimit { get; }

// Behaviour of WaitAsync when not enough resources can be leased
public QueueProcessingOrder QueueProcessingOrder { get; }
/// <summary>
/// Determines the behaviour of <see cref="RateLimiter.WaitAsync"/> when not enough resources can be leased.
/// </summary>
/// <value>
/// <see cref="QueueProcessingOrder.OldestFirst"/>
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
/// </value>
public QueueProcessingOrder QueueProcessingOrder { get; } = QueueProcessingOrder.OldestFirst;

// Maximum cumulative permit count of queued acquisition requests
/// <summary>
/// Maximum cumulative permit count of queued acquisition requests.
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public int QueueLimit { get; }
}
}
31 changes: 31 additions & 0 deletions src/RateLimiting/src/MetadataName.T.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,74 @@

namespace System.Threading.RateLimiting
{
/// <summary>
/// A strongly-typed name of metadata that can be stored in a <see cref="RateLimitLease"/>.
/// </summary>
/// <typeparam name="T">The type the metadata will be.</typeparam>
public sealed class MetadataName<T> : IEquatable<MetadataName<T>>
{
private readonly string _name;

/// <summary>
/// Constructs a <see cref="MetadataName{T}"/> object with the given name.
/// </summary>
/// <param name="name">The name of the <see cref="MetadataName"/> object.</param>
public MetadataName(string name)
{
_name = name;
}

/// <summary>
/// Gets the name of the metadata.
/// </summary>
public string Name => _name;

/// <inheritdoc/>
public override string ToString()
{
return _name ?? string.Empty;
}

/// <inheritdoc/>
public override int GetHashCode()
{
return _name == null ? 0 : _name.GetHashCode();
}

/// <inheritdoc/>
public override bool Equals([NotNullWhen(true)] object? obj)
{
return obj is MetadataName<T> && Equals((MetadataName<T>)obj);
}

/// <summary>
/// Determines whether the specified <see cref="MetadataName{T}"/> is equal to the current object.
/// </summary>
/// <param name="other"></param>
/// <returns></returns>
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
public bool Equals(MetadataName<T> other)
{
// NOTE: intentionally ordinal and case sensitive, matches CNG.
return _name == other._name;
}

/// <summary>
/// Determines whether two <see cref="MetadataName{T}"/> are equal to each other.
/// </summary>
/// <param name="left"></param>
/// <param name="right"></param>
/// <returns></returns>
public static bool operator ==(MetadataName<T> left, MetadataName<T> right)
{
return left.Equals(right);
}

/// <summary>
/// Determines whether two <see cref="MetadataName{T}"/> are not equal to each other.
/// </summary>
/// <param name="left"></param>
/// <param name="right"></param>
/// <returns></returns>
public static bool operator !=(MetadataName<T> left, MetadataName<T> right)
{
return !(left == right);
Expand Down
Loading