Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when bcasttype = all, master node down and up, E.election.monitored elem... #6

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions src/gen_leader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
%%%
%%%
-module(gen_leader).
-compile([{parse_transform, lager_transform}]).


%% Time between rounds of query from the leader
-define(TAU,5000).
Expand Down Expand Up @@ -433,19 +435,24 @@ init_it(Starter,Parent,Name,Mod,{CandidateNodes,OptArgs,Arg},Options) ->
{{ok, State}, true, false} ->
Server = #server{parent = Parent,mod = Mod,
state = State,debug = Debug},
lager:info("gen_leader_a_1"),
Incarn = incarnation(VarDir, Name, node()),
lager:info("gen_leader_a_2"),
NewE = startStage1(Election#election{incarn = Incarn}, Server),
lager:info("gen_leader_a_3"),
proc_lib:init_ack(Starter, {ok, self()}),

lager:info("gen_leader_a_4"),
%% handle the case where there's only one candidate worker and we can't
%% rely on DOWN messages to trigger the elected() call because we never get
%% a DOWN for ourselves
case CandidateNodes =:= [node()] of
true ->
%% there's only one candidate leader; us
lager:info("gen_leader_a_5"),
hasBecomeLeader(NewE,Server,{init});
false ->
%% more than one candidate worker, continue as normal
lager:info("gen_leader_a_6"),
safe_loop(#server{parent = Parent,mod = Mod,
state = State,debug = Debug},
candidate, NewE,{init})
Expand Down Expand Up @@ -1214,10 +1221,13 @@ startStage1(E, Server) ->
nextel = E#election.nextel + 1,
down = [],
status = elec1},
lager:info("gen_leader_startStage1_1:~p",[NewE]),
case NodePos of
1 ->
lager:info("gen_leader_startStage1_2"),
startStage2(NewE, Server);
_ ->
lager:info("gen_leader_startStage1_3"),
mon_nodes(NewE, lesser(node(),E#election.candidate_nodes), Server)
end.

Expand Down Expand Up @@ -1331,6 +1341,7 @@ broadcast(Msg, #election{monitored = Monitored} = E) ->
%% This function is used for broadcasts,
%% and we make sure only to broadcast to already known nodes.
ToNodes = [N || {_,N} <- Monitored],
%% lager:info("gen_leader_broadcast_1:~p",[Monitored]),
broadcast(Msg, ToNodes, E).

broadcast({from_leader, Msg}, ToNodes, E) ->
Expand Down Expand Up @@ -1398,18 +1409,32 @@ mon_nodes(E,Nodes,Server) ->
E
end,
FromNode = node(),
lager:info("gen_leader_mon_nodes_1:~p",[FromNode]),
lists:foldl(
fun(ToNode,El) ->
Pid = {El#election.name, ToNode},
lager:info("gen_leader_mon_nodes_2:~p,~p",[Pid,FromNode]),
Pid ! {heartbeat, FromNode},
mon_node(El, Pid, Server)
end,E1,Nodes -- [node()]).

%% Star monitoring one Process
%% Start monitoring one Process
mon_node(E,Proc,Server) ->
{Ref,Node} = do_monitor(Proc, Server),
E#election{monitored = [{Ref,Node} | E#election.monitored]}.

try
lager:info("gen_leader_mon_node_1"),
{Ref,Node} = do_monitor(Proc, Server),
%% E#election{monitored = [{Ref,Node} | E#election.monitored]}.
A = {Ref,Node},
case lists:member(A,E#election.monitored) of
true ->
E;
false ->
E#election{monitored = [A | E#election.monitored]}
end
catch _:_->
lager:info("gen_leader_mon_node_2"),
E
end.

spawn_monitor_proc() ->
Parent = self(),
Expand Down Expand Up @@ -1444,7 +1469,8 @@ mon_handle_req({monitor, P}, From, Refs) ->
Pid when is_pid(Pid) -> node(Pid)
end,
case lists:keysearch(Node, 2, Refs) of
{value, {_, Ref}} ->
%% {value, {_, Ref}} ->
{value, {Ref,_}} ->
mon_reply(From, {Ref,Node}),
Refs;
false ->
Expand Down