Skip to content

Commit

Permalink
Support AMQP 1.0 natively
Browse files Browse the repository at this point in the history
ATTACH target

TRANSFER from client to server

ATTACH source

Merge session files

Merge files rabbit_amqp1_0_session_process and rabbit_amqp1_0_session.

Handle deliver action

DETACH link and END session

Fix credit reply from quorum queue

Make following tests green:
```
make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:reliable_send_receive_with_outcomes_classic
make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:reliable_send_receive_with_outcomes_quorum
```

Settle with state released if unroutable

Fixes the following test:
```
make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:publishing_to_non_existing_queue_should_settle_with_released
```

Handle drain

Make follwing tests green:
```
make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:roundtrip_classic_queue_with_drain
make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:roundtrip_quorum_queue_with_drain
```

Handle send_credit_reply action

Make the following test green:
```
make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:roundtrip_stream_queue_with_drain
```

Fix test expectation

Make the following test green:
```
make -C deps/rabbitmq_amqp1_0/ ct-amqp10_client t=tests:message_headers_conversion
```

With Native AMQP, the behaviour of
```
Convert AMQP 0.9.1 message headers to application properties for an AMQP 1.0 consumer
amqp1_0.convert_amqp091_headers_to_app_props = false | true (default false)
Convert AMQP 1.0 Application Properties to AMQP 0.9.1 headers
amqp1_0.convert_app_props_to_amqp091_headers = false | true (default false)
```
https://github.com/rabbitmq/rabbitmq-server/tree/main/deps/rabbitmq_amqp1_0#configuration

will break because we always convert according to the message container conversions.
For example, 091 x-headers will go into message-annotations instead of application properties.
Also, false won’t be respected since we always convert the headers with message containers.
Either we decide to have this breaking change with Native AMQP or we need to respect the
old behaviour of application parameters amqp1_0.convert_amqp091_headers_to_app_props and
amqp1_0.convert_app_props_to_amqp091_headers when doing the message container conversions.

Register connection

Make the following test green:
```
make -C deps/rabbitmq_amqp1_0/ ct-proxy_protocol
```

Delete unused code

Fix message annotation test expectation

With message containers, the broker will include x-exchange and
x-routing-key message annotations.

Serialize footer

makes the following test green:
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:footer
```

Set first-acquirer in header

Makes the following test green:
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:redelivery
```

Fix queue confirmation crash

Makes the following test green:
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=dotnet:routing
```

Add some authz checks

to make the following tests green:
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=dotnet:access_failure
make -C deps/rabbitmq_amqp1_0/ ct-system t=dotnet:access_failure_not_allowed
make -C deps/rabbitmq_amqp1_0/ ct-system t=dotnet:access_failure_send
```

Delete unused code

Adapt test expectation

