Skip to content

Commit

Permalink
Merge pull request #720 from bradhanks/main
Browse files Browse the repository at this point in the history
strictly not equal
  • Loading branch information
danschultzer authored Feb 7, 2024
2 parents 830096f + 481fa49 commit c0c768d
Showing 1 changed file with 50 additions and 30 deletions.
80 changes: 50 additions & 30 deletions lib/pow/store/backend/mnesia_cache/unsplit.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ defmodule Pow.Store.Backend.MnesiaCache.Unsplit do
{:noreply, state}
end

def handle_info({:mnesia_system_event, {:inconsistent_database, _context, node}}, %{config: config} = state) do
def handle_info(
{:mnesia_system_event, {:inconsistent_database, _context, node}},
%{config: config} = state
) do
:global.trans({__MODULE__, self()}, fn -> autoheal(node, config) end)

{:noreply, state}
Expand All @@ -123,18 +126,14 @@ defmodule Pow.Store.Backend.MnesiaCache.Unsplit do
def handle_info({:nodedown, _node}, state), do: {:noreply, state}

defp autoinit(node, config) do
cond do
Config.get(config, :auto_initialize_cluster, true) != true ->
:ok

node in :mnesia.system_info(:db_nodes) ->
:ok

is_nil(:rpc.call(node, Process, :whereis, [Pow.Store.Backend.MnesiaCache])) ->
:ok

true ->
do_autoinit(node, config)
with true <- Config.get(config, :auto_initialize_cluster, true),
# The node must not already be in the cluster
false <- node in :mnesia.system_info(:db_nodes),
# MnesiaCache must run on the node
false <- is_nil(:rpc.call(node, Process, :whereis, [Pow.Store.Backend.MnesiaCache])) do
do_autoinit(node, config)
else
_any -> :ok
end
end

Expand All @@ -144,28 +143,39 @@ defmodule Pow.Store.Backend.MnesiaCache.Unsplit do

case {local_cluster_nodes, remote_cluster_nodes} do
{[_local_node], [_remote_node]} ->
Logger.info("Connection to #{inspect node} established with no mnesia cluster found for either #{inspect node()} or #{inspect node}")
Logger.info(
"Connection to #{inspect(node)} established with no mnesia cluster found for either #{inspect(node())} or #{inspect(node)}"
)

{local_node_uptime, _} = :erlang.statistics(:wall_clock)
{remote_node_uptime, _} = :rpc.call(node, :erlang, :statistics, [:wall_clock])

if local_node_uptime < remote_node_uptime do
reset_node(node, config)
else
Logger.info("Skipping reset for #{inspect node()} as #{inspect node} is the most recent node")
Logger.info(
"Skipping reset for #{inspect(node())} as #{inspect(node)} is the most recent node"
)
end

{[_local_node], _remote_cluster_nodes} ->
Logger.info("Connection to #{inspect node} established with no mnesia cluster running on #{inspect node()}")
Logger.info(
"Connection to #{inspect(node)} established with no mnesia cluster running on #{inspect(node())}"
)

reset_node(node, config)

{_local_cluster_nodes, _remote_cluster_node_or_nodes} ->
Logger.info("Connection to #{inspect node} established with #{inspect node()} already being part of a mnesia cluster")
Logger.info(
"Connection to #{inspect(node)} established with #{inspect(node())} already being part of a mnesia cluster"
)
end
end

defp reset_node(node, _config) do
Logger.warning("Resetting mnesia on #{inspect node()} and restarting the mnesia cache to connect to #{inspect node}")
Logger.warning(
"Resetting mnesia on #{inspect(node())} and restarting the mnesia cache to connect to #{inspect(node)}"
)

:mnesia.stop()
:mnesia.delete_schema([node()])
Expand All @@ -178,12 +188,14 @@ defmodule Pow.Store.Backend.MnesiaCache.Unsplit do
|> Enum.member?(node)
|> case do
true ->
Logger.info("The node #{inspect node} has already been healed and joined the mnesia cluster")
Logger.info(
"The node #{inspect(node)} has already been healed and joined the mnesia cluster"
)

:ok

false ->
Logger.warning("Detected a netsplit in the mnesia cluster with node #{inspect node}")
Logger.warning("Detected a netsplit in the mnesia cluster with node #{inspect(node)}")

heal(node, config)
end
Expand All @@ -200,11 +212,11 @@ defmodule Pow.Store.Backend.MnesiaCache.Unsplit do
|> :mnesia.system_info()
|> List.delete(:schema)
|> List.foldl([], fn table, acc ->
nodes = get_all_nodes_for_table(table)
nodes = get_all_nodes_for_table(table)
is_shared = node in nodes && node() in nodes

case is_shared do
true -> [table | acc]
true -> [table | acc]
false -> acc
end
end)
Expand All @@ -219,8 +231,8 @@ defmodule Pow.Store.Backend.MnesiaCache.Unsplit do
defp force_reload(tables, node, config) do
flushable_tables =
case Config.get(config, :flush_tables, false) do
false -> [@mnesia_cache_tab]
:all -> tables
false -> [@mnesia_cache_tab]
:all -> tables
tables -> Enum.uniq([@mnesia_cache_tab | tables])
end

Expand All @@ -233,7 +245,9 @@ defmodule Pow.Store.Backend.MnesiaCache.Unsplit do
do_force_reload(tables, node)

unflushable_tables ->
Logger.error("Can't force reload unexpected tables #{inspect unflushable_tables} to heal #{inspect node}")
Logger.error(
"Can't force reload unexpected tables #{inspect(unflushable_tables)} to heal #{inspect(node)}"
)

{:error, {:unexpected_tables, tables}}
end
Expand All @@ -244,25 +258,31 @@ defmodule Pow.Store.Backend.MnesiaCache.Unsplit do

for node <- nodes do
:stopped = :rpc.call(node, :mnesia, :stop, [])
for table <- tables, do: :ok = :rpc.call(node, :mnesia, :set_master_nodes, [table, master_nodes])

for table <- tables do
:ok = :rpc.call(node, :mnesia, :set_master_nodes, [table, master_nodes])
end

:ok = :rpc.block_call(node, :mnesia, :start, [])
:ok = :rpc.call(node, :mnesia, :wait_for_tables, [tables, :timer.seconds(15)])

Logger.info("The node #{inspect node} has been healed and joined the mnesia cluster #{inspect master_nodes}")
Logger.info(
"The node #{inspect(node)} has been healed and joined the mnesia cluster #{inspect(master_nodes)}"
)
end

:ok
end

defp sorted_cluster_islands(node) do
island_a = :mnesia.system_info(:running_db_nodes)
island_b = :rpc.call(node, :mnesia, :system_info, [:running_db_nodes])
island_a = :mnesia.system_info(:running_db_nodes)
island_b = :rpc.call(node, :mnesia, :system_info, [:running_db_nodes])

Enum.sort([island_a, island_b], &older?/2)
end

defp older?(island_a, island_b) do
all_nodes = get_all_nodes_for_table(@mnesia_cache_tab)
all_nodes = get_all_nodes_for_table(@mnesia_cache_tab)
island_nodes = Enum.concat(island_a, island_b)

oldest_node = all_nodes |> Enum.reverse() |> Enum.find(&(&1 in island_nodes))
Expand Down

0 comments on commit c0c768d

Please sign in to comment.