From a42e3b9a51897e85d9fc032553744b325732386d Mon Sep 17 00:00:00 2001 From: Arek Gil Date: Thu, 27 Oct 2022 19:16:33 +0200 Subject: [PATCH 1/6] Add lookup timeout for rpc process alive check (#1) Co-authored-by: Rafal Studnicki --- lib/horde/registry.ex | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/horde/registry.ex b/lib/horde/registry.ex index e6e28ac..f55cd0b 100644 --- a/lib/horde/registry.ex +++ b/lib/horde/registry.ex @@ -247,10 +247,10 @@ defmodule Horde.Registry do @doc "See `Registry.lookup/2`." def lookup({:via, _, {registry, name}}), do: lookup(registry, name) - def lookup(registry, key) when is_atom(registry) do + def lookup(registry, key, rpc_timeout \\ 5_000) when is_atom(registry) do with [{^key, member, {pid, value}}] <- :ets.lookup(keys_ets_table(registry), key), true <- member_in_cluster?(registry, member), - true <- process_alive?(pid) do + true <- process_alive?(pid, rpc_timeout) do [{pid, value}] else _ -> [] @@ -412,13 +412,13 @@ defmodule Horde.Registry do defp maybe_add_node_manager(children, _, _), do: children - defp process_alive?(pid) when node(pid) == node(), do: Process.alive?(pid) + defp process_alive?(pid, _) when node(pid) == node(), do: Process.alive?(pid) - defp process_alive?(pid) do + defp process_alive?(pid, rpc_timeout) do n = node(pid) Node.list() |> Enum.member?(n) && - :rpc.call(n, Process, :alive?, [pid]) + :rpc.call(n, Process, :alive?, [pid], rpc_timeout) end defp member_in_cluster?(registry, member) do From 197bfabb369b85c07844295e5f57188f18da5181 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Studnicki?= Date: Fri, 28 Oct 2022 15:04:04 +0200 Subject: [PATCH 2/6] Add process added to local Register synchronously (#2) --- lib/horde/registry_impl.ex | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/lib/horde/registry_impl.ex b/lib/horde/registry_impl.ex index cf0d4c7..afd44aa 100644 --- a/lib/horde/registry_impl.ex +++ b/lib/horde/registry_impl.ex @@ -211,6 +211,27 @@ defmodule Horde.RegistryImpl do defp process_diff(state, {:add, {:key, key}, {member, pid, value}}) do link_local_pid(pid) + case member == fully_qualified_name(state.name) do + true -> + state + + false -> + process_add(state, key, member, pid, value) + end + end + + defp process_diff(state, {:remove, {:key, key}}) do + unregister_local(state, key) + + state + end + + defp process_diff(state, {:add, {:registry, key}, value}) do + :ets.insert(state.registry_ets_table, {key, value}) + state + end + + defp process_add(state, key, member, pid, value) do add_key_to_pids_table(state, pid, key) with [{^key, _member, {other_pid, other_value}}] when other_pid != pid <- @@ -232,17 +253,6 @@ defmodule Horde.RegistryImpl do state end - defp process_diff(state, {:remove, {:key, key}}) do - unregister_local(state, key) - - state - end - - defp process_diff(state, {:add, {:registry, key}, value}) do - :ets.insert(state.registry_ets_table, {key, value}) - state - end - defp add_key_to_pids_table(state, pid, key) do case :ets.lookup(state.pids_ets_table, pid) do [] -> @@ -305,10 +315,13 @@ defmodule Horde.RegistryImpl do def handle_call({:register, key, value, pid}, _from, state) do Process.link(pid) + member = fully_qualified_name(state.name) + state = process_add(state, key, member, pid, value) + DeltaCrdt.put( crdt_name(state.name), {:key, key}, - {fully_qualified_name(state.name), pid, value}, + {member, pid, value}, :infinity ) From e09f7bad309b313e4eb29032e376cb3e5aa400e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Studnicki?= Date: Thu, 5 Jan 2023 17:22:05 +0100 Subject: [PATCH 3/6] Handle a race condition when a child was disowned after it was deleted (#3) --- lib/horde/dynamic_supervisor_impl.ex | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/lib/horde/dynamic_supervisor_impl.ex b/lib/horde/dynamic_supervisor_impl.ex index a39a520..858f4d2 100644 --- a/lib/horde/dynamic_supervisor_impl.ex +++ b/lib/horde/dynamic_supervisor_impl.ex @@ -230,17 +230,21 @@ defmodule Horde.DynamicSupervisorImpl do # TODO think of a better name than "disown_child_process" def handle_cast({:disown_child_process, child_id}, state) do - {{_, _, child_pid}, new_processes_by_id} = pop_item(state.processes_by_id, child_id) + case pop_item(state.processes_by_id, child_id) do + {{_, _, child_pid}, new_processes_by_id} -> + new_state = %{ + state + | processes_by_id: new_processes_by_id, + process_pid_to_id: delete_item(state.process_pid_to_id, child_pid), + local_process_count: state.local_process_count - 1 + } - new_state = %{ - state - | processes_by_id: new_processes_by_id, - process_pid_to_id: delete_item(state.process_pid_to_id, child_pid), - local_process_count: state.local_process_count - 1 - } + DeltaCrdt.delete(crdt_name(state.name), {:process, child_id}, :infinity) + {:noreply, new_state} - DeltaCrdt.delete(crdt_name(state.name), {:process, child_id}, :infinity) - {:noreply, new_state} + {nil, _} -> + {:noreply, state} + end end defp set_child_pid(state, child_id, new_child_pid) do From ad6c4408d5ee25267ed859606cd4d6a2de4b0ccd Mon Sep 17 00:00:00 2001 From: Arek Gil Date: Wed, 1 Feb 2023 20:13:58 +0100 Subject: [PATCH 4/6] Add a test for handover of many processes --- test/large_scale_test.exs | 86 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 test/large_scale_test.exs diff --git a/test/large_scale_test.exs b/test/large_scale_test.exs new file mode 100644 index 0000000..c9877b3 --- /dev/null +++ b/test/large_scale_test.exs @@ -0,0 +1,86 @@ +defmodule DynamicSupervisorTaintsTest do + require Logger + use ExUnit.Case + import Liveness + + setup do + n1 = :horde_1 + n2 = :horde_2 + n3 = :horde_3 + + {:ok, _} = + Horde.DynamicSupervisor.start_link( + name: n1, + strategy: :one_for_one, + delta_crdt_options: [sync_interval: 100] + ) + + {:ok, _} = + Horde.DynamicSupervisor.start_link( + name: n2, + strategy: :one_for_one, + delta_crdt_options: [sync_interval: 100] + ) + + {:ok, _} = + Horde.DynamicSupervisor.start_link( + name: n3, + strategy: :one_for_one, + delta_crdt_options: [sync_interval: 100] + ) + + Horde.Cluster.set_members(n1, [n1, n2, n3]) + + # give the processes a couple ms to sync up + Process.sleep(100) + + [n1: n1, n2: n2, n3: n3] + end + + test "migration after shutdown", %{ + n1: n1, + n2: n2, + n3: n3 + } do + proc_count = 10_000 + + for i <- 1..proc_count do + sup = Enum.random([n1, n2, n3]) + child_spec = make_child_spec(i) + + {:ok, _} = Horde.DynamicSupervisor.start_child(sup, child_spec) + end + + eventually(fn -> + Horde.DynamicSupervisor.count_children(n1).active == proc_count + end) + + count1 = count_local_children(n1) |> IO.inspect(label: "count1") + count2 = count_local_children(n2) |> IO.inspect(label: "count2") + count3 = count_local_children(n2) |> IO.inspect(label: "count2") + + Process.flag(:trap_exit, true) + Horde.DynamicSupervisor.stop(n3, :shutdown) + + eventually( + fn -> + # count1 = count_local_children(n1) |> IO.inspect(label: "count1") + # count2 = count_local_children(n2) |> IO.inspect(label: "count2") + + assert count1 + count2 == proc_count + end, + 250, + 100 + ) + end + + defp count_local_children(dynamic_sup) do + proc_sup_name = :"#{dynamic_sup}.ProcessesSupervisor" + Supervisor.count_children(proc_sup_name).active + end + + defp make_child_spec(i) do + random_state = :rand.uniform(100_000_000) + %{id: i, start: {Agent, :start_link, [fn -> random_state end]}} + end +end From f7a6d3f1b99e259eacaa25e06bcc66819a1e108e Mon Sep 17 00:00:00 2001 From: Arek Gil Date: Wed, 1 Feb 2023 20:16:23 +0100 Subject: [PATCH 5/6] Fix assertion --- test/large_scale_test.exs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/large_scale_test.exs b/test/large_scale_test.exs index c9877b3..26c51f0 100644 --- a/test/large_scale_test.exs +++ b/test/large_scale_test.exs @@ -64,8 +64,8 @@ defmodule DynamicSupervisorTaintsTest do eventually( fn -> - # count1 = count_local_children(n1) |> IO.inspect(label: "count1") - # count2 = count_local_children(n2) |> IO.inspect(label: "count2") + count1 = count_local_children(n1) |> IO.inspect(label: "count1") + count2 = count_local_children(n2) |> IO.inspect(label: "count2") assert count1 + count2 == proc_count end, From 6e5eff3506cd2155d1583fb24caa430940ea4e72 Mon Sep 17 00:00:00 2001 From: Arek Gil Date: Wed, 1 Feb 2023 20:18:11 +0100 Subject: [PATCH 6/6] Decrease sync interval --- test/large_scale_test.exs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/large_scale_test.exs b/test/large_scale_test.exs index 26c51f0..c30a0fb 100644 --- a/test/large_scale_test.exs +++ b/test/large_scale_test.exs @@ -12,21 +12,21 @@ defmodule DynamicSupervisorTaintsTest do Horde.DynamicSupervisor.start_link( name: n1, strategy: :one_for_one, - delta_crdt_options: [sync_interval: 100] + delta_crdt_options: [sync_interval: 20] ) {:ok, _} = Horde.DynamicSupervisor.start_link( name: n2, strategy: :one_for_one, - delta_crdt_options: [sync_interval: 100] + delta_crdt_options: [sync_interval: 20] ) {:ok, _} = Horde.DynamicSupervisor.start_link( name: n3, strategy: :one_for_one, - delta_crdt_options: [sync_interval: 100] + delta_crdt_options: [sync_interval: 20] ) Horde.Cluster.set_members(n1, [n1, n2, n3])