diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 728bc431174d..debb718c9a98 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -34,6 +34,8 @@ %%---------------------------------------------------------------------------- +-define(X_DEATH_HEADER, <<"x-death">>). + publish(Msg, Reason, X, RK, QName) -> DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined), @@ -66,8 +68,7 @@ make_msg(Msg = #basic_message{content = Content, {<<"time">>, timestamp, TimeSec}, {<<"exchange">>, longstr, Exchange#resource.name}, {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL, - HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, - Info, Headers)) + HeadersFun1(update_x_death_header(Info, Headers)) end, Content1 = #content{properties = Props} = rabbit_basic:map_headers(HeadersFun2, Content), @@ -78,6 +79,41 @@ make_msg(Msg = #basic_message{content = Content, routing_keys = DeathRoutingKeys, content = Content2}. +x_death_header(undefined) -> + undefined; +x_death_header([]) -> + undefined; +x_death_header(Headers) -> + case lists:keysearch(?X_DEATH_HEADER, 1, Headers) of + false -> undefined; + {value, Val} -> Val + end. + +x_death_event_queue(Info) -> + case lists:keysearch(<<"queue">>, 1, Info) of + false -> undefined; + {value, {<<"queue">>, longstr, Val}} -> Val + end. + +x_death_not_for_queue({table, Info}, Queue) -> + x_death_event_queue(Info) =/= Queue. + +x_deaths_from_header({?X_DEATH_HEADER, array, Table}) -> + Table. + +update_x_death_header(Info, Headers) -> + Q = x_death_event_queue(Info), + case x_death_header(Headers) of + undefined -> + rabbit_basic:prepend_table_header(?X_DEATH_HEADER, + Info, Headers); + Array -> + XDeaths = rabbit_misc:sort_field_table([{table, Info} | + [XD || XD <- x_deaths_from_header(Array), + x_death_not_for_queue(XD, Q)]]), + rabbit_misc:set_table_value(Headers, ?X_DEATH_HEADER, array, XDeaths) + end. + per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> []; per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> @@ -96,7 +132,7 @@ detect_cycles(_Reason, #basic_message{content = Content}, Queues) -> undefined -> NoCycles; _ -> - case rabbit_misc:table_lookup(Headers, <<"x-death">>) of + case rabbit_misc:table_lookup(Headers, ?X_DEATH_HEADER) of {array, Deaths} -> {Cycling, NotCycling} = lists:partition(fun (#resource{name = Queue}) ->