Skip to content

Commit

Permalink
Expose max offset lag of stream consumers via Prometheus
Browse files Browse the repository at this point in the history
Supports both per stream (detailed) and aggregated (metrics) values.
  • Loading branch information
markus812498 authored and gomoripeti committed Nov 19, 2024
1 parent cf8a00c commit e82058e
Showing 1 changed file with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
]},

%% the family name for this metric is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created.
{stream_consumer_metrics, [
{2, undefined, stream_consumer_max_offset_lag, gauge, "Current maximum of offset lag of consumers"}
]},

{connection_metrics, [
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
Expand Down Expand Up @@ -578,6 +583,17 @@ get_data(channel_metrics = Table, false, _) ->
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
{global_prefetch_count, A7}]}];
get_data(stream_consumer_metrics = MF, false, _) ->
Table = rabbit_stream_consumer_created, %% real table name
try ets:foldl(fun({_, Props}, OldMax) ->
erlang:max(proplists:get_value(offset_lag, Props, 0), OldMax)
end, 0, Table) of
MaxOffsetLag ->
[{MF, MaxOffsetLag}]
catch error:badarg ->
%% rabbitmq_stream plugin is not enabled
[]
end;
get_data(queue_consumer_count = MF, false, VHostsFilter) ->
Table = queue_metrics, %% Real table name
{_, A1} = ets:foldl(fun
Expand Down Expand Up @@ -708,6 +724,22 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics
end, [], Table);
get_data(queue_consumer_count, true, _) ->
ets:tab2list(queue_metrics);
get_data(stream_consumer_metrics, true, _) ->
Table = rabbit_stream_consumer_created, %% real table name
try ets:foldl(fun({{QueueName, _Pid, _SubId}, Props}, Map0) ->
Value = proplists:get_value(offset_lag, Props, 0),
maps:update_with(
QueueName,
fun(OldMax) -> erlang:max(Value, OldMax) end,
Value,
Map0)
end, #{}, Table) of
Map1 ->
maps:to_list(Map1)
catch error:badarg ->
%% rabbitmq_stream plugin is not enabled
[]
end;
get_data(vhost_status, _, _) ->
[ { #{<<"vhost">> => VHost},
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
Expand Down

0 comments on commit e82058e

Please sign in to comment.