Skip to content

Commit

Permalink
Rename unconfirmed to incoming_unsettled_map
Browse files Browse the repository at this point in the history
to better match the AMQP spec terminology.
  • Loading branch information
ansd committed Aug 8, 2023
1 parent 0542696 commit 48e2119
Showing 1 changed file with 34 additions and 64 deletions.
98 changes: 34 additions & 64 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

-behaviour(gen_server).

-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_amqp1_0.hrl").

-define(MAX_SESSION_WINDOW_SIZE, 65_535).
Expand All @@ -26,7 +26,7 @@
?V_1_0_SYMBOL_MODIFIED]).

%% Just make these constant for the time being.
-define(INCOMING_CREDIT, 65536).
-define(INCOMING_CREDIT, 65_536).

-define(MAX_PERMISSION_CACHE_SIZE, 12).

Expand All @@ -40,7 +40,6 @@
get_info/1]).
-export([init/1,
terminate/2,
code_change/3,
handle_call/3,
handle_cast/2,
handle_info/2]).
Expand All @@ -53,17 +52,16 @@
exchange :: rabbit_exchange:name(),
routing_key :: undefined | rabbit_types:routing_key(),
delivery_id :: undefined | delivery_number(),
delivery_count = 0,
delivery_count = 0 :: sequence_no(),
send_settle_mode = undefined,
recv_settle_mode = undefined,
credit_used = ?INCOMING_CREDIT div 2,
msg_acc = []}).

-record(outgoing_link, {
queue :: undefined | rabbit_misc:resource_name(),
delivery_count = 0,
%% TODO below field is not needed?
send_settled}).
delivery_count = 0 :: sequence_no(),
send_settled :: boolean()}).

