Skip to content

Commit

Permalink
expand state
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Valiushko committed Jun 30, 2023
1 parent 9be1971 commit 484c209
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 29 deletions.
1 change: 0 additions & 1 deletion src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,6 @@ add_member(ServerLoc, ServerId, Timeout) ->
{'$ra_join', ServerId, after_log_append},
Timeout).


%% @doc Removes a server from the cluster's membership configuration.
%% This function returns after appending a cluster membership change
%% command to the log.
Expand Down
6 changes: 6 additions & 0 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@
suspended |
disconnected.

-type ra_voter_status() :: yes | {no, ra_nonvoter_reason()}.

-type ra_nonvoter_reason() :: #{target := ra_index()}.

-type ra_peer_state() :: #{next_index := non_neg_integer(),
match_index := non_neg_integer(),
query_index := non_neg_integer(),
% the commit index last sent
% used for evaluating pipeline status
commit_index_sent := non_neg_integer(),
%% whether the peer is part of the consensus
voter := ra_voter_status(),
%% indicates that a snapshot is being sent
%% to the peer
status := ra_peer_status()}.
Expand Down
4 changes: 3 additions & 1 deletion src/ra_directory.erl
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,13 @@ overview(System) when is_atom(System) ->
States = maps:from_list(ets:tab2list(ra_state)),
Snaps = maps:from_list(ets:tab2list(ra_log_snapshot_state)),
lists:foldl(fun ({UId, Pid, Parent, ServerName, ClusterName}, Acc) ->
{State, Voter} = maps:get(ServerName, States, {undefined, undefined}),
Acc#{ServerName =>
#{uid => UId,
pid => Pid,
parent => Parent,
state => maps:get(ServerName, States, undefined),
state => State,
voter => Voter,
cluster_name => ClusterName,
snapshot_state => maps:get(UId, Snaps,
undefined)}}
Expand Down
115 changes: 109 additions & 6 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ init(#{id := Id,
cluster_name := _ClusterName,
initial_members := InitialNodes,
log_init_args := LogInitArgs,
tick_timeout := Timeout,
machine := MachineConf} = Config) ->
SystemConfig = maps:get(system_config, Config,
ra_system:default_config()),
Expand Down Expand Up @@ -315,6 +316,7 @@ init(#{id := Id,
uid = UId,
log_id = LogId,
metrics_key = MetricKey,
tick_timeout = Timeout,
machine = Machine,
machine_version = LatestMacVer,
machine_versions = [{SnapshotIdx, MacVer}],
Expand Down Expand Up @@ -394,11 +396,16 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
Peer = Peer0#{match_index => max(MI, LastIdx),
next_index => max(NI, NextIdx)},
State1 = put_peer(PeerId, Peer, State0),
{State2, Effects0} = evaluate_quorum(State1, []),

Effects00 = maybe_promote_voter(PeerId, State1, []),

{State2, Effects0} = evaluate_quorum(State1, Effects00),

{State, Effects1} = process_pending_consistent_queries(State2,
Effects0),

Effects = [{next_event, info, pipeline_rpcs} | Effects1],

