Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a test for handover of many processes #4

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions lib/horde/dynamic_supervisor_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -230,17 +230,21 @@ defmodule Horde.DynamicSupervisorImpl do

# TODO think of a better name than "disown_child_process"
def handle_cast({:disown_child_process, child_id}, state) do
{{_, _, child_pid}, new_processes_by_id} = pop_item(state.processes_by_id, child_id)
case pop_item(state.processes_by_id, child_id) do
{{_, _, child_pid}, new_processes_by_id} ->
new_state = %{
state
| processes_by_id: new_processes_by_id,
process_pid_to_id: delete_item(state.process_pid_to_id, child_pid),
local_process_count: state.local_process_count - 1
}

new_state = %{
state
| processes_by_id: new_processes_by_id,
process_pid_to_id: delete_item(state.process_pid_to_id, child_pid),
local_process_count: state.local_process_count - 1
}
DeltaCrdt.delete(crdt_name(state.name), {:process, child_id}, :infinity)
{:noreply, new_state}

DeltaCrdt.delete(crdt_name(state.name), {:process, child_id}, :infinity)
{:noreply, new_state}
{nil, _} ->
{:noreply, state}
end
end

defp set_child_pid(state, child_id, new_child_pid) do
Expand Down
10 changes: 5 additions & 5 deletions lib/horde/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ defmodule Horde.Registry do
@doc "See `Registry.lookup/2`."
def lookup({:via, _, {registry, name}}), do: lookup(registry, name)

def lookup(registry, key) when is_atom(registry) do
def lookup(registry, key, rpc_timeout \\ 5_000) when is_atom(registry) do
with [{^key, member, {pid, value}}] <- :ets.lookup(keys_ets_table(registry), key),
true <- member_in_cluster?(registry, member),
true <- process_alive?(pid) do
true <- process_alive?(pid, rpc_timeout) do
[{pid, value}]
else
_ -> []
Expand Down Expand Up @@ -412,13 +412,13 @@ defmodule Horde.Registry do

defp maybe_add_node_manager(children, _, _), do: children

defp process_alive?(pid) when node(pid) == node(), do: Process.alive?(pid)
defp process_alive?(pid, _) when node(pid) == node(), do: Process.alive?(pid)

defp process_alive?(pid) do
defp process_alive?(pid, rpc_timeout) do
n = node(pid)

Node.list() |> Enum.member?(n) &&
:rpc.call(n, Process, :alive?, [pid])
:rpc.call(n, Process, :alive?, [pid], rpc_timeout)
end

defp member_in_cluster?(registry, member) do
Expand Down
37 changes: 25 additions & 12 deletions lib/horde/registry_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,27 @@ defmodule Horde.RegistryImpl do
defp process_diff(state, {:add, {:key, key}, {member, pid, value}}) do
link_local_pid(pid)

case member == fully_qualified_name(state.name) do
true ->
state

false ->
process_add(state, key, member, pid, value)
end
end

defp process_diff(state, {:remove, {:key, key}}) do
unregister_local(state, key)

state
end

defp process_diff(state, {:add, {:registry, key}, value}) do
:ets.insert(state.registry_ets_table, {key, value})
state
end

defp process_add(state, key, member, pid, value) do
add_key_to_pids_table(state, pid, key)

with [{^key, _member, {other_pid, other_value}}] when other_pid != pid <-
Expand All @@ -232,17 +253,6 @@ defmodule Horde.RegistryImpl do
state
end

defp process_diff(state, {:remove, {:key, key}}) do
unregister_local(state, key)

state
end

defp process_diff(state, {:add, {:registry, key}, value}) do
:ets.insert(state.registry_ets_table, {key, value})
state
end

defp add_key_to_pids_table(state, pid, key) do
case :ets.lookup(state.pids_ets_table, pid) do
[] ->
Expand Down Expand Up @@ -305,10 +315,13 @@ defmodule Horde.RegistryImpl do
def handle_call({:register, key, value, pid}, _from, state) do
Process.link(pid)

member = fully_qualified_name(state.name)
state = process_add(state, key, member, pid, value)

DeltaCrdt.put(
crdt_name(state.name),
{:key, key},
{fully_qualified_name(state.name), pid, value},
{member, pid, value},
:infinity
)

Expand Down
86 changes: 86 additions & 0 deletions test/large_scale_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
defmodule DynamicSupervisorTaintsTest do
require Logger
use ExUnit.Case
import Liveness

setup do
n1 = :horde_1
n2 = :horde_2
n3 = :horde_3

{:ok, _} =
Horde.DynamicSupervisor.start_link(
name: n1,
strategy: :one_for_one,
delta_crdt_options: [sync_interval: 20]
)

{:ok, _} =
Horde.DynamicSupervisor.start_link(
name: n2,
strategy: :one_for_one,
delta_crdt_options: [sync_interval: 20]
)

{:ok, _} =
Horde.DynamicSupervisor.start_link(
name: n3,
strategy: :one_for_one,
delta_crdt_options: [sync_interval: 20]
)

Horde.Cluster.set_members(n1, [n1, n2, n3])

# give the processes a couple ms to sync up
Process.sleep(100)

[n1: n1, n2: n2, n3: n3]
end

test "migration after shutdown", %{
n1: n1,
n2: n2,
n3: n3
} do
proc_count = 10_000

for i <- 1..proc_count do
sup = Enum.random([n1, n2, n3])
child_spec = make_child_spec(i)

{:ok, _} = Horde.DynamicSupervisor.start_child(sup, child_spec)
end

eventually(fn ->
Horde.DynamicSupervisor.count_children(n1).active == proc_count
end)

count1 = count_local_children(n1) |> IO.inspect(label: "count1")
count2 = count_local_children(n2) |> IO.inspect(label: "count2")
count3 = count_local_children(n2) |> IO.inspect(label: "count2")

Process.flag(:trap_exit, true)
Horde.DynamicSupervisor.stop(n3, :shutdown)

eventually(
fn ->
count1 = count_local_children(n1) |> IO.inspect(label: "count1")
count2 = count_local_children(n2) |> IO.inspect(label: "count2")

assert count1 + count2 == proc_count
end,
250,
100
)
end

defp count_local_children(dynamic_sup) do
proc_sup_name = :"#{dynamic_sup}.ProcessesSupervisor"
Supervisor.count_children(proc_sup_name).active
end

defp make_child_spec(i) do
random_state = :rand.uniform(100_000_000)
%{id: i, start: {Agent, :start_link, [fn -> random_state end]}}
end
end