Skip to content

Commit

Permalink
Change vhost supervision recovery process.
Browse files Browse the repository at this point in the history
In order to be aware if vhost is alive or not, introduce
a rabbit_vhost_process gen_server, which manages recovery and
teardown of a vhost.
Also aliveness of the process can be used to determine a vhost state.
Vhost process termination emits an event to close all the vhost connections.
Addresses [#145106713]
  • Loading branch information
Daniil Fedotov committed Jul 19, 2017
1 parent 49a6cff commit 7a82b43
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 61 deletions.
12 changes: 11 additions & 1 deletion src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1,
clear_tracked_connection_tables_for_this_node/0,
register_connection/1, unregister_connection/1,
list/0, list/1, list_on_node/1, list_of_user/1,
list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
count_connections_in/1]).
Expand Down Expand Up @@ -217,6 +217,16 @@ list_on_node(Node) ->
catch exit:{aborted, {no_exists, _}} -> []
end.

-spec list_on_node(node(), rabbit_types:vhsot()) -> [rabbit_types:tracked_connection()].

list_on_node(Node, VHost) ->
try mnesia:dirty_match_object(
tracked_connection_table_name_for(Node),
#tracked_connection{vhost = VHost, _ = '_'})
catch exit:{aborted, {no_exists, _}} -> []
end.


-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()].

list_of_user(Username) ->
Expand Down
9 changes: 9 additions & 0 deletions src/rabbit_connection_tracking_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
close_connections(rabbit_connection_tracking:list(VHost),
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
{ok, State};
handle_event(#event{type = vhost_down, props = Details}, State) ->
VHost = pget(name, Details),
Node = pget(node, Details),
rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
" because the vhost database has stopped working",
[VHost, Node]),
close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
rabbit_misc:format("vhost '~s' is down", [VHost])),
{ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
Username = pget(name, Details),
rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
Expand Down
2 changes: 1 addition & 1 deletion src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
ok;
{error, RTErr} ->
rabbit_log:error("Unable to save message store recovery terms"
"for directory ~p~nError: ~p~n",
" for directory ~p~nError: ~p~n",
[Dir, RTErr])
end,
State3 #msstate { index_state = undefined,
Expand Down
7 changes: 7 additions & 0 deletions src/rabbit_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
-export([delete_storage/1]).
-export([vhost_down/1]).

-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
Expand Down Expand Up @@ -144,6 +145,12 @@ delete(VHostPath, ActingUser) ->
rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath),
ok.

vhost_down(VHostPath) ->
ok = rabbit_event:notify(vhost_down,
[{name, VHostPath},
{node, node()},
{user_who_performed_action, ?INTERNAL_USER}]).

delete_storage(VHost) ->
VhostDir = msg_store_dir_path(VHost),
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
Expand Down
42 changes: 34 additions & 8 deletions src/rabbit_vhost_sup_watcher.erl → src/rabbit_vhost_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,21 @@
%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
%%

%% This module implements a watcher process which should stop
%% the parent supervisor if its vhost is missing from the mnesia DB
%% This module implements a vhost identity process.

-module(rabbit_vhost_sup_watcher).
%% On start this process will try to recover the vhost data and
%% processes structure (queues and message stores).
%% If recovered successfully, the process will save it's PID
%% to vhost process registry. If vhost process PID is in the registry and the
%% process is alive - the vhost is considered running.

%% On termination, the ptocess will notify of vhost going down.

%% The process will also check periodically if the vhost still
%% present in mnesia DB and stop the vhost supervision tree when it
%% desapears.

-module(rabbit_vhost_process).

-include("rabbit.hrl").

Expand All @@ -29,15 +40,26 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).


start_link(VHost) ->
gen_server2:start_link(?MODULE, [VHost], []).


init([VHost]) ->
Interval = interval(),
timer:send_interval(Interval, check_vhost),
{ok, VHost}.
process_flag(trap_exit, true),
rabbit_log:debug("Recovering data for VHost ~p~n", [VHost]),
try
%% Recover the vhost data and save it to vhost registry.
ok = rabbit_vhost:recover(VHost),
rabbit_vhost_sup_sup:save_vhost_process(VHost, self()),
Interval = interval(),
timer:send_interval(Interval, check_vhost),
{ok, VHost}
catch _:Reason ->
rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
" Stacktrace ~p",
[VHost, Reason, erlang:get_stacktrace()]),
{stop, Reason}
end.

