Skip to content

Commit

Permalink
Direct Port RateLimiter from Rezilience (#2083)
Browse files Browse the repository at this point in the history
Co-authored-by: Harish Subramanium <hsubra89@gmail.com>
  • Loading branch information
mikearnaldi and hsubra89 authored Feb 9, 2024
1 parent e94130a commit be19ce0
Show file tree
Hide file tree
Showing 7 changed files with 404 additions and 6 deletions.
51 changes: 51 additions & 0 deletions .changeset/strong-peas-repeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
---
"effect": patch
---

Add `Ratelimiter` which limits the number of calls to a resource within a time window using the token bucket algorithm.

Usage Example:

```ts
import { Effect, RateLimiter } from "effect";

// we need a scope because the rate limiter needs to allocate a state and a background job
const program = Effect.scoped(
Effect.gen(function* ($) {
// create a rate limiter that executes up to 10 requests within 2 seconds
const rateLimit = yield* $(RateLimiter.make(10, "2 seconds"));
// simulate repeated calls
for (let n = 0; n < 100; n++) {
// wrap the effect we want to limit with rateLimit
yield* $(rateLimit(Effect.log("Calling RateLimited Effect")));
}
})
);

// will print 10 calls immediately and then throttle
program.pipe(Effect.runFork);
```

Or, in a more real world scenario, with a dedicated Service + Layer:

```ts
import { Context, Effect, Layer, RateLimiter } from "effect";

class ApiLimiter extends Context.Tag("@services/ApiLimiter")<
ApiLimiter,
RateLimiter.RateLimiter
>() {
static Live = RateLimiter.make(10, "2 seconds").pipe(
Layer.scoped(ApiLimiter)
);
}

const program = Effect.gen(function* ($) {
const rateLimit = yield* $(ApiLimiter);
for (let n = 0; n < 100; n++) {
yield* $(rateLimit(Effect.log("Calling RateLimited Effect")));
}
});

program.pipe(Effect.provide(ApiLimiter.Live), Effect.runFork);
```
34 changes: 34 additions & 0 deletions packages/effect/src/RateLimiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Limits the number of calls to a resource to a maximum amount in some interval using the token bucket algorithm.
*
* Note that only the moment of starting the effect is rate limited: the number of concurrent executions is not bounded.
*
* Calls are queued up in an unbounded queue until capacity becomes available.
*
* @since 2.0.0
*/
import type { DurationInput } from "./Duration.js"
import type { Effect } from "./Effect.js"
import * as internal from "./internal/rateLimiter.js"
import type { Scope } from "./Scope.js"

/**
* Limits the number of calls to a resource to a maximum amount in some interval using the token bucket algorithm.
*
* Note that only the moment of starting the effect is rate limited: the number of concurrent executions is not bounded.
*
* Calls are queued up in an unbounded queue until capacity becomes available.
*
* @since 2.0.0
* @category models
*/
export interface RateLimiter {
<A, E, R>(task: Effect<A, E, R>): Effect<A, E, R>
}

/**
* @since 2.0.0
* @category constructors
*/
export const make = (limit: number, window: DurationInput): Effect<RateLimiter, never, Scope> =>
internal.make(limit, window)
11 changes: 11 additions & 0 deletions packages/effect/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,17 @@ export * as Queue from "./Queue.js"
*/
export * as Random from "./Random.js"

/**
* Limits the number of calls to a resource to a maximum amount in some interval using the token bucket algorithm.
*
* Note that only the moment of starting the effect is rate limited: the number of concurrent executions is not bounded.
*
* Calls are queued up in an unbounded queue until capacity becomes available.
*
* @since 2.0.0
*/
export * as RateLimiter from "./RateLimiter.js"

/**
* This module provides utility functions for working with arrays in TypeScript.
*
Expand Down
5 changes: 5 additions & 0 deletions packages/effect/src/internal/nextPow2.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/** @internal */
export const nextPow2 = (n: number): number => {
const nextPow = Math.ceil(Math.log(n) / Math.log(2))
return Math.max(Math.pow(2, nextPow), 2)
}
7 changes: 1 addition & 6 deletions packages/effect/src/internal/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type * as Scope from "../Scope.js"
import * as core from "./core.js"
import * as executionStrategy from "./executionStrategy.js"
import * as fiberRuntime from "./fiberRuntime.js"
import { nextPow2 } from "./nextPow2.js"
import * as queue from "./queue.js"

const AbsentValue = Symbol.for("effect/PubSub/AbsentValue")
Expand Down Expand Up @@ -1171,12 +1172,6 @@ export const unsafeMakePubSub = <A>(
return new PubSubImpl(pubsub, subscribers, scope, shutdownHook, shutdownFlag, strategy)
}

/** @internal */
const nextPow2 = (n: number): number => {
const nextPow = Math.ceil(Math.log(n) / Math.log(2.0))
return Math.max(Math.pow(2, nextPow), 2)
}

/** @internal */
const ensureCapacity = (capacity: number): void => {
if (capacity <= 0) {
Expand Down
79 changes: 79 additions & 0 deletions packages/effect/src/internal/rateLimiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* This is a direct port of `RateLimiter` from Rezilience
* https://github.com/svroonland/rezilience/blob/master/rezilience/shared/src/main/scala/nl/vroste/rezilience/RateLimiter.scala
*/

import * as Chunk from "../Chunk.js"
import * as Deferred from "../Deferred.js"
import type { DurationInput } from "../Duration.js"
import * as Effect from "../Effect.js"
import { pipe } from "../Function.js"
import * as Queue from "../Queue.js"
import * as Ref from "../Ref.js"
import * as Stream from "../Stream.js"
import { nextPow2 } from "./nextPow2.js"

/** @internal */
export const make = (limit: number, window: DurationInput) => {
return Effect.gen(function*($) {
const q = yield* $(Queue.bounded<[Ref.Ref<boolean>, Effect.Effect<void>]>(nextPow2(limit)))

yield* $(
pipe(
Stream.fromQueue(q, { maxChunkSize: 1 }),
Stream.filterEffect(([interrupted]) => {
return pipe(
Ref.get(interrupted),
Effect.map((b) => !b)
)
}),
Stream.throttle({
strategy: "shape",
duration: window,
cost: Chunk.size,
units: limit
}),
Stream.mapEffect(([_interrupted, eff]) => eff, { concurrency: "unbounded", unordered: true }),
Stream.runDrain,
Effect.interruptible,
Effect.forkScoped
)
)

const apply = <A, E, R>(task: Effect.Effect<A, E, R>) =>
Effect.gen(function*($) {
const start = yield* $(Deferred.make<void>())
const done = yield* $(Deferred.make<void>())
const interruptedRef = yield* $(Ref.make(false))

const action = pipe(
Deferred.succeed(start, void 0),
Effect.flatMap(() => Deferred.await(done))
)

const onInterruptOrCompletion = pipe(
Ref.set(interruptedRef, true),
Effect.flatMap(() => Deferred.succeed(done, void 0))
)

const run = pipe(
Queue.offer(q, [interruptedRef, action]),
Effect.onInterrupt(() => onInterruptOrCompletion)
)

const result = yield* $(
Effect.scoped(
pipe(
Effect.acquireReleaseInterruptible(run, () => onInterruptOrCompletion),
Effect.flatMap(() => Deferred.await(start)),
Effect.flatMap(() => task)
)
)
)

return result
})

return apply
})
}
Loading

0 comments on commit be19ce0

Please sign in to comment.