Skip to content

Commit

Permalink
Adds new algorithm for throttling (#113)
Browse files Browse the repository at this point in the history
* Adds new algorithm for throttling
  • Loading branch information
epinault authored Dec 13, 2024
1 parent acd2879 commit b2686bb
Show file tree
Hide file tree
Showing 14 changed files with 1,134 additions and 133 deletions.
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
# Changelog

## 7.0.0-rc.0 - 2024-12-04
## 7.0.0-rc.1 - 2024-12-13

- Improved API a little more. Should be compatibe with previous RC
- Made ETS backend more flexible with `:algorithm` option
- Added `:key_older_than` option to the ETS backend
- Added `:algorithm` option to the ETS backend with support for:
- `:fix_window` (default) - Fixed time window rate limiting
- `:sliding_window` - Sliding time window for smoother rate limiting
- `:leaky_bucket` - Constant rate limiting with burst capacity
- `:token_bucket` - Token-based rate limiting with burst capacity

## 7.0.0-rc.0 - 2024-12-13

- Breaking change. Completely new API. Consider upgrading if you are experiencing performance or usability problems with Hammer v6. See [./guides/upgrade-v7.md] for upgrade instructions. https://github.com/ExHammer/hammer/pull/104
- Hammer.Plug has been removed. See documentation for using Hammer as a plug in Phoenix.
Expand Down
7 changes: 4 additions & 3 deletions lib/hammer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ defmodule Hammer do
# Check the rate limit allowing 10 requests per second
MyApp.RateLimit.hit("some-key", _scale = :timer.seconds(1), _limit = 10)
"""

@type key :: term
Expand All @@ -23,12 +22,14 @@ defmodule Hammer do
@type increment :: non_neg_integer

@doc """
Checks if a key is allowed to perform an action, and increment the counter.
Same as `hit/4` with `increment` set to 1.
"""
@callback hit(key, scale, limit) :: {:allow, count} | {:deny, timeout}

@doc """
Checks if a key is allowed to perform an action, and increment the counter.
Optional callback to check if a key is allowed to perform an action, and increment the counter.
Returns `{:allow, count}` if the action is allowed, or `{:deny, timeout}` if the action is denied.
Expand Down Expand Up @@ -62,7 +63,7 @@ defmodule Hammer do
"""
@callback get(key, scale) :: count

@optional_callbacks inc: 2, inc: 3, set: 3, get: 2
@optional_callbacks hit: 4, inc: 2, inc: 3, set: 3, get: 2

@doc """
Use the Hammer library in a module to create a rate limiter.
Expand Down
183 changes: 96 additions & 87 deletions lib/hammer/ets.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,60 @@ defmodule Hammer.ETS do
Runtime configuration:
- `:clean_period` - (in milliseconds) period to clean up expired entries, defaults to 1 minute
- `:key_older_than` - (in milliseconds) maximum age for entries before they are cleaned up, defaults to 1 hour
- `:algorithm` - the rate limiting algorithm to use, one of: `:fix_window`, `:sliding_window`, `:leaky_bucket`, `:token_bucket`. Defaults to `:fix_window`
"""

use GenServer
require Logger

defmacro __before_compile__(_env) do
@type start_option ::
{:clean_period, pos_integer()}
| {:table, atom()}
| {:algorithm, module()}
| {:key_older_than, pos_integer()}
| GenServer.option()

# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
defmacro __before_compile__(%{module: module}) do
hammer_opts = Module.get_attribute(module, :hammer_opts)

algorithm =
case Keyword.get(hammer_opts, :algorithm) do
nil ->
Hammer.ETS.FixWindow

:ets ->
Hammer.ETS.FixWindow

:fix_window ->
Hammer.ETS.FixWindow

:sliding_window ->
Hammer.ETS.SlidingWindow

:leaky_bucket ->
Hammer.ETS.LeakyBucket

:token_bucket ->
Hammer.ETS.TokenBucket

_module ->
raise ArgumentError, """
Hammer requires a valid backend to be specified. Must be one of: :ets,:fix_window, :sliding_window, :leaky_bucket, :token_bucket.
If none is specified, :fix_window is used.
Example:
use Hammer, backend: :ets
"""
end

Code.ensure_loaded!(algorithm)

quote do
@table __MODULE__
@algorithm unquote(algorithm)

def child_spec(opts) do
%{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}, type: :worker}
Expand All @@ -28,38 +74,57 @@ defmodule Hammer.ETS do
def start_link(opts) do
opts = Keyword.put(opts, :table, @table)
opts = Keyword.put_new(opts, :clean_period, :timer.minutes(1))
opts = Keyword.put_new(opts, :algorithm, @algorithm)
Hammer.ETS.start_link(opts)
end

@impl Hammer
def hit(key, scale, limit, increment \\ 1) do
Hammer.ETS.hit(@table, key, scale, limit, increment)
if function_exported?(@algorithm, :hit, 4) do
def hit(key, scale, limit) do
@algorithm.hit(@table, key, scale, limit)
end
end

if function_exported?(@algorithm, :hit, 5) do
def hit(key, scale, limit, increment \\ 1) do
@algorithm.hit(@table, key, scale, limit, increment)
end
end

if function_exported?(@algorithm, :inc, 4) do
def inc(key, scale, increment \\ 1) do
@algorithm.inc(@table, key, scale, increment)
end
end

@impl Hammer
def inc(key, scale, increment \\ 1) do
Hammer.ETS.inc(@table, key, scale, increment)
if function_exported?(@algorithm, :set, 4) do
def set(key, scale, count) do
@algorithm.set(@table, key, scale, count)
end
end

@impl Hammer
def set(key, scale, count) do
Hammer.ETS.set(@table, key, scale, count)
if function_exported?(@algorithm, :get, 3) do
def get(key, scale) do
@algorithm.get(@table, key, scale)
end
end

@impl Hammer
def get(key, scale) do
Hammer.ETS.get(@table, key, scale)
if function_exported?(@algorithm, :get, 2) do
def get(key, scale) do
@algorithm.get(@table, key)
end
end
end
end

@type start_option :: {:clean_period, timeout} | GenServer.option()

@doc """
Starts the process that creates and cleans the ETS table.
Accepts the following options:
- `:clean_period` for how often to perform garbage collection
- `:clean_period` - How often to run the cleanup process (in milliseconds). Defaults to 1 minute.
- `:key_older_than` - Optional maximum age for bucket entries (in milliseconds). Defaults to 24 hours.
Entries older than this will be removed during cleanup.
- `:algorithm` - The rate limiting algorithm to use. Can be `:fixed_window`, `:sliding_window`,
`:token_bucket`, or `:leaky_bucket`. Defaults to `:fixed_window`.
- optional `:debug`, `:spawn_opts`, and `:hibernate_after` GenServer options
"""
@spec start_link([start_option]) :: GenServer.on_start()
Expand All @@ -68,6 +133,8 @@ defmodule Hammer.ETS do

{clean_period, opts} = Keyword.pop!(opts, :clean_period)
{table, opts} = Keyword.pop!(opts, :table)
{algorithm, opts} = Keyword.pop!(opts, :algorithm)
{key_older_than, opts} = Keyword.pop(opts, :key_older_than, :timer.hours(24))

case opts do
[] ->
Expand All @@ -79,102 +146,44 @@ defmodule Hammer.ETS do
)
end

config = %{table: table, clean_period: clean_period}
GenServer.start_link(__MODULE__, config, gen_opts)
end

@doc false
@spec hit(
table :: atom(),
key :: String.t(),
scale :: integer(),
limit :: integer(),
increment :: integer()
) :: {:allow, integer()} | {:deny, integer()}
def hit(table, key, scale, limit, increment) do
now = now()
window = div(now, scale)
full_key = {key, window}
expires_at = (window + 1) * scale
count = update_counter(table, full_key, increment, expires_at)

if count <= limit do
{:allow, count}
else
{:deny, expires_at - now}
end
end

@doc false
@spec inc(table :: atom(), key :: String.t(), scale :: integer(), increment :: integer()) ::
integer()
def inc(table, key, scale, increment) do
window = div(now(), scale)
full_key = {key, window}
expires_at = (window + 1) * scale
update_counter(table, full_key, increment, expires_at)
end

@doc false
@spec set(table :: atom(), key :: String.t(), scale :: integer(), count :: integer()) ::
integer()
def set(table, key, scale, count) do
window = div(now(), scale)
full_key = {key, window}
expires_at = (window + 1) * scale
update_counter(table, full_key, {2, 1, 0, count}, expires_at)
end

@doc false
@spec get(table :: atom(), key :: String.t(), scale :: integer()) :: integer()
def get(table, key, scale) do
window = div(now(), scale)
full_key = {key, window}
config = %{
table: table,
table_opts: algorithm.ets_opts(),
clean_period: clean_period,
key_older_than: key_older_than,
algorithm: algorithm
}

case :ets.lookup(table, full_key) do
[{_full_key, count, _expires_at}] -> count
[] -> 0
end
GenServer.start_link(__MODULE__, config, gen_opts)
end

@compile inline: [update_counter: 4]
defp update_counter(table, key, op, expires_at) do
def update_counter(table, key, op, expires_at) do
:ets.update_counter(table, key, op, {key, 0, expires_at})
end

@compile inline: [now: 0]
defp now do
def now do
System.system_time(:millisecond)
end

@impl GenServer
def init(config) do
:ets.new(config.table, [
:named_table,
:set,
:public,
{:read_concurrency, true},
{:write_concurrency, true},
{:decentralized_counters, true}
])
:ets.new(config.table, config.table_opts)

schedule(config.clean_period)
{:ok, config}
end

@impl GenServer
def handle_info(:clean, config) do
clean(config.table)
algorithm = config.algorithm
algorithm.clean(config)
schedule(config.clean_period)
{:noreply, config}
end

defp schedule(clean_period) do
Process.send_after(self(), :clean, clean_period)
end

defp clean(table) do
ms = [{{{:_, :_}, :_, :"$1"}, [], [{:<, :"$1", {:const, now()}}]}]
:ets.select_delete(table, ms)
end
end
Loading

0 comments on commit b2686bb

Please sign in to comment.