Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] RateLimitPolicy sketch #666

Conversation

reisenberger
Copy link
Member

The issue or feature being addressed

#260 Rate limiting

Confirm the following

  • I started this PR by branching from the head of the latest dev vX.Y branch, or I have rebased on the latest dev vX.Y branch, or I have merged the latest changes from the dev vX.Y branch
  • I have targeted the PR to merge into the latest dev vX.Y branch as the base branch
  • I have included unit tests for the issue/feature
  • I have successfully run a local build

@reisenberger
Copy link
Member Author

Hi @georgiosd . Can you describe the use case/real world scenario for "should retry" in more detail, so that I can better understand? Thanks!

@georgiosd
Copy link

Of course @reisenberger.

Think of a trading system. The exchange API has a rate limit and it's possible I will have to wait to send my order.

But if the prices have changed until the moment I get my turn and my order is no longer relevant (I calculate that I will make a loss), then I should discard the order.

@reisenberger
Copy link
Member Author

reisenberger commented Jul 22, 2019

Thanks for raising this and elaborating, @georgiosd . The goal of my q was to understand whether the scenario should be handled by CancellationToken, which your example answers. (The commonest reason I could/can envisage for not executing after being rate-limited - a simple "discard this, it's too late now" - should be handled by CancellationToken via a wrapping TimeoutPolicy.)

We'll come back to this when we have a policy closer to fruition.

My first reaction to adding shouldExecute: as a delegate was (is) not, mainly because the same is easily achievable by just placing that same go/no-go code as the first statements of the delegate executed through the policy. New API has to meet quite a high bar for inclusion, because of the tendency of APIs to proliferate over time - every API piece becomes extra to maintain, and for users of the product to grok. So keeping things simple - especially at the outset of a policy - is a good principle.

That perspective can evolve though as we learn more about how we and everybody would want to use the policy - more ideas/thoughts, anyone?

@reisenberger reisenberger changed the base branch from v711-or-v720 to v712-or-v720 September 8, 2019 09:46
@reisenberger reisenberger removed this from the v7.1.2 or v7.2.0 milestone Nov 25, 2019
@reisenberger reisenberger changed the base branch from v712-or-v720 to v721-or-v730 December 22, 2019 10:41
@ghost
Copy link

ghost commented Jan 3, 2020

Hi @reisenberger,
while I am delighted to learn that the Polly library - which I hold in high regard even as it is - will hopefully be extended with a very useful rate limiting feature set in the near future, I must report that I may have found a glitch in the implementation of LockFreeTokenBucketRateLimiter.PermitExecution().

The problem manifests itself in multi-threading environments where

  • the current token contingent has been exhausted,
  • the timespan for adding new tokens to the RateLimiter has passed, and
  • multiple threads contend for a token.

It appears to me that there is a window of uncertainty in LockFreeTokenBucketRateLimiter.cs between the call to
if (Interlocked.CompareExchange(ref addNextTokenAtTicks, newAddNextTokenAtTicks, currentAddNextTokenAtTicks) == currentAddNextTokenAtTicks) (line 87)
and the accompanying call to
Interlocked.Exchange(ref currentTokens, tokensToAdd - 1); (line 99)
that can sometimes lead contending threads to run into an unexpected state between the calls to
long tokensAfterGrabOne = Interlocked.Decrement(ref currentTokens); (line 50)
and
long currentAddNextTokenAtTicks = Interlocked.Read(ref addNextTokenAtTicks); (line 60)
resulting in an inappropriate
return (false, TimeSpan.FromTicks(ticksTillAddNextToken)); (line 66)
when a new token contingent is being added just then.

