Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multiple leaky bucket rate limiting #118

Closed
ben-pr-p opened this issue Jun 27, 2020 · 10 comments
Closed

Add multiple leaky bucket rate limiting #118

ben-pr-p opened this issue Jun 27, 2020 · 10 comments

Comments

@ben-pr-p
Copy link
Contributor

I have a send-message job that sends a text message. We have a global 3,000 messages / minute rate limit, a limit of 6 messages / second per sending phone number, and have many different clients that send messages. Each client has hundreds of phone numbers.

Our goals are:
a) To stay compliant with our global rate limit
b) To stay compliant with our per-phone number rate limit
c) To prevent any client from clogging the queue for all other clients, such that one client sending 6,000 messages in a minute means that all other clients messages are delayed by 2. Something like 1000 / minute would probably be sensible here, given that not all clients are going to send their maximum volume at once.

One way to do this is a Postgres friendly simplification of the leaky bucket algorithm, where you would have buckets:

buckets (
    bucket_type text,
    interval text, // 'second' or 'minute'
    capacity integer
)

And bucket_intervals:

bucket_intervals (
    bucket_type text,
    bucket_name text,
    bucket_time timestamp, // truncated to the second or minute based on the bucket interval
    tokens_used integer, // number of jobs in this bucket
    primary key (bucket_time, bucket_type, bucket_name)
)

Whenever run_at is computed, if the job has specified buckets (via a buckets text[] column / parameter), run_at would be set to the max of the user specified run_at (or now) and the next available slot which doesn't overflow any bucket interval.

For our use case, our buckets table would be:

insert into buckets (bucket_type, interval, capacity)
values 
    ('send-message-global', 'minute', 3000),
    ('send-message-client', 'minute', 1000),
    ('send-message-phone', 'second', 6);

