Skip to content

Commit

Permalink
Add PartitionSupervisor.resize!/2
Browse files Browse the repository at this point in the history
  • Loading branch information
josevalim committed Oct 29, 2024
1 parent 1572aac commit 4b72da2
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 41 deletions.
144 changes: 103 additions & 41 deletions lib/elixir/lib/partition_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ defmodule PartitionSupervisor do
raise "the call to the function in :with_arguments must return a list, got: #{inspect(args)}"
end

start = {__MODULE__, :start_child, [mod, fun, args, name, partition]}
start = {__MODULE__, :start_child, [mod, fun, args, partition]}
Map.merge(map, %{id: partition, start: start, modules: modules})
end

Expand All @@ -282,48 +282,112 @@ defmodule PartitionSupervisor do
end

@doc false
def start_child(mod, fun, args, name, partition) do
def start_child(mod, fun, args, partition) do
case apply(mod, fun, args) do
{:ok, pid} ->
register_child(name, partition, pid)
register_child(partition, pid)
{:ok, pid}

{:ok, pid, info} ->
register_child(name, partition, pid)
register_child(partition, pid)
{:ok, pid, info}

other ->
other
end
end

defp register_child(name, partition, pid) when is_atom(name) do
:ets.insert(name, {partition, pid})
end

defp register_child({:via, _, _}, partition, pid) do
Registry.register(@registry, {self(), partition}, pid)
defp register_child(partition, pid) do
:ets.insert(Process.get(:ets_table), {partition, pid})
end

@impl true
def init({name, partitions, children, init_opts}) do
init_partitions(name, partitions)
table = init_table(name)
:ets.insert(table, {:partitions, partitions, partitions})
Process.put(:ets_table, table)
Supervisor.init(children, Keyword.put_new(init_opts, :strategy, :one_for_one))
end

defp init_partitions(name, partitions) when is_atom(name) do
:ets.new(name, [:set, :named_table, :protected, read_concurrency: true])
:ets.insert(name, {:partitions, partitions})
defp init_table(name) when is_atom(name) do
:ets.new(name, [:set, :named_table, :public, read_concurrency: true])
end

defp init_partitions({:via, _, _}, partitions) do
child_spec = {Registry, keys: :unique, name: @registry}
defp init_table({:via, _, _}) do
table = :ets.new(__MODULE__, [:set, :public, read_concurrency: true])
ensure_registry()
Registry.register(@registry, self(), table)
table
end

if !Process.whereis(@registry) do
Supervisor.start_child(:elixir_sup, child_spec)
defp ensure_registry do
if Process.whereis(@registry) == nil do
Supervisor.start_child(:elixir_sup, {Registry, keys: :unique, name: @registry})
end
end

@doc """
Resizes the number of partitions in the PartitionSupervisor.
Registry.register(@registry, self(), partitions)
This is done by starting or stopping a given number of
partitions in the supervisor. All of the child specifications
are kept in the `PartitionSupervisor` itself.
The final number of partitions cannot be less than zero and
cannot be more than the number of partitions the supervisor
started with.
"""
@doc since: "1.18.0"
@spec resize!(name(), non_neg_integer()) :: non_neg_integer()
def resize!(name, partitions) when is_integer(partitions) do
supervisor =
GenServer.whereis(name) || exit({:noproc, {__MODULE__, :resize!, [name, partitions]}})

table = table(name)
ensure_registry()

Registry.lock(@registry, supervisor, fn ->
case :ets.lookup(table, :partitions) do
[{:partitions, _current, max}] when partitions not in 0..max//1 ->
raise ArgumentError,
"the number of partitions to resize to must be a number between 0 and #{max}, got: #{partitions}"

[{:partitions, current, max}] when partitions > current ->
for id <- current..(partitions - 1) do
case Supervisor.restart_child(supervisor, id) do
{:ok, _} ->
:ok

{:ok, _, _} ->
:ok

{:error, reason} ->
raise "cannot restart partition #{id} of PartitionSupervisor #{inspect(name)} due to reason #{inspect(reason)}"
end
end

:ets.insert(table, {:partitions, partitions, max})
current

[{:partitions, current, max}] when partitions < current ->
:ets.insert(table, {:partitions, partitions, max})

for id <- partitions..(current - 1) do
case Supervisor.terminate_child(supervisor, id) do
:ok ->
:ok

{:error, reason} ->
raise "cannot terminate partition #{id} of PartitionSupervisor #{inspect(name)} due to reason #{inspect(reason)}"
end
end

current

[{:partitions, current, _max}] ->
current
end
end)
end

