Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RateLimiting APIs #61788

Merged
merged 9 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public sealed partial class ConcurrencyLimiter : System.Threading.RateLimiting.R
{
public ConcurrencyLimiter(System.Threading.RateLimiting.ConcurrencyLimiterOptions options) { }
protected override System.Threading.RateLimiting.RateLimitLease AcquireCore(int permitCount) { throw null; }
public override void Dispose() { }
public override int GetAvailablePermits() { throw null; }
protected override System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease> WaitAsyncCore(int permitCount, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
Expand Down Expand Up @@ -42,11 +43,12 @@ public enum QueueProcessingOrder
OldestFirst = 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approved API called these:

ProcessOldest,
ProcessNewest

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approved API code was wrong, the comment above the code states:

We decided to rename the QueueProcessingOrder members from { ProcessOldest, ProcessNewest } to { OldestFirst, NewestFirst }

NewestFirst = 1,
}
public abstract partial class RateLimiter
public abstract partial class RateLimiter : System.IDisposable
{
protected RateLimiter() { }
public System.Threading.RateLimiting.RateLimitLease Acquire(int permitCount = 1) { throw null; }
protected abstract System.Threading.RateLimiting.RateLimitLease AcquireCore(int permitCount);
public abstract void Dispose();
BrennanConroy marked this conversation as resolved.
Show resolved Hide resolved
public abstract int GetAvailablePermits();
public System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease> WaitAsync(int permitCount = 1, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
protected abstract System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease> WaitAsyncCore(int permitCount, System.Threading.CancellationToken cancellationToken);
Expand All @@ -66,6 +68,7 @@ public sealed partial class TokenBucketRateLimiter : System.Threading.RateLimiti
{
public TokenBucketRateLimiter(System.Threading.RateLimiting.TokenBucketRateLimiterOptions options) { }
protected override System.Threading.RateLimiting.RateLimitLease AcquireCore(int tokenCount) { throw null; }
public override void Dispose() { }
public override int GetAvailablePermits() { throw null; }
public bool TryReplenish() { throw null; }
protected override System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease> WaitAsyncCore(int tokenCount, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public sealed class ConcurrencyLimiter : RateLimiter
{
private int _permitCount;
private int _queueCount;
private bool _disposed;

private readonly ConcurrencyLimiterOptions _options;
private readonly Deque<RequestRegistration> _queue = new Deque<RequestRegistration>();
Expand Down Expand Up @@ -49,7 +50,7 @@ protected override RateLimitLease AcquireCore(int permitCount)
}

// Return SuccessfulLease or FailedLease to indicate limiter state
if (permitCount == 0)
if (permitCount == 0 && !_disposed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to throw ObjectDisposedException if this has been disposed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should an ODE throw synchronously or be wrapped in a ValueTask?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the first question is:

  • Do we want to throw ODE or always return "failed" when a limiter is disposed?

Doing a quick check of other System.Threading disposable types, they seem to throw ODE: Barrier, ManualResetEventSlim, ReaderWriterLockSlim

For the next question, I would say it is similar to argument checking - so we would throw inline.

{
return _permitCount > 0 ? SuccessfulLease : FailedLease;
}
Expand Down Expand Up @@ -79,7 +80,7 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, Canc
}

// Return SuccessfulLease if requestedCount is 0 and resources are available
if (permitCount == 0 && _permitCount > 0)
if (permitCount == 0 && _permitCount > 0 && !_disposed)
{
return new ValueTask<RateLimitLease>(SuccessfulLease);
}
Expand All @@ -95,7 +96,6 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, Canc
// Don't queue if queue limit reached
if (_queueCount + permitCount > _options.QueueLimit)
{
// Perf: static failed/successful value tasks?
return new ValueTask<RateLimitLease>(QueueLimitLease);
}

Expand All @@ -120,6 +120,12 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, Canc

private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out RateLimitLease? lease)
{
if (_disposed)
{
lease = FailedLease;
return true;
}

// if permitCount is 0 we want to queue it if there are no available permits
if (_permitCount >= permitCount && _permitCount != 0)
{
Expand Down Expand Up @@ -149,6 +155,11 @@ private void Release(int releaseCount)
{
lock (Lock)
{
if (_disposed)
{
return;
}

_permitCount += releaseCount;
Debug.Assert(_permitCount <= _options.PermitLimit);

Expand Down Expand Up @@ -188,6 +199,22 @@ private void Release(int releaseCount)
}
}

public override void Dispose()
{
lock (Lock)
{
_disposed = true;
while (_queue.Count > 0)
{
RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
next.CancellationTokenRegistration.Dispose();
next.Tcs.SetResult(FailedLease);
}
}
}

private sealed class ConcurrencyLease : RateLimitLease
{
private static readonly string[] s_allMetadataNames = new[] { MetadataName.ReasonPhrase.Name };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace System.Threading.RateLimiting
/// <summary>
/// Represents a limiter type that users interact with to determine if an operation can proceed.
/// </summary>
public abstract class RateLimiter
public abstract class RateLimiter : IDisposable
{
/// <summary>
/// An estimated count of available permits.
Expand Down Expand Up @@ -74,5 +74,8 @@ public ValueTask<RateLimitLease> WaitAsync(int permitCount = 1, CancellationToke
/// <param name="cancellationToken">Optional token to allow canceling a queued request for permits.</param>
/// <returns>A task that completes when the requested permits are acquired or when the requested permits are denied.</returns>
protected abstract ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken);

/// <interitdoc/>
public abstract void Dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public sealed class TokenBucketRateLimiter : RateLimiter
private int _tokenCount;
private int _queueCount;
private uint _lastReplenishmentTick = (uint)Environment.TickCount;
private bool _disposed;

private readonly Timer? _renewTimer;
private readonly TokenBucketRateLimiterOptions _options;
Expand All @@ -25,6 +26,7 @@ public sealed class TokenBucketRateLimiter : RateLimiter
private object Lock => _queue;

private static readonly RateLimitLease SuccessfulLease = new TokenBucketLease(true, null);
private static readonly RateLimitLease FailedLease = new TokenBucketLease(false, null);

/// <summary>
/// Initializes the <see cref="TokenBucketRateLimiter"/>.
Expand Down Expand Up @@ -54,7 +56,7 @@ protected override RateLimitLease AcquireCore(int tokenCount)
}

// Return SuccessfulLease or FailedLease depending to indicate limiter state
if (tokenCount == 0)
if (tokenCount == 0 && !_disposed)
{
if (_tokenCount > 0)
{
Expand Down Expand Up @@ -85,7 +87,7 @@ protected override ValueTask<RateLimitLease> WaitAsyncCore(int tokenCount, Cance
}

// Return SuccessfulAcquisition if requestedCount is 0 and resources are available
if (tokenCount == 0 && _tokenCount > 0)
if (tokenCount == 0 && _tokenCount > 0 && !_disposed)
{
return new ValueTask<RateLimitLease>(SuccessfulLease);
}
Expand Down Expand Up @@ -137,6 +139,12 @@ private RateLimitLease CreateFailedTokenLease(int tokenCount)

private bool TryLeaseUnsynchronized(int tokenCount, [NotNullWhen(true)] out RateLimitLease? lease)
{
if (_disposed)
{
lease = FailedLease;
return true;
}

// if permitCount is 0 we want to queue it if there are no available permits
if (_tokenCount >= tokenCount && _tokenCount != 0)
{
Expand Down Expand Up @@ -202,6 +210,11 @@ private void ReplenishInternal(uint nowTicks)
// method is re-entrant (from Timer), lock to avoid multiple simultaneous replenishes
lock (Lock)
{
if (_disposed)
{
return;
}

// Fix the wrapping by using a long and adding uint.MaxValue in the wrapped case
long nonWrappedTicks = wrapped ? (long)nowTicks + uint.MaxValue : nowTicks;
if (nonWrappedTicks - _lastReplenishmentTick < _options.ReplenishmentPeriod.TotalMilliseconds)
Expand Down Expand Up @@ -267,6 +280,23 @@ private void ReplenishInternal(uint nowTicks)
}
}

public override void Dispose()
{
lock (Lock)
{
_disposed = true;
_renewTimer?.Dispose();
while (_queue.Count > 0)
{
RequestRegistration next = _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
? _queue.DequeueHead()
: _queue.DequeueTail();
next.CancellationTokenRegistration.Dispose();
next.Tcs.SetResult(FailedLease);
}
}
}

private sealed class TokenBucketLease : RateLimitLease
{
private static readonly string[] s_allMetadataNames = new[] { MetadataName.RetryAfter.Name };
Expand Down Expand Up @@ -294,8 +324,6 @@ public override bool TryGetMetadata(string metadataName, out object? metadata)
metadata = default;
return false;
}

protected override void Dispose(bool disposing) { }
}

private readonly struct RequestRegistration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,8 @@ public abstract class BaseRateLimiterTests

[Fact]
public abstract void MetadataNamesContainsAllMetadata();

[Fact]
public abstract Task DisposeReleasesQueuedAcquires();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,35 @@ public override void MetadataNamesContainsAllMetadata()
Assert.Collection(lease.MetadataNames, metadataName => Assert.Equal(metadataName, MetadataName.ReasonPhrase.Name));
}

[Fact]
public override async Task DisposeReleasesQueuedAcquires()
{
var limiter = new ConcurrencyLimiter(new ConcurrencyLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3));
using var lease = limiter.Acquire(1);

var wait1 = limiter.WaitAsync(1);
var wait2 = limiter.WaitAsync(1);
var wait3 = limiter.WaitAsync(1);
Assert.False(wait1.IsCompleted);
Assert.False(wait2.IsCompleted);
Assert.False(wait3.IsCompleted);

limiter.Dispose();

var failedLease = await wait1;
Assert.False(failedLease.IsAcquired);
failedLease = await wait2;
Assert.False(failedLease.IsAcquired);
failedLease = await wait3;
Assert.False(failedLease.IsAcquired);

lease.Dispose();

// Can't acquire any leases after disposal
Assert.False(limiter.Acquire(1).IsAcquired);
Assert.False((await limiter.WaitAsync(1)).IsAcquired);
}

[Fact]
public async Task ReasonMetadataOnFailedWaitAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,33 @@ public override void MetadataNamesContainsAllMetadata()
Assert.Collection(lease.MetadataNames, metadataName => Assert.Equal(metadataName, MetadataName.RetryAfter.Name));
}

[Fact]
public override async Task DisposeReleasesQueuedAcquires()
{
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions(1, QueueProcessingOrder.OldestFirst, 3,
TimeSpan.Zero, 1, autoReplenishment: false));
var lease = limiter.Acquire(1);
var wait1 = limiter.WaitAsync(1);
var wait2 = limiter.WaitAsync(1);
var wait3 = limiter.WaitAsync(1);
Assert.False(wait1.IsCompleted);
Assert.False(wait2.IsCompleted);
Assert.False(wait3.IsCompleted);

limiter.Dispose();

lease = await wait1;
Assert.False(lease.IsAcquired);
lease = await wait2;
Assert.False(lease.IsAcquired);
lease = await wait3;
Assert.False(lease.IsAcquired);

// Can't acquire any leases after disposal
Assert.False(limiter.Acquire(1).IsAcquired);
Assert.False((await limiter.WaitAsync(1)).IsAcquired);
}

[Fact]
public async Task RetryMetadataOnFailedWaitAsync()
{
Expand Down