Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

By @gomoripeti: Streams: two additional Prometheus metrics for connections #12765

Merged
merged 4 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions deps/amqp10_common/src/serial_number.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
diff/2,
foldl/4]).

-ifdef(TEST).
%% For tests.
-export([usort/1]).
-endif.

-type serial_number() :: sequence_no().
-export_type([serial_number/0]).
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_ct_helpers/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PROJECT = rabbitmq_ct_helpers
PROJECT_DESCRIPTION = Common Test helpers for RabbitMQ

DEPS = rabbit_common proper inet_tcp_proxy meck
DEPS = rabbit_common amqp10_common rabbitmq_stream_common proper inet_tcp_proxy meck
LOCAL_DEPS = common_test eunit inets
#TEST_DEPS = rabbit

Expand Down
137 changes: 137 additions & 0 deletions deps/rabbitmq_ct_helpers/src/stream_test_utils.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

%% There is no open source Erlang RabbitMQ Stream client.
%% Therefore, we have to build the Stream protocol commands manually.

-module(stream_test_utils).

-compile([export_all, nowarn_export_all]).

-include_lib("amqp10_common/include/amqp10_framing.hrl").

-define(RESPONSE_CODE_OK, 1).

connect(Config, Node) ->
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_stream),
{ok, Sock} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),

