Skip to content

Commit

Permalink
Merge branch 'master' into rabbitmq-server-500
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Feb 10, 2016
2 parents 49a1886 + 8974581 commit ba20887
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/rabbit_alarm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
alarms :: [alarm()]}).

-type(local_alarm() :: 'file_descriptor_limit').
-type(resource_alarm_source() :: 'disk' | 'node').
-type(resource_alarm_source() :: 'disk' | 'memory').
-type(resource_alarm() :: {resource_limit, resource_alarm_source(), node()}).
-type(alarm() :: local_alarm() | resource_alarm()).

Expand Down
59 changes: 56 additions & 3 deletions src/rabbit_mirror_queue_sync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

-include("rabbit.hrl").

-export([master_prepare/4, master_go/8, slave/7]).
-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).

-define(SYNC_PROGRESS_INTERVAL, 1000000).

Expand Down Expand Up @@ -198,7 +198,7 @@ syncer(Ref, Log, MPid, SPids) ->
[] -> Log("all slaves already synced", []);
SPids1 -> MPid ! {ready, self()},
Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]),
syncer_loop(Ref, MPid, SPids1)
syncer_check_resources(Ref, MPid, SPids1)
end.

await_slaves(Ref, SPids) ->
Expand All @@ -217,12 +217,43 @@ await_slaves(Ref, SPids) ->
%% 'sync_start' and so will not reply. We need to act as though they are
%% down.

syncer_check_resources(Ref, MPid, SPids) ->
rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
%% Before we ask the master node to send the first batch of messages
%% over here, we check if one node is already short on memory. If
%% that's the case, we wait for the alarm to be cleared before
%% starting the syncer loop.
AlarmedNodes = lists:any(
fun
({{resource_limit, memory, _}, _}) -> true;
({_, _}) -> false
end, rabbit_alarm:get_alarms()),
if
not AlarmedNodes ->
MPid ! {next, Ref},
syncer_loop(Ref, MPid, SPids);
true ->
case wait_for_resources(Ref, SPids) of
cancel -> ok;
SPids1 -> MPid ! {next, Ref},
syncer_loop(Ref, MPid, SPids1)
end
end.

syncer_loop(Ref, MPid, SPids) ->
MPid ! {next, Ref},
receive
{conserve_resources, memory, true} ->
case wait_for_resources(Ref, SPids) of
cancel -> ok;
SPids1 -> syncer_loop(Ref, MPid, SPids1)
end;
{conserve_resources, _, _} ->
%% Ignore other alerts.
syncer_loop(Ref, MPid, SPids);
{msgs, Ref, Msgs} ->
SPids1 = wait_for_credit(SPids),
broadcast(SPids1, {sync_msgs, Ref, Msgs}),
MPid ! {next, Ref},
syncer_loop(Ref, MPid, SPids1);
{cancel, Ref} ->
%% We don't tell the slaves we will die - so when we do
Expand All @@ -239,6 +270,10 @@ broadcast(SPids, Msg) ->
SPid ! Msg
end || SPid <- SPids].

conserve_resources(Pid, Source, {_, Conserve, _}) ->
Pid ! {conserve_resources, Source, Conserve},
ok.

wait_for_credit(SPids) ->
case credit_flow:blocked() of
true -> receive
Expand All @@ -252,6 +287,24 @@ wait_for_credit(SPids) ->
false -> SPids
end.

wait_for_resources(Ref, SPids) ->
receive
{conserve_resources, memory, false} ->
SPids;
{conserve_resources, _, _} ->
%% Ignore other alerts.
wait_for_resources(Ref, SPids);
{cancel, Ref} ->
%% We don't tell the slaves we will die - so when we do
%% they interpret that as a failure, which is what we
%% want.
cancel;
{'DOWN', _, process, SPid, _} ->
credit_flow:peer_down(SPid),
SPids1 = wait_for_credit(lists:delete(SPid, SPids)),
wait_for_resources(Ref, SPids1)
end.

%% Syncer
%% ---------------------------------------------------------------------------
%% Slave
Expand Down

0 comments on commit ba20887

Please sign in to comment.