Skip to content

Commit

Permalink
Fix message IDs settlement order
Browse files Browse the repository at this point in the history
 ## What?

This commit fixes issues that were present only on `main`
branch and were introduced by #9022.

1. Classic queues (specifically `rabbit_queue_consumers:subtract_acks/3`)
   expect message IDs to be (n)acked in the order as they were delivered
   to the channel / session proc.
   Hence, the `lists:usort(MsgIds0)` in `rabbit_classic_queue:settle/5`
   was wrong causing not all messages to be acked adding a regression
   to also AMQP 0.9.1.
2. The order in which the session proc requeues or rejects multiple
   message IDs at once is important. For example, if the client sends a
   DISPOSITION with first=3 and last=5, the message IDs corresponding to
   delivery IDs 3,4,5 must be requeued or rejected in exactly that
   order.
   For example, quorum queues use this order of message IDs in
   https://github.com/rabbitmq/rabbitmq-server/blob/34d3f943742bdcf7d34859edff8d45f35e4007d4/deps/rabbit/src/rabbit_fifo.erl#L226-L234
   to dead letter in that order.

 ## How?

The session proc will settle (internal) message IDs to queues in ascending
(AMQP) delivery ID order, i.e. in the order messages were sent to the
client and in the order messages were settled by the client.

This commit chooses to keep the session's outgoing_unsettled_map map
data structure.

An alternative would have been to use a queue or lqueue for the
outgoing_unsettled_map as done in
* https://github.com/rabbitmq/rabbitmq-server/blob/34d3f943742bdcf7d34859edff8d45f35e4007d4/deps/rabbit/src/rabbit_channel.erl#L135
* https://github.com/rabbitmq/rabbitmq-server/blob/34d3f943742bdcf7d34859edff8d45f35e4007d4/deps/rabbit/src/rabbit_queue_consumers.erl#L43

Whether a queue (as done by `rabbit_channel`) or a map (as done by
`rabbit_amqp_session`) performs better depends on the pattern how
clients ack messages.

A queue will likely perform good enough because usually the oldest
delivered messages will be acked first.
However, given that there can be many different consumers on an AQMP
0.9.1 channel or AMQP 1.0 session, this commit favours a map because
it will likely generate less garbage and is very efficient when for
example a single new message (or few new messages) gets acked while
many (older) messages are still checked out by the session (but by
possibly different AMQP 1.0 receivers).
  • Loading branch information
ansd committed Jun 26, 2024
1 parent 4586236 commit dcb2fe0
Show file tree
Hide file tree
Showing 10 changed files with 733 additions and 16 deletions.
5 changes: 1 addition & 4 deletions deps/amqp10_common/src/serial_number.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ compare(A, B) ->
[serial_number()].
usort(L) ->
lists:usort(fun(A, B) ->
case compare(A, B) of
greater -> false;
_ -> true
end
compare(A, B) =/= greater
end, L).

%% Takes a list of serial numbers and returns tuples
Expand Down
4 changes: 4 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ rabbitmq_integration_suite(
shard_count = 6,
)

rabbitmq_integration_suite(
name = "amqpl_consumer_ack_SUITE",
)

rabbitmq_integration_suite(
name = "message_containers_deaths_v2_SUITE",
size = "medium",
Expand Down
9 changes: 9 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2134,3 +2134,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "amqpl_consumer_ack_SUITE_beam_files",
testonly = True,
srcs = ["test/amqpl_consumer_ack_SUITE.erl"],
outs = ["test/amqpl_consumer_ack_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
)
11 changes: 8 additions & 3 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1307,9 +1307,13 @@ handle_control(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
case DispositionRangeSize =< UnsettledMapSize of
true ->
%% It is cheaper to iterate over the range of settled delivery IDs.
serial_number:foldl(fun settle_delivery_id/2, {#{}, UnsettledMap0}, First, Last);
serial_number:foldl(fun settle_delivery_id/2,
{#{}, UnsettledMap0},
First, Last);
false ->
%% It is cheaper to iterate over the outgoing unsettled map.
Iter = maps:iterator(UnsettledMap0,
fun(D1, D2) -> compare(D1, D2) =/= greater end),
{Settled0, UnsettledList} =
maps:fold(
fun (DeliveryId,
Expand All @@ -1329,14 +1333,15 @@ handle_control(#'v1_0.disposition'{role = ?AMQP_ROLE_RECEIVER,
{SettledAcc, [{DeliveryId, Unsettled} | UnsettledAcc]}
end
end,
{#{}, []}, UnsettledMap0),
{#{}, []}, Iter),
{Settled0, maps:from_list(UnsettledList)}
end,

SettleOp = settle_op_from_outcome(Outcome),
{QStates, Actions} =
maps:fold(
fun({QName, Ctag}, MsgIds, {QS0, ActionsAcc}) ->
fun({QName, Ctag}, MsgIdsRev, {QS0, ActionsAcc}) ->
MsgIds = lists:reverse(MsgIdsRev),
case rabbit_queue_type:settle(QName, SettleOp, Ctag, MsgIds, QS0) of
{ok, QS, Actions0} ->
messages_acknowledged(SettleOp, QName, QS, MsgIds),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1972,7 +1972,7 @@ collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) ->
end.

%% Settles (acknowledges) messages at the queue replica process level.
%% This happens in the youngest-first order (ascending by delivery tag).
%% This happens in the oldest-first order (ascending by delivery tag).
settle_acks(Acks, State = #ch{queue_states = QueueStates0}) ->
{QueueStates, Actions} =
foreach_per_queue(
Expand Down
4 changes: 1 addition & 3 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,7 @@ cancel(Q, ConsumerTag, OkMsg, ActingUser, State) ->
-spec settle(rabbit_amqqueue:name(), rabbit_queue_type:settle_op(),
rabbit_types:ctag(), [non_neg_integer()], state()) ->
{state(), rabbit_queue_type:actions()}.
settle(_QName, Op, _CTag, MsgIds0, State = #?STATE{pid = Pid}) ->
%% Classic queues expect message IDs in sorted order.
MsgIds = lists:usort(MsgIds0),
settle(_QName, Op, _CTag, MsgIds, State = #?STATE{pid = Pid}) ->
Arg = case Op of
complete ->
{ack, MsgIds, self()};
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_queue_consumers.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
%% channel record
-record(cr, {ch_pid,
monitor_ref,
acktags,
consumer_count,
acktags :: ?QUEUE:?QUEUE({ack(), rabbit_types:ctag() | none}),
consumer_count :: non_neg_integer(),
%% Queue of {ChPid, #consumer{}} for consumers which have
%% been blocked (rate/prefetch limited) for any reason
blocked_consumers,
Expand Down
Loading

0 comments on commit dcb2fe0

Please sign in to comment.