C0 = rabbit_stream_core:init(0),
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
ok = gen_tcp:send(Sock, PeerPropertiesFrame),
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(Sock, C0),

ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 1, sasl_handshake})),
{{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(Sock, C1),
Username = <<"guest">>,
Password = <<"guest">>,
Null = 0,
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})),
{{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(Sock, C2),
{{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(Sock, C3),

ok = gen_tcp:send(Sock, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
ok = gen_tcp:send(Sock, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(Sock, C4),
{ok, Sock, C5}.

create_stream(Sock, C0, Stream) ->
CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
ok = gen_tcp:send(Sock, CreateStreamFrame),
{{response, 1, {create_stream, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
{ok, C1}.

declare_publisher(Sock, C0, Stream, PublisherId) ->
DeclarePublisherFrame = rabbit_stream_core:frame({request, 1, {declare_publisher, PublisherId, <<>>, Stream}}),
ok = gen_tcp:send(Sock, DeclarePublisherFrame),
{{response, 1, {declare_publisher, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
{ok, C1}.

subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
SubscribeFrame = rabbit_stream_core:frame({request, 1, {subscribe, SubscriptionId, Stream, _OffsetSpec = first, InitialCredit, _Props = #{}}}),
ok = gen_tcp:send(Sock, SubscribeFrame),
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
{ok, C1}.

publish(Sock, C0, PublisherId, Sequence0, Payloads) ->
SeqIds = lists:seq(Sequence0, Sequence0 + length(Payloads) - 1),
Messages = [simple_entry(Seq, P)
|| {Seq, P} <- lists:zip(SeqIds, Payloads)],
{ok, SeqIds, C1} = publish_entries(Sock, C0, PublisherId, length(Messages), Messages),
{ok, C1}.

publish_entries(Sock, C0, PublisherId, MsgCount, Messages) ->
PublishFrame1 = rabbit_stream_core:frame({publish, PublisherId, MsgCount, Messages}),
ok = gen_tcp:send(Sock, PublishFrame1),
{{publish_confirm, PublisherId, SeqIds}, C1} = receive_stream_commands(Sock, C0),
{ok, SeqIds, C1}.

%% Streams contain AMQP 1.0 encoded messages.
%% In this case, the AMQP 1.0 encoded message contains a single data section.
simple_entry(Sequence, Body)
when is_binary(Body) ->
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
DataSectSize = byte_size(DataSect),
<<Sequence:64, 0:1, DataSectSize:31, DataSect:DataSectSize/binary>>.

%% Streams contain AMQP 1.0 encoded messages.
%% In this case, the AMQP 1.0 encoded message consists of an application-properties section and a data section.
simple_entry(Sequence, Body, AppProps)
when is_binary(Body) ->
AppPropsSect = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
DataSect = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
Sects = <<AppPropsSect/binary, DataSect/binary>>,
SectSize = byte_size(Sects),
<<Sequence:64, 0:1, SectSize:31, Sects:SectSize/binary>>.

%% Here, each AMQP 1.0 encoded message consists of an application-properties section and a data section.
%% All data sections are delivered uncompressed in 1 batch.
sub_batch_entry_uncompressed(Sequence, Bodies) ->
Batch = lists:foldl(fun(Body, Acc) ->
AppProps = #'v1_0.application_properties'{
content = [{{utf8, <<"my key">>}, {utf8, <<"my value">>}}]},
Sect0 = iolist_to_binary(amqp10_framing:encode_bin(AppProps)),
Sect1 = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
Sect = <<Sect0/binary, Sect1/binary>>,
<<Acc/binary, 0:1, (byte_size(Sect)):31, Sect/binary>>
end, <<>>, Bodies),
Size = byte_size(Batch),
<<Sequence:64, 1:1, 0:3, 0:4, (length(Bodies)):16, Size:32, Size:32, Batch:Size/binary>>.

%% Here, each AMQP 1.0 encoded message contains a single data section.
%% All data sections are delivered in 1 gzip compressed batch.
sub_batch_entry_compressed(Sequence, Bodies) ->
Uncompressed = lists:foldl(fun(Body, Acc) ->
Bin = iolist_to_binary(amqp10_framing:encode_bin(#'v1_0.data'{content = Body})),
<<Acc/binary, Bin/binary>>
end, <<>>, Bodies),
Compressed = zlib:gzip(Uncompressed),
CompressedLen = byte_size(Compressed),
<<Sequence:64, 1:1, 1:3, 0:4, (length(Bodies)):16, (byte_size(Uncompressed)):32,
CompressedLen:32, Compressed:CompressedLen/binary>>.

receive_stream_commands(Sock, C0) ->
case rabbit_stream_core:next_command(C0) of
empty ->
case gen_tcp:recv(Sock, 0, 5000) of
{ok, Data} ->
C1 = rabbit_stream_core:incoming_data(Data, C0),
case rabbit_stream_core:next_command(C1) of
empty ->
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
rabbit_stream_core:next_command(
rabbit_stream_core:incoming_data(Data2, C1));
Res ->
Res
end;
{error, Err} ->
ct:fail("error receiving stream data ~w", [Err])
end;
Res ->
Res
end.
2 changes: 1 addition & 1 deletion deps/rabbitmq_prometheus/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PROJECT_DESCRIPTION = Prometheus metrics for RabbitMQ
PROJECT_MOD := rabbit_prometheus_app
DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch
BUILD_DEPS = amqp_client rabbit_common rabbitmq_management
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters rabbitmq_stream

EUNIT_OPTS = no_tty, {report, {eunit_progress, [colored, profile]}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
]},

%% the family name for this metric is stream_consumer_metrics but the real table used for data is rabbit_stream_consumer_created.
{stream_consumer_metrics, [
{2, undefined, stream_consumer_max_offset_lag, gauge, "Current maximum of offset lag of consumers"}
]},

{connection_metrics, [
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
{2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
Expand Down Expand Up @@ -578,6 +583,17 @@ get_data(channel_metrics = Table, false, _) ->
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
{global_prefetch_count, A7}]}];
get_data(stream_consumer_metrics = MF, false, _) ->
Table = rabbit_stream_consumer_created, %% real table name
try ets:foldl(fun({_, Props}, OldMax) ->
erlang:max(proplists:get_value(offset_lag, Props, 0), OldMax)
end, 0, Table) of
MaxOffsetLag ->
[{MF, MaxOffsetLag}]
catch error:badarg ->
%% rabbitmq_stream plugin is not enabled
[]
end;
get_data(queue_consumer_count = MF, false, VHostsFilter) ->
Table = queue_metrics, %% Real table name
{_, A1} = ets:foldl(fun
Expand Down Expand Up @@ -708,6 +724,22 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics
end, [], Table);
get_data(queue_consumer_count, true, _) ->
ets:tab2list(queue_metrics);
get_data(stream_consumer_metrics, true, _) ->
Table = rabbit_stream_consumer_created, %% real table name
try ets:foldl(fun({{QueueName, _Pid, _SubId}, Props}, Map0) ->
Value = proplists:get_value(offset_lag, Props, 0),
maps:update_with(
QueueName,
fun(OldMax) -> erlang:max(Value, OldMax) end,
Value,
Map0)
end, #{}, Table) of
Map1 ->
maps:to_list(Map1)
catch error:badarg ->
%% rabbitmq_stream plugin is not enabled
[]
end;
get_data(vhost_status, _, _) ->
[ { #{<<"vhost">> => VHost},
case rabbit_vhost_sup_sup:is_vhost_alive(VHost) of
Expand Down
61 changes: 59 additions & 2 deletions deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile(export_all).
-compile([export_all, nowarn_export_all]).

all() ->
[
Expand Down Expand Up @@ -70,7 +71,8 @@ groups() ->
queue_consumer_count_and_queue_metrics_mutually_exclusive_test,
vhost_status_metric,
exchange_bindings_metric,
exchange_names_metric
exchange_names_metric,
stream_pub_sub_metrics
]},
{special_chars, [], [core_metrics_special_chars]},
{authentication, [], [basic_auth]}
Expand Down Expand Up @@ -739,6 +741,37 @@ exchange_names_metric(Config) ->
}, Names),
ok.

stream_pub_sub_metrics(Config) ->
Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1",
MsgPerBatch1 = 2,
publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2",
MsgPerBatch2 = 3,
publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),

%% aggregated metrics

%% wait for the stream to emit stats
%% (collect_statistics_interval set to 100ms in this test group)
?awaitMatch(V when V == #{rabbitmq_stream_consumer_max_offset_lag => #{undefined => [3]}},
begin
{_, Body1} = http_get_with_pal(Config, "/metrics", [], 200),
maps:with([rabbitmq_stream_consumer_max_offset_lag],
parse_response(Body1))
end,
100),

%% per-object metrics
{_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics",
[], 200),
ParsedBody2 = parse_response(Body2),
#{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag} = ParsedBody2,

?assertEqual([{#{vhost => "/", queue => Stream1}, [2]},
{#{vhost => "/", queue => Stream2}, [3]}],
lists:sort(maps:to_list(MaxOffsetLag))),
ok.

core_metrics_special_chars(Config) ->
{_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=queue_coarse_metrics", [], 200),
?assertMatch(#{rabbitmq_detailed_queue_messages :=
Expand Down Expand Up @@ -784,6 +817,30 @@ basic_auth(Config) ->
rabbit_ct_broker_helpers:delete_user(Config, <<"monitor">>),
rabbit_ct_broker_helpers:delete_user(Config, <<"management">>).

%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------

publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
{ok, S, C0} = stream_test_utils:connect(Config, 0),
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
PublisherId = 98,
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
Payloads = lists:duplicate(MsgPerBatch, <<"m1">>),
SequenceFrom1 = 1,
{ok, C3} = stream_test_utils:publish(S, C2, PublisherId, SequenceFrom1, Payloads),

PublisherId2 = 99,
{ok, C4} = stream_test_utils:declare_publisher(S, C3, Stream, PublisherId2),
Payloads2 = lists:duplicate(MsgPerBatch, <<"m2">>),
SequenceFrom2 = SequenceFrom1 + MsgPerBatch,
{ok, C5} = stream_test_utils:publish(S, C4, PublisherId2, SequenceFrom2, Payloads2),

SubscriptionId = 97,
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1),
%% delivery of first batch of messages
{{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6),
ok.

http_get(Config, ReqHeaders, CodeExp) ->
Path = proplists:get_value(prometheus_path, Config, "/metrics"),
Expand Down
Loading
Loading