Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For 4.0: Native AMQP 1.0 #9022

Merged
merged 16 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,6 @@ RSYNC_FLAGS += -a $(RSYNC_V) \
--exclude '/cowboy/doc/' \
--exclude '/cowboy/examples/' \
--exclude '/rabbit/escript/' \
--exclude '/rabbitmq_amqp1_0/test/swiftmq/build/'\
--exclude '/rabbitmq_amqp1_0/test/swiftmq/swiftmq*'\
--exclude '/rabbitmq_cli/escript/' \
--exclude '/rabbitmq_mqtt/test/build/' \
--exclude '/rabbitmq_mqtt/test/test_client/'\
Expand Down
1 change: 0 additions & 1 deletion deps/amqp10_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ dialyze(
)

broker_for_integration_suites(
extra_plugins = ["//deps/rabbitmq_amqp1_0:erlang_app"],
)

TEST_DEPS = [
Expand Down
21 changes: 2 additions & 19 deletions deps/amqp10_client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ PACKAGES_DIR ?= $(abspath PACKAGES)

BUILD_DEPS = rabbit_common elvis_mk
DEPS = amqp10_common credentials_obfuscation
TEST_DEPS = rabbit rabbitmq_amqp1_0 rabbitmq_ct_helpers
TEST_DEPS = rabbit rabbitmq_ct_helpers
LOCAL_DEPS = ssl inets crypto public_key

DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-test.mk
Expand All @@ -51,28 +51,11 @@ include erlang.mk
HEX_TARBALL_FILES += rabbitmq-components.mk \
git-revisions.txt

# --------------------------------------------------------------------
# Compiler flags.
# --------------------------------------------------------------------

# gen_fsm is deprecated starting from Erlang 20, but we want to support
# Erlang 19 as well.

ERTS_VER := $(shell erl -version 2>&1 | sed -E 's/.* version //')
ERLANG_20_ERTS_VER := 9.0

ifeq ($(call compare_version,$(ERTS_VER),$(ERLANG_20_ERTS_VER),>=),true)
ERLC_OPTS += -Dnowarn_deprecated_gen_fsm
endif

# Dialyze the tests.
DIALYZER_OPTS += --src -r test

# --------------------------------------------------------------------
# ActiveMQ for the testsuite.
# --------------------------------------------------------------------

ACTIVEMQ_VERSION := 5.14.4
ACTIVEMQ_VERSION := 5.18.3
ACTIVEMQ_URL := 'https://archive.apache.org/dist/activemq/$(ACTIVEMQ_VERSION)/apache-activemq-$(ACTIVEMQ_VERSION)-bin.tar.gz'

ACTIVEMQ := $(abspath test/system_SUITE_data/apache-activemq-$(ACTIVEMQ_VERSION)/bin/activemq)
Expand Down
14 changes: 7 additions & 7 deletions deps/amqp10_client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

This is an [Erlang client for the AMQP 1.0](https://www.amqp.org/resources/specifications) protocol.

It's primary purpose is to be used in RabbitMQ related projects but it is a
generic client that was tested with at least 4 implementations of AMQP 1.0.
Its primary purpose is to be used in RabbitMQ related projects but it is a
generic client that was tested with at least 3 implementations of AMQP 1.0.

If you are looking for an Erlang client for [AMQP 0-9-1](https://www.rabbitmq.com/tutorials/amqp-concepts.html) — a completely different
protocol despite the name — [consider this one](https://github.com/rabbitmq/rabbitmq-erlang-client).
protocol despite the name — [consider this one](../amqp_client).

## Project Maturity and Status

This client is used in the cross-protocol version of the RabbitMQ Shovel plugin. It is not 100%
feature complete but moderately mature and was tested against at least three AMQP 1.0 servers:
feature complete but moderately mature and was tested against at least 3 AMQP 1.0 servers:
RabbitMQ, Azure ServiceBus, ActiveMQ.

This client library is not officially supported by VMware at this time.
Expand Down Expand Up @@ -80,8 +80,8 @@ after 2000 ->
exit(credited_timeout)
end.

%% create a new message using a delivery-tag, body and indicate
%% it's settlement status (true meaning no disposition confirmation
%% Create a new message using a delivery-tag, body and indicate
%% its settlement status (true meaning no disposition confirmation
%% will be sent by the receiver).
OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
ok = amqp10_client:send_msg(Sender, OutMsg),
Expand Down Expand Up @@ -112,7 +112,7 @@ after the `Open` frame has been successfully written to the socket rather than
waiting until the remote end returns with their `Open` frame. The client will
notify the caller of various internal/async events using `amqp10_event`
messages. In the example above when the remote replies with their `Open` frame
a message is sent of the following forma:
a message is sent of the following form:

```
{amqp10_event, {connection, ConnectionPid, opened}}
Expand Down
4 changes: 2 additions & 2 deletions deps/amqp10_client/activemq.bzl
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

ACTIVEMQ_VERSION = "5.14.4"
ACTIVEMQ_VERSION = "5.18.3"
ACTIVEMQ_URL = "https://archive.apache.org/dist/activemq/{version}/apache-activemq-{version}-bin.tar.gz".format(version = ACTIVEMQ_VERSION)
SHA_256 = "16ec52bece0a4759f9d70f4132d7d8da67d662e4af029081c492e65510a695c1"
SHA_256 = "943381aa6d340707de6c42eadbf7b41b7fdf93df604156d972d50c4da783544f"

def activemq_archive():
http_archive(
Expand Down
3 changes: 0 additions & 3 deletions deps/amqp10_client/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ def all_beam_files(name = "all_beam_files"):
"src/amqp10_client_app.erl",
"src/amqp10_client_connection.erl",
"src/amqp10_client_connection_sup.erl",
"src/amqp10_client_connections_sup.erl",
"src/amqp10_client_frame_reader.erl",
"src/amqp10_client_session.erl",
"src/amqp10_client_sessions_sup.erl",
Expand Down Expand Up @@ -42,7 +41,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/amqp10_client_app.erl",
"src/amqp10_client_connection.erl",
"src/amqp10_client_connection_sup.erl",
"src/amqp10_client_connections_sup.erl",
"src/amqp10_client_frame_reader.erl",
"src/amqp10_client_session.erl",
"src/amqp10_client_sessions_sup.erl",
Expand Down Expand Up @@ -77,7 +75,6 @@ def all_srcs(name = "all_srcs"):
"src/amqp10_client_app.erl",
"src/amqp10_client_connection.erl",
"src/amqp10_client_connection_sup.erl",
"src/amqp10_client_connections_sup.erl",
"src/amqp10_client_frame_reader.erl",
"src/amqp10_client_session.erl",
"src/amqp10_client_sessions_sup.erl",
Expand Down
71 changes: 40 additions & 31 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
settle_msg/3,
flow_link_credit/3,
flow_link_credit/4,
echo/1,
stop_receiver_link/1,
link_handle/1,
get_msg/1,
get_msg/2,
Expand All @@ -55,7 +55,7 @@
-type attach_role() :: amqp10_client_session:attach_role().
-type attach_args() :: amqp10_client_session:attach_args().
-type filter() :: amqp10_client_session:filter().
-type properties() :: amqp10_client_session:properties().
-type properties() :: amqp10_client_types:properties().

-type connection_config() :: amqp10_client_connection:connection_config().

Expand Down Expand Up @@ -109,10 +109,10 @@ open_connection(ConnectionConfig0) ->
notify_when_closed => NotifyWhenClosed
},
Sasl = maps:get(sasl, ConnectionConfig1),
ConnectionConfig2 = ConnectionConfig1#{sasl => amqp10_client_connection:encrypt_sasl(Sasl)},
amqp10_client_connection:open(ConnectionConfig2).
ConnectionConfig = ConnectionConfig1#{sasl => amqp10_client_connection:encrypt_sasl(Sasl)},
amqp10_client_connection:open(ConnectionConfig).

%% @doc Opens a connection using a connection_config map
%% @doc Closes a connection.
%% This is asynchronous and will notify completion to the caller using
%% an amqp10_event of the following format:
%% {amqp10_event, {connection, ConnectionPid, {closed, Why}}}
Expand Down Expand Up @@ -271,9 +271,8 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
%% This is asynchronous and will notify completion of the attach request to the
%% caller using an amqp10_event of the following format:
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
-spec attach_receiver_link(pid(), binary(), binary(),
snd_settle_mode(), terminus_durability(), filter(),
properties()) ->
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
terminus_durability(), filter(), properties()) ->
{ok, link_ref()}.
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
when is_pid(Session) andalso
Expand Down Expand Up @@ -302,48 +301,59 @@ attach_link(Session, AttachArgs) ->
%% This is asynchronous and will notify completion of the attach request to the
%% caller using an amqp10_event of the following format:
%% {amqp10_event, {link, LinkRef, {detached, Why}}}
-spec detach_link(link_ref()) -> _.
-spec detach_link(link_ref()) -> ok | {error, term()}.
detach_link(#link_ref{link_handle = Handle, session = Session}) ->
amqp10_client_session:detach(Session, Handle).

%% @doc Grant credit to a sender.
%% The amqp10_client will automatically grant more credit to the sender when
%% the remaining link credit falls below the value of RenewWhenBelow.
%% If RenewWhenBelow is 'never' the client will never grant new credit. Instead
%% the caller will be notified when the link_credit reaches 0 with an
%% amqp10_event of the following format:
%% @doc Grant Credit to a sender.
%%
%% In addition, if RenewWhenBelow is an integer, the amqp10_client will automatically grant more
%% Credit to the sender when the sum of the remaining link credit and the number of unsettled
%% messages falls below the value of RenewWhenBelow.
%% `Credit + RenewWhenBelow - 1` is the maximum number of in-flight unsettled messages.
%%
%% If RenewWhenBelow is `never` the amqp10_client will never grant more credit. Instead the caller
%% will be notified when the link_credit reaches 0 with an amqp10_event of the following format:
%% {amqp10_event, {link, LinkRef, credit_exhausted}}
-spec flow_link_credit(link_ref(), Credit :: non_neg_integer(),
RenewWhenBelow :: never | non_neg_integer()) -> ok.
RenewWhenBelow :: never | pos_integer()) -> ok.
flow_link_credit(Ref, Credit, RenewWhenBelow) ->
flow_link_credit(Ref, Credit, RenewWhenBelow, false).

-spec flow_link_credit(link_ref(), Credit :: non_neg_integer(),
RenewWhenBelow :: never | non_neg_integer(),
RenewWhenBelow :: never | pos_integer(),
Drain :: boolean()) -> ok.
flow_link_credit(#link_ref{role = receiver, session = Session,
link_handle = Handle},
Credit, RenewWhenBelow, Drain) ->
Credit, RenewWhenBelow, Drain)
when
%% Drain together with auto renewal doesn't make sense, so disallow it in the API.
((Drain) andalso RenewWhenBelow =:= never
orelse not(Drain))
andalso
%% Check that the RenewWhenBelow value make sense.
(RenewWhenBelow =:= never orelse
is_integer(RenewWhenBelow) andalso
RenewWhenBelow > 0 andalso
ansd marked this conversation as resolved.
Show resolved Hide resolved
RenewWhenBelow =< Credit) ->
Flow = #'v1_0.flow'{link_credit = {uint, Credit},
drain = Drain},
ok = amqp10_client_session:flow(Session, Handle, Flow, RenewWhenBelow).

%% @doc Request that the sender's flow state is echoed back
%% This may be used to determine when the Link has finally quiesced.
%% see §2.6.10 of the spec
echo(#link_ref{role = receiver, session = Session,
link_handle = Handle}) ->
%% @doc Stop a receiving link.
%% See AMQP 1.0 spec §2.6.10.
stop_receiver_link(#link_ref{role = receiver,
session = Session,
link_handle = Handle}) ->
Flow = #'v1_0.flow'{link_credit = {uint, 0},
echo = true},
ok = amqp10_client_session:flow(Session, Handle, Flow, 0).
ok = amqp10_client_session:flow(Session, Handle, Flow, never).

%%% messages

%% @doc Send a message on a the link referred to be the 'LinkRef'.
%% Returns ok for "async" transfers when messages are sent with settled=true
%% else it returns the delivery state from the disposition
-spec send_msg(link_ref(), amqp10_msg:amqp10_msg()) ->
ok | {error, insufficient_credit | link_not_found | half_attached}.
ok | amqp10_client_session:transfer_error().
send_msg(#link_ref{role = sender, session = Session,
link_handle = Handle}, Msg0) ->
Msg = amqp10_msg:set_handle(Handle, Msg0),
Expand All @@ -358,11 +368,10 @@ accept_msg(LinkRef, Msg) ->
%% the chosen delivery state.
-spec settle_msg(link_ref(), amqp10_msg:amqp10_msg(),
amqp10_client_types:delivery_state()) -> ok.
settle_msg(#link_ref{role = receiver,
session = Session}, Msg, Settlement) ->
settle_msg(LinkRef, Msg, Settlement) ->
DeliveryId = amqp10_msg:delivery_id(Msg),
amqp10_client_session:disposition(Session, receiver, DeliveryId,
DeliveryId, true, Settlement).
amqp10_client_session:disposition(LinkRef, DeliveryId, DeliveryId, true, Settlement).

%% @doc Get a single message from a link.
%% Flows a single link credit then awaits delivery or timeout.
-spec get_msg(link_ref()) -> {ok, amqp10_msg:amqp10_msg()} | {error, timeout}.
Expand Down
3 changes: 1 addition & 2 deletions deps/amqp10_client/src/amqp10_client.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

-define(AMQP_PROTOCOL_HEADER, <<"AMQP", 0, 1, 0, 0>>).
-define(SASL_PROTOCOL_HEADER, <<"AMQP", 3, 1, 0, 0>>).
-define(MIN_MAX_FRAME_SIZE, 512).
-define(MAX_MAX_FRAME_SIZE, 1024 * 1024).
-define(FRAME_HEADER_SIZE, 8).

-define(TIMEOUT, 5000).
Expand All @@ -22,4 +20,5 @@

-record(link_ref, {role :: sender | receiver,
session :: pid(),
%% locally chosen output handle
link_handle :: non_neg_integer()}).
20 changes: 1 addition & 19 deletions deps/amqp10_client/src/amqp10_client_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,12 @@

-behaviour(application).

%% Application callbacks
%% application callbacks
-export([start/2,
stop/1]).

-type start_type() :: (
normal |
{takeover, Node :: node()} |
{failover, Node :: node()}
).
-type state() :: term().

%%====================================================================
%% API
%%====================================================================

-spec start(StartType :: start_type(), StartArgs :: term()) ->
{ok, Pid :: pid()} | {ok, Pid :: pid(), State :: state()} | {error, Reason :: term()}.
start(_Type, _Args) ->
amqp10_client_sup:start_link().

-spec stop(State :: state()) -> ok.
stop(_State) ->
ok.

%%====================================================================
%% Internal functions
%%====================================================================
Loading
Loading