Skip to content

Commit

Permalink
Support x-cc message annotation
Browse files Browse the repository at this point in the history
Support an `x-cc` message annotation in AMQP 1.0
similar to the [CC](https://www.rabbitmq.com/docs/sender-selected) header in AMQP 0.9.1.

The value of the `x-cc` message annotation must by an array of strings.
A message annotation is used since application properties allow only simple types.
  • Loading branch information
ansd committed Oct 22, 2024
1 parent 814d44d commit 715d421
Show file tree
Hide file tree
Showing 17 changed files with 621 additions and 115 deletions.
6 changes: 6 additions & 0 deletions deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,12 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "topic_permission_SUITE",
size = "medium",
additional_beam = [
":test_amqp_utils_beam",
],
runtime_deps = [
"//deps/rabbitmq_amqp_client:erlang_app",
],
)

rabbitmq_integration_suite(
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1559,7 +1559,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
outs = ["test/topic_permission_SUITE.beam"],
app_name = "rabbit",
erlc_opts = "//:test_erlc_opts",
deps = ["//deps/amqp_client:erlang_app"],
deps = ["//deps/amqp10_common:erlang_app", "//deps/amqp_client:erlang_app"],
)
erlang_bytecode(
name = "transactions_SUITE_beam_files",
Expand Down
30 changes: 28 additions & 2 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
priority/1,
set_ttl/2,
x_header/2,
x_headers/1,
routing_headers/2,
exchange/1,
routing_keys/1,
Expand Down Expand Up @@ -88,6 +89,7 @@
{timestamp, non_neg_integer()} |
{list, [tagged_value()]} |
{map, [{tagged_value(), tagged_value()}]} |
{array, atom(), [tagged_value()]} |
null |
undefined.

Expand All @@ -104,11 +106,16 @@
{MetadataSize :: non_neg_integer(),
PayloadSize :: non_neg_integer()}.

%% retrieve and x- header from the protocol data
%% retrieve an x- header from the protocol data
%% the return value should be tagged with an AMQP 1.0 type
-callback x_header(binary(), proto_state()) ->
tagged_value().

%% retrieve x- headers from the protocol data
%% the return values should be tagged with an AMQP 1.0 type
-callback x_headers(proto_state()) ->
#{binary() => tagged_value()}.

%% retrieve a property field from the protocol data
%% e.g. message_id, correlation_id
-callback property(atom(), proto_state()) ->
Expand Down Expand Up @@ -148,7 +155,7 @@ init(Proto, Data, Anns) ->
-spec init(protocol(), term(), annotations(), environment()) -> state().
init(Proto, Data, Anns0, Env) ->
{ProtoData, ProtoAnns} = Proto:init(Data),
Anns1 = case map_size(Env) == 0 of
Anns1 = case map_size(Env) =:= 0 of
true -> Anns0;
false -> Anns0#{env => Env}
end,
Expand Down Expand Up @@ -214,6 +221,25 @@ x_header(Key, #?MODULE{protocol = Proto,
x_header(Key, BasicMsg) ->
mc_compat:x_header(Key, BasicMsg).

-spec x_headers(state()) ->
#{binary() => tagged_value()}.
x_headers(#?MODULE{protocol = Proto,
annotations = Anns,
data = Data}) ->
%% x-headers may be have been added to the annotations map.
New = maps:filtermap(
fun(Key, Val) ->
case mc_util:is_x_header(Key) of
true ->
{true, mc_util:infer_type(Val)};
false ->
false
end
end, Anns),
maps:merge(Proto:x_headers(Data), New);
x_headers(BasicMsg) ->
mc_compat:x_headers(BasicMsg).

-spec routing_headers(state(), [x_headers | complex_types]) ->
#{binary() => property_value()}.
routing_headers(#?MODULE{protocol = Proto,
Expand Down
47 changes: 13 additions & 34 deletions deps/rabbit/src/mc_amqp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
init/1,
size/1,
x_header/2,
x_headers/1,
property/2,
routing_headers/2,
convert_to/3,
Expand Down Expand Up @@ -125,6 +126,9 @@ size(#v1{message_annotations = MA,
x_header(Key, Msg) ->
message_annotation(Key, Msg, undefined).

x_headers(Msg) ->
#{K => V || {{_T, K}, V} <- message_annotations(Msg)}.

property(_Prop, #msg_body_encoded{properties = undefined}) ->
undefined;
property(Prop, #msg_body_encoded{properties = Props}) ->
Expand Down Expand Up @@ -618,41 +622,16 @@ encode_deaths(Deaths) ->
{map, Map}
end, Deaths).

essential_properties(#msg_body_encoded{message_annotations = MA} = Msg) ->
essential_properties(Msg) ->
Durable = get_property(durable, Msg),
Priority = get_property(priority, Msg),
Timestamp = get_property(timestamp, Msg),
Ttl = get_property(ttl, Msg),
Anns0 = #{?ANN_DURABLE => Durable},
Anns = maps_put_truthy(
?ANN_PRIORITY, Priority,
maps_put_truthy(
?ANN_TIMESTAMP, Timestamp,
maps_put_truthy(
ttl, Ttl,
Anns0))),
case MA of
[] ->
Anns;
_ ->
lists:foldl(
fun ({{symbol, <<"x-routing-key">>},
{utf8, Key}}, Acc) ->
maps:update_with(?ANN_ROUTING_KEYS,
fun(L) -> [Key | L] end,
[Key],
Acc);
({{symbol, <<"x-cc">>},
{list, CCs0}}, Acc) ->
CCs = [CC || {_T, CC} <- CCs0],
maps:update_with(?ANN_ROUTING_KEYS,
fun(L) -> L ++ CCs end,
CCs,
Acc);
({{symbol, <<"x-exchange">>},
{utf8, Exchange}}, Acc) ->
Acc#{?ANN_EXCHANGE => Exchange};
(_, Acc) ->
Acc
end, Anns, MA)
end.
Anns = #{?ANN_DURABLE => Durable},
maps_put_truthy(
?ANN_PRIORITY, Priority,
maps_put_truthy(
?ANN_TIMESTAMP, Timestamp,
maps_put_truthy(
ttl, Ttl,
Anns))).
59 changes: 55 additions & 4 deletions deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
init/1,
size/1,
x_header/2,
x_headers/1,
routing_headers/2,
convert_to/3,
convert_from/3,
Expand Down Expand Up @@ -273,6 +274,23 @@ x_header(Key, #content{properties = none} = Content0) ->
Content = rabbit_binary_parser:ensure_content_decoded(Content0),
x_header(Key, Content).

x_headers(#content{properties = #'P_basic'{headers = undefined}}) ->
#{};
x_headers(#content{properties = #'P_basic'{headers = Headers}}) ->
L = lists:filtermap(
fun({Name, Type, Val}) ->
case mc_util:is_x_header(Name) of
true ->
{true, {Name, from_091(Type, Val)}};
false ->
false
end
end, Headers),
maps:from_list(L);
x_headers(#content{properties = none} = Content0) ->
Content = rabbit_binary_parser:ensure_content_decoded(Content0),
x_headers(Content).

property(Prop, Content) ->
mc_util:infer_type(mc_compat:get_property(Prop, Content)).

Expand Down Expand Up @@ -690,10 +708,23 @@ from_091(binary, V) -> {binary, V};
from_091(timestamp, V) -> {timestamp, V * 1000};
from_091(byte, V) -> {byte, V};
from_091(void, _V) -> null;
from_091(array, L) ->
{list, [from_091(T, V) || {T, V} <- L]};
from_091(table, L) ->
{map, [{wrap(symbol, K), from_091(T, V)} || {K, T, V} <- L]}.
{map, [{wrap(symbol, K), from_091(T, V)} || {K, T, V} <- L]};
from_091(array, []) ->
{list, []};
from_091(array, L0 = [{T0, _} | _]) ->
{L = [{T1, _} | _], {Monomorphic, _}} =
lists:mapfoldl(fun({T, V}, {Mono0, PrevType}) ->
Mono = case Mono0 of
false -> false;
true -> T =:= PrevType
end,
{from_091(T, V), {Mono, T}}
end, {true, T0}, L0),
case Monomorphic of
true -> {array, T1, L};
false -> {list, L}
end.

map_add(_T, _Key, _Type, undefined, Acc) ->
Acc;
Expand All @@ -707,7 +738,6 @@ supported_header_value_type(table) ->
supported_header_value_type(_) ->
true.


amqp10_map_get(_K, []) ->
undefined;
amqp10_map_get(K, Tuples) ->
Expand Down Expand Up @@ -857,3 +887,24 @@ amqp10_section_header(Header, Headers) ->

amqp_encoded_binary(Section) ->
iolist_to_binary(amqp10_framing:encode_bin(Section)).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

from_091_array_test() ->
{list, []} = from_091(array, []),
{array, utf8, [{utf8, <<"e1">>}]} = from_091(array, [{longstr, <<"e1">>}]),
{array, utf8, [{utf8, <<"e1">>},
{utf8, <<"e2">>}]} = from_091(array, [{longstr, <<"e1">>},
{longstr, <<"e2">>}]),
{list, [{utf8, <<"e1">>},
{binary, <<"e2">>}]} = from_091(array, [{longstr, <<"e1">>},
{binary, <<"e2">>}]),
{list, [{utf8, <<"e1">>},
{binary, <<"e2">>},
{utf8, <<"e3">>},
{utf8, <<"e4">>}]} = from_091(array, [{longstr, <<"e1">>},
{binary, <<"e2">>},
{longstr, <<"e3">>},
{longstr, <<"e4">>}]).
-endif.
4 changes: 4 additions & 0 deletions deps/rabbit/src/mc_compat.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
priority/1,
set_ttl/2,
x_header/2,
x_headers/1,
routing_headers/2,
%%%
convert_to/2,
Expand Down Expand Up @@ -138,6 +139,9 @@ set_ttl(Value, #basic_message{content = Content0} = Msg) ->
x_header(Key,#basic_message{content = Content}) ->
mc_amqpl:x_header(Key, Content).

x_headers(#basic_message{content = Content}) ->
mc_amqpl:x_headers(Content).

routing_headers(#basic_message{content = Content}, Opts) ->
mc_amqpl:routing_headers(Content, Opts).

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/mc_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ utf8_string_is_ascii(UTF8String) ->
amqp_map_get(Key, {map, List}, Default) ->
amqp_map_get(Key, List, Default);
amqp_map_get(Key, List, Default) when is_list(List) ->
case lists:search(fun ({{_, K}, _}) -> K == Key end, List) of
case lists:search(fun ({{_, K}, _}) -> K =:= Key end, List) of
{value, {_K, V}} ->
V;
false ->
Expand Down
Loading

0 comments on commit 715d421

Please sign in to comment.