Makes the following test green:
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=dotnet:streams
```

Fix dialyzer warnings

bazel run gazelle

excluding any changes for deps/proper

Handle undefined msg ID from queue

Makes the following tests green:
```
make -C deps/amqp10_client/ ct-system
```

Classic queues send an 'undefined' message ID to the channel when
no ack is required. No ack is required when the send settle mode is
settled. In this case it should be perfectly valid to always send the
same (empty binary) delivery-tag from server to client.

mix format deps/rabbitmq_cli/lib/rabbitmqctl.ex

using Mix 1.15.4 (compiled with Erlang/OTP 26)

Convert gen_server2 to gen_server

Fix crash when basic message is received

Makes the following test green:
```
bazel test //deps/rabbitmq_amqp1_0:amqp10_client_SUITE-mixed
```

Remove #outgoing_link.default_outcome

Rename unconfirmed to incoming_unsettled_map

to better match the AMQP spec terminology.

Fix cherry-pick build failure

Add MQTT 5.0 <-> AMQP 1.0 assertions

Simplify rabbit_channel

by removing extra AMQP 1.0 logic for settling unroutable messages with
released state.

This commit reverts the workaround introduced by PR 8015.

Remove rabbit_queue_collector

rabbit_queue_collector is responsible for synchronously deleting
exclusive queues. Since the AMQP 1.0 plugin never creates exclusive
queues, rabbit_queue_collector doesn't need to be started in the first
place. This will save 1 Erlang process per AMQP 1.0 connection.

Use 1 writer process per AMQP 1.0 connection

AMQP 0.9.1 uses a separate rabbit_writer Erlang process per
AMQP 0.9.1 channel.

Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer
Erlang process per AMQP 1.0 session.

Advantage of single writer proc per session (prior to this commit):
* High parallelism for serialising packets if multiple sessions within
  a connection write heavily at the same time.

This commit uses a single writer process per AMQP 1.0 connection that is
shared across all AMQP 1.0 sessions.

Advantages of single writer proc per connection (this commit):
* Lower memory usage with hundreds of thousands of AMQP 1.0 sessions
* Less TCP and IP header overhead given that the single writer process
  can accumulate across all sessions bytes worth a MSS before flushing
  the socket.

In other words, this commit decides that a reader / writer process pair
per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows.
Having a writer per session is too heavy. The final goal by previous
commits and follow-up commits is to reduce the total number of Erlang
processes to allow hundreds of thousands of AMQP clients to connect
while keeping resource usage in RabbitMQ at a low level.

We still ensure high thoughput by having separate reader, writer, and
session processes.

Remove one supervisory level

Given that we now have 1 writer per AMQP 1.0 connection, this commit
changes the supervisor hierarchy such that only 1 additional process
(rabbit_amqp1_0_session) is created per AMQP 1.0 session.

Fix dialyze and xref

`bazel run gazelle` wrongly removes the dependency on amqp_client
due to a bug in the gazelle plugin.

For now, we add a directive.

Transform rabbit_amqp1_0_writer into gen_server

Why:
Prior to this commit, when clicking on the AMQP 1.0 writer process in
observer, the process crashed.
Instead of handling all these debug messages of the sys module, it's
much better to implement a gen_server.
There is no advantage of using a special OTP process over gen_server
for the AMQP 1.0 writer.
gen_server also provides cleaner format status output.

How:
Message callbacks return a timeout of 0.
After all messages in the inbox are processed, the timeout message is
handled by flushing any pending bytes.

Add test for multiple sessions on same connection

given that a single writer is used across multiple sessions.

Remove stats timer from writer

AMQP 1.0 connections haven't emitted any stats previously.

Since Native AMQP 1.0 is targeted for 4.0 where metrics delivery via the
Management API is removed anyway, we remove the stats timer from the 1.0
writer in this commit.

Add better test for CLI connections listing

Display connection properties in Management UI

Make rabbit_confirms more efficient

use lists:foldl/3 instead of lists:foldr/3.
The returned order of confirmed sequence numers is not important since
rabbit_channel will sort them anyway.

Avoid lists:any/2 by checking within preceeding lists:foldl/3

Fix flawed serial number arithmetic

Batch confirms and rejections

When there are contiguous queue confirmations in the session process
mailbox, batch them. When the confirmations are sent to the publisher, a
single DISPOSITION frame is sent for contiguously confirmed delivery
IDs and for the special case where no confirmations are outstanding
anymore.

This approach should be good enough. However it's sub optimal in
scenarios where contiguous delivery IDs that need confirmations are rare,
for example:
* There are multiple links in the session with different sender
  settlement modes and sender publishes across these links interleaved.
* sender settlement mode is mixed and sender publishes interleaved settled
  and unsettled TRANSFERs.
  • Loading branch information
ansd committed Sep 17, 2023
1 parent ee7c6d9 commit 7ae5000
Show file tree
Hide file tree
Showing 50 changed files with 2,908 additions and 3,003 deletions.
14 changes: 7 additions & 7 deletions deps/amqp10_client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

This is an [Erlang client for the AMQP 1.0](https://www.amqp.org/resources/specifications) protocol.

It's primary purpose is to be used in RabbitMQ related projects but it is a
generic client that was tested with at least 4 implementations of AMQP 1.0.
Its primary purpose is to be used in RabbitMQ related projects but it is a
generic client that was tested with at least 3 implementations of AMQP 1.0.

If you are looking for an Erlang client for [AMQP 0-9-1](https://www.rabbitmq.com/tutorials/amqp-concepts.html) — a completely different
protocol despite the name — [consider this one](https://github.com/rabbitmq/rabbitmq-erlang-client).
protocol despite the name — [consider this one](../amqp_client).

## Project Maturity and Status

This client is used in the cross-protocol version of the RabbitMQ Shovel plugin. It is not 100%
feature complete but moderately mature and was tested against at least three AMQP 1.0 servers:
feature complete but moderately mature and was tested against at least 3 AMQP 1.0 servers:
RabbitMQ, Azure ServiceBus, ActiveMQ.

This client library is not officially supported by VMware at this time.
Expand Down Expand Up @@ -80,8 +80,8 @@ after 2000 ->
exit(credited_timeout)
end.
%% create a new message using a delivery-tag, body and indicate
%% it's settlement status (true meaning no disposition confirmation
%% Create a new message using a delivery-tag, body and indicate
%% its settlement status (true meaning no disposition confirmation
%% will be sent by the receiver).
OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
ok = amqp10_client:send_msg(Sender, OutMsg),
Expand Down Expand Up @@ -112,7 +112,7 @@ after the `Open` frame has been successfully written to the socket rather than
waiting until the remote end returns with their `Open` frame. The client will
notify the caller of various internal/async events using `amqp10_event`
messages. In the example above when the remote replies with their `Open` frame
a message is sent of the following forma:
a message is sent of the following form:

```
{amqp10_event, {connection, ConnectionPid, opened}}
Expand Down
2 changes: 0 additions & 2 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,6 @@ echo(#link_ref{role = receiver, session = Session,
%%% messages

%% @doc Send a message on a the link referred to be the 'LinkRef'.
%% Returns ok for "async" transfers when messages are sent with settled=true
%% else it returns the delivery state from the disposition
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) ->
ok | {error, insufficient_credit | link_not_found | half_attached}.
send_msg(#link_ref{role = sender, session = Session,
Expand Down
7 changes: 2 additions & 5 deletions deps/amqp10_client/src/amqp10_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,7 @@
% set to a negative value to allow a sender to "overshoot" the flow
% control by this margin
transfer_limit_margin => 0 | neg_integer(),
sasl => none | anon | {plain, User :: binary(), Pwd :: binary()},
notify => pid(),
notify_when_opened => pid() | none,
notify_when_closed => pid() | none
sasl => none | anon | {plain, User :: binary(), Pwd :: binary()}
}.