-record(outgoing_unsettled, {
%% The queue sent us this consumer scoped sequence number.
Expand All @@ -85,8 +83,8 @@

%%TODO put rarely used fields into separate #cfg{}
-record(state, {frame_max,
reader_pid,
writer_pid,
reader_pid :: pid(),
writer_pid :: pid(),
%% These messages were received from queues thanks to sufficient link credit.
%% However, they are buffered here due to session flow control before being sent to the client.
pending_transfers = queue:new() :: queue:queue(#pending_transfer{}),
Expand All @@ -103,15 +101,14 @@
outgoing_window_max,
next_publish_id, %% the 0-9-1-side counter for confirms
next_delivery_id = 0 :: delivery_number(),
incoming_unsettled_map, %%TODO delete
outgoing_unsettled_map = gb_trees:empty() :: gb_trees:tree(delivery_number(), #outgoing_unsettled{}),
queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state(),
%% TRANSFER delivery IDs published to queues but not yet confirmed by queues
%% TODO Use a different data structure because
%% 1. we don't need to record exchanges since we don't emit channel stats,
%% 2. mixed mode can result in large gaps across delivery_ids that need to be confirmed. Use a tree?
%% 3. handle wrap around of 32-bit RFC-1982 serial number
unconfirmed = rabbit_confirms:init() :: rabbit_confirms:state()
incoming_unsettled_map = rabbit_confirms:init() :: rabbit_confirms:state(),
outgoing_unsettled_map = gb_trees:empty() :: gb_trees:tree(delivery_number(), #outgoing_unsettled{}),
queue_states = rabbit_queue_type:init() :: rabbit_queue_type:state()
%% TRANSFER delivery IDs confirmed by queues but yet to be sent to the client
%%TODO accumulate confirms and send DISPOSITIONs after processing the mailbox
%%(see rabbit_channel:noreply_coalesce/1
Expand All @@ -138,63 +135,18 @@ init({Channel, ReaderPid, WriterPid, User, Vhost, FrameMax}) ->
user = User,
vhost = Vhost,
channel_num = Channel,
next_publish_id = 0,
incoming_unsettled_map = gb_trees:empty()
next_publish_id = 0
}}.

terminate(_Reason, _State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

handle_call(info, _From, #state{reader_pid = ReaderPid} = State) ->
Info = [{reader, ReaderPid}],
{reply, Info, State};
handle_call(Msg, _From, State) ->
{reply, {error, not_understood, Msg}, State}.

handle_info(#'basic.consume_ok'{}, State) ->
%% Handled above
{noreply, State};

handle_info(#'basic.cancel_ok'{}, State) ->
%% just ignore this for now,
%% At some point we should send the detach here but then we'd need to track
%% consumer tags -> link handle somewhere
{noreply, State};

%% A message from the queue saying that there are no more messages
% handle_info(#'basic.credit_drained'{consumer_tag = CTag} = CreditDrained,
% State = #state{writer_pid = WriterPid,
% session = Session}) ->
% Handle = ctag_to_handle(CTag),
% Link = get({out, Handle}),
% {Flow0, Link1} = rabbit_amqp1_0_session:outgoing_link_credit_drained(
% CreditDrained, Handle, Link),
% Flow = rabbit_amqp1_0_session:flow_fields(Flow0, Session),
% rabbit_amqp1_0_writer:send_command(WriterPid, Flow),
% put({out, Handle}, Link1),
% {noreply, State};

% handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
% session = Session}) ->
% {Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
% case Reply of
% undefined ->
% ok;
% _ ->
% rabbit_amqp1_0_writer:send_command(
% WriterPid,
% rabbit_amqp1_0_session:flow_fields(Reply, Session)
% )
% end,
% {noreply, state(Session1, State)};

% handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
% rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
% {noreply, state(Session, State)};

handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
{noreply, State};
Expand Down Expand Up @@ -893,12 +845,30 @@ handle_queue_event({queue_event, QRef, Evt},
% rabbit_misc:protocol_error(Type, Reason, ReasonArgs)
end.

% handle_info({#'basic.return'{}, {DTag, _Msg}}, State = #state{writer_pid = WriterPid,
% session = Session}) ->
% {Reply, Session1} = rabbit_amqp1_0_session:return(DTag, Session),
% case Reply of
% undefined ->
% ok;
% _ ->
% rabbit_amqp1_0_writer:send_command(
% WriterPid,
% rabbit_amqp1_0_session:flow_fields(Reply, Session)
% )
% end,
% {noreply, state(Session1, State)};

% handle_info({#'basic.return'{}, _Msg}, State = #state{session = Session}) ->
% rabbit_log:warning("AMQP 1.0 message return without publishing sequence"),
% {noreply, state(Session, State)};

handle_queue_actions(Actions, State0) ->
{ReplyRev, State} =
lists:foldl(
fun ({settled, QName, DelIds}, {Reply, S0 = #state{unconfirmed = U0}}) ->
fun ({settled, QName, DelIds}, {Reply, S0 = #state{incoming_unsettled_map = U0}}) ->
{ConfirmMXs, U} = rabbit_confirms:confirm(DelIds, QName, U0),
S = S0#state{unconfirmed = U},
S = S0#state{incoming_unsettled_map = U},
R = if ConfirmMXs =:= [] ->
Reply;
ConfirmMXs =/= [] ->
Expand Down Expand Up @@ -960,7 +930,7 @@ handle_deliver(ConsumerTag, AckRequired,
Dtag = if is_integer(MsgId) ->
%% delivery-tag must be unique only per link (not per session)
<<MsgId:64>>;
MsgId =:= undefined ->
MsgId =:= undefined andalso SendSettled ->
%% Both ends of the link will always consider this message settled because
%% "the sender will send all deliveries settled to the receiver" [3.8.2].
%% Hence, the delivery tag does not have to be unique on this link.
Expand Down Expand Up @@ -1039,7 +1009,7 @@ incoming_link_transfer(#'v1_0.transfer'{delivery_id = DeliveryId0,
send_settle_mode = SSM,
recv_settle_mode = RSM} = Link,
#state{queue_states = QStates0,
unconfirmed = U0,
incoming_unsettled_map = U0,
next_publish_id = NextPublishId0
} = State0) ->
MsgBin = iolist_to_binary(lists:reverse([MsgPart | MsgAcc])),
Expand Down Expand Up @@ -1094,7 +1064,7 @@ incoming_link_transfer(#'v1_0.transfer'{delivery_id = DeliveryId0,
{U, Reply0} = process_routing_confirm(Qs, EffectiveSendSettleMode, DeliveryId, XName, U0),
State1 = State0#state{queue_states = QStates,
next_publish_id = NextPublishId,
unconfirmed = U},
incoming_unsettled_map = U},
{Reply1, State} = handle_queue_actions(Actions, State1),
{SendFlow, CreditUsed1} = case CreditUsed - 1 of
C when C =< 0 ->
Expand Down

0 comments on commit 48e2119

Please sign in to comment.