Skip to content

Commit

Permalink
Ask nodes that come back to re-register their connections
Browse files Browse the repository at this point in the history
Depending on the partition handling mode used there may or may not
be any clients still connected. We make sure that registration
and deregistration functions are idempotent and assume there
may be connections on the node that has come back.

Point of improvement: when a node comes back up, N-1 nodes
will tell it to re-register connections. It could be fewer
than N-1, ideally just 1.
  • Loading branch information
michaelklishin committed Jul 19, 2016
1 parent 24e4c0e commit a774c7b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
18 changes: 12 additions & 6 deletions src/rabbit_connection_tracker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
-behaviour(gen_server2).

%% API
-export([boot/0, start_link/0, reregister/0]).
-export([boot/0, start_link/0, reregister/1]).

%% gen_fsm callbacks
-export([init/1,
Expand All @@ -52,8 +52,9 @@ boot() ->
start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).

reregister() ->
gen_server2:cast({local, ?SERVER}, reregister).
reregister(Node) ->
rabbit_log:info("Telling node ~p to re-register tracked connections", [Node]),
gen_server2:cast({?SERVER, Node}, reregister).

%%%===================================================================
%%% gen_server callbacks
Expand All @@ -66,8 +67,9 @@ handle_call(_Req, _From, State) ->
{noreply, State}.

handle_cast(reregister, State) ->
rabbit_log:info("Connection tracker: asked to re-register client connections"),
case rabbit_networking:connections_local() of
Cs = rabbit_networking:connections_local(),
rabbit_log:info("Connection tracker: asked to re-register ~p client connections", [length(Cs)]),
case Cs of
[] -> ok;
Cs ->
[reregister_connection(C) || C <- Cs],
Expand All @@ -90,4 +92,8 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================

reregister_connection(Conn) ->
Conn ! reregister.
try
Conn ! reregister
catch _:Error ->
rabbit_log:error("Failed to re-register connection ~p after a network split: ~p", [Conn, Error])
end.
18 changes: 12 additions & 6 deletions src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@

-spec register_connection(rabbit_types:tracked_connection()) -> ok.

register_connection(#tracked_connection{vhost = VHost} = Conn) ->
register_connection(#tracked_connection{vhost = VHost, id = ConnId} = Conn) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
mnesia:write(?TABLE, Conn, write),
mnesia:dirty_update_counter(
rabbit_tracked_connection_per_vhost, VHost, 1),
%% upsert
case mnesia:dirty_read(?TABLE, ConnId) of
[] ->
mnesia:write(?TABLE, Conn, write),
mnesia:dirty_update_counter(
rabbit_tracked_connection_per_vhost, VHost, 1);
[_Row] ->
ok
end,
ok
end).

Expand Down Expand Up @@ -103,8 +109,8 @@ on_node_down(Node) ->
end.

-spec on_node_up(node()) -> ok.
on_node_up(_Node) ->
%% TODO
on_node_up(Node) ->
rabbit_connection_tracker:reregister(Node),
ok.

-spec is_over_connection_limit(rabbit_types:vhost()) -> boolean().
Expand Down
12 changes: 7 additions & 5 deletions test/per_vhost_connection_limit_partitions_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ all() ->
groups() ->
[
{net_ticktime_1, [], [
cluster_full_partition_test
cluster_full_partition_with_autoheal_test
]}
].

%% see partitions_SUITE
-define(DELAY, 9000).
-define(DELAY, 12000).

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
Expand Down Expand Up @@ -87,7 +87,7 @@ end_per_testcase(Testcase, Config) ->
%% Test cases.
%% -------------------------------------------------------------------

cluster_full_partition_test(Config) ->
cluster_full_partition_with_autoheal_test(Config) ->
VHost = <<"/">>,
rabbit_ct_broker_helpers:set_partition_handling_mode_globally(Config, autoheal),

Expand Down Expand Up @@ -115,11 +115,13 @@ cluster_full_partition_test(Config) ->
rabbit_ct_broker_helpers:allow_traffic_between(B, C),
timer:sleep(?DELAY),

?assertEqual(6, count_connections_in(Config, VHost)),
%% during autoheal B's connections were dropped
?assertEqual(4, count_connections_in(Config, VHost)),

lists:foreach(fun (Conn) ->
(catch rabbit_ct_client_helpers:close_connection(Conn))
end, [Conn1, Conn2, Conn3]),
end, [Conn1, Conn2, Conn3, Conn4,
Conn5, Conn6]),

passed.

Expand Down

0 comments on commit a774c7b

Please sign in to comment.