Skip to content

Commit

Permalink
Fix routing key for AMQP 0.9.1 reading from a stream
Browse files Browse the repository at this point in the history
when message was published to a stream via the stream protocol.

Fixes the following test:
```
./mvnw test -Dtest=AmqpInteroperabilityTest#publishToStreamConsumeFromStreamQueue
```
  • Loading branch information
ansd committed Mar 4, 2024
1 parent de261a9 commit ef9ab12
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 35 deletions.
9 changes: 9 additions & 0 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
correlation_id/1,
user_id/1,
message_id/1,
property/2,
timestamp/1,
priority/1,
set_ttl/2,
Expand Down Expand Up @@ -302,6 +303,14 @@ message_id(#?MODULE{protocol = Proto,
message_id(BasicMsg) ->
mc_compat:message_id(BasicMsg).

-spec property(atom(), state()) ->
{utf8, binary()} | undefined.
property(Property, #?MODULE{protocol = Proto,
data = Data}) ->
Proto:property(Property, Data);
property(_Property, _BasicMsg) ->
undefined.

-spec set_ttl(undefined | non_neg_integer(), state()) -> state().
set_ttl(Value, #?MODULE{annotations = Anns} = State) ->
State#?MODULE{annotations = maps:put(ttl, Value, Anns)};
Expand Down
19 changes: 4 additions & 15 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ property(message_id, #msg{properties = #'v1_0.properties'{message_id = MsgId}})
MsgId;
property(user_id, #msg{properties = #'v1_0.properties'{user_id = UserId}}) ->
UserId;
property(subject, #msg{properties = #'v1_0.properties'{subject = Subject}}) ->
Subject;
property(_Prop, #msg{}) ->
undefined.

Expand Down Expand Up @@ -178,13 +180,6 @@ get_property(priority, Msg) ->
_ ->
undefined
end
end;
get_property(subject, Msg) ->
case Msg of
#msg{properties = #'v1_0.properties'{subject = {utf8, Subject}}} ->
Subject;
_ ->
undefined
end.

convert_to(?MODULE, Msg, _Env) ->
Expand Down Expand Up @@ -430,10 +425,6 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
Priority = get_property(priority, Msg),
Timestamp = get_property(timestamp, Msg),
Ttl = get_property(ttl, Msg),
RoutingKeys = case get_property(subject, Msg) of
undefined -> undefined;
Subject -> [Subject]
end,

Deaths = case message_annotation(<<"x-death">>, Msg, undefined) of
{list, DeathMaps} ->
Expand All @@ -458,10 +449,8 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
maps_put_truthy(
ttl, Ttl,
maps_put_truthy(
?ANN_ROUTING_KEYS, RoutingKeys,
maps_put_truthy(
deaths, Deaths,
#{})))))),
deaths, Deaths,
#{}))))),
case MA of
[] ->
Anns;
Expand Down
41 changes: 21 additions & 20 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1562,14 +1562,10 @@ incoming_link_transfer(
[?MODULE, [amqp10_framing:pprint(Section) || Section <- Sections]]),
case rabbit_exchange_lookup(Exchange) of
{ok, X = #exchange{name = #resource{name = XNameBin}}} ->
Anns0 = #{?ANN_EXCHANGE => XNameBin},
Anns = case LinkRKey of
undefined -> Anns0;
_ -> Anns0#{?ANN_ROUTING_KEYS => [LinkRKey]}
end,
Anns = #{?ANN_EXCHANGE => XNameBin},
Mc0 = mc:init(mc_amqp, Sections, Anns),
Mc1 = rabbit_message_interceptor:intercept(Mc0),
{Mc, RoutingKey} = ensure_routing_key(Mc1),
{RoutingKey, Mc1} = ensure_routing_key(LinkRKey, Mc0),
Mc = rabbit_message_interceptor:intercept(Mc1),
check_user_id(Mc, User),
messages_received(Settled),
check_write_permitted_on_topic(X, User, RoutingKey),
Expand Down Expand Up @@ -1620,19 +1616,24 @@ rabbit_exchange_lookup(X = #exchange{}) ->
rabbit_exchange_lookup(XName = #resource{}) ->
rabbit_exchange:lookup(XName).

ensure_routing_key(Mc) ->
case mc:routing_keys(Mc) of
[RoutingKey] ->
{Mc, RoutingKey};
[] ->
%% Set the default routing key of AMQP 0.9.1 'basic.publish'{}.
%% For example, when the client attached to target /exchange/amq.fanout and sends a
%% message without setting a 'subject' in the message properties, the routing key is
%% ignored during routing, but receiving code paths still expect some routing key to be set.
DefaultRoutingKey = <<"">>,
Mc1 = mc:set_annotation(?ANN_ROUTING_KEYS, [DefaultRoutingKey], Mc),
{Mc1, DefaultRoutingKey}
end.
ensure_routing_key(LinkRKey, Mc0) ->
RKey = case LinkRKey of
undefined ->
case mc:property(subject, Mc0) of
undefined ->
%% Set the default routing key of AMQP 0.9.1 'basic.publish'{}.
%% For example, when the client attached to target /exchange/amq.fanout and sends a
%% message without setting a 'subject' in the message properties, the routing key is
%% ignored during routing, but receiving code paths still expect some routing key to be set.
<<"">>;
{utf8, Subject} ->
Subject
end;
_ ->
LinkRKey
end,
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, [RKey], Mc0),
{RKey, Mc}.

process_routing_confirm([], _SenderSettles = true, _, U) ->
rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1),
Expand Down

0 comments on commit ef9ab12

Please sign in to comment.