Skip to content

Commit

Permalink
Add MQTT 5.0 <-> AMQP 1.0 assertions
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Aug 25, 2023
1 parent 84a84c7 commit 17fd5be
Showing 1 changed file with 33 additions and 14 deletions.
47 changes: 33 additions & 14 deletions deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("rabbitmq_stomp/include/rabbit_stomp_frame.hrl").

-import(util,
Expand Down Expand Up @@ -170,8 +171,8 @@ amqp(Config) ->
Correlation = <<"some correlation ID">>,
ContentType = <<"text/plain">>,
RequestPayload = <<"my request">>,
UserProperty = [{<<"rabbit🐇"/utf8>>, <<"carrot🥕"/utf8>>},
{<<"x-rabbit🐇"/utf8>>, <<"carrot🥕"/utf8>>},
UserProperty = [{<<"🐇"/utf8>>, <<"🥕"/utf8>>},
{<<"x-🐇"/utf8>>, <<"🥕"/utf8>>},
{<<"key">>, <<"val">>},
{<<"key">>, <<"val">>},
{<<"x-key">>, <<"val">>},
Expand All @@ -180,21 +181,38 @@ amqp(Config) ->
#{'Content-Type' => ContentType,
'Correlation-Data' => Correlation,
'Response-Topic' => MqttResponseTopic,
'User-Property' => UserProperty},
'User-Property' => UserProperty,
'Payload-Format-Indicator' => 1},
RequestPayload, [{qos, 1}]),

%% As of 3.13, AMQP 1.0 is proxied via AMQP 0.9.1 and therefore the conversion from
%% mc_mqtt to mc_amqpl takes place. We therefore lose MQTT User Property and Response Topic
%% which gets converted to AMQP 0.9.1 headers. In the future, Native AMQP 1.0 will convert
%% from mc_mqtt to mc_amqp allowing us to do many more assertions here.
{ok, Msg1} = amqp10_client:get_msg(Receiver),
ct:pal("Received AMQP 1.0 message:~n~p", [Msg1]),
?assertEqual([RequestPayload], amqp10_msg:body(Msg1)),
?assertMatch(#{correlation_id := Correlation,
content_type := ContentType}, amqp10_msg:properties(Msg1)),

?assert(amqp10_msg:header(durable, Msg1)),
?assert(amqp10_msg:header(first_acquirer, Msg1)),

%% We expect to receive x-headers in message annotations.
%% However, since annotation keys are symbols and symbols are only valid ASCII,
%% we expect header
%% {<<"x-🐇"/utf8>>, <<"🥕"/utf8>>}
%% to be dropped.
?assertEqual(#{<<"x-key">> => <<"val">>,
<<"x-exchange">> => <<"amq.topic">>,
<<"x-routing-key">> => <<"topic.1">>},
amqp10_msg:message_annotations(Msg1)),
%% In contrast, application property keys are of type string, and therefore UTF-8 encoded.
?assertEqual(#{<<"🐇"/utf8>> => <<"🥕"/utf8>>,
<<"key">> => <<"val">>},
amqp10_msg:application_properties(Msg1)),

#{correlation_id := Correlation,
content_type := ContentType,
reply_to := ReplyToAddress} = amqp10_msg:properties(Msg1),
?assertEqual(<<"/topic/response.topic">>, ReplyToAddress),

%% Thanks to the 'Payload-Format-Indicator', we get a single utf8 value.
?assertEqual(#'v1_0.amqp_value'{content = {utf8, RequestPayload}}, amqp10_msg:body(Msg1)),

ok = amqp10_client:settle_msg(Receiver, Msg1, accepted),
ok = amqp10_client:detach_link(Receiver),
ok = amqp10_client:end_session(Session1),
Expand All @@ -205,15 +223,14 @@ amqp(Config) ->
{ok, Session2} = amqp10_client:begin_session(Connection2),
SenderLinkName = <<"test-sender">>,
{ok, Sender} = amqp10_client:attach_sender_link(
%% With Native AMQP 1.0, address should be read from received reply-to
Session2, SenderLinkName, <<"/topic/response.topic">>, unsettled),
Session2, SenderLinkName, ReplyToAddress, unsettled),
receive {amqp10_event, {link, Sender, credited}} -> ok
after 1000 -> ct:fail(credited_timeout)
end,

DTag = <<"my-dtag">>,
ReplyPayload = <<"my response">>,
Msg2a = amqp10_msg:new(DTag, ReplyPayload),
Msg2a = amqp10_msg:new(DTag, #'v1_0.amqp_value'{content = {utf8, ReplyPayload}}),
Msg2b = amqp10_msg:set_properties(
#{correlation_id => Correlation,
content_type => ContentType},
Expand All @@ -237,7 +254,9 @@ amqp(Config) ->
payload := ReplyPayload,
properties := #{'Content-Type' := ContentType,
'Correlation-Data' := Correlation,
'Subscription-Identifier' := 999}},
'Subscription-Identifier' := 999,
%% since the AMQP 1.0 client sent UTF-8
'Payload-Format-Indicator' := 1}},
MqttMsg)
after 1000 -> ct:fail("did not receive reply")
end,
Expand Down

0 comments on commit 17fd5be

Please sign in to comment.