@doc """
Expand All @@ -332,24 +396,27 @@ defmodule PartitionSupervisor do
@doc since: "1.14.0"
@spec partitions(name()) :: pos_integer()
def partitions(name) do
{_name, partitions} = name_partitions(name)
partitions
name |> table() |> partitions(name)
end

# For whereis_name, we want to lookup on GenServer.whereis/1
# just once, so we lookup the name and partitions together.
defp name_partitions(name) when is_atom(name) do
defp partitions(table, name) do
try do
{name, :ets.lookup_element(name, :partitions, 2)}
:ets.lookup_element(table, :partitions, 2)
rescue
_ -> exit({:noproc, {__MODULE__, :partitions, [name]}})
end
end

defp name_partitions(name) when is_tuple(name) do
defp table(name) when is_atom(name) do
name
end

# For whereis_name, we want to lookup on GenServer.whereis/1
# just once, so we lookup the name and partitions together.
defp table(name) when is_tuple(name) do
with pid when is_pid(pid) <- GenServer.whereis(name),
[name_partitions] <- Registry.lookup(@registry, pid) do
name_partitions
[{_, table}] <- Registry.lookup(@registry, pid) do
table
else
_ -> exit({:noproc, {__MODULE__, :partitions, [name]}})
end
Expand All @@ -374,7 +441,7 @@ defmodule PartitionSupervisor do
@doc since: "1.14.0"
@spec which_children(name()) :: [
# Inlining [module()] | :dynamic here because :supervisor.modules() is not exported
{:undefined, pid | :restarting, :worker | :supervisor, [module()] | :dynamic}
{integer(), pid | :restarting, :worker | :supervisor, [module()] | :dynamic}
]
def which_children(name) when is_atom(name) or elem(name, 0) == :via do
Supervisor.which_children(name)
Expand Down Expand Up @@ -428,22 +495,17 @@ defmodule PartitionSupervisor do

@doc false
def whereis_name({name, key}) when is_atom(name) or is_tuple(name) do
{name, partitions} = name_partitions(name)
table = table(name)
partitions = partitions(table, name)

if partitions == 0 do
raise ArgumentError, "PartitionSupervisor #{inspect(name)} has zero partitions"
end

partition =
if is_integer(key), do: rem(abs(key), partitions), else: :erlang.phash2(key, partitions)

whereis_name(name, partition)
end

defp whereis_name(name, partition) when is_atom(name) do
:ets.lookup_element(name, partition, 2)
end

defp whereis_name(name, partition) when is_pid(name) do
@registry
|> Registry.values({name, partition}, name)
|> List.first(:undefined)
:ets.lookup_element(table, partition, 2)
end

@doc false
Expand Down
59 changes: 59 additions & 0 deletions lib/elixir/test/elixir/partition_supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,65 @@ defmodule PartitionSupervisorTest do
end
end

describe "resize!/1" do
test "resizes the number of children", config do
{:ok, _} =
PartitionSupervisor.start_link(
child_spec: {Agent, fn -> %{} end},
name: config.test,
partitions: 8
)

for range <- [8..0//1, 0..8//1, Enum.shuffle(0..8)], i <- range do
PartitionSupervisor.resize!(config.test, i)
assert PartitionSupervisor.partitions(config.test) == i

assert PartitionSupervisor.count_children(config.test) ==
%{active: i, specs: 8, supervisors: 0, workers: 8}

# Assert that we can still query across all range,
# but they are routed properly, as long as we have
# a single partition.
children =
for partition <- 0..7, i != 0, uniq: true do
GenServer.whereis({:via, PartitionSupervisor, {config.test, partition}})
end

assert length(children) == i
end
end

test "raises on lookup after resizing to zero", config do
{:ok, _} =
PartitionSupervisor.start_link(
child_spec: {Agent, fn -> %{} end},
name: config.test,
partitions: 8
)

assert PartitionSupervisor.resize!(config.test, 0) == 8

assert_raise ArgumentError, ~r"has zero partitions", fn ->
GenServer.whereis({:via, PartitionSupervisor, {config.test, 0}})
end

assert PartitionSupervisor.resize!(config.test, 8) == 0
end

test "raises if trying to increase the number of partitions", config do
{:ok, _} =
PartitionSupervisor.start_link(
child_spec: {Agent, fn -> %{} end},
name: config.test,
partitions: 8
)

assert_raise ArgumentError,
"the number of partitions to resize to must be a number between 0 and 8, got: 9",
fn -> PartitionSupervisor.resize!(config.test, 9) end
end
end

describe "which_children/1" do
test "returns all partitions", config do
{:ok, _} =
Expand Down

0 comments on commit 4b72da2

Please sign in to comment.