-record(state,
Expand Down Expand Up @@ -144,7 +141,7 @@ protocol_header_received(Pid, Protocol, Maj, Min, Rev) ->

-spec begin_session(pid()) -> supervisor:startchild_ret().
begin_session(Pid) ->
gen_statem:call(Pid, begin_session, {dirty_timeout, ?TIMEOUT}).
gen_statem:call(Pid, begin_session, ?TIMEOUT).

heartbeat(Pid) ->
gen_statem:cast(Pid, heartbeat).
Expand Down
10 changes: 4 additions & 6 deletions deps/amqp10_client/src/amqp10_client_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@
notify :: pid()
}).


%% -------------------------------------------------------------------
%% Public API.
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -173,18 +172,17 @@ begin_sync(Connection, Timeout) ->

-spec attach(pid(), attach_args()) -> {ok, link_ref()}.
attach(Session, Args) ->
gen_statem:call(Session, {attach, Args}, {dirty_timeout, ?TIMEOUT}).
gen_statem:call(Session, {attach, Args}, ?TIMEOUT).

-spec detach(pid(), link_handle()) -> ok | {error, link_not_found | half_attached}.
detach(Session, Handle) ->
gen_statem:call(Session, {detach, Handle}, {dirty_timeout, ?TIMEOUT}).
gen_statem:call(Session, {detach, Handle}, ?TIMEOUT).

-spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) ->
ok | {error, insufficient_credit | link_not_found | half_attached}.
transfer(Session, Amqp10Msg, Timeout) ->
[Transfer | Records] = amqp10_msg:to_amqp_records(Amqp10Msg),
gen_statem:call(Session, {transfer, Transfer, Records},
{dirty_timeout, Timeout}).
gen_statem:call(Session, {transfer, Transfer, Records}, Timeout).

