Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Commit

Permalink
Handle node_node_stats and emit data rates and history for cluster li…
Browse files Browse the repository at this point in the history
…nks (bug 26598)
  • Loading branch information
Simon MacMullen committed Feb 27, 2015
1 parent 30ffe3b commit dc4a42a
Showing 1 changed file with 34 additions and 11 deletions.
45 changes: 34 additions & 11 deletions src/rabbit_mgmt_db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@
channel_queue_exchange_stats]).
-define(TABLES, [queue_stats, connection_stats, channel_stats,
consumers_by_queue, consumers_by_channel,
node_stats]).
node_stats, node_node_stats]).

-define(DELIVER_GET, [deliver, deliver_no_ack, get, get_no_ack]).
-define(FINE_STATS, [publish, publish_in, publish_out,
Expand All @@ -173,6 +173,8 @@
queue_index_journal_write_count,
queue_index_write_count, queue_index_read_count]).

-define(COARSE_NODE_NODE_STATS, [send_bytes, recv_bytes]).

%% Normally 0 and no history means "has never happened, don't
%% report". But for these things we do want to report even at 0 with
%% no history.
Expand Down Expand Up @@ -453,6 +455,7 @@ pget(Key, List) -> pget(Key, List, unknown).
%% passed a queue proplist that will already have been formatted -
%% i.e. it will have name and vhost keys.
id_name(node_stats) -> name;
id_name(node_node_stats) -> route;
id_name(vhost_stats) -> name;
id_name(queue_stats) -> name;
id_name(exchange_stats) -> name;
Expand Down Expand Up @@ -526,8 +529,7 @@ handle_event(Event = #event{type = exchange_deleted,

handle_event(#event{type = vhost_deleted,
props = [{name, Name}]}, State) ->
delete_samples(vhost_stats, Name, State),
{ok, State};
delete_samples(vhost_stats, Name, State);

handle_event(#event{type = connection_created, props = Stats}, State) ->
handle_created(
Expand Down Expand Up @@ -562,8 +564,7 @@ handle_event(#event{type = channel_stats, props = Stats, timestamp = Timestamp},
ets:match_delete(OldTable, {{fine, {ChPid, '_'}}, '_'}),
ets:match_delete(OldTable, {{fine, {ChPid, '_', '_'}}, '_'}),
[handle_fine_stats(Timestamp, AllStatsElem, State)
|| AllStatsElem <- AllStats],
{ok, State};
|| AllStatsElem <- AllStats];

handle_event(Event = #event{type = channel_closed,
props = [{pid, Pid}]},
Expand Down Expand Up @@ -597,8 +598,18 @@ handle_event(#event{type = node_stats, props = Stats0, timestamp = Timestamp},
pget(persister_stats, Stats0),
handle_stats(node_stats, Stats, Timestamp, [], ?COARSE_NODE_STATS, State);

handle_event(_Event, State) ->
{ok, State}.
handle_event(#event{type = node_node_stats, props = Stats,
timestamp = Timestamp}, State) ->
handle_stats(node_node_stats, Stats, Timestamp, [], ?COARSE_NODE_NODE_STATS,
State);

handle_event(Event = #event{type = node_node_deleted,
props = [{route, Route}]}, State) ->
delete_samples(node_node_stats, Route, State),
handle_deleted(node_node_stats, Event, State);

handle_event(_Event, _State) ->
ok.

handle_created(TName, Stats, Funs, State = #state{tables = Tables}) ->
Formatted = rabbit_mgmt_format:format(Stats, Funs),
Expand Down Expand Up @@ -742,6 +753,9 @@ ignore_coarse_sample(_, _) ->
record_sample({coarse, {node_stats, _Node} = Id}, Args, true, _State) ->
record_sample0(Id, Args);

record_sample({coarse, {node_node_stats, _Names} = Id}, Args, true, _State) ->
record_sample0(Id, Args);

record_sample({coarse, Id}, Args, false, _State) ->
record_sample0(Id, Args);

Expand Down Expand Up @@ -831,7 +845,8 @@ record_sampleX(_RenamePublishTo, X, {Type, Diff, TS, State}) ->

%% Ignore case where ID1 and ID2 are in a tuple, i.e. detailed stats,
%% when in basic mode
record_sample0({_, {_ID1, _ID2}}, {_, _, _, #state{rates_mode = basic}}) ->
record_sample0({Type, {_ID1, _ID2}}, {_, _, _, #state{rates_mode = basic}})
when Type =/= node_node_stats ->
ok;
record_sample0(Id0, {Key, Diff, TS, #state{aggregated_stats = ETS,
aggregated_stats_index = ETSi}}) ->
Expand Down Expand Up @@ -865,6 +880,9 @@ record_sample0(Id0, {Key, Diff, TS, #state{aggregated_stats = ETS,
{channel_stats, [{publishes, channel_exchange_stats, fun first/1},
{deliveries, channel_queue_stats, fun first/1}]}).

-define(NODE_DETAILS,
{node_stats, [{cluster_links, node_node_stats, fun first/1}]}).

first(Id) -> {Id, '$1'}.
second(Id) -> {'$1', Id}.

Expand Down Expand Up @@ -918,7 +936,8 @@ vhost_stats(Ranges, Objs, State) ->

node_stats(Ranges, Objs, State) ->
merge_stats(Objs, [basic_stats_fun(node_stats, State),
simple_stats_fun(Ranges, node_stats, State)]).
simple_stats_fun(Ranges, node_stats, State),
detail_stats_fun(Ranges, ?NODE_DETAILS, State)]).

merge_stats(Objs, Funs) ->
[lists:foldl(fun (Fun, Props) -> combine(Fun(Props), Props) end, Obj, Funs)
Expand Down Expand Up @@ -991,7 +1010,9 @@ format_detail_id(ChPid, State) when is_pid(ChPid) ->
augment_msg_stats([{channel, ChPid}], State);
format_detail_id(#resource{name = Name, virtual_host = Vhost, kind = Kind},
_State) ->
[{Kind, [{name, Name}, {vhost, Vhost}]}].
[{Kind, [{name, Name}, {vhost, Vhost}]}];
format_detail_id(Node, _State) when is_atom(Node) ->
[{node, Node}].

format_samples(Ranges, ManyStats, #state{interval = Interval}) ->
lists:append(
Expand All @@ -1009,7 +1030,8 @@ pick_range(K, {RangeL, RangeM, RangeD, RangeN}) ->
case {lists:member(K, ?QUEUE_MSG_COUNTS),
lists:member(K, ?MSG_RATES),
lists:member(K, ?COARSE_CONN_STATS),
lists:member(K, ?COARSE_NODE_STATS)} of
lists:member(K, ?COARSE_NODE_STATS)
orelse lists:member(K, ?COARSE_NODE_NODE_STATS)} of
{true, false, false, false} -> RangeL;
{false, true, false, false} -> RangeM;
{false, false, true, false} -> RangeD;
Expand Down Expand Up @@ -1148,6 +1170,7 @@ gc({{Type, Id}, Key}, Stats, Policies, Now, ETS) ->
end.

retention_policy(node_stats) -> global;
retention_policy(node_node_stats) -> global;
retention_policy(vhost_stats) -> global;
retention_policy(queue_stats) -> basic;
retention_policy(exchange_stats) -> basic;
Expand Down

0 comments on commit dc4a42a

Please sign in to comment.