Skip to content

Commit

Permalink
Merge pull request #11711 from rabbitmq/mergify/bp/v4.0.x/pr-11705
Browse files Browse the repository at this point in the history
Support consumer priority in AMQP (backport #11705)
  • Loading branch information
michaelklishin authored Jul 13, 2024
2 parents 9abe954 + 38b7726 commit efae3e5
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 17 deletions.
3 changes: 1 addition & 2 deletions deps/amqp10_client/src/amqp10_client_types.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ utf8(B) when is_binary(B) -> {utf8, B}.
uint(N) -> {uint, N}.

make_properties(#{properties := Props})
when is_map(Props) andalso
map_size(Props) > 0 ->
when map_size(Props) > 0 ->
{map, maps:fold(fun(K, V, L) ->
[{{symbol, K}, V} | L]
end, [], Props)};
Expand Down
43 changes: 30 additions & 13 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
mode => Mode,
consumer_tag => handle_to_ctag(HandleInt),
exclusive_consume => false,
args => source_filters_to_consumer_args(Source),
args => consumer_arguments(Attach),
ok_msg => undefined,
acting_user => Username},
case rabbit_queue_type:consume(Q, Spec, QStates0) of
Expand Down Expand Up @@ -2852,19 +2852,36 @@ encode_frames(T, Msg, MaxPayloadSize, Transfers) ->
lists:reverse([[T, Msg] | Transfers])
end.

source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
source_filters_to_consumer_args(
consumer_arguments(#'v1_0.attach'{
source = #'v1_0.source'{filter = Filter},
properties = Properties}) ->
properties_to_consumer_args(Properties) ++
filter_to_consumer_args(Filter).

properties_to_consumer_args({map, KVList}) ->
Key = {symbol, <<"rabbitmq:priority">>},
case proplists:lookup(Key, KVList) of
{Key, Val = {int, _Prio}} ->
[mc_amqpl:to_091(<<"x-priority">>, Val)];
_ ->
[]
end;
properties_to_consumer_args(_) ->
[].

filter_to_consumer_args({map, KVList}) ->
filter_to_consumer_args(
[<<"rabbitmq:stream-offset-spec">>,
<<"rabbitmq:stream-filter">>,
<<"rabbitmq:stream-match-unfiltered">>],
KVList,
[]);
source_filters_to_consumer_args(_Source) ->
filter_to_consumer_args(_) ->
[].

source_filters_to_consumer_args([], _KVList, Acc) ->
filter_to_consumer_args([], _KVList, Acc) ->
Acc;
source_filters_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVList, Acc) ->
filter_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVList, Acc) ->
Key = {symbol, H},
Arg = case keyfind_unpack_described(Key, KVList) of
{_, {timestamp, Ts}} ->
Expand All @@ -2876,8 +2893,8 @@ source_filters_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVL
_ ->
[]
end,
source_filters_to_consumer_args(T, KVList, Arg ++ Acc);
source_filters_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList, Acc) ->
filter_to_consumer_args(T, KVList, Arg ++ Acc);
filter_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList, Acc) ->
Key = {symbol, H},
Arg = case keyfind_unpack_described(Key, KVList) of
{_, {list, Filters0}} when is_list(Filters0) ->
Expand All @@ -2892,18 +2909,18 @@ source_filters_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList,
_ ->
[]
end,
source_filters_to_consumer_args(T, KVList, Arg ++ Acc);
source_filters_to_consumer_args([<<"rabbitmq:stream-match-unfiltered">> = H | T], KVList, Acc) ->
filter_to_consumer_args(T, KVList, Arg ++ Acc);
filter_to_consumer_args([<<"rabbitmq:stream-match-unfiltered">> = H | T], KVList, Acc) ->
Key = {symbol, H},
Arg = case keyfind_unpack_described(Key, KVList) of
{_, MU} when is_boolean(MU) ->
[{<<"x-stream-match-unfiltered">>, bool, MU}];
_ ->
[]
end,
source_filters_to_consumer_args(T, KVList, Arg ++ Acc);
source_filters_to_consumer_args([_ | T], KVList, Acc) ->
source_filters_to_consumer_args(T, KVList, Acc).
filter_to_consumer_args(T, KVList, Arg ++ Acc);
filter_to_consumer_args([_ | T], KVList, Acc) ->
filter_to_consumer_args(T, KVList, Acc).

keyfind_unpack_described(Key, KvList) ->
%% filterset values _should_ be described values
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_fifo.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
%% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
credit_mode :: credit_mode(), % part of snapshot data
lifetime = once :: once | auto,
priority = 0 :: non_neg_integer()}).
priority = 0 :: integer()}).

