Skip to content

Commit

Permalink
Add test for stream consumer max offset lag prometheus metric
Browse files Browse the repository at this point in the history
  • Loading branch information
gomoripeti committed Nov 19, 2024
1 parent e82058e commit 0c76054
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 3 deletions.
2 changes: 1 addition & 1 deletion deps/rabbitmq_prometheus/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ PROJECT_DESCRIPTION = Prometheus metrics for RabbitMQ
PROJECT_MOD := rabbit_prometheus_app
DEPS = accept cowboy rabbit rabbitmq_management_agent prometheus rabbitmq_web_dispatch
BUILD_DEPS = amqp_client rabbit_common rabbitmq_management
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers eunit_formatters rabbitmq_stream

EUNIT_OPTS = no_tty, {report, {eunit_progress, [colored, profile]}}

Expand Down
61 changes: 59 additions & 2 deletions deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile(export_all).
-compile([export_all, nowarn_export_all]).

all() ->
[
Expand Down Expand Up @@ -70,7 +71,8 @@ groups() ->
queue_consumer_count_and_queue_metrics_mutually_exclusive_test,
vhost_status_metric,
exchange_bindings_metric,
exchange_names_metric
exchange_names_metric,
stream_pub_sub_metrics
]},
{special_chars, [], [core_metrics_special_chars]},
{authentication, [], [basic_auth]}
Expand Down Expand Up @@ -739,6 +741,37 @@ exchange_names_metric(Config) ->
}, Names),
ok.

stream_pub_sub_metrics(Config) ->
Stream1 = atom_to_list(?FUNCTION_NAME) ++ "1",
MsgPerBatch1 = 2,
publish_via_stream_protocol(list_to_binary(Stream1), MsgPerBatch1, Config),
Stream2 = atom_to_list(?FUNCTION_NAME) ++ "2",
MsgPerBatch2 = 3,
publish_via_stream_protocol(list_to_binary(Stream2), MsgPerBatch2, Config),

%% aggregated metrics

%% wait for the stream to emit stats
%% (collect_statistics_interval set to 100ms in this test group)
?awaitMatch(V when V == #{rabbitmq_stream_consumer_max_offset_lag => #{undefined => [3]}},
begin
{_, Body1} = http_get_with_pal(Config, "/metrics", [], 200),
maps:with([rabbitmq_stream_consumer_max_offset_lag],
parse_response(Body1))
end,
100),

%% per-object metrics
{_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=stream_consumer_metrics",
[], 200),
ParsedBody2 = parse_response(Body2),
#{rabbitmq_detailed_stream_consumer_max_offset_lag := MaxOffsetLag} = ParsedBody2,

?assertEqual([{#{vhost => "/", queue => Stream1}, [2]},
{#{vhost => "/", queue => Stream2}, [3]}],
lists:sort(maps:to_list(MaxOffsetLag))),
ok.

core_metrics_special_chars(Config) ->
{_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=queue_coarse_metrics", [], 200),
?assertMatch(#{rabbitmq_detailed_queue_messages :=
Expand Down Expand Up @@ -784,6 +817,30 @@ basic_auth(Config) ->
rabbit_ct_broker_helpers:delete_user(Config, <<"monitor">>),
rabbit_ct_broker_helpers:delete_user(Config, <<"management">>).

%% -------------------------------------------------------------------
%% Helpers
%% -------------------------------------------------------------------

publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
{ok, S, C0} = stream_test_utils:connect(Config, 0),
{ok, C1} = stream_test_utils:create_stream(S, C0, Stream),
PublisherId = 98,
{ok, C2} = stream_test_utils:declare_publisher(S, C1, Stream, PublisherId),
Payloads = lists:duplicate(MsgPerBatch, <<"m1">>),
SequenceFrom1 = 1,
{ok, C3} = stream_test_utils:publish(S, C2, PublisherId, SequenceFrom1, Payloads),

PublisherId2 = 99,
{ok, C4} = stream_test_utils:declare_publisher(S, C3, Stream, PublisherId2),
Payloads2 = lists:duplicate(MsgPerBatch, <<"m2">>),
SequenceFrom2 = SequenceFrom1 + MsgPerBatch,
{ok, C5} = stream_test_utils:publish(S, C4, PublisherId2, SequenceFrom2, Payloads2),

SubscriptionId = 97,
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1),
%% delivery of first batch of messages
{{deliver, SubscriptionId, _Bin1}, _C7} = stream_test_utils:receive_stream_commands(S, C6),
ok.

http_get(Config, ReqHeaders, CodeExp) ->
Path = proplists:get_value(prometheus_path, Config, "/metrics"),
Expand Down

0 comments on commit 0c76054

Please sign in to comment.