handle_call(_,_,VHost) ->
{reply, ok, VHost}.
Expand All @@ -64,7 +86,11 @@ handle_info(check_vhost, VHost) ->
handle_info(_, VHost) ->
{noreply, VHost}.

terminate(_, _) -> ok.
terminate(shutdown, VHost) ->
%% Notify that vhost is stopped.
rabbit_vhost:vhost_down(VHost);

This comment has been minimized.

Copy link
@michaelklishin

michaelklishin Jul 31, 2017

Member

This has one unfortunate issue: it will be called when a vhost is deleted normally, e.g. using CLI tools. Which will leave scary messages about "vhost database failures" in the log and crash rabbit_connection_tracking_handler. I will address both as part of #1315. Kudos to @lukebakken for helping me investigate.

This comment has been minimized.

Copy link
@michaelklishin

michaelklishin Jul 31, 2017

Member

See 613b2a8.

terminate(_, _VHost) ->
ok.

code_change(_OldVsn, VHost, _Extra) ->
{ok, VHost}.
Expand Down
7 changes: 2 additions & 5 deletions src/rabbit_vhost_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,5 @@ start_link(VHost) ->
supervisor2:start_link(?MODULE, [VHost]).

init([VHost]) ->
{ok, {{one_for_all, 0, 1},
[{rabbit_vhost_sup_watcher,
{rabbit_vhost_sup_watcher, start_link, [VHost]},
intrinsic, ?WORKER_WAIT, worker,
[rabbit_vhost_sup]}]}}.
rabbit_log:error("Starting VHost sup ~p~n", [VHost]),
{ok, {{one_for_all, 0, 1}, []}}.
90 changes: 52 additions & 38 deletions src/rabbit_vhost_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
-export([start_link/0, start/0]).
-export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
-export([delete_on_all_nodes/1]).
-export([start_vhost/1, start_vhost/2, start_on_all_nodes/1, vhost_restart_strategy/0]).
-export([start_on_all_nodes/1]).

-export([save_vhost_process/2]).
-export([is_vhost_alive/1]).

%% Internal
-export([stop_and_delete_vhost/1]).

-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}).
-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}).

start() ->
case supervisor:start_child(rabbit_sup, {?MODULE,
Expand All @@ -56,48 +59,18 @@ init([]) ->
[rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}.

start_on_all_nodes(VHost) ->
[ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
[ {ok, _} = vhost_sup(VHost, Node) || Node <- rabbit_nodes:all_running() ],
ok.

delete_on_all_nodes(VHost) ->
[ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
ok.

start_vhost(VHost, Node) when Node == node(self()) ->
start_vhost(VHost);
start_vhost(VHost, Node) ->
case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
{badrpc, RpcErr} ->
{error, RpcErr}
end.

start_vhost(VHost) ->
case rabbit_vhost:exists(VHost) of
false -> {error, {no_such_vhost, VHost}};
true ->
case vhost_sup_pid(VHost) of
no_pid ->
case supervisor2:start_child(?MODULE, [VHost]) of
{ok, _} -> ok;
{error, {already_started, _}} -> ok;
Error ->
rabbit_log:error("Could not start process tree "
"for vhost '~s': ~p", [VHost, Error]),
throw(Error)
end,
{ok, _} = vhost_sup_pid(VHost);
{ok, Pid} when is_pid(Pid) ->
{ok, Pid}
end
end.

stop_and_delete_vhost(VHost) ->
case get_vhost_sup(VHost) of
not_found -> ok;
#vhost_sup{wrapper_pid = WrapperPid,
vhost_sup_pid = VHostSupPid} = VHostSup ->
vhost_sup_pid = VHostSupPid} ->
case is_process_alive(WrapperPid) of
false -> ok;
true ->
Expand All @@ -106,7 +79,7 @@ stop_and_delete_vhost(VHost) ->
[VHostSupPid, VHost]),
case supervisor2:terminate_child(?MODULE, WrapperPid) of
ok ->
ets:delete_object(?MODULE, VHostSup),
ets:delete(?MODULE, VHost),
ok = rabbit_vhost:delete_storage(VHost);
Other ->
Other
Expand All @@ -128,7 +101,7 @@ stop_and_delete_vhost(VHost, Node) ->
{error, RpcErr}
end.

-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}.
-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}.
vhost_sup(VHost, Local) when Local == node(self()) ->
vhost_sup(VHost);
vhost_sup(VHost, Node) ->
Expand All @@ -139,9 +112,44 @@ vhost_sup(VHost, Node) ->
{error, RpcErr}
end.