case State of
#{cluster := #{Id := _}} ->
% leader is in the cluster
Expand Down Expand Up @@ -776,7 +783,7 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true},
NewVotes = Votes + 1,
?DEBUG("~ts: vote granted for term ~b votes ~b",
[LogId, Term, NewVotes]),
case trunc(maps:size(Nodes) / 2) + 1 of
case required_quorum(Nodes) of
NewVotes ->
{State1, Effects} = make_all_rpcs(initialise_peers(State0)),
Noop = {noop, #{ts => erlang:system_time(millisecond)},
Expand Down Expand Up @@ -922,7 +929,7 @@ handle_pre_vote(#pre_vote_result{term = Term, vote_granted = true,
[LogId, Token, Term, Votes + 1]),
NewVotes = Votes + 1,
State = update_term(Term, State0),
case trunc(maps:size(Nodes) / 2) + 1 of
case required_quorum(Nodes) of
NewVotes ->
call_for_election(candidate, State);
_ ->
Expand Down Expand Up @@ -1103,8 +1110,16 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
% simply forward all other events to ra_log
{Log, Effects} = ra_log:handle_event(Evt, Log0),
{follower, State#{log => Log}, Effects};
handle_follower(#pre_vote_rpc{},
#{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) ->
?DEBUG("~w: follower ignored pre_vote_rpc, non-voter: ~p", [LogId, Voter]),
{follower, State, []};
handle_follower(#pre_vote_rpc{} = PreVote, State) ->
process_pre_vote(follower, PreVote, State);
handle_follower(#request_vote_rpc{},
#{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) ->
?DEBUG("~w: follower ignored request_vote_rpc, non-voter: ~p", [LogId, Voter]),
{follower, State, []};
handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term},
#{current_term := Term, voted_for := VotedFor,
cfg := #cfg{log_id = LogId}} = State)
Expand Down Expand Up @@ -1202,6 +1217,11 @@ handle_follower(#append_entries_reply{}, State) ->
%% handle to avoid logging as unhandled
%% could receive a lot of these shortly after standing down as leader
{follower, State, []};
handle_follower(election_timeout,
#{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) ->
?DEBUG("~w: follower ignored election_timeout, non-voter: ~p",
[LogId, Voter]),
{follower, State, []};
handle_follower(election_timeout, State) ->
call_for_election(pre_vote, State);
handle_follower(try_become_leader, State) ->
Expand Down Expand Up @@ -1369,12 +1389,14 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
last_applied,
cluster,
leader_id,
voter,
voted_for,
cluster_change_permitted,
cluster_index_term,
query_index
], State),
O = maps:merge(O0, cfg_to_map(Cfg)),
O1 = O0#{voter => maps:get(voter, O0, yes)}, % implicit voter for initial leaders
O = maps:merge(O1, cfg_to_map(Cfg)),
LogOverview = ra_log:overview(Log),
MacOverview = ra_machine:overview(MacMod, MacState),
O#{log => LogOverview,
Expand Down Expand Up @@ -2087,6 +2109,7 @@ new_peer() ->
match_index => 0,
commit_index_sent => 0,
query_index => 0,
voter => yes,
status => normal}.

new_peer_with(Map) ->
Expand Down Expand Up @@ -2318,6 +2341,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
[log_id(State0), maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
voter => voter_status(id(State0), NewCluster),
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
_ ->
Expand Down Expand Up @@ -2450,16 +2474,34 @@ append_log_leader({CmdTag, _, _, _},
when CmdTag == '$ra_join' orelse
CmdTag == '$ra_leave' ->
{not_appended, cluster_change_not_permitted, State};
append_log_leader({'$ra_join', From, #{node := JoiningNode, voter := Voter}, ReplyMode},
State = #{cluster := OldCluster}) ->
case OldCluster of
#{JoiningNode := #{voter := Voter}} ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State};
#{JoiningNode := Peer} ->
% Update member status.
Cluster = OldCluster#{JoiningNode => Peer#{voter => Voter}},
append_cluster_change(Cluster, From, ReplyMode, State);
_ ->
% Insert new member.
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => Voter})},
append_cluster_change(Cluster, From, ReplyMode, State)
end;
append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
State = #{cluster := OldCluster}) ->
% Legacy $ra_join, join as future voter iff no such member in the cluster.
case OldCluster of
#{JoiningNode := _} ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State};
_ ->
Cluster = OldCluster#{JoiningNode => new_peer()},
append_cluster_change(Cluster, From, ReplyMode, State)
append_log_leader({'$ra_join', From,
#{node => JoiningNode, voter => new_nonvoter(State)},
ReplyMode}, State)
end;
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
State = #{cfg := #cfg{log_id = LogId},
Expand Down Expand Up @@ -2501,6 +2543,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
State) ->
State#{cluster => Cluster,
voter => voter_status(id(State), Cluster),
cluster_index_term => {Idx, Term}};
pre_append_log_follower(_, State) ->
State.
Expand Down Expand Up @@ -2577,6 +2620,8 @@ query_indexes(#{cfg := #cfg{id = Id},
query_index := QueryIndex}) ->
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter := {no, _}}, Acc) ->
Acc;
(_K, #{query_index := Idx}, Acc) ->
[Idx | Acc]
end, [QueryIndex], Cluster).
Expand All @@ -2587,6 +2632,8 @@ match_indexes(#{cfg := #cfg{id = Id},
{LWIdx, _} = ra_log:last_written(Log),
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter := {no, _}}, Acc) ->
Acc;
(_K, #{match_index := Idx}, Acc) ->
[Idx | Acc]
end, [LWIdx], Cluster).
Expand Down Expand Up @@ -2803,6 +2850,62 @@ meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) ->
Name;
meta_name(#{names := #{log_meta := Name}}) ->
Name.

%%% ====================
%%% Voter status helpers
%%% ====================

-spec new_nonvoter(ra_server_state()) -> ra_voter_status().

new_nonvoter(#{commit_index := Target} = _State) ->
{no, #{target => Target}}.

-spec maybe_promote_voter(ra_server_id(), ra_server_state(), effects()) -> effects().

maybe_promote_voter(PeerID, #{cluster := Cluster} = _State, Effects) ->
% Unknown peer handled in the caller.
#{PeerID := #{match_index := MI, voter := OldStatus}} = Cluster,
case update_voter_status(OldStatus, MI) of
OldStatus ->
Effects;
yes ->
[{next_event,
{command, {'$ra_join',
#{ts => os:system_time(millisecond)},
#{node => PeerID, voter => yes},
noreply}}} |
Effects]
end.

update_voter_status({no, #{target := Target}}, MI)
when MI >= Target ->
yes;
update_voter_status(Permanent, _) ->
Permanent.

-spec voter_status(ra_server_id(), ra_cluster()) -> ra_voter_status().

voter_status(PeerId, Cluster) ->
case maps:get(PeerId, Cluster, undefined) of
undefined ->
{no, undefined};
Peer ->
maps:get(voter, Peer, yes)
end.

-spec required_quorum(ra_cluster()) -> pos_integer().

required_quorum(Cluster) ->
Voters = count_voters(Cluster),
trunc(Voters / 2) + 1.

count_voters(Cluster) ->
maps:fold(
fun (_, #{voter := {no, _}}, Count) -> Count;
(_, _, Count) -> Count + 1
end,
0, Cluster).

%%% ===================
%%% Internal unit tests
%%% ===================
Expand Down
2 changes: 2 additions & 0 deletions src/ra_server.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
-define(DEFAULT_SNAPSHOT_CHUNK_SIZE, 1000000). % 1MB
-define(DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT, 30000).
-define(FLUSH_COMMANDS_SIZE, 16).
-define(MAX_NONVOTER_ROUNDS, 4).

-record(cfg,
{id :: ra_server_id(),
uid :: ra_uid(),
log_id :: unicode:chardata(),
metrics_key :: term(),
tick_timeout :: non_neg_integer(),
machine :: ra_machine:machine(),
machine_version :: ra_machine:version(),
machine_versions :: [{ra_index(), ra_machine:version()}, ...],
Expand Down
15 changes: 12 additions & 3 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -783,9 +783,17 @@ follower(_, tick_timeout, State0) ->
set_tick_timer(State, Actions)};
follower({call, From}, {log_fold, Fun, Term}, State) ->
fold_log(From, Fun, Term, State);
follower(EventType, Msg, State0) ->
follower(EventType, Msg, #state{conf = #conf{name = Name},
server_state = SS0} = State0) ->
Voter0 = maps:get(voter, SS0, yes),
case handle_follower(Msg, State0) of
{follower, State1, Effects} ->
{follower, #state{server_state = SS1} = State1, Effects} ->
case maps:get(voter, SS1, yes) of
Voter0 ->
ok;
Voter ->
true = ets:insert(ra_state, {Name, {follower, Voter}})
end,
{State2, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
State = follower_leader_change(State0, State2),
{keep_state, State, Actions};
Expand Down Expand Up @@ -1028,7 +1036,8 @@ format_status(Opt, [_PDict, StateName,
handle_enter(RaftState, OldRaftState,
#state{conf = #conf{name = Name},
server_state = ServerState0} = State) ->
true = ets:insert(ra_state, {Name, RaftState}),
Voter = maps:get(voter, ServerState0, yes),
true = ets:insert(ra_state, {Name, {RaftState, Voter}}),
{ServerState, Effects} = ra_server:handle_state_enter(RaftState,
ServerState0),
case RaftState == leader orelse OldRaftState == leader of
Expand Down
3 changes: 2 additions & 1 deletion test/ra_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,8 @@ force_start_follower_as_single_member(Config) ->
Conf4 = conf(ClusterName, UId4, ServerId4, PrivDir, [ServerId3]),
{ok, _, _} = ra:add_member(ServerId3, ServerId4),
%% the membership has changed but member not running yet
{timeout,_} = ra:process_command(ServerId3, {enq, banana}),
%% it is nonvoter and does not affect quorum size yet
{ok, _, _} = ra:process_command(ServerId3, {enq, banana}),
%% start new member
ok = ra:start_server(?SYS, Conf4),
{ok, _, ServerId3} = ra:members(ServerId4),
Expand Down
Loading

0 comments on commit 484c209

Please sign in to comment.