/// <summary>
/// Returns whether the execution is permitted; if not, returns what <see cref="TimeSpan"/> should be waited before retrying.
/// </summary>
public (bool permitExecution, TimeSpan retryAfter) PermitExecution()
{
    while (true)
    {
        // Try to get a token.
L50-->  long tokensAfterGrabOne = Interlocked.Decrement(ref currentTokens);

        if (tokensAfterGrabOne >= 0)
        {
            // We got a token: permit execution!
            return (true, TimeSpan.Zero);
        }

        // No tokens! We're rate-limited - unless we can refill the bucket.
        long now = SystemClock.DateTimeOffsetUtcNow().Ticks;
L60-->  long currentAddNextTokenAtTicks = Interlocked.Read(ref addNextTokenAtTicks);
        long ticksTillAddNextToken = currentAddNextTokenAtTicks - now;

        if (ticksTillAddNextToken > 0)
        {
            // Not time to add tokens yet: we're rate-limited!
L66-->      return (false, TimeSpan.FromTicks(ticksTillAddNextToken));
        }

        // Time to add tokens to the bucket!

        // We definitely need to add one token.  In fact, if we haven't hit this bit of code for a while, we might be due to add a bunch of tokens.
        long tokensMissedAdding =
            // Passing addNextTokenAtTicks merits one token
            1 +
            // And any whole token tick intervals further each merit another.
            (-ticksTillAddNextToken / addTokenTickInterval);

        // We mustn't exceed bucket capacity though.
        long tokensToAdd = Math.Min(bucketCapacity, tokensMissedAdding);

        // Work out when tokens would next be due to be added, if we add these tokens.
        long newAddNextTokenAtTicks = currentAddNextTokenAtTicks + tokensToAdd * addTokenTickInterval;
        // But if we were way overdue refilling the bucket (there was inactivity for a while), that value would be out-of-date: the next time we add tokens must be at least addTokenTickInterval from now.
        newAddNextTokenAtTicks = Math.Max(newAddNextTokenAtTicks, now + addTokenTickInterval);

        // Now see if we win the race to add these tokens.  Other threads might be racing through this code at the same time: only one thread must add the tokens!
L87-->  if (Interlocked.CompareExchange(ref addNextTokenAtTicks, newAddNextTokenAtTicks, currentAddNextTokenAtTicks) == currentAddNextTokenAtTicks)
        {
            // We won the race to add the tokens!

            // Theoretically we want to add tokensToAdd tokens.  But in fact we don't do that.
            // We want to claim one of those tokens for ourselves - there's no way we're going to add it but let another thread snatch it from under our nose.
            // (Doing that could leave this thread looping round adding tokens for ever which other threads just snatch - would lead to odd observed behaviour.)

            // So in fact we add (tokensToAdd - 1) tokens (ie we consume one), and return, permitting this execution.

            // The advantage of only adding tokens when the bucket is empty is that we can now hard set the new amount of tokens (Interlocked.Exchange) without caring if other threads have simultaneously been taking or adding tokens.
            // (If we added a token per addTokenTickInterval to a non-empty bucket, the reasoning about not overflowing the bucket seems harder.)
L99-->     Interlocked.Exchange(ref currentTokens, tokensToAdd - 1);
            return (true, TimeSpan.Zero);
        }
        else
        {
            // We didn't win the race to add the tokens. BUT because it _was_ time to add tokens, another thread must have won that race and have added/be adding tokens, so there _may_ be more tokens, so loop and try again.

            // We want any thread refilling the bucket to have a chance to do so before we try to grab the next token.
#if NETSTANDARD2_0
          Thread.Sleep(0);
#else
            spinner.SpinOnce();
#endif
        }
    }
}

I'm afraid I cannot come up with a fix from the top of my head, but at least I can offer a unit test that will expose the problem.

Please modify TokenBucketRateLimiterTestsBase.cs to include the following:

[Theory]
[InlineData(3, 2)]
[InlineData(3, 3)]
[InlineData(5, 3)]
public void Given_parallel_contention_with_multiple_new_tokens_available_ratelimiter_permits_all_expected(int parallelContention, int tokens)
{
    tokens.Should().BeGreaterThan(1);

    for (int repetition = 0; repetition < 20; repetition++)
    {
        FixClock();

        // Arrange
        TimeSpan onePer = TimeSpan.FromSeconds(1);
        var rateLimiter = GetRateLimiter(onePer, tokens);

        // Arrange - rate limiter with tokens exhausted and SystemClock advanced by (tokens * onePer), parallel tasks all waiting on a manual reset event.
        for (int i = 0; i < tokens; i++)
            rateLimiter.PermitExecution();
        AdvanceClock(tokens * onePer.Ticks);

        ManualResetEventSlim gate = new ManualResetEventSlim();
        Task<(bool permitExecution, TimeSpan retryAfter)>[] tasks = new Task<(bool, TimeSpan)>[parallelContention];
        for (int i = 0; i < parallelContention; i++)
        {
            tasks[i] = Task.Run(async () =>
            {
                await Task.Delay(TimeSpan.FromMilliseconds(10));
                gate.Wait();
                return rateLimiter.PermitExecution();
            });
        }

        // Act - release gate.
        Task.Delay(TimeSpan.FromMilliseconds(100)).Wait();
        gate.Set();
        Within(TimeSpan.FromSeconds(10 /* high to allow for slow-running on time-slicing CI servers */), () => tasks.All(t => t.IsCompleted).Should().BeTrue());

        // Assert - #tokens should have permitted execution, n-#tokens not.
        var results = tasks.Select(t => t.Result).ToList();
        results.Count(r => r.permitExecution).Should().Be(tokens);
        results.Count(r => !r.permitExecution).Should().Be(parallelContention - tokens);
    }
}

