diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl index f8c4c6541bf8..27c4bff81012 100644 --- a/src/rabbit_connection_tracking.erl +++ b/src/rabbit_connection_tracking.erl @@ -34,7 +34,7 @@ delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1, clear_tracked_connection_tables_for_this_node/0, register_connection/1, unregister_connection/1, - list/0, list/1, list_on_node/1, list_of_user/1, + list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1, tracked_connection_from_connection_created/1, tracked_connection_from_connection_state/1, count_connections_in/1]). @@ -217,6 +217,16 @@ list_on_node(Node) -> catch exit:{aborted, {no_exists, _}} -> [] end. +-spec list_on_node(node(), rabbit_types:vhsot()) -> [rabbit_types:tracked_connection()]. + +list_on_node(Node, VHost) -> + try mnesia:dirty_match_object( + tracked_connection_table_name_for(Node), + #tracked_connection{vhost = VHost, _ = '_'}) + catch exit:{aborted, {no_exists, _}} -> [] + end. + + -spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()]. list_of_user(Username) -> diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl index f1b844c60cb5..3ae17677e0d6 100644 --- a/src/rabbit_connection_tracking_handler.erl +++ b/src/rabbit_connection_tracking_handler.erl @@ -82,6 +82,15 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) -> close_connections(rabbit_connection_tracking:list(VHost), rabbit_misc:format("vhost '~s' is deleted", [VHost])), {ok, State}; +handle_event(#event{type = vhost_down, props = Details}, State) -> + VHost = pget(name, Details), + Node = pget(node, Details), + rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'" + " because the vhost database has stopped working", + [VHost, Node]), + close_connections(rabbit_connection_tracking:list_on_node(Node, VHost), + rabbit_misc:format("vhost '~s' is down", [VHost])), + {ok, State}; handle_event(#event{type = user_deleted, props = Details}, State) -> Username = pget(name, Details), rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]), diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 275a9127d1f0..fe78075d0fed 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1010,7 +1010,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, ok; {error, RTErr} -> rabbit_log:error("Unable to save message store recovery terms" - "for directory ~p~nError: ~p~n", + " for directory ~p~nError: ~p~n", [Dir, RTErr]) end, State3 #msstate { index_state = undefined, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 7513c23925ee..85a967816d71 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -26,6 +26,7 @@ -export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]). -export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). -export([delete_storage/1]). +-export([vhost_down/1]). -spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). -spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). @@ -144,6 +145,12 @@ delete(VHostPath, ActingUser) -> rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath), ok. +vhost_down(VHostPath) -> + ok = rabbit_event:notify(vhost_down, + [{name, VHostPath}, + {node, node()}, + {user_who_performed_action, ?INTERNAL_USER}]). + delete_storage(VHost) -> VhostDir = msg_store_dir_path(VHost), rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]), diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_process.erl similarity index 58% rename from src/rabbit_vhost_sup_watcher.erl rename to src/rabbit_vhost_process.erl index be2c5f20bb18..c16a24eb073e 100644 --- a/src/rabbit_vhost_sup_watcher.erl +++ b/src/rabbit_vhost_process.erl @@ -14,10 +14,21 @@ %% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. %% -%% This module implements a watcher process which should stop -%% the parent supervisor if its vhost is missing from the mnesia DB +%% This module implements a vhost identity process. --module(rabbit_vhost_sup_watcher). +%% On start this process will try to recover the vhost data and +%% processes structure (queues and message stores). +%% If recovered successfully, the process will save it's PID +%% to vhost process registry. If vhost process PID is in the registry and the +%% process is alive - the vhost is considered running. + +%% On termination, the ptocess will notify of vhost going down. + +%% The process will also check periodically if the vhost still +%% present in mnesia DB and stop the vhost supervision tree when it +%% desapears. + +-module(rabbit_vhost_process). -include("rabbit.hrl"). @@ -29,15 +40,26 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - start_link(VHost) -> gen_server2:start_link(?MODULE, [VHost], []). init([VHost]) -> - Interval = interval(), - timer:send_interval(Interval, check_vhost), - {ok, VHost}. + process_flag(trap_exit, true), + rabbit_log:debug("Recovering data for VHost ~p~n", [VHost]), + try + %% Recover the vhost data and save it to vhost registry. + ok = rabbit_vhost:recover(VHost), + rabbit_vhost_sup_sup:save_vhost_process(VHost, self()), + Interval = interval(), + timer:send_interval(Interval, check_vhost), + {ok, VHost} + catch _:Reason -> + rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n" + " Stacktrace ~p", + [VHost, Reason, erlang:get_stacktrace()]), + {stop, Reason} + end. handle_call(_,_,VHost) -> {reply, ok, VHost}. @@ -64,7 +86,11 @@ handle_info(check_vhost, VHost) -> handle_info(_, VHost) -> {noreply, VHost}. -terminate(_, _) -> ok. +terminate(shutdown, VHost) -> + %% Notify that vhost is stopped. + rabbit_vhost:vhost_down(VHost); +terminate(_, _VHost) -> + ok. code_change(_OldVsn, VHost, _Extra) -> {ok, VHost}. diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl index bbf006fbd3ad..4e8688ac9ac8 100644 --- a/src/rabbit_vhost_sup.erl +++ b/src/rabbit_vhost_sup.erl @@ -28,8 +28,5 @@ start_link(VHost) -> supervisor2:start_link(?MODULE, [VHost]). init([VHost]) -> - {ok, {{one_for_all, 0, 1}, - [{rabbit_vhost_sup_watcher, - {rabbit_vhost_sup_watcher, start_link, [VHost]}, - intrinsic, ?WORKER_WAIT, worker, - [rabbit_vhost_sup]}]}}. +rabbit_log:error("Starting VHost sup ~p~n", [VHost]), + {ok, {{one_for_all, 0, 1}, []}}. diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index 7ecac7a5d40e..3d33b872ab82 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -25,12 +25,15 @@ -export([start_link/0, start/0]). -export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). -export([delete_on_all_nodes/1]). --export([start_vhost/1, start_vhost/2, start_on_all_nodes/1, vhost_restart_strategy/0]). +-export([start_on_all_nodes/1]). + +-export([save_vhost_process/2]). +-export([is_vhost_alive/1]). %% Internal -export([stop_and_delete_vhost/1]). --record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}). +-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}). start() -> case supervisor:start_child(rabbit_sup, {?MODULE, @@ -56,48 +59,18 @@ init([]) -> [rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}. start_on_all_nodes(VHost) -> - [ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], + [ {ok, _} = vhost_sup(VHost, Node) || Node <- rabbit_nodes:all_running() ], ok. delete_on_all_nodes(VHost) -> [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], ok. -start_vhost(VHost, Node) when Node == node(self()) -> - start_vhost(VHost); -start_vhost(VHost, Node) -> - case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of - {ok, Pid} when is_pid(Pid) -> - {ok, Pid}; - {badrpc, RpcErr} -> - {error, RpcErr} - end. - -start_vhost(VHost) -> - case rabbit_vhost:exists(VHost) of - false -> {error, {no_such_vhost, VHost}}; - true -> - case vhost_sup_pid(VHost) of - no_pid -> - case supervisor2:start_child(?MODULE, [VHost]) of - {ok, _} -> ok; - {error, {already_started, _}} -> ok; - Error -> - rabbit_log:error("Could not start process tree " - "for vhost '~s': ~p", [VHost, Error]), - throw(Error) - end, - {ok, _} = vhost_sup_pid(VHost); - {ok, Pid} when is_pid(Pid) -> - {ok, Pid} - end - end. - stop_and_delete_vhost(VHost) -> case get_vhost_sup(VHost) of not_found -> ok; #vhost_sup{wrapper_pid = WrapperPid, - vhost_sup_pid = VHostSupPid} = VHostSup -> + vhost_sup_pid = VHostSupPid} -> case is_process_alive(WrapperPid) of false -> ok; true -> @@ -106,7 +79,7 @@ stop_and_delete_vhost(VHost) -> [VHostSupPid, VHost]), case supervisor2:terminate_child(?MODULE, WrapperPid) of ok -> - ets:delete_object(?MODULE, VHostSup), + ets:delete(?MODULE, VHost), ok = rabbit_vhost:delete_storage(VHost); Other -> Other @@ -128,7 +101,7 @@ stop_and_delete_vhost(VHost, Node) -> {error, RpcErr} end. --spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}. +-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}. vhost_sup(VHost, Local) when Local == node(self()) -> vhost_sup(VHost); vhost_sup(VHost, Node) -> @@ -139,9 +112,44 @@ vhost_sup(VHost, Node) -> {error, RpcErr} end. --spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}. +-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}. vhost_sup(VHost) -> - start_vhost(VHost). + case rabbit_vhost:exists(VHost) of + false -> {error, {no_such_vhost, VHost}}; + true -> + case vhost_sup_pid(VHost) of + no_pid -> + case supervisor2:start_child(?MODULE, [VHost]) of + {ok, _} -> ok; + {error, {already_started, _}} -> ok; + Error -> throw(Error) + end, + {ok, _} = vhost_sup_pid(VHost); + {ok, Pid} when is_pid(Pid) -> + {ok, Pid} + end + end. + + +-spec is_vhost_alive(rabbit_types:vhost()) -> boolean(). +is_vhost_alive(VHost) -> +%% A vhost is considered alive if it's supervision tree is alive and +%% saved in the ETS table + case get_vhost_sup(VHost) of + #vhost_sup{wrapper_pid = WrapperPid, + vhost_sup_pid = VHostSupPid, + vhost_process_pid = VHostProcessPid} + when is_pid(WrapperPid), + is_pid(VHostSupPid), + is_pid(VHostProcessPid) -> + is_process_alive(WrapperPid) + andalso + is_process_alive(VHostSupPid) + andalso + is_process_alive(VHostProcessPid); + _ -> false + end. + -spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok. save_vhost_sup(VHost, WrapperPid, VHostPid) -> @@ -150,6 +158,12 @@ save_vhost_sup(VHost, WrapperPid, VHostPid) -> wrapper_pid = WrapperPid}), ok. +-spec save_vhost_process(rabbit_types:vhost(), pid()) -> ok. +save_vhost_process(VHost, VHostProcessPid) -> + true = ets:update_element(?MODULE, VHost, + {#vhost_sup.vhost_process_pid, VHostProcessPid}), + ok. + -spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}. get_vhost_sup(VHost) -> case ets:lookup(?MODULE, VHost) of diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl index 8dbec30bff29..8e23389bb9bb 100644 --- a/src/rabbit_vhost_sup_wrapper.erl +++ b/src/rabbit_vhost_sup_wrapper.erl @@ -27,24 +27,35 @@ -export([start_vhost_sup/1]). start_link(VHost) -> - supervisor2:start_link(?MODULE, [VHost]). + %% Using supervisor, because supervisor2 does not stop a started child when + %% another one fails to start. Bug? + supervisor:start_link(?MODULE, [VHost]). init([VHost]) -> %% 2 restarts in 5 minutes. One per message store. {ok, {{one_for_all, 2, 300}, - [{rabbit_vhost_sup, - {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, - permanent, infinity, supervisor, - [rabbit_vhost_sup]}]}}. + [ + %% rabbit_vhost_sup is an empty supervisor container for + %% all data processes. + {rabbit_vhost_sup, + {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, + permanent, infinity, supervisor, + [rabbit_vhost_sup]}, + %% rabbit_vhost_process is a vhost identity process, which + %% is responsible for data recovery and vhost aliveness status. + %% See the module comments for more info. + {rabbit_vhost_process, + {rabbit_vhost_process, start_link, [VHost]}, + permanent, ?WORKER_WAIT, worker, + [rabbit_vhost_process]}]}}. + start_vhost_sup(VHost) -> case rabbit_vhost_sup:start_link(VHost) of {ok, Pid} -> %% Save vhost sup record with wrapper pid and vhost sup pid. ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid), - %% We can start recover as soon as we have vhost_sup record saved - ok = rabbit_vhost:recover(VHost), {ok, Pid}; Other -> Other - end. + end. \ No newline at end of file