Skip to content

Commit

Permalink
Fix orphaned rate-limit buckets (#2585)
Browse files Browse the repository at this point in the history
  • Loading branch information
MinnDevelopment authored Nov 18, 2023
1 parent 8937fd4 commit 9a55479
Showing 1 changed file with 14 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private void cleanup()
bucket.requests.removeIf(Work::isSkipped); // Remove cancelled requests

// Check if the bucket is empty
if (bucket.requests.isEmpty())
if (bucket.requests.isEmpty() && !rateLimitQueue.containsKey(bucket))
{
// remove uninit if requests are empty
if (bucket.isUninit())
Expand Down Expand Up @@ -231,8 +231,8 @@ private void runBucket(Bucket bucket)
return;
// Schedule a new bucket worker if no worker is running
MiscUtil.locked(lock, () ->
rateLimitQueue.computeIfAbsent(bucket,
(k) -> config.getPool().schedule(bucket, bucket.getRateLimit(), TimeUnit.MILLISECONDS)));
rateLimitQueue.computeIfAbsent(bucket,
k -> config.getPool().schedule(bucket, bucket.getRateLimit(), TimeUnit.MILLISECONDS)));
}

private long parseLong(String input)
Expand All @@ -252,9 +252,9 @@ private long getNow()
return System.currentTimeMillis();
}

private void updateBucket(Route.CompiledRoute route, Response response)
private Bucket updateBucket(Route.CompiledRoute route, Response response)
{
MiscUtil.locked(lock, () ->
return MiscUtil.locked(lock, () ->
{
try
{
Expand Down Expand Up @@ -302,14 +302,16 @@ else if (cloudflare)
boolean firstHit = hitRatelimit.add(baseRoute) && retryAfter < 60000;
// Update the bucket to the new information
bucket.remaining = 0;
bucket.reset = getNow() + retryAfter;
bucket.reset = now + retryAfter;
// don't log warning if we hit the rate limit for the first time, likely due to initialization of the bucket
// unless its a long retry-after delay (more than a minute)
if (firstHit)
log.debug("Encountered 429 on route {} with bucket {} Retry-After: {} ms Scope: {}", baseRoute, bucket.bucketId, retryAfter, scope);
else
log.warn("Encountered 429 on route {} with bucket {} Retry-After: {} ms Scope: {}", baseRoute, bucket.bucketId, retryAfter, scope);
}

log.trace("Updated bucket {} to retry after {}", bucket.bucketId, bucket.reset - now);
return bucket;
}

Expand Down Expand Up @@ -367,7 +369,8 @@ public void enqueue(Work request)

public void retry(Work request)
{
requests.addFirst(request);
if (!moveRequest(request))
requests.addFirst(request);
}

public long getReset()
Expand Down Expand Up @@ -423,7 +426,7 @@ public Queue<Work> getRequests()
return requests;
}

protected Boolean moveRequest(Work request)
protected boolean moveRequest(Work request)
{
return MiscUtil.locked(lock, () ->
{
Expand All @@ -433,9 +436,8 @@ protected Boolean moveRequest(Work request)
{
bucket.enqueue(request);
runBucket(bucket);
return true;
}
return false;
return bucket != this;
});
}

Expand Down Expand Up @@ -480,12 +482,8 @@ public void run()
if (request.isSkipped())
continue;

// Check if a bucket has been discovered and initialized for this route
if (isUninit())
{
boolean shouldSkip = moveRequest(request);
if (shouldSkip) continue;
}
if (isUninit() && moveRequest(request))
continue;

if (execute(request)) break;
}
Expand Down

0 comments on commit 9a55479

Please sign in to comment.