flow(Session, Handle, Flow, RenewAfter) ->
gen_statem:cast(Session, {flow, Handle, Flow, RenewAfter}).
Expand All @@ -193,7 +191,7 @@ flow(Session, Handle, Flow, RenewAfter) ->
amqp10_client_types:delivery_state()) -> ok.
disposition(Session, Role, First, Last, Settled, DeliveryState) ->
gen_statem:call(Session, {disposition, Role, First, Last, Settled,
DeliveryState}, {dirty_timeout, ?TIMEOUT}).
DeliveryState}, ?TIMEOUT).



Expand Down
46 changes: 21 additions & 25 deletions deps/amqp10_client/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,10 @@

-include("src/amqp10_client.hrl").

-compile(export_all).
-compile([export_all, nowarn_export_all]).

-define(UNAUTHORIZED_USER, <<"test_user_no_perm">>).

%% The latch constant defines how many processes are spawned in order
%% to run certain functionality in parallel. It follows the standard
%% countdown latch pattern.
-define(LATCH, 100).

%% The wait constant defines how long a consumer waits before it
%% unsubscribes
-define(WAIT, 200).

%% How to long wait for a process to die after an expected failure
-define(PROCESS_EXIT_TIMEOUT, 5000).
suite() ->
[{timetrap, {seconds, 120}}].

all() ->
[
Expand Down Expand Up @@ -344,7 +333,7 @@ roundtrip(OpenConf, Body) ->
<<"test1">>,
settled,
unsettled_state),
{ok, OutMsg} = amqp10_client:get_msg(Receiver, 60000 * 5),
{ok, OutMsg} = amqp10_client:get_msg(Receiver, 60_000 * 4),
ok = amqp10_client:end_session(Session),
ok = amqp10_client:close_connection(Connection),
% ct:pal(?LOW_IMPORTANCE, "roundtrip message Out: ~tp~nIn: ~tp~n", [OutMsg, Msg]),
Expand Down Expand Up @@ -379,7 +368,7 @@ filtered_roundtrip(OpenConf, Body) ->
settled,
unsettled_state),
ok = amqp10_client:send_msg(Sender, Msg1),
{ok, OutMsg1} = amqp10_client:get_msg(DefaultReceiver, 60000 * 5),
{ok, OutMsg1} = amqp10_client:get_msg(DefaultReceiver, 60_000 * 4),
?assertEqual(<<"msg-1-tag">>, amqp10_msg:delivery_tag(OutMsg1)),

timer:sleep(5 * 1000),
Expand All @@ -398,10 +387,10 @@ filtered_roundtrip(OpenConf, Body) ->
unsettled_state,
#{<<"apache.org:selector-filter:string">> => <<"amqp.annotation.x-opt-enqueuedtimeutc > ", Now2Binary/binary>>}),

{ok, OutMsg2} = amqp10_client:get_msg(DefaultReceiver, 60000 * 5),
{ok, OutMsg2} = amqp10_client:get_msg(DefaultReceiver, 60_000 * 4),
?assertEqual(<<"msg-2-tag">>, amqp10_msg:delivery_tag(OutMsg2)),

{ok, OutMsgFiltered} = amqp10_client:get_msg(FilteredReceiver, 60000 * 5),
{ok, OutMsgFiltered} = amqp10_client:get_msg(FilteredReceiver, 60_000 * 4),
?assertEqual(<<"msg-2-tag">>, amqp10_msg:delivery_tag(OutMsgFiltered)),

ok = amqp10_client:end_session(Session),
Expand Down Expand Up @@ -676,11 +665,13 @@ incoming_heartbeat(Config) ->
idle_time_out => 1000, notify => self()},
{ok, Connection} = amqp10_client:open_connection(CConf),
receive
{amqp10_event, {connection, Connection,
{closed, {resource_limit_exceeded, <<"remote idle-time-out">>}}}} ->
{amqp10_event,
{connection, Connection0,
{closed, {resource_limit_exceeded, <<"remote idle-time-out">>}}}}
when Connection0 =:= Connection ->
ok
after 5000 ->
exit(incoming_heartbeat_assert)
exit(incoming_heartbeat_assert)
end,
demonitor(MockRef).

