Skip to content

Commit

Permalink
Apply PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Feb 28, 2024
1 parent 53e9407 commit afd28ba
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 17 deletions.
16 changes: 16 additions & 0 deletions deps/amqp10_common/src/serial_number.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
-export([add/2,
compare/2,
ranges/1,
in_range/3,
diff/2,
foldl/4]).

Expand Down Expand Up @@ -86,6 +87,21 @@ ranges0([H | Rest], [{First, Last} | AccRest] = Acc0) ->
ranges0(Rest, Acc)
end.

-spec in_range(serial_number(), serial_number(), serial_number()) ->
boolean().
in_range(S, First, Last) ->
case compare(S, First) of
less ->
false;
_ ->
case compare(S, Last) of
greater ->
false;
_ ->
true
end
end.

-define(SERIAL_DIFF_BOUND, 16#80000000).
-spec diff(serial_number(), serial_number()) -> integer().
diff(A, B) ->
Expand Down
27 changes: 27 additions & 0 deletions deps/amqp10_common/test/serial_number_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
compare/2,
usort/1,
ranges/1,
in_range/3,
diff/2,
foldl/4]).

all() -> [test_add,
test_compare,
test_usort,
test_ranges,
test_in_range,
test_diff,
test_foldl].

Expand Down Expand Up @@ -96,6 +98,31 @@ test_ranges(_Config) ->
?assertEqual([{4294967294, 1}, {3, 5}, {10, 10}, {18, 19}],
ranges([1, 10, 4294967294, 0, 3, 4, 5, 19, 18, 4294967295])).

test_in_range(_Config) ->
?assert(in_range(0, 0, 0)),
?assert(in_range(0, 0, 1)),
?assert(in_range(4294967295, 4294967295, 4294967295)),
?assert(in_range(4294967295, 4294967295, 0)),
?assert(in_range(0, 4294967295, 0)),
?assert(in_range(4294967230, 4294967200, 1000)),
?assert(in_range(88, 4294967200, 1000)),

?assertNot(in_range(1, 0, 0)),
?assertNot(in_range(4294967295, 0, 0)),
?assertNot(in_range(0, 1, 1)),
?assertNot(in_range(10, 1, 9)),
?assertNot(in_range(1005, 4294967200, 1000)),
?assertNot(in_range(4294967190, 4294967200, 1000)),

%% Pass wrong First and Last.
?assertNot(in_range(1, 3, 2)),
?assertNot(in_range(2, 3, 2)),
?assertNot(in_range(3, 3, 2)),
?assertNot(in_range(4, 3, 2)),

?assertExit({undefined_serial_comparison, 0, 16#80000000},
in_range(0, 16#80000000, 16#80000000)).

test_diff(_Config) ->
?assertEqual(0, diff(0, 0)),
?assertEqual(0, diff(1, 1)),
Expand Down
40 changes: 23 additions & 17 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

-module(rabbit_amqp_session).

-compile({inline, [maps_update_with/4]}).

-behaviour(gen_server).

-include_lib("rabbit_common/include/rabbit.hrl").
Expand Down Expand Up @@ -1061,20 +1063,16 @@ handle_control(#'v1_0.disposition'{role = ?RECV_ROLE,
consumer_tag = Ctag,
msg_id = MsgId} = Unsettled,
{SettledAcc, UnsettledAcc}) ->
DeliveryIdComparedToFirst = compare(DeliveryId, First),
DeliveryIdComparedToLast = compare(DeliveryId, Last),
if DeliveryIdComparedToFirst =:= less orelse
DeliveryIdComparedToLast =:= greater ->
%% Delivery ID is outside the DISPOSITION range.
{SettledAcc, UnsettledAcc#{DeliveryId => Unsettled}};
true ->
%% Delivery ID is inside the DISPOSITION range.
SettledAcc1 = maps:update_with(
{QName, Ctag},
fun(MsgIds) -> [MsgId | MsgIds] end,
[MsgId],
SettledAcc),
{SettledAcc1, UnsettledAcc}
case serial_number:in_range(DeliveryId, First, Last) of
true ->
SettledAcc1 = maps_update_with(
{QName, Ctag},
fun(MsgIds) -> [MsgId | MsgIds] end,
[MsgId],
SettledAcc),
{SettledAcc1, UnsettledAcc};
false ->
{SettledAcc, UnsettledAcc#{DeliveryId => Unsettled}}
end
end,
{#{}, #{}}, UnsettledMap)
Expand Down Expand Up @@ -1209,19 +1207,19 @@ session_flow_control_sent_transfers(
State#state{remote_incoming_window = RemoteIncomingWindow - NumTransfers,
next_outgoing_id = add(NextOutgoingId, NumTransfers)}.

settle_delivery_id(Current, {Settled, Unsettled}) ->
settle_delivery_id(Current, {Settled, Unsettled} = Acc) ->
case maps:take(Current, Unsettled) of
{#outgoing_unsettled{queue_name = QName,
consumer_tag = Ctag,
msg_id = MsgId}, Unsettled1} ->
Settled1 = maps:update_with(
Settled1 = maps_update_with(
{QName, Ctag},
fun(MsgIds) -> [MsgId | MsgIds] end,
[MsgId],
Settled),
{Settled1, Unsettled1};
error ->
{Settled, Unsettled}
Acc
end.

settle_op_from_outcome(#'v1_0.accepted'{}) ->
Expand Down Expand Up @@ -2276,6 +2274,14 @@ check_user_id(Mc, User) ->
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Reason, Args)
end.

maps_update_with(Key, Fun, Init, Map) ->
case Map of
#{Key := Value} ->
Map#{Key := Fun(Value)};
_ ->
Map#{Key => Init}
end.

format_status(
#{state := #state{cfg = Cfg,
outgoing_pending = OutgoingPending,
Expand Down

0 comments on commit afd28ba

Please sign in to comment.