diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 0509da510554..1ea00f279737 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -19,10 +19,15 @@ -rabbit_deprecated_feature( {amqp_address_v1, #{deprecation_phase => permitted_by_default, + doc_url => "https://www.rabbitmq.com/docs/next/amqp#address", messages => #{when_permitted => "RabbitMQ AMQP address version 1 is deprecated. " - "Clients should use RabbitMQ AMQP address version 2."}} + "Clients should use RabbitMQ AMQP address version 2.", + when_denied => + "RabbitMQ AMQP address version 1 is unsupported. " + "Clients must use RabbitMQ AMQP address version 2." + }} }). -define(PROTOCOL, amqp10). @@ -2422,12 +2427,20 @@ ensure_source(#'v1_0.source'{address = Address, durable = Durable}, Vhost, User, PermCache, TopicPermCache) -> case Address of + {utf8, <<"/q/", QNameBinQuoted/binary>>} -> + %% The only possible v2 source address format is: + %% /q/:queue + QNameBin = unquote(QNameBinQuoted), + QName = queue_resource(Vhost, QNameBin), + ok = exit_if_absent(QName), + {ok, QName, PermCache, TopicPermCache}; {utf8, SourceAddr} -> case address_v1_permitted() of - true -> ensure_source_v1( - SourceAddr, Vhost, User, Durable, PermCache, TopicPermCache); - false -> ensure_source_v2( - SourceAddr, Vhost, PermCache, TopicPermCache) + true -> + ensure_source_v1(SourceAddr, Vhost, User, Durable, + PermCache, TopicPermCache); + false -> + {error, {amqp_address_v1_not_permitted, Address}} end; _ -> {error, {bad_address, Address}} @@ -2467,19 +2480,10 @@ ensure_source_v1(Address, Err end end; - {error, _} -> - ensure_source_v2(Address, Vhost, PermCache0, TopicPermCache0) + {error, _} = Err -> + Err end. -%% The only possible v2 source address format is: -%% /queue/:queue -ensure_source_v2(<<"/queue/", QNameBin/binary>>, Vhost, PermCache, TopicPermCache) -> - QName = queue_resource(Vhost, QNameBin), - ok = exit_if_absent(QName), - {ok, QName, PermCache, TopicPermCache}; -ensure_source_v2(Address, _, _, _) -> - {error, {bad_address, Address}}. - -spec ensure_target(#'v1_0.target'{}, rabbit_types:vhost(), rabbit_types:user(), @@ -2495,29 +2499,28 @@ ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) -> ensure_target(#'v1_0.target'{address = Address, durable = Durable}, Vhost, User, PermCache) -> - case address_v1_permitted() of - true -> - try_target_v1(Address, Vhost, User, Durable, PermCache); - false -> - try_target_v2(Address, Vhost, User, PermCache) - end. - -try_target_v1(Address, Vhost, User, Durable, PermCache0) -> - case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of - {ok, XNameBin, RKey, QNameBin, PermCache} -> - check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache); - {error, _} -> - try_target_v2(Address, Vhost, User, PermCache0) - end. - -try_target_v2(Address, Vhost, User, PermCache) -> - case ensure_target_v2(Address, Vhost) of - {ok, to, RKey, QNameBin} -> - {ok, to, RKey, QNameBin, PermCache}; - {ok, XNameBin, RKey, QNameBin} -> - check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache); - {error, _} = Err -> - Err + case target_address_version(Address) of + 2 -> + case ensure_target_v2(Address, Vhost) of + {ok, to, RKey, QNameBin} -> + {ok, to, RKey, QNameBin, PermCache}; + {ok, XNameBin, RKey, QNameBin} -> + check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache); + {error, _} = Err -> + Err + end; + 1 -> + case address_v1_permitted() of + true -> + case ensure_target_v1(Address, Vhost, User, Durable, PermCache) of + {ok, XNameBin, RKey, QNameBin, PermCache1} -> + check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache1); + {error, _} = Err -> + Err + end; + false -> + {error, {amqp_address_v1_not_permitted, Address}} + end end. check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) -> @@ -2539,29 +2542,24 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) -> exit_not_found(XName) end. -ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) -> - case rabbit_routing_parser:parse_endpoint(Address, true) of - {ok, Dest} -> - {QNameBin, PermCache} = ensure_terminus( - target, Dest, Vhost, User, Durable, PermCache0), - {XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest), - XNameBin = unicode:characters_to_binary(XNameList1), - RoutingKey = case RK of - undefined -> subject; - [] -> subject; - _ -> unicode:characters_to_binary(RK) - end, - {ok, XNameBin, RoutingKey, QNameBin, PermCache}; - {error, _} = Err -> - Err - end; -ensure_target_v1(Address, _, _, _, _) -> - {error, {bad_address, Address}}. +address_v1_permitted() -> + rabbit_deprecated_features:is_permitted(amqp_address_v1). + +target_address_version({utf8, <<"/e/", _/binary>>}) -> + 2; +target_address_version({utf8, <<"/q/", _/binary>>}) -> + 2; +target_address_version(undefined) -> + %% anonymous terminus + %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay + 2; +target_address_version(_Address) -> + 1. %% The possible v2 target address formats are: -%% /exchange/:exchange/key/:routing-key -%% /exchange/:exchange -%% /queue/:queue +%% /e/:exchange/:routing-key +%% /e/:exchange +%% /q/:queue %% ensure_target_v2({utf8, String}, Vhost) -> case parse_target_v2_string(String) of @@ -2576,43 +2574,77 @@ ensure_target_v2({utf8, String}, Vhost) -> ensure_target_v2(undefined, _) -> %% anonymous terminus %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay - {ok, to, to, undefined}; -ensure_target_v2(Address, _) -> - {error, {bad_address, Address}}. + {ok, to, to, undefined}. -parse_target_v2_string(<<"/exchange/", Rest/binary>>) -> - case split_exchange_target(Rest) of - {?DEFAULT_EXCHANGE_NAME, _} -> +parse_target_v2_string(<<"/e/", Rest/binary>>) -> + Key = cp_slash, + Pattern = try persistent_term:get(Key) + catch error:badarg -> + Cp = binary:compile_pattern(<<"/">>), + ok = persistent_term:put(Key, Cp), + Cp + end, + case binary:split(Rest, Pattern, [global]) of + [?DEFAULT_EXCHANGE_NAME | _] -> {error, bad_address}; - {<<"amq.default">>, _} -> + [<<"amq.default">> | _] -> {error, bad_address}; - {XNameBin, RKey} -> - {ok, XNameBin, RKey, undefined} + [XNameBinQuoted] -> + XNameBin = unquote(XNameBinQuoted), + {ok, XNameBin, <<>>, undefined}; + [XNameBinQuoted, RKeyQuoted] -> + XNameBin = unquote(XNameBinQuoted), + RKey = unquote(RKeyQuoted), + {ok, XNameBin, RKey, undefined}; + _ -> + {error, bad_address} end; -parse_target_v2_string(<<"/queue/">>) -> +parse_target_v2_string(<<"/q/">>) -> %% empty queue name is invalid {error, bad_address}; -parse_target_v2_string(<<"/queue/", QNameBin/binary>>) -> +parse_target_v2_string(<<"/q/", QNameBinQuoted/binary>>) -> + QNameBin = unquote(QNameBinQuoted), {ok, ?DEFAULT_EXCHANGE_NAME, QNameBin, QNameBin}; parse_target_v2_string(_) -> {error, bad_address}. -%% Empty exchange name (default exchange) is valid. -split_exchange_target(Target) -> - Key = cp_amqp_target_address, - Pattern = try persistent_term:get(Key) - catch error:badarg -> - Cp = binary:compile_pattern(<<"/key/">>), - ok = persistent_term:put(Key, Cp), - Cp - end, - case binary:split(Target, Pattern) of - [XNameBin] -> - {XNameBin, <<>>}; - [XNameBin, RoutingKey] -> - {XNameBin, RoutingKey} +ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) -> + case rabbit_routing_parser:parse_endpoint(Address, true) of + {ok, Dest} -> + {QNameBin, PermCache} = ensure_terminus( + target, Dest, Vhost, User, Durable, PermCache0), + {XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest), + XNameBin = unicode:characters_to_binary(XNameList1), + RoutingKey = case RK of + undefined -> subject; + [] -> subject; + _ -> unicode:characters_to_binary(RK) + end, + {ok, XNameBin, RoutingKey, QNameBin, PermCache}; + {error, _} = Err -> + Err + end; +ensure_target_v1(Address, _, _, _, _) -> + {error, {bad_address, Address}}. + +%% uri_string:unquote/1 is implemented inefficiently because it always creates +%% a new binary. We optimise for the common case: When no character is percent +%% encoded, we avoid a new binary being created. +unquote(Bin) -> + case is_quoted(Bin) of + true -> + uri_string:unquote(Bin); + false -> + Bin end. +is_quoted(<<>>) -> + false; +is_quoted(<<$%, _/binary>>) -> + true; +is_quoted(<<_, Rest/binary>>) -> + is_quoted(Rest). + handle_outgoing_mgmt_link_flow_control( #management_link{delivery_count = DeliveryCountSnd} = Link0, #'v1_0.flow'{handle = Handle = ?UINT(HandleInt), @@ -3355,14 +3387,24 @@ error_not_found(Resource) -> condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, description = {utf8, Description}}. -address_v1_permitted() -> - rabbit_deprecated_features:is_permitted(amqp_address_v1). - -spec cap_credit(rabbit_queue_type:credit()) -> 0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX. cap_credit(DesiredCredit) -> min(DesiredCredit, ?LINK_CREDIT_RCV_FROM_QUEUE_MAX). +ensure_mc_cluster_compat(Mc) -> + IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1), + case IsEnabled of + true -> + Mc; + false -> + McEnv = #{message_containers_store_amqp_v1 => IsEnabled}, + %% other nodes in the cluster may not understand the new internal + %% amqp mc format - in this case we convert to AMQP legacy format + %% for compatibility + mc:convert(mc_amqpl, Mc, McEnv) + end. + format_status( #{state := #state{cfg = Cfg, outgoing_pending = OutgoingPending, @@ -3407,16 +3449,3 @@ format_status( permission_cache => PermissionCache, topic_permission_cache => TopicPermissionCache}, maps:update(state, State, Status). - -ensure_mc_cluster_compat(Mc) -> - IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1), - case IsEnabled of - true -> - Mc; - false -> - McEnv = #{message_containers_store_amqp_v1 => IsEnabled}, - %% other nodes in the cluster may not understand the new internal - %% amqp mc format - in this case we convert to AMQP legacy format - %% for compatibility - mc:convert(mc_amqpl, Mc, McEnv) - end. diff --git a/deps/rabbit/test/amqp_address_SUITE.erl b/deps/rabbit/test/amqp_address_SUITE.erl index eb7f8c98935e..614758ad8c81 100644 --- a/deps/rabbit/test/amqp_address_SUITE.erl +++ b/deps/rabbit/test/amqp_address_SUITE.erl @@ -22,7 +22,7 @@ all() -> [ {group, v1_permitted}, - {group, v2} + {group, v1_denied} ]. groups() -> @@ -30,7 +30,7 @@ groups() -> {v1_permitted, [shuffle], common_tests() }, - {v2, [shuffle], + {v1_denied, [shuffle], [ target_queue_absent, source_queue_absent, @@ -70,7 +70,7 @@ end_per_suite(Config) -> init_per_group(Group, Config0) -> PermitV1 = case Group of v1_permitted -> true; - v2 -> false + v1_denied -> false end, Config = rabbit_ct_helpers:merge_app_env( Config0, @@ -97,14 +97,14 @@ end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). %% Test v2 target address -%% /exchange/:exchange/key/:routing-key +%% /e/:exchange/:routing-key target_exchange_routing_key(Config) -> XName = <<"πŸ‘‰"/utf8>>, RKey = <<"πŸ—οΈ"/utf8>>, target_exchange_routing_key0(XName, RKey, Config). %% Test v2 target address -%% /exchange/:exchange/key/:routing-key +%% /e/:exchange/:routing-key %% where both :exchange and :routing-key contains a "/" character. target_exchange_routing_key_with_slash(Config) -> XName = <<"my/exchange">>, @@ -112,14 +112,14 @@ target_exchange_routing_key_with_slash(Config) -> target_exchange_routing_key0(XName, RKey, Config). target_exchange_routing_key0(XName, RKey, Config) -> - TargetAddr = <<"/exchange/", XName/binary, "/key/", RKey/binary>>, + TargetAddr = rabbitmq_amqp_address:exchange(XName, RKey), QName = atom_to_binary(?FUNCTION_NAME), Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{}), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, RKey, #{}), - SrcAddr = <<"/queue/", QName/binary>>, + SrcAddr = rabbitmq_amqp_address:queue(QName), {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SrcAddr), {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), @@ -141,17 +141,17 @@ target_exchange_routing_key0(XName, RKey, Config) -> ok = cleanup(Init). %% Test v2 target address -%% /exchange/:exchange/key/ +%% /e/:exchange/ %% Routing key is empty. target_exchange_routing_key_empty(Config) -> XName = <<"amq.fanout">>, + TargetAddr = rabbitmq_amqp_address:exchange(XName, <<>>), QName = atom_to_binary(?FUNCTION_NAME), - TargetAddr = <<"/exchange/", XName/binary, "/key/">>, Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, <<"ignored">>, #{}), - SrcAddr = <<"/queue/", QName/binary>>, + SrcAddr = rabbitmq_amqp_address:queue(QName), {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SrcAddr), {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), @@ -167,17 +167,17 @@ target_exchange_routing_key_empty(Config) -> ok = cleanup(Init). %% Test v2 target address -%% /exchange/:exchange +%% /e/:exchange %% Routing key is empty. target_exchange(Config) -> XName = <<"amq.fanout">>, - TargetAddr = <<"/exchange/", XName/binary>>, + TargetAddr = rabbitmq_amqp_address:exchange(XName), QName = atom_to_binary(?FUNCTION_NAME), Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, <<"ignored">>, #{}), - SrcAddr = <<"/queue/", QName/binary>>, + SrcAddr = rabbitmq_amqp_address:queue(QName), {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SrcAddr), {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), @@ -193,11 +193,11 @@ target_exchange(Config) -> ok = cleanup(Init). %% Test v2 target address -%% /exchange/:exchange +%% /e/:exchange %% where the target exchange does not exist. target_exchange_absent(Config) -> XName = <<"🎈"/utf8>>, - TargetAddr = <<"/exchange/", XName/binary>>, + TargetAddr = rabbitmq_amqp_address:exchange(XName), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -220,20 +220,20 @@ target_exchange_absent(Config) -> ok = amqp10_client:close_connection(Connection). %% Test v2 target and source address -%% /queue/:queue +%% /q/:queue queue(Config) -> QName = <<"🎈"/utf8>>, queue0(QName, Config). %% Test v2 target and source address -%% /queue/:queue +%% /q/:queue %% where :queue contains a "/" character. queue_with_slash(Config) -> QName = <<"my/queue">>, queue0(QName, Config). queue0(QName, Config) -> - Addr = <<"/queue/", QName/binary>>, + Addr = rabbitmq_amqp_address:queue(QName), Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), @@ -252,11 +252,11 @@ queue0(QName, Config) -> ok = cleanup(Init). %% Test v2 target address -%% /queue/:queue +%% /q/:queue %% where the target queue does not exist. target_queue_absent(Config) -> QName = <<"🎈"/utf8>>, - TargetAddr = <<"/queue/", QName/binary>>, + TargetAddr = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -279,15 +279,15 @@ target_queue_absent(Config) -> ok = amqp10_client:close_connection(Connection). %% Test v2 target address 'null' and 'to' -%% /exchange/:exchange/key/:routing-key +%% /e/:exchange/:routing-key %% with varying routing keys. target_per_message_exchange_routing_key(Config) -> QName = atom_to_binary(?FUNCTION_NAME), DirectX = <<"amq.direct">>, RKey1 = <<"πŸ—οΈ1"/utf8>>, RKey2 = <<"πŸ—οΈ2"/utf8>>, - To1 = <<"/exchange/", DirectX/binary, "/key/", RKey1/binary>>, - To2 = <<"/exchange/", DirectX/binary, "/key/", RKey2/binary>>, + To1 = rabbitmq_amqp_address:exchange(DirectX, RKey1), + To2 = rabbitmq_amqp_address:exchange(DirectX, RKey2), Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), @@ -315,13 +315,13 @@ target_per_message_exchange_routing_key(Config) -> ok = cleanup(Init). %% Test v2 target address 'null' and 'to' -%% /exchange/:exchange +%% /e/:exchange %% with varying exchanges. target_per_message_exchange(Config) -> XFanout = <<"amq.fanout">>, XHeaders = <<"amq.headers">>, - To1 = <<"/exchange/", XFanout/binary>>, - To2 = <<"/exchange/", XHeaders/binary>>, + To1 = rabbitmq_amqp_address:exchange(XFanout), + To2 = rabbitmq_amqp_address:exchange(XHeaders), QName = atom_to_binary(?FUNCTION_NAME), Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), @@ -349,14 +349,14 @@ target_per_message_exchange(Config) -> ok = cleanup(Init). %% Test v2 target address 'null' and 'to' -%% /queue/:queue +%% /q/:queue target_per_message_queue(Config) -> Q1 = <<"q1">>, Q2 = <<"q2">>, Q3 = <<"q3">>, - To1 = <<"/queue/", Q1/binary>>, - To2 = <<"/queue/", Q2/binary>>, - To3 = <<"/queue/", Q3/binary>>, + To1 = rabbitmq_amqp_address:queue(Q1), + To2 = rabbitmq_amqp_address:queue(Q2), + To3 = rabbitmq_amqp_address:queue(Q3), Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, Q1, #{}), @@ -414,10 +414,23 @@ bad_v2_addresses() -> <<"myqueue">>, <<"/queue">>, %% bad v2 target addresses + <<>>, + <<0>>, + <<"/">>, + <<"//">>, + <<"/q">>, + <<"/q/">>, <<"/queue/">>, + <<"/e">>, + %% default exchange in v2 target address is disallowed + <<"/e/">>, + <<"/e//">>, + <<"/e//mykey">>, + <<"/e/amq.default">>, + <<"/e/amq.default/">>, + <<"/e/amq.default/mykey">>, <<"/ex/βœ‹"/utf8>>, <<"/exchange">>, - %% default exchange in v2 target address is disallowed <<"/exchange/">>, <<"/exchange/amq.default">>, <<"/exchange//key/">>, @@ -458,7 +471,7 @@ target_per_message_bad_to_address0(Address, Config) -> target_per_message_exchange_absent(Config) -> Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), XName = <<"🎈"/utf8>>, - Address = <<"/exchange/", XName/binary>>, + Address = rabbitmq_amqp_address:exchange(XName), ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{}), {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null), ok = wait_for_credit(Sender), @@ -514,11 +527,11 @@ target_bad_address0(TargetAddress, Config) -> ok = amqp10_client:close_connection(Connection). %% Test v2 source address -%% /queue/:queue +%% /q/:queue %% where the source queue does not exist. source_queue_absent(Config) -> QName = <<"🎈"/utf8>>, - SourceAddr = <<"/queue/", QName/binary>>, + SourceAddr = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl index 69f44bedf818..0ff70bf0c520 100644 --- a/deps/rabbit/test/amqp_auth_SUITE.erl +++ b/deps/rabbit/test/amqp_auth_SUITE.erl @@ -413,7 +413,7 @@ v1_attach_target_internal_exchange(Config) -> attach_source_queue(Config) -> {Conn, Session, LinkPair} = init_pair(Config), QName = <<"🍿"/utf8>>, - Address = <<"/queue/", QName/binary>>, + Address = rabbitmq_amqp_address:queue(QName), %% missing read permission to queue ok = set_permissions(Config, QName, <<>>, <<>>), @@ -433,8 +433,8 @@ attach_source_queue(Config) -> attach_target_exchange(Config) -> XName = <<"amq.fanout">>, - Address1 = <<"/exchange/", XName/binary>>, - Address2 = <<"/exchange/", XName/binary, "/key/some-key", XName/binary>>, + Address1 = rabbitmq_amqp_address:exchange(XName), + Address2 = rabbitmq_amqp_address:exchange(XName, <<"some-key">>), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -457,13 +457,14 @@ attach_target_exchange(Config) -> ok = amqp10_client:close_connection(Connection). attach_target_topic_exchange(Config) -> - TargetAddress = <<"/exchange/amq.topic/key/test vhost.test user.a.b">>, + TargetAddress = rabbitmq_amqp_address:exchange( + <<"amq.topic">>, <<"test vhost.test user.a.b">>), ok = send_to_topic(TargetAddress, Config). attach_target_queue(Config) -> {Conn, Session, LinkPair} = init_pair(Config), QName = <<"🍿"/utf8>>, - Address = <<"/queue/", QName/binary>>, + Address = rabbitmq_amqp_address:queue(QName), %% missing write permission to default exchange ok = set_permissions(Config, QName, <<>>, <<>>), @@ -480,8 +481,8 @@ attach_target_queue(Config) -> target_per_message_exchange(Config) -> TargetAddress = null, - To1 = <<"/exchange/amq.fanout">>, - To2 = <<"/queue/q1">>, + To1 = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), + To2 = rabbitmq_amqp_address:queue(<<"q1">>), %% missing write permission to default exchange ok = set_permissions(Config, <<>>, <<"amq.fanout">>, <<>>), @@ -516,7 +517,7 @@ target_per_message_internal_exchange(Config) -> XName = <<"my internal exchange">>, XProps = #{internal => true}, TargetAddress = null, - To = <<"/exchange/", XName/binary>>, + To = rabbitmq_amqp_address:exchange(XName), ok = set_permissions(Config, XName, XName, <<>>), {Conn1, Session1, LinkPair1} = init_pair(Config), @@ -541,8 +542,8 @@ target_per_message_internal_exchange(Config) -> target_per_message_topic(Config) -> TargetAddress = null, - To1 = <<"/exchange/amq.topic/key/.a">>, - To2 = <<"/exchange/amq.topic/key/.a.b">>, + To1 = rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<".a">>), + To2 = rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<".a.b">>), User = ?config(test_user, Config), Vhost = ?config(test_vhost, Config), ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, Vhost), diff --git a/deps/rabbitmq_amqp_client/app.bzl b/deps/rabbitmq_amqp_client/app.bzl index 6f3e3c4c0446..d80a6dafe4f5 100644 --- a/deps/rabbitmq_amqp_client/app.bzl +++ b/deps/rabbitmq_amqp_client/app.bzl @@ -8,7 +8,7 @@ def all_beam_files(name = "all_beam_files"): ) erlang_bytecode( name = "other_beam", - srcs = ["src/rabbitmq_amqp_client.erl"], + srcs = ["src/rabbitmq_amqp_address.erl", "src/rabbitmq_amqp_client.erl"], hdrs = [":public_and_private_hdrs"], app_name = "rabbitmq_amqp_client", dest = "ebin", @@ -19,7 +19,7 @@ def all_beam_files(name = "all_beam_files"): def all_srcs(name = "all_srcs"): filegroup( name = "srcs", - srcs = ["src/rabbitmq_amqp_client.erl"], + srcs = ["src/rabbitmq_amqp_address.erl", "src/rabbitmq_amqp_client.erl"], ) filegroup(name = "private_hdrs") filegroup( @@ -47,7 +47,7 @@ def all_test_beam_files(name = "all_test_beam_files"): erlang_bytecode( name = "test_other_beam", testonly = True, - srcs = ["src/rabbitmq_amqp_client.erl"], + srcs = ["src/rabbitmq_amqp_address.erl", "src/rabbitmq_amqp_client.erl"], hdrs = [":public_and_private_hdrs"], app_name = "rabbitmq_amqp_client", dest = "test", diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_address.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_address.erl new file mode 100644 index 000000000000..a0c07d4fde57 --- /dev/null +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_address.erl @@ -0,0 +1,30 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term β€œBroadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +-module(rabbitmq_amqp_address). + +-export[exchange/1, + exchange/2, + queue/1]. + +-spec exchange(unicode:unicode_binary()) -> + unicode:unicode_binary(). +exchange(ExchangeName) -> + ExchangeNameQuoted = uri_string:quote(ExchangeName), + <<"/e/", ExchangeNameQuoted/binary>>. + +-spec exchange(unicode:unicode_binary(), unicode:unicode_binary()) -> + unicode:unicode_binary(). +exchange(ExchangeName, RoutingKey) -> + ExchangeNameQuoted = uri_string:quote(ExchangeName), + RoutingKeyQuoted = uri_string:quote(RoutingKey), + <<"/e/", ExchangeNameQuoted/binary, "/", RoutingKeyQuoted/binary>>. + +-spec queue(unicode:unicode_binary()) -> + unicode:unicode_binary(). +queue(QueueName) -> + QueueNameQuoted = uri_string:quote(QueueName), + <<"/q/", QueueNameQuoted/binary>>. diff --git a/moduleindex.yaml b/moduleindex.yaml index 2eb563069e53..cb95f0335636 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -807,6 +807,7 @@ rabbit_common: - worker_pool_sup - worker_pool_worker rabbitmq_amqp_client: +- rabbitmq_amqp_address - rabbitmq_amqp_client rabbitmq_amqp1_0: - rabbitmq_amqp1_0_noop