-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}.
-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}.
vhost_sup(VHost) ->
start_vhost(VHost).
case rabbit_vhost:exists(VHost) of
false -> {error, {no_such_vhost, VHost}};
true ->
case vhost_sup_pid(VHost) of
no_pid ->
case supervisor2:start_child(?MODULE, [VHost]) of
{ok, _} -> ok;
{error, {already_started, _}} -> ok;
Error -> throw(Error)
end,
{ok, _} = vhost_sup_pid(VHost);
{ok, Pid} when is_pid(Pid) ->
{ok, Pid}
end
end.


-spec is_vhost_alive(rabbit_types:vhost()) -> boolean().
is_vhost_alive(VHost) ->
%% A vhost is considered alive if it's supervision tree is alive and
%% saved in the ETS table
case get_vhost_sup(VHost) of
#vhost_sup{wrapper_pid = WrapperPid,
vhost_sup_pid = VHostSupPid,
vhost_process_pid = VHostProcessPid}
when is_pid(WrapperPid),
is_pid(VHostSupPid),
is_pid(VHostProcessPid) ->
is_process_alive(WrapperPid)
andalso
is_process_alive(VHostSupPid)
andalso
is_process_alive(VHostProcessPid);
_ -> false
end.


-spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok.
save_vhost_sup(VHost, WrapperPid, VHostPid) ->
Expand All @@ -150,6 +158,12 @@ save_vhost_sup(VHost, WrapperPid, VHostPid) ->
wrapper_pid = WrapperPid}),
ok.

-spec save_vhost_process(rabbit_types:vhost(), pid()) -> ok.
save_vhost_process(VHost, VHostProcessPid) ->
true = ets:update_element(?MODULE, VHost,
{#vhost_sup.vhost_process_pid, VHostProcessPid}),
ok.

-spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
get_vhost_sup(VHost) ->
case ets:lookup(?MODULE, VHost) of
Expand Down
27 changes: 19 additions & 8 deletions src/rabbit_vhost_sup_wrapper.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,35 @@
-export([start_vhost_sup/1]).

start_link(VHost) ->
supervisor2:start_link(?MODULE, [VHost]).
%% Using supervisor, because supervisor2 does not stop a started child when
%% another one fails to start. Bug?
supervisor:start_link(?MODULE, [VHost]).

init([VHost]) ->
%% 2 restarts in 5 minutes. One per message store.
{ok, {{one_for_all, 2, 300},
[{rabbit_vhost_sup,
{rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
permanent, infinity, supervisor,
[rabbit_vhost_sup]}]}}.
[
%% rabbit_vhost_sup is an empty supervisor container for
%% all data processes.
{rabbit_vhost_sup,
{rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
permanent, infinity, supervisor,
[rabbit_vhost_sup]},
%% rabbit_vhost_process is a vhost identity process, which
%% is responsible for data recovery and vhost aliveness status.
%% See the module comments for more info.
{rabbit_vhost_process,
{rabbit_vhost_process, start_link, [VHost]},
permanent, ?WORKER_WAIT, worker,
[rabbit_vhost_process]}]}}.


start_vhost_sup(VHost) ->
case rabbit_vhost_sup:start_link(VHost) of
{ok, Pid} ->
%% Save vhost sup record with wrapper pid and vhost sup pid.
ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid),
%% We can start recover as soon as we have vhost_sup record saved
ok = rabbit_vhost:recover(VHost),
{ok, Pid};
Other ->
Other
end.
end.

0 comments on commit 7a82b43

Please sign in to comment.