Skip to content

Commit

Permalink
Limit x-death growth to one entry per queue
Browse files Browse the repository at this point in the history
Otherwise the list can grow forever in some cases.

Fixes #78.
  • Loading branch information
michaelklishin committed Mar 20, 2015
1 parent 8e2ae9d commit 8124e3c
Showing 1 changed file with 39 additions and 3 deletions.
42 changes: 39 additions & 3 deletions src/rabbit_dead_letter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

%%----------------------------------------------------------------------------

-define(X_DEATH_HEADER, <<"x-death">>).

publish(Msg, Reason, X, RK, QName) ->
DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined),
Expand Down Expand Up @@ -66,8 +68,7 @@ make_msg(Msg = #basic_message{content = Content,
{<<"time">>, timestamp, TimeSec},
{<<"exchange">>, longstr, Exchange#resource.name},
{<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL,
HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
Info, Headers))
HeadersFun1(update_x_death_header(Info, Headers))
end,
Content1 = #content{properties = Props} =
rabbit_basic:map_headers(HeadersFun2, Content),
Expand All @@ -78,6 +79,41 @@ make_msg(Msg = #basic_message{content = Content,
routing_keys = DeathRoutingKeys,
content = Content2}.

x_death_header(undefined) ->
undefined;
x_death_header([]) ->
undefined;
x_death_header(Headers) ->
case lists:keysearch(?X_DEATH_HEADER, 1, Headers) of
false -> undefined;
{value, Val} -> Val
end.

x_death_event_queue(Info) ->
case lists:keysearch(<<"queue">>, 1, Info) of
false -> undefined;
{value, {<<"queue">>, longstr, Val}} -> Val
end.

x_death_not_for_queue({table, Info}, Queue) ->
x_death_event_queue(Info) =/= Queue.

x_deaths_from_header({?X_DEATH_HEADER, array, Table}) ->
Table.

update_x_death_header(Info, Headers) ->
Q = x_death_event_queue(Info),

This comment has been minimized.

Copy link
@dumbbell

dumbbell Mar 20, 2015

Member

This line could be moved to the Array case clause as it is only useful there.

case x_death_header(Headers) of
undefined ->
rabbit_basic:prepend_table_header(?X_DEATH_HEADER,
Info, Headers);
Array ->
XDeaths = rabbit_misc:sort_field_table([{table, Info} |
[XD || XD <- x_deaths_from_header(Array),

This comment has been minimized.

Copy link
@dumbbell

dumbbell Mar 20, 2015

Member

Instead of using x_deaths_from_header(Array), you could use a pattern matching in the case clause:

case x_death_header(Headers) of
    % ...
    {?X_DEATH_HEADER, array, Table} ->
        XDeaths = rabbit_misc:sort_field_table([{table, Info} |
                                                [XD || XD <- Table,

This comment has been minimized.

Copy link
@simonmacmullen

simonmacmullen Mar 20, 2015

Contributor

Agreed; the "accessor" functions x_deaths_from_header/1 and x_death_not_for_queue/2 obscure more than they clarify for me.

x_death_not_for_queue(XD, Q)]]),
rabbit_misc:set_table_value(Headers, ?X_DEATH_HEADER, array, XDeaths)
end.

per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
[];
per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
Expand All @@ -96,7 +132,7 @@ detect_cycles(_Reason, #basic_message{content = Content}, Queues) ->
undefined ->
NoCycles;
_ ->
case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
case rabbit_misc:table_lookup(Headers, ?X_DEATH_HEADER) of
{array, Deaths} ->
{Cycling, NotCycling} =
lists:partition(fun (#resource{name = Queue}) ->
Expand Down

0 comments on commit 8124e3c

Please sign in to comment.