And each send-message job would be queued with three buckets:

  1. send-message-global (which would map to bucket_type = 'send-message-global', bucket_name = null)
  2. send-message-client|<client-id> (bucket_type = 'send-message-client', bucket_name = <client-id>)
  3. send-message-phone|<phone-number> (bucket_type = 'send-message'phone, bucket_name = <phone-number`)

I think this could be accomplished via:

  1. adding an optional buckets parameter to add_job
  2. triggers on the jobs table that only run when the job has buckets, and as a result have no negative performance impact for users who don't need this feature

To keep it performant, we would need to delete past buckets. This could either be done on job completion / failure, or we could just write a SQL function to do it and leave periodically calling that function up to the user.

Although I'd like to be able to use an approach similar to the one described here, in this case we have multiple different queues whose rate limits interact.

Although it's also possible to implement this in user land with a custom add_job function and by overwriting fail_job, the project it is part of is closed source, and other users may benefit from having a simple way to rate limit jobs that interact with rate limited external APIs.

Do you think this is a good solution / would you like a PR for it, or do you think this level of complexity is best kept outside of this project?

@benjie
Copy link
Member

benjie commented Jun 30, 2020

I think it'd be appropriate to handle this within the jobs themselves rather than adding more logic to the database. I'd make the job definition something like (PSEUDOCODE!)

const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
const MESSAGES_PER_MINUTE = 3000;
const MESSAGES_PER_SECOND_PER_NUMBER = 6;
const AVERAGE_DELAY_IN_MILLISECONDS = 1000;
const MIN_DELAY_IN_MILLISECONDS = 100;
const MAX_ATTEMPTS = 20; // We'll try to find a send slot this many times.
                         // With these values, it'll take on average 22 seconds
                         // to exhaust attempts.

// This is the real task, sans rate limiting
const performSend = async (payload, helpers) => {
  // Example:
  await sms.send(payload.number, payload.message);
}

// This does the rate limiting then hands off to the above function for execution
const task = async (payload, helpers) => {
  const number = payload.number; // e.g. '+441234567890';

  for (let attempts = 0; attempts < MAX_ATTEMPTS; attempts ++) {
    // Check the number counter first; if this is allowed but the main limit is exceeded,
    // then the global limit will not be permitted _ever_ during this second so we're
    // not blocking anything.
    const numberCounter = `${number}|${Math.floor(Date.now()/1000)}`;
    if (await isCounterLessThan(numberCounter, MESSAGES_PER_SECOND_PER_NUMBER, 1)) {
      // Now check the global counter.
      const globalCounter = `global|${Math.floor(Date.now()/60000)}`;
      if (await isCounterLessThan(globalCounter, MESSAGES_PER_MINUTE, 60)) {
        // Do it! We're returning here so the loop stops and error is not thrown.
        return performSend(payload, helpers);
      }
    }
  
    // Try again once the slots have moved on
    await sleep(
      MIN_DELAY_IN_MILLISECONDS +
      Math.ceil(Math.random() * 2 * AVERAGE_DELAY_IN_MILLISECONDS)
    );
  }
  throw new Error("Attempts exceeded; try again later");
}

async function isCounterLessThan(counterName, limit, validityPeriodInSeconds) {
  // Advance the counter
  const val = await redis.incr(counterName, 1);
  // Make sure counter self-destructs
  await redis.expire(counterName, validityPeriodInSeconds + 60 /* allow for clock skew */);
  // Was the counter less than limit?
  return val <= limit;
}

There's some issues around latency in this - if latency is variable then there's a chance that messages per second may be high one second and low next - but I expect they actually use a rolling average so this should sort itself out.

With this, you'd want high concurrency (since you spend a hell of a lot of time waiting) and you don't really need a queue name. Having concurrency above the pg pool size would be completely fine for this, despite the warnings it raises.

What do you think?

I worry that because scheduling and execution are independent, performing leaky bucket at scheduling time would not be very good because if you ever scale your workers down all jobs might well be executed in a massive batch when the workers scale back up again. Performing leaky bucket at execution time in Postgres would probably slow the queue down and put more load on the DB.

I'd be well up for somehow making the above algorithm something you can automatically wrap tasks with; we've talked about task wrappers before and this is just the kind of thing they'd be great for.

@ben-pr-p
Copy link
Contributor Author

Hm, I see what you're saying about this not working if the worker goes down and starts back up again - the fact that original run_at's were spaced out won't matter several minutes into the future.

I think the problem with this approach is that our queues will still get clogged up. If client A sends 5,000 messages in a minute, and those are passed to the workers first, with this approach we'd end up waiting 20 seconds before deciding not to send each message after 1,000 in a minute, and so we'd need to have our worker concurrency be well over 4,000 in order to continue to process other clients messages.

I'll keep thinking! I think it should be possible to do the ahead of time scheduling pretty efficiently (not 10,000 jobs a second, but that's ok for me and probably others).

How about an alternate approach that would use Redis / some external process to know what buckets are overloaded this minute or this second, and then use that information to skip over jobs with those buckets when it looks for new jobs? That would avoid taking up job concurrency slots waiting, avoid the compute of ahead of time database scheduling, and work if the jobs were not fresh / the workers were just starting up again.

@benjie
Copy link
Member

benjie commented Jul 22, 2020

I've pondered this a bit, and the get_job function already takes task_identifiers which it uses for filtering. If you can achieve this by also passing an additional argument for filtering (e.g. from the result of a redis lookup) I'm open to it. Would that work for what you need?

create or replace function :GRAPHILE_WORKER_SCHEMA.get_job(worker_id text, task_identifiers text[] = null, job_expiry interval = interval '4 hours') returns :GRAPHILE_WORKER_SCHEMA.jobs as $$

@ben-pr-p
Copy link
Contributor Author

Almost - if I could pass a getTaskIdentifiers function that would get called for each get_job invocation, that would let me skip over a particular task, but since a task can only have one task_identifier, I couldn't do the type of multiple overlapping rate limiting buckets that I need to do for my application.

This is one of the many scenarios where I wish pl/pgsql had higher order functions - then we could just do get_job(worker_id text, filter_fn (job) => boolean) (I'm mixing Typescript / pl/pgsql here) and then that filter_fn could be user defined

@benjie
Copy link
Member

benjie commented Jul 22, 2020

If you were to do that mixed syntax; what would the content of the lambda/callback be?

@ben-pr-p
Copy link
Contributor Author

Something like:

const filterFn = (job): Promise<boolean> => {
  const rateLimitedBuckets = await getOverloadedBuckets()
  return job.buckets.find(b => rateLimitedBuckets.includes(b)) === undefined
  // or postgres && operator for array overlap with the tasks buckets and the buckets that are currently rate limite
}

@benjie
Copy link
Member

benjie commented Jul 22, 2020

To be clear, is job.buckets a new property we need to add, or is it details from within the job payload?

@ben-pr-p
Copy link
Contributor Author

Could go either way there! I think if we were to implement it as a fork, we would probably use an __buckets property in the payload.

@benjie
Copy link
Member

benjie commented Aug 4, 2020

Discussed with Ben via Discord voice chat; current plan is for Ben to look into the performance of:

  • adding a text[] column "flags" (or similar) to jobs
  • adding a filter like this one for forbidden_flags such that a job won't be matched if any of its flags are in forbidden_flags
  • should flags be nullable, or is it more performant for it to be an empty array?

We'll then have a worker configuration option that would be something like:

  forbiddenFlags: null | string[] | (() => string[] | Promise<string[]>)

We'll call this (if it's a function) before each get_job call to determine the list of forbidden flags.

This should generalise to rate-limiting leaky-bucket style across any number of APIs. It will be up to the user to manage their own empty bucket list (e.g. via Redis, or whatever).

@benjie
Copy link
Member

benjie commented Sep 2, 2020

Solved via #131

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants