Skip to content

Commit

Permalink
Protect receiving app from being overloaded
Browse files Browse the repository at this point in the history
What?

Protect receiving application from being overloaded with new messages
while still processing existing messages if the auto credit renewal
feature of the Erlang AMQP 1.0 client library is used.

This feature can therefore be thought of as a prefetch window equivalent
in AMQP 0.9.1 or MQTT 5.0 property Receive Maximum.

How?

The credit auto renewal feature in RabbitMQ 3.x was wrongly implemented.
This commit takes the same approach as done in the server:
The incoming_unsettled map is hold in the link instead of in the session
to accurately and quickly determine the number of unsettled messages for
a receiving link.

The amqp10_client lib will grant more credits to the sender when the sum
of remaining link credits and number of unsettled deliveries falls below
the threshold RenewWhenBelow.

This avoids maintaning additional state like the `link_credit_unsettled`
or an alternative delivery_count_settled sequence number which is more
complex to implement correctly.

This commit breaks the amqp10_client_session:disposition/6 API:
This commit forces the client application to only range settle for a
given link, i.e. not across multiple links on a given session at once.
The latter is allowed according to the AMQP spec.
  • Loading branch information
ansd committed Feb 23, 2024
1 parent d31fdb9 commit 3d67842
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 193 deletions.
3 changes: 0 additions & 3 deletions deps/amqp10_client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ include erlang.mk
HEX_TARBALL_FILES += rabbitmq-components.mk \
git-revisions.txt

# Dialyze the tests.
DIALYZER_OPTS += --src -r test

# --------------------------------------------------------------------
# ActiveMQ for the testsuite.
# --------------------------------------------------------------------
Expand Down
34 changes: 21 additions & 13 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -301,16 +301,19 @@ attach_link(Session, AttachArgs) ->
%% This is asynchronous and will notify completion of the attach request to the
%% caller using an amqp10_event of the following format:
%% {amqp10_event, {link, LinkRef, {detached, Why}}}
-spec detach_link(link_ref()) -> _.
-spec detach_link(link_ref()) -> ok | {error, term()}.
detach_link(#link_ref{link_handle = Handle, session = Session}) ->
amqp10_client_session:detach(Session, Handle).

%% @doc Grant credit to a sender.
%% The amqp10_client will automatically grant Credit to the sender when
%% the remaining link credit falls below the value of RenewWhenBelow.
%% If RenewWhenBelow is 'never' the client will never grant more credit. Instead
%% the caller will be notified when the link_credit reaches 0 with an
%% amqp10_event of the following format:
%% @doc Grant Credit to a sender.
%%
%% In addition, if RenewWhenBelow is an integer, the amqp10_client will automatically grant more
%% Credit to the sender when the sum of the remaining link credit and the number of unsettled
%% messages falls below the value of RenewWhenBelow.
%% `Credit + RenewWhenBelow - 1` is the maximum number of in-flight unsettled messages.
%%
%% If RenewWhenBelow is `never` the amqp10_client will never grant more credit. Instead the caller
%% will be notified when the link_credit reaches 0 with an amqp10_event of the following format:
%% {amqp10_event, {link, LinkRef, credit_exhausted}}
-spec flow_link_credit(link_ref(), Credit :: non_neg_integer(),
RenewWhenBelow :: never | pos_integer()) -> ok.
Expand All @@ -323,10 +326,16 @@ flow_link_credit(Ref, Credit, RenewWhenBelow) ->
flow_link_credit(#link_ref{role = receiver, session = Session,
link_handle = Handle},
Credit, RenewWhenBelow, Drain)
when RenewWhenBelow =:= never orelse
when
%% Drain together with auto renewal doesn't make sense, so disallow it in the API.
((Drain) andalso RenewWhenBelow =:= never
orelse not(Drain))
andalso
%% Check that the RenewWhenBelow value make sense.
(RenewWhenBelow =:= never orelse
is_integer(RenewWhenBelow) andalso
RenewWhenBelow > 0 andalso
RenewWhenBelow =< Credit ->
RenewWhenBelow =< Credit) ->
Flow = #'v1_0.flow'{link_credit = {uint, Credit},
drain = Drain},
ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow).
Expand Down Expand Up @@ -359,11 +368,10 @@ accept_msg(LinkRef, Msg) ->
%% the chosen delivery state.
-spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(),
amqp10_client_types:delivery_state()) -> ok.
settle_msg(#link_ref{role = receiver,
session = Session}, Msg, Settlement) ->
settle_msg(LinkRef, Msg, Settlement) ->
DeliveryId = amqp10_msg:delivery_id(Msg),
amqp10_client_session:disposition(Session, receiver, DeliveryId,
DeliveryId, true, Settlement).
amqp10_client_session:disposition(LinkRef, DeliveryId, DeliveryId, true, Settlement).

%% @doc Get a single message from a link.
%% Flows a single link credit then awaits delivery or timeout.
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.
Expand Down
1 change: 1 addition & 0 deletions deps/amqp10_client/src/amqp10_client.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@

-record(link_ref, {role :: sender | receiver,
session :: pid(),
%% locally chosen output handle
link_handle :: non_neg_integer()}).
115 changes: 62 additions & 53 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
detach/2,
transfer/3,
flow/4,
disposition/6
disposition/5
]).

%% Private API
Expand Down Expand Up @@ -131,8 +131,9 @@
available = 0 :: non_neg_integer(),
drain = false :: boolean(),
partial_transfers :: undefined | {#'v1_0.transfer'{}, [binary()]},
auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()}
}).
auto_flow :: never | {auto, RenewWhenBelow :: pos_integer(), Credit :: pos_integer()},
incoming_unsettled = #{} :: #{delivery_number() => ok}
}).

-record(state,
{channel :: pos_integer(),
Expand All @@ -155,7 +156,6 @@
connection_config :: amqp10_client_connection:connection_config(),
outgoing_delivery_id = ?INITIAL_OUTGOING_DELIVERY_ID :: delivery_number(),
outgoing_unsettled = #{} :: #{delivery_number() => {amqp10_msg:delivery_tag(), Notify :: pid()}},
incoming_unsettled = #{} :: #{delivery_number() => output_handle()},
notify :: pid()
}).

Expand Down Expand Up @@ -204,14 +204,18 @@ transfer(Session, Amqp10Msg, Timeout) ->
flow(Session, Handle, Flow, RenewWhenBelow) ->
gen_statem:cast(Session, {flow_link, Handle, Flow, RenewWhenBelow}).

-spec disposition(pid(), link_role(), delivery_number(), delivery_number(), boolean(),
%% Sending a disposition on a sender link (with receiver-settle-mode = second)
%% is currently unsupported.
-spec disposition(link_ref(), delivery_number(), delivery_number(), boolean(),
amqp10_client_types:delivery_state()) -> ok.
disposition(Session, Role, First, Last, Settled, DeliveryState) ->
gen_statem:call(Session, {disposition, Role, First, Last, Settled,
disposition(#link_ref{role = receiver,
session = Session,
link_handle = Handle},
First, Last, Settled, DeliveryState) ->
gen_statem:call(Session, {disposition, Handle, First, Last, Settled,
DeliveryState}, ?TIMEOUT).



%% -------------------------------------------------------------------
%% Private API.
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -277,7 +281,7 @@ mapped(cast, 'end', State) ->
send_end(State),
{next_state, end_sent, State};
mapped(cast, {flow_link, OutHandle, Flow0, RenewWhenBelow}, State0) ->
State = send_flow_link(fun send/2, OutHandle, Flow0, RenewWhenBelow, State0),
State = send_flow_link(OutHandle, Flow0, RenewWhenBelow, State0),
{keep_state, State};
mapped(cast, {flow_session, Flow0 = #'v1_0.flow'{incoming_window = {uint, IncomingWindow}}},
#state{next_incoming_id = NII,
Expand Down Expand Up @@ -367,45 +371,43 @@ mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
State = book_partial_transfer_received(
State0#state{links = Links#{OutHandle => Link1}}),
{keep_state, State};
mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
delivery_id = MaybeDeliveryId,
settled = Settled} = Transfer0, Payload0},
#state{incoming_unsettled = Unsettled0} = State0) ->

mapped(cast, {Transfer0 = #'v1_0.transfer'{handle = {uint, InHandle}},
Payload0}, State0) ->
{ok, #link{target = {pid, TargetPid},
output_handle = OutHandle,
ref = LinkRef} = Link0} =
find_link_by_input_handle(InHandle, State0),
ref = LinkRef,
incoming_unsettled = Unsettled
} = Link0} = find_link_by_input_handle(InHandle, State0),

{Transfer, Payload, Link1} = complete_partial_transfer(Transfer0, Payload0, Link0),
Msg = decode_as_msg(Transfer, Payload),

% stash the DeliveryId - not sure for what yet
Unsettled = case MaybeDeliveryId of
{uint, DeliveryId} when Settled =/= true ->
Unsettled0#{DeliveryId => OutHandle};
_ ->
Unsettled0
end,
{Transfer = #'v1_0.transfer'{settled = Settled,
delivery_id = {uint, DeliveryId}},
Payload, Link1} = complete_partial_transfer(Transfer0, Payload0, Link0),

Msg = decode_as_msg(Transfer, Payload),
Link2 = case Settled of
true ->
Link1;
_ ->
%% "If not set on the first (or only) transfer for a (multi-transfer) delivery,
%% then the settled flag MUST be interpreted as being false." [2.7.5]
Link1#link{incoming_unsettled = Unsettled#{DeliveryId => ok}}
end,
% link bookkeeping
% notify when credit is exhausted (link_credit = 0)
% detach the Link with a transfer-limit-exceeded error code if further
% transfers are received
State1 = State0#state{incoming_unsettled = Unsettled},
case book_transfer_received(State1, Link1) of
{ok, Link2, State2} ->
case book_transfer_received(State0, Link2) of
{ok, Link3, State1} ->
% deliver
TargetPid ! {amqp10_msg, LinkRef, Msg},
State = auto_flow(Link2, State2),
State = auto_flow(Link3, State1),
{keep_state, State};
{credit_exhausted, Link2, State} ->
{credit_exhausted, Link3, State} ->
TargetPid ! {amqp10_msg, LinkRef, Msg},
notify_credit_exhausted(Link2),
notify_credit_exhausted(Link3),
{keep_state, State};
{transfer_limit_exceeded, Link2, State} ->
logger:warning("transfer_limit_exceeded for link ~tp", [Link2]),
Link = detach_with_error_cond(Link2, State,
{transfer_limit_exceeded, Link3, State} ->
logger:warning("transfer_limit_exceeded for link ~tp", [Link3]),
Link = detach_with_error_cond(Link3, State,
?V_1_0_LINK_ERROR_TRANSFER_LIMIT_EXCEEDED),
{keep_state, update_link(Link, State)}
end;
Expand Down Expand Up @@ -501,12 +503,15 @@ mapped({call, From},
end;

mapped({call, From},
{disposition, Role, First, Last, Settled0, DeliveryState},
#state{incoming_unsettled = Unsettled0} = State0) ->
{disposition, OutputHandle, First, Last, Settled0, DeliveryState},
#state{links = Links} = State0) ->
#{OutputHandle := Link0 = #link{incoming_unsettled = Unsettled0}} = Links,
Unsettled = serial_number:foldl(fun maps:remove/2, Unsettled0, First, Last),
State = State0#state{incoming_unsettled = Unsettled},
Link = Link0#link{incoming_unsettled = Unsettled},
State1 = State0#state{links = Links#{OutputHandle := Link}},
State = auto_flow(Link, State1),
Disposition = #'v1_0.disposition'{
role = translate_role(Role),
role = translate_role(receiver),
first = {uint, First},
last = {uint, Last},
settled = Settled0,
Expand Down Expand Up @@ -599,7 +604,7 @@ send_transfer(Transfer0, Parts0, MaxMessageSize, #state{socket = Socket,
{ok, length(Frames)}
end.

send_flow_link(Send, OutHandle,
send_flow_link(OutHandle,
#'v1_0.flow'{link_credit = {uint, Credit}} = Flow0, RenewWhenBelow,
#state{links = Links,
next_incoming_id = NII,
Expand All @@ -625,7 +630,7 @@ send_flow_link(Send, OutHandle,
%% initial attach frame from the sender this field MUST NOT be set." [2.7.4]
delivery_count = maybe_uint(DeliveryCount),
available = uint(Available)},
ok = Send(Flow, State),
ok = send(Flow, State),
State#state{links = Links#{OutHandle =>
Link#link{link_credit = Credit,
auto_flow = AutoFlow}}}.
Expand Down Expand Up @@ -777,8 +782,9 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},
max_message_size = MaxMessageSize},
ok = Send(Attach, State),

LinkRef = make_link_ref(element(1, Role), self(), OutHandle),
Link = #link{name = Name,
ref = make_link_ref(element(1, Role), self(), OutHandle),
ref = LinkRef,
output_handle = OutHandle,
state = attach_sent,
role = element(1, Role),
Expand All @@ -790,7 +796,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _},

{State#state{links = Links#{OutHandle => Link},
next_link_handle = NextLinkHandle,
link_index = LinkIndex#{Name => OutHandle}}, Link#link.ref}.
link_index = LinkIndex#{Name => OutHandle}}, LinkRef}.

-spec handle_session_flow(#'v1_0.flow'{}, #state{}) -> #state{}.
handle_session_flow(#'v1_0.flow'{next_incoming_id = MaybeNII,
Expand Down Expand Up @@ -908,7 +914,6 @@ translate_delivery_state({modified,
translate_delivery_state(released) -> #'v1_0.released'{};
translate_delivery_state(received) -> #'v1_0.received'{}.

translate_role(sender) -> false;
translate_role(receiver) -> true.

maybe_notify_link_credit(#link{role = sender,
Expand Down Expand Up @@ -987,9 +992,11 @@ book_transfer_received(#state{next_incoming_id = NID,

auto_flow(#link{link_credit = LC,
auto_flow = {auto, RenewWhenBelow, Credit},
output_handle = OutHandle}, State)
when LC < RenewWhenBelow ->
send_flow_link(fun send/2, OutHandle,
output_handle = OutHandle,
incoming_unsettled = Unsettled},
State)
when LC + map_size(Unsettled) < RenewWhenBelow ->
send_flow_link(OutHandle,
#'v1_0.flow'{link_credit = {uint, Credit}},
RenewWhenBelow, State);
auto_flow(_, State) ->
Expand Down Expand Up @@ -1045,7 +1052,8 @@ socket_send0({tcp, Socket}, Data) ->
socket_send0({ssl, Socket}, Data) ->
ssl:send(Socket, Data).

-spec make_link_ref(_, _, _) -> link_ref().
-spec make_link_ref(link_role(), pid(), output_handle()) ->
link_ref().
make_link_ref(Role, Session, Handle) ->
#link_ref{role = Role, session = Session, link_handle = Handle}.

Expand Down Expand Up @@ -1100,7 +1108,6 @@ format_status(Status = #{data := Data0}) ->
connection_config = ConnectionConfig,
outgoing_delivery_id = OutgoingDeliveryId,
outgoing_unsettled = OutgoingUnsettled,
incoming_unsettled = IncomingUnsettled,
notify = Notify
} = Data0,
Links = maps:map(
Expand All @@ -1119,7 +1126,8 @@ format_status(Status = #{data := Data0}) ->
available = Available,
drain = Drain,
partial_transfers = PartialTransfers0,
auto_flow = AutoFlow
auto_flow = AutoFlow,
incoming_unsettled = IncomingUnsettled
}) ->
PartialTransfers = case PartialTransfers0 of
undefined ->
Expand All @@ -1141,7 +1149,9 @@ format_status(Status = #{data := Data0}) ->
available => Available,
drain => Drain,
partial_transfers => PartialTransfers,
auto_flow => AutoFlow}
auto_flow => AutoFlow,
incoming_unsettled => maps:size(IncomingUnsettled)
}
end, Links0),
Data = #{channel => Channel,
remote_channel => RemoteChannel,
Expand All @@ -1160,7 +1170,6 @@ format_status(Status = #{data := Data0}) ->
connection_config => maps:remove(sasl, ConnectionConfig),
outgoing_delivery_id => OutgoingDeliveryId,
outgoing_unsettled => maps:size(OutgoingUnsettled),
incoming_unsettled => maps:size(IncomingUnsettled),
notify => Notify},
Status#{data := Data}.

Expand Down
Loading

0 comments on commit 3d67842

Please sign in to comment.