Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close connections when vhost is unavailable (supervision tree is down) #1293

Merged
merged 8 commits into from
Jul 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1,
clear_tracked_connection_tables_for_this_node/0,
register_connection/1, unregister_connection/1,
list/0, list/1, list_on_node/1, list_of_user/1,
list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
count_connections_in/1]).
Expand Down Expand Up @@ -217,6 +217,16 @@ list_on_node(Node) ->
catch exit:{aborted, {no_exists, _}} -> []
end.

-spec list_on_node(node(), rabbit_types:vhsot()) -> [rabbit_types:tracked_connection()].

list_on_node(Node, VHost) ->
try mnesia:dirty_match_object(
tracked_connection_table_name_for(Node),
#tracked_connection{vhost = VHost, _ = '_'})
catch exit:{aborted, {no_exists, _}} -> []
end.


-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()].

list_of_user(Username) ->
Expand Down
9 changes: 9 additions & 0 deletions src/rabbit_connection_tracking_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
close_connections(rabbit_connection_tracking:list(VHost),
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
{ok, State};
handle_event(#event{type = vhost_down, props = Details}, State) ->
VHost = pget(name, Details),
Node = pget(node, Details),
rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
" because the vhost database has stopped working",
[VHost, Node]),
close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
rabbit_misc:format("vhost '~s' is down", [VHost])),
{ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
Username = pget(name, Details),
rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
Expand Down
40 changes: 30 additions & 10 deletions src/rabbit_direct.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,21 @@ connect(Creds, VHost, Protocol, Pid, Infos) ->
true ->
{error, not_allowed};
false ->
case AuthFun() of
{ok, User = #user{username = Username}} ->
notify_auth_result(Username,
user_authentication_success, []),
connect1(User, VHost, Protocol, Pid, Infos);
{refused, Username, Msg, Args} ->
notify_auth_result(Username,
user_authentication_failure,
[{error, rabbit_misc:format(Msg, Args)}]),
{error, {auth_failure, "Refused"}}
case is_vhost_alive(VHost, Creds, Pid) of
false ->
{error, {internal_error, vhost_is_down}};
true ->
case AuthFun() of
{ok, User = #user{username = Username}} ->
notify_auth_result(Username,
user_authentication_success, []),
connect1(User, VHost, Protocol, Pid, Infos);
{refused, Username, Msg, Args} ->
notify_auth_result(Username,
user_authentication_failure,
[{error, rabbit_misc:format(Msg, Args)}]),
{error, {auth_failure, "Refused"}}
end
end
end;
false -> {error, broker_not_found_on_node}
Expand Down Expand Up @@ -140,6 +145,21 @@ maybe_call_connection_info_module(Protocol, Creds, VHost, Pid, Infos) ->
[]
end.

is_vhost_alive(VHost, {Username, _Password}, Pid) ->
PrintedUsername = case Username of
none -> "";
_ -> Username
end,
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
true -> true;
false ->
rabbit_log_connection:error(
"Error on Direct connection ~p~n"
"access to vhost '~s' refused for user '~s': "
"vhost '~s' is down",
[Pid, VHost, PrintedUsername, VHost]),
false
end.

is_over_connection_limit(VHost, {Username, _Password}, Pid) ->
PrintedUsername = case Username of
Expand Down
2 changes: 1 addition & 1 deletion src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
ok;
{error, RTErr} ->
rabbit_log:error("Unable to save message store recovery terms"
"for directory ~p~nError: ~p~n",
" for directory ~p~nError: ~p~n",
[Dir, RTErr])
end,
State3 #msstate { index_state = undefined,
Expand Down
23 changes: 17 additions & 6 deletions src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ handle_other(handshake_timeout, State) ->
throw({handshake_timeout, State#v1.callback});
handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
State;
handle_other(heartbeat_timeout,
handle_other(heartbeat_timeout,
State = #v1{connection = #connection{timeout_sec = T}}) ->
maybe_emit_stats(State),
throw({heartbeat_timeout, T});
Expand Down Expand Up @@ -623,7 +623,7 @@ send_blocked(#v1{connection = #connection{protocol = Protocol,
sock = Sock}, Reason) ->
case rabbit_misc:table_lookup(Capabilities, <<"connection.blocked">>) of
{bool, true} ->

ok = send_on_channel0(Sock, #'connection.blocked'{reason = Reason},
Protocol);
_ ->
Expand Down Expand Up @@ -1164,6 +1164,7 @@ handle_method0(#'connection.open'{virtual_host = VHost},

ok = is_over_connection_limit(VHost, User),
ok = rabbit_access_control:check_vhost_access(User, VHost, Sock),
ok = is_vhost_alive(VHost, User),
NewConnection = Connection#connection{vhost = VHost},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),

Expand Down Expand Up @@ -1209,6 +1210,16 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).

is_vhost_alive(VHostPath, User) ->
case rabbit_vhost_sup_sup:is_vhost_alive(VHostPath) of
true -> ok;
false ->
rabbit_misc:protocol_error(internal_error,
"access to vhost '~s' refused for user '~s': "
"vhost '~s' is down",
[VHostPath, User#user.username, VHostPath])
end.

is_over_connection_limit(VHostPath, User) ->
try rabbit_vhost_limit:is_over_connection_limit(VHostPath) of
false -> ok;
Expand Down Expand Up @@ -1567,7 +1578,7 @@ maybe_block(State = #v1{connection_state = CS, throttle = Throttle}) ->
State1 = State#v1{connection_state = blocked,
throttle = update_last_blocked_at(Throttle)},
case CS of
running ->
running ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater);
_ -> ok
end,
Expand All @@ -1589,7 +1600,7 @@ maybe_send_unblocked(State = #v1{throttle = Throttle}) ->
case should_send_unblocked(Throttle) of
true ->
ok = send_unblocked(State),
State#v1{throttle =
State#v1{throttle =
Throttle#throttle{connection_blocked_message_sent = false}};
false -> State
end.
Expand All @@ -1598,7 +1609,7 @@ maybe_send_blocked_or_unblocked(State = #v1{throttle = Throttle}) ->
case should_send_blocked(Throttle) of
true ->
ok = send_blocked(State, blocked_by_message(Throttle)),
State#v1{throttle =
State#v1{throttle =
Throttle#throttle{connection_blocked_message_sent = true}};
false -> maybe_send_unblocked(State)
end.
Expand All @@ -1624,7 +1635,7 @@ control_throttle(State = #v1{connection_state = CS,
running -> maybe_block(State1);
%% unblock or re-enable blocking
blocked -> maybe_block(maybe_unblock(State1));
_ -> State1
_ -> State1
end.

augment_connection_log_name(#connection{client_properties = ClientProperties,
Expand Down
12 changes: 10 additions & 2 deletions src/rabbit_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
-export([delete_storage/1]).
-export([vhost_down/1]).

-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
Expand Down Expand Up @@ -54,8 +55,9 @@ recover() ->
%% rabbit_vhost_sup_sup will start the actual recovery.
%% So recovery will be run every time a vhost supervisor is restarted.
ok = rabbit_vhost_sup_sup:start(),
[{ok, _} = rabbit_vhost_sup_sup:vhost_sup(VHost)
|| VHost <- rabbit_vhost:list()],

[ ok = rabbit_vhost_sup_sup:init_vhost(VHost)
|| VHost <- rabbit_vhost:list()],
ok.

recover(VHost) ->
Expand Down Expand Up @@ -144,6 +146,12 @@ delete(VHostPath, ActingUser) ->
rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath),
ok.

vhost_down(VHostPath) ->
ok = rabbit_event:notify(vhost_down,
[{name, VHostPath},
{node, node()},
{user_who_performed_action, ?INTERNAL_USER}]).

delete_storage(VHost) ->
VhostDir = msg_store_dir_path(VHost),
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
Expand Down
42 changes: 34 additions & 8 deletions src/rabbit_vhost_sup_watcher.erl → src/rabbit_vhost_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,21 @@
%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
%%

%% This module implements a watcher process which should stop
%% the parent supervisor if its vhost is missing from the mnesia DB
%% This module implements a vhost identity process.

-module(rabbit_vhost_sup_watcher).
%% On start this process will try to recover the vhost data and
%% processes structure (queues and message stores).
%% If recovered successfully, the process will save it's PID
%% to vhost process registry. If vhost process PID is in the registry and the
%% process is alive - the vhost is considered running.

%% On termination, the ptocess will notify of vhost going down.

%% The process will also check periodically if the vhost still
%% present in mnesia DB and stop the vhost supervision tree when it
%% desapears.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spelling


-module(rabbit_vhost_process).

-include("rabbit.hrl").

Expand All @@ -29,15 +40,26 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).


start_link(VHost) ->
gen_server2:start_link(?MODULE, [VHost], []).


init([VHost]) ->
Interval = interval(),
timer:send_interval(Interval, check_vhost),
{ok, VHost}.
process_flag(trap_exit, true),
rabbit_log:debug("Recovering data for VHost ~p~n", [VHost]),
try
%% Recover the vhost data and save it to vhost registry.
ok = rabbit_vhost:recover(VHost),
rabbit_vhost_sup_sup:save_vhost_process(VHost, self()),
Interval = interval(),
timer:send_interval(Interval, check_vhost),
{ok, VHost}
catch _:Reason ->
rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
" Stacktrace ~p",
[VHost, Reason, erlang:get_stacktrace()]),
{stop, Reason}
end.

handle_call(_,_,VHost) ->
{reply, ok, VHost}.
Expand All @@ -64,7 +86,11 @@ handle_info(check_vhost, VHost) ->
handle_info(_, VHost) ->
{noreply, VHost}.

terminate(_, _) -> ok.
terminate(shutdown, VHost) ->
%% Notify that vhost is stopped.
rabbit_vhost:vhost_down(VHost);
terminate(_, _VHost) ->
ok.

code_change(_OldVsn, VHost, _Extra) ->
{ok, VHost}.
Expand Down
6 changes: 1 addition & 5 deletions src/rabbit_vhost_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,4 @@ start_link(VHost) ->
supervisor2:start_link(?MODULE, [VHost]).

init([VHost]) ->
{ok, {{one_for_all, 0, 1},
[{rabbit_vhost_sup_watcher,
{rabbit_vhost_sup_watcher, start_link, [VHost]},
intrinsic, ?WORKER_WAIT, worker,
[rabbit_vhost_sup]}]}}.
{ok, {{one_for_all, 0, 1}, []}}.
Loading