Skip to content

Commit

Permalink
Convert array from AMQP 1.0 to AMQP 0.9.1
Browse files Browse the repository at this point in the history
Fix the following crash when an AMQP 0.9.1 client consumes an AMQP 1.0
encoded message that contains an array value in message annotations:
```
crasher:
  initial call: rabbit_channel:init/1
  pid: <0.685.0>
  registered_name: []
  exception exit: {function_clause,
                      [{mc_amqpl,to_091,
                           [<<"x-array">>,
                            {array,utf8,[{utf8,<<"e1">>},{utf8,<<"e2">>}]}],
                           [{file,"mc_amqpl.erl"},{line,737}]},
                       {mc_amqpl,'-convert_from/3-fun-3-',1,
                           [{file,"mc_amqpl.erl"},{line,168}]},
                       {lists,filtermap_1,2,
                           [{file,"lists.erl"},{line,2279}]},
                       {mc_amqpl,convert_from,3,
                           [{file,"mc_amqpl.erl"},{line,158}]},
                       {mc,convert,3,[{file,"mc.erl"},{line,332}]},
                       {rabbit_channel,handle_deliver0,4,
                           [{file,"rabbit_channel.erl"},{line,2619}]},
                       {lists,foldl_1,3,[{file,"lists.erl"},{line,2151}]},
                       {lists,foldl,3,[{file,"lists.erl"},{line,2146}]}]}
```
  • Loading branch information
ansd committed Oct 22, 2024
1 parent d7c4e94 commit 814d44d
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 6 deletions.
8 changes: 5 additions & 3 deletions deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ set_delivery_annotations(
Anns1 = #'v1_0.delivery_annotations'{content = maps:to_list(Anns)},
Msg#amqp10_msg{delivery_annotations = Anns1}.

-spec set_message_annotations(#{binary() => binary() | integer() | string()},
amqp10_msg()) -> amqp10_msg().
-spec set_message_annotations(#{binary() => binary() | number() | string() | tuple()},
amqp10_msg()) -> amqp10_msg().
set_message_annotations(Props,
#amqp10_msg{message_annotations = undefined} =
Msg) ->
Expand Down Expand Up @@ -436,7 +436,9 @@ wrap_ap_value(V) when is_integer(V) ->
end;
wrap_ap_value(V) when is_number(V) ->
%% AMQP double and Erlang float are both 64-bit.
{double, V}.
{double, V};
wrap_ap_value(TaggedValue) when is_tuple(TaggedValue) ->
TaggedValue.

%% LOCAL
header_value(durable, undefined) -> false;
Expand Down
9 changes: 7 additions & 2 deletions deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -754,9 +754,14 @@ to_091(Key, false) -> {Key, bool, false};
to_091(Key, undefined) -> {Key, void, undefined};
to_091(Key, null) -> {Key, void, undefined};
to_091(Key, {list, L}) ->
{Key, array, [to_091(V) || V <- L]};
to_091_array(Key, L);
to_091(Key, {map, M}) ->
{Key, table, [to_091(unwrap(K), V) || {K, V} <- M]}.
{Key, table, [to_091(unwrap(K), V) || {K, V} <- M]};
to_091(Key, {array, _T, L}) ->
to_091_array(Key, L).

to_091_array(Key, L) ->
{Key, array, [to_091(V) || V <- L]}.

to_091({utf8, V}) -> {longstr, V};
to_091({symbol, V}) -> {longstr, V};
Expand Down
14 changes: 14 additions & 0 deletions deps/rabbit/test/amqp_client_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,13 @@ amqp_amqpl(QType, Config) ->
message_format = {uint, 0}},
Body1,
Footer])),
%% Send with an array value in message annotations.
ok = amqp10_client:send_msg(
Sender,
amqp10_msg:set_message_annotations(
#{<<"x-array">> => {array, utf8, [{utf8, <<"e1">>},
{utf8, <<"e2">>}]}},
amqp10_msg:new(<<>>, Body1, true))),

ok = amqp10_client:detach_link(Sender),
flush(detached),
Expand Down Expand Up @@ -1418,6 +1425,13 @@ amqp_amqpl(QType, Config) ->
?assertEqual([Body1, Footer], amqp10_framing:decode_bin(Payload10))
after 5000 -> ct:fail({missing_deliver, ?LINE})
end,
receive {_, #amqp_msg{payload = Payload11,
props = #'P_basic'{headers = Headers11}}} ->
?assertEqual([Body1], amqp10_framing:decode_bin(Payload11)),
?assertEqual({array, [{longstr, <<"e1">>}, {longstr, <<"e2">>}]},
rabbit_misc:table_lookup(Headers11, <<"x-array">>))
after 5000 -> ct:fail({missing_deliver, ?LINE})
end,

ok = rabbit_ct_client_helpers:close_channel(Ch),
{ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName),
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbit/test/mc_unit_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,8 @@ amqp_amqpl(_Config) ->
MAC = [
{{symbol, <<"x-stream-filter">>}, {utf8, <<"apple">>}},
thead2('x-list', list, [utf8(<<"l">>)]),
thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}])
thead2('x-map', map, [{utf8(<<"k">>), utf8(<<"v">>)}]),
{{symbol, <<"x-array">>}, {array, utf8, [{utf8, <<"a">>}]}}
],
M = #'v1_0.message_annotations'{content = MAC},
P = #'v1_0.properties'{content_type = {symbol, <<"ctype">>},
Expand Down Expand Up @@ -598,6 +599,7 @@ amqp_amqpl(_Config) ->
?assertMatch({_, longstr, <<"apple">>}, header(<<"x-stream-filter">>, HL)),
?assertMatch({_ ,array, [{longstr,<<"l">>}]}, header(<<"x-list">>, HL)),
?assertMatch({_, table, [{<<"k">>,longstr,<<"v">>}]}, header(<<"x-map">>, HL)),
?assertMatch({_, array, [{longstr, <<"a">>}]}, header(<<"x-array">>, HL)),

?assertMatch({_, long, 5}, header(<<"long">>, HL)),
?assertMatch({_, long, 5}, header(<<"ulong">>, HL)),
Expand Down

0 comments on commit 814d44d

Please sign in to comment.