The test will fail every so often when using LockFreeTokenBucketRateLimiter - at least when executed in the command line test runner along the lines of vstest.console.exe C:\GIT\Polly\src\Polly.Specs\bin\Release\net472\Polly.Specs.dll --Tests:Given_parallel_contention_with_multiple_new_tokens_available_ratelimiter_permits_all_expected and on .NET Framework 4.7.2 or 4.6.1, or .NET Core 3.0.
In Visual Studio 2019's integrated test runner I am able to produce the error in every single iteration by setting breakpoints on lines 66 and 99 in LockFreeTokenBucketRateLimiter.cs, but only very rarely without those breakpoints. Go figure.
It will always succeed when using LockBasedTokenBucketRateLimiter.

A possible workaround for the time being might be reverting the hard coded IRateLimiter implementation inside RateLimiterFactory from LockFreeTokenBucketRateLimiter back to LockBasedTokenBucketRateLimiter.

Hope this helps.

Cheers
DJö

@reisenberger
Copy link
Member Author

Thanks @djoe47441. Yes, I agree. I'd also independently spotted a potential race condition in LockFreeTokenBucketRateLimiter (need to check my notes to see if the same), and had planned to remove LockFreeTokenBucketRateLimiter in favour of LockBasedTokenBucketRateLimiter for that reason. Thanks for taking the time to put together your detailed report.

@ghost
Copy link

ghost commented Jan 6, 2020

Hi @reisenberger,
good to see that I'm not entirely mistaken.
I really appreciate your work on Polly in general and on the RateLimitPoliy extension in particular, and I'll be waiting eagerly to see the feature integrated into Polly proper.

BTW, could I ask you to extend the RateLimitRejectedException to include the originating RateLimiter's PolicyKey? I have a scenario where I need to chain multiple RateLimiter instances into a hierarchy - which basically works well via a PolicyWrap - but once an Exception is thrown there is no way to know on which hierarchy level the rate limit was exceeded.

Before:

	internal static class RateLimitEngine
	{
		internal static TResult Implementation<TResult>(IRateLimiter rateLimiter, Func<TimeSpan, Context, TResult> retryAfterFactory, Func<Context, CancellationToken, TResult> action, Context context, CancellationToken cancellationToken)
		{
			ValueTuple<bool, TimeSpan> valueTuple = rateLimiter.PermitExecution();
			bool permit = valueTuple.Item1;
			TimeSpan retryAfter = valueTuple.Item2;
			if (permit)
			{
				return action(context, cancellationToken);
			}
			if (retryAfterFactory != null)
			{
				return retryAfterFactory(retryAfter, context);
			}
			throw new RateLimitRejectedException(retryAfter);
		}
	}

After:

	internal static class RateLimitEngine
	{
		internal static TResult Implementation<TResult>(IRateLimiter rateLimiter, Func<TimeSpan, Context, TResult> retryAfterFactory, Func<Context, CancellationToken, TResult> action, Context context, CancellationToken cancellationToken)
		{
			ValueTuple<bool, TimeSpan> valueTuple = rateLimiter.PermitExecution();
			bool permit = valueTuple.Item1;
			TimeSpan retryAfter = valueTuple.Item2;
			if (permit)
			{
				return action(context, cancellationToken);
			}
			if (retryAfterFactory != null)
			{
				return retryAfterFactory(retryAfter, context);
			}
			throw new RateLimitRejectedException(context.PolicyKey, retryAfter);
		}
	}

@reisenberger
Copy link
Member Author

Hi @djoe47441

extend the RateLimitRejectedException to include the originating RateLimiter's PolicyKey

That's a great idea! 👍 Near identical came up on Polly slack yesterday also. We should generalise this so that all Polly-defined execution context is included, not only PolicyKey; and so that it applies to all custom exceptions Polly throws: RateLimitRejectedException; TimeoutRejectedException; BrokenCircuitException, BulkheadRejectedException

@martincostello
Copy link
Member