Expand All @@ -704,25 +695,30 @@ publish_messages(Sender, Data, Num) ->

receive_one(Receiver) ->
receive
{amqp10_msg, Receiver, Msg} ->
{amqp10_msg, Receiver0, Msg}
when Receiver0 =:= Receiver ->
amqp10_client:accept_msg(Receiver, Msg)
after 2000 ->
timeout
end.

await_disposition(DeliveryTag) ->
receive
{amqp10_disposition, {accepted, DeliveryTag}} -> ok
{amqp10_disposition, {accepted, DeliveryTag0}}
when DeliveryTag0 =:= DeliveryTag -> ok
after 3000 ->
flush(),
exit(dispostion_timeout)
end.

await_link(Who, What, Err) ->
receive
{amqp10_event, {link, Who, What}} ->
{amqp10_event, {link, Who0, What0}}
when Who0 =:= Who andalso
What0 =:= What ->
ok;
{amqp10_event, {link, Who, {detached, Why}}} ->
{amqp10_event, {link, Who0, {detached, Why}}}
when Who0 =:= Who ->
exit(Why)
after 5000 ->
flush(),
Expand Down
30 changes: 26 additions & 4 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ get_property(priority, Msg) ->
undefined
end
end;
get_property(subject, Msg) ->
case Msg of
#msg{properties = #'v1_0.properties'{subject = {utf8, Subject}}} ->
Subject;
_ ->
undefined
end;
get_property(_P, _Msg) ->
undefined.

Expand All @@ -185,10 +192,19 @@ convert_to(TargetProto, Msg) ->
serialize(Sections) ->
encode_bin(Sections).

protocol_state(Msg, Anns) ->
protocol_state(Msg0 = #msg{header = Header0}, Anns) ->
Redelivered = maps:get(redelivered, Anns, false),
FirstAcquirer = not Redelivered,
Header = case Header0 of
undefined ->
#'v1_0.header'{first_acquirer = FirstAcquirer};
#'v1_0.header'{} ->
Header0#'v1_0.header'{first_acquirer = FirstAcquirer}
end,
Msg = Msg0#msg{header = Header},

Exchange = maps:get(exchange, Anns),
[RKey | _] = maps:get(routing_keys, Anns),

%% any x-* annotations get added as message annotations
AnnsToAdd = maps:filter(fun (Key, _) -> mc_util:is_x_header(Key) end, Anns),

Expand Down Expand Up @@ -408,6 +424,10 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
Priority = get_property(priority, Msg),
Timestamp = get_property(timestamp, Msg),
Ttl = get_property(ttl, Msg),
RoutingKeys = case get_property(subject, Msg) of
undefined -> undefined;
Subject -> [Subject]
end,

Deaths = case message_annotation(<<"x-death">>, Msg, undefined) of
{list, DeathMaps} ->
Expand All @@ -432,8 +452,10 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
maps_put_truthy(
ttl, Ttl,
maps_put_truthy(
deaths, Deaths,
#{}))))),
routing_keys, RoutingKeys,
maps_put_truthy(
deaths, Deaths,
#{})))))),
case MA of
[] ->
Anns;
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
message/3,
message/4,
message/5,
from_basic_message/1
from_basic_message/1,
to_091/2
]).

-import(rabbit_misc,
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
-define(IS_QUORUM(QPid), is_tuple(QPid)).
%%----------------------------------------------------------------------------

-export_type([name/0, qmsg/0, absent_reason/0]).
-export_type([name/0, qmsg/0, msg_id/0, absent_reason/0]).

-type name() :: rabbit_types:r('queue').

Expand All @@ -99,7 +99,7 @@
-type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()).
-type qmsg() :: {name(), pid() | {atom(), pid()}, msg_id(),
boolean(), mc:state()}.
-type msg_id() :: non_neg_integer().
-type msg_id() :: rabbit_types:option(non_neg_integer()).
-type ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}.
-type absent_reason() :: 'nodedown' | 'crashed' | stopped | timeout.
Expand Down
Loading

0 comments on commit 7ae5000

Please sign in to comment.