-record(consumer,
{cfg = #consumer_cfg{},
Expand Down
92 changes: 91 additions & 1 deletion deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ groups() ->
stop_classic_queue,
stop_quorum_queue,
stop_stream,
consumer_priority_classic_queue,
consumer_priority_quorum_queue,
single_active_consumer_classic_queue,
single_active_consumer_quorum_queue,
detach_requeues_one_session_classic_queue,
Expand Down Expand Up @@ -1841,6 +1843,95 @@ stop(QType, Config) ->
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
ok = rabbit_ct_client_helpers:close_channel(Ch).

consumer_priority_classic_queue(Config) ->
consumer_priority(<<"classic">>, Config).

consumer_priority_quorum_queue(Config) ->
consumer_priority(<<"quorum">>, Config).

consumer_priority(QType, Config) ->
QName = atom_to_binary(?FUNCTION_NAME),
{Connection, Session, LinkPair} = init(Config),
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QType}}},
{ok, #{type := QType}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),

Address = rabbitmq_amqp_address:queue(QName),
{ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, Address),
ok = wait_for_credit(Sender),

%% We test what our RabbitMQ docs state:
%% "Consumers which do not specify a value have priority 0.
%% Larger numbers indicate higher priority, and both positive and negative numbers can be used."
{ok, ReceiverDefaultPrio} = amqp10_client:attach_receiver_link(
Session,
<<"default prio consumer">>,
Address,
unsettled),
{ok, ReceiverHighPrio} = amqp10_client:attach_receiver_link(
Session,
<<"high prio consumer">>,
Address,
unsettled,
none,
#{},
#{<<"rabbitmq:priority">> => {int, 2_000_000_000}}),
{ok, ReceiverLowPrio} = amqp10_client:attach_receiver_link(
Session,
<<"low prio consumer">>,
Address,
unsettled,
none,
#{},
#{<<"rabbitmq:priority">> => {int, -2_000_000_000}}),
ok = amqp10_client:flow_link_credit(ReceiverDefaultPrio, 1, never),
ok = amqp10_client:flow_link_credit(ReceiverHighPrio, 2, never),
ok = amqp10_client:flow_link_credit(ReceiverLowPrio, 1, never),

NumMsgs = 5,
[begin
Bin = integer_to_binary(N),
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(Bin, Bin))
end || N <- lists:seq(1, NumMsgs)],
ok = wait_for_accepts(NumMsgs),

receive {amqp10_msg, Rec1, Msg1} ->
?assertEqual(<<"1">>, amqp10_msg:body_bin(Msg1)),
?assertEqual(ReceiverHighPrio, Rec1),
ok = amqp10_client:accept_msg(Rec1, Msg1)
after 5000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, Rec2, Msg2} ->
?assertEqual(<<"2">>, amqp10_msg:body_bin(Msg2)),
?assertEqual(ReceiverHighPrio, Rec2),
ok = amqp10_client:accept_msg(Rec2, Msg2)
after 5000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, Rec3, Msg3} ->
?assertEqual(<<"3">>, amqp10_msg:body_bin(Msg3)),
?assertEqual(ReceiverDefaultPrio, Rec3),
ok = amqp10_client:accept_msg(Rec3, Msg3)
after 5000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, Rec4, Msg4} ->
?assertEqual(<<"4">>, amqp10_msg:body_bin(Msg4)),
?assertEqual(ReceiverLowPrio, Rec4),
ok = amqp10_client:accept_msg(Rec4, Msg4)
after 5000 -> ct:fail({missing_msg, ?LINE})
end,
receive {amqp10_msg, _, _} = Unexpected ->
ct:fail({unexpected_msg, Unexpected, ?LINE})
after 5 -> ok
end,

ok = amqp10_client:detach_link(Sender),
ok = amqp10_client:detach_link(ReceiverDefaultPrio),
ok = amqp10_client:detach_link(ReceiverHighPrio),
ok = amqp10_client:detach_link(ReceiverLowPrio),
{ok, #{message_count := 1}} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).

single_active_consumer_classic_queue(Config) ->
single_active_consumer(<<"classic">>, Config).

Expand Down Expand Up @@ -4899,7 +4990,6 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) ->
ok = end_session_sync(Session),
ok = amqp10_client:close_connection(Connection).


%% internal
%%

Expand Down

0 comments on commit efae3e5

Please sign in to comment.