I've just stumbled across this PR, and assuming I've understood it correctly, the proposal does not allow for a use case I'm interested in (apologies if it does and I've misunderstood).

This appears to work on a global rate limit for the policy across all requests, but in some use cases (such as mine) you'd want to rate limit a user (e.g. by their ID, an access token, IP address etc.). While this is possible using this implementation as-is, it requires the implementer to maintain a policy per user.

For a system with thousands of distinct users, this would consume a non-trivial amount of additional memory, as well as require a mechanism to evict the policies over time when they're no longer in use (you wouldn't want a Policy lying around in memory for every user who's ever made a request to the app).

Could the functionality be extended to allow an arbitrary key to be used to shard the buckets inside the policy and make this a built-in feature, rather than a layer on top that many consumers would have to build on top themselves in a very similar manner?

RetryAfter = retryAfter;
}

private static string DefaultMessage(TimeSpan retryAfter) => $"The operation has been rate-limited and should be retried after ${retryAfter}";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static string DefaultMessage(TimeSpan retryAfter) => $"The operation has been rate-limited and should be retried after ${retryAfter}";
private static string DefaultMessage(TimeSpan retryAfter) => $"The operation has been rate-limited and should be retried after {retryAfter}";

@reisenberger
Copy link
Member Author

@martincostello Yes, great use case, and I have also been thinking about this. I'll come back to this (and involve you) when we circle back to the rate-limit policy! Thanks again.

@martincostello
Copy link
Member

@reisenberger No problem, thanks!

FYI, I took this PR as-is as source (excluding two tweaks, which I've left code comments on above) and incorporated it into a production service using a MemoryCache with a sliding expiry to handle the per-user sharding part, and it works quite nicely 😃

@martincostello martincostello changed the base branch from v721-or-v730 to v722-or-v730 January 23, 2021 14:33
@madelson
Copy link

This looks great; I'd love to see this merged in! I like the smoothing offered by the token bucket approach.

Two thoughts:

  • We'd definitely leverage the "keyed" version of this that @martincostello suggests (although we have use-cases for the non-keyed version as well).

  • One of our use-cases requires a "compound" policy where we have a limit of X requests/user/time but also a global policy of Y requests/time across all users. The idea is to avoid one user hogging all the requests. I haven't thought this through fully, but I think that with the current code I would run into issues where, for example, a user successfully gets a token from their per-user bucket but then fails to get one from the overall bucket. In that case the token they took from the per-user bucket is effectively wasted and so it would be ideal if it were returned to that bucket. Something like this:

// potentially contains a mix of sharded and unsharded buckets with different rates
private TokenBucketRateLimiter[] _rateLimiters; 

public bool TryTakeToken()
{
    lock (_lock)
    {
        for (var i = 0; i < _rateLimiters.Length; ++i)
        {
            if (!_rateLimiters[i].TryTakeToken())
            {
                // return all tokens taken so far
                for (var j = 0; j < i; ++j) { _rateLimiters[j].ReturnToken(); }
                return false;
            }
        }

        return true;
    }
}

This would also allow for specifying compound policies like "no more than 10 requests/second, and also no more than 100 requests/minute". These can be useful when you expect requests to come in bursts (and want to accommodate for that) but also you don't want to accommodate the burst-level rate constantly (I've worked with APIs that have such compound policies).

@martincostello
Copy link
Member

For the compound key scenario, would wrapping the rate limit in a bulkhead policy (or vice-versa) suit your use case?

@madelson
Copy link

@martincostello I'm not sure it would. If I understand correctly, bulkhead manages the number of concurrently executing actions, which is different than the rate of actions. For example a third-party API might be able to process requests very quickly but still block you from submitting requests faster than a certain rate. Another scenario is the one I mentioned here where we are using a rate-limit to protect our storage layer but the actions being executed always complete very quickly.

@martincostello martincostello changed the base branch from v722-or-v730 to v723-or-v730 April 11, 2021 16:37
@djjeane
Copy link

djjeane commented Aug 2, 2021

Hey there, would love this feature to get merged in. Is this still being looked into?

@martincostello
Copy link
Member

This feature is currently on-hold for an indefinite period due to the author's personal commitments making them unable to further pursue it.

I've had success using this from source in production applications for my own use cases, so you might find the same approach acceptable to benefit until work on this PR resumes.

@martincostello
Copy link
Member

I have taken the changes from this Pull Request and incorporated them into a new one - #903. If you are an interested party to this PR, please address any comments there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants