diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 010288024b56..dbf38e41e780 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -84,36 +84,95 @@ x_death_event_key(Info, Key, KeyType) -> {value, {Key, KeyType, Val}} -> Val end. +maybe_append_to_event_group(Table, _Key, _SeenKeys, []) -> + [Table]; +maybe_append_to_event_group(Table, {_Queue, _Reason} = Key, SeenKeys, Acc) -> + case sets:is_element(Key, SeenKeys) of + true -> Acc; + false -> [Table | Acc] + end. + +group_by_queue_and_reason([]) -> + []; +group_by_queue_and_reason([Table]) -> + [Table]; +group_by_queue_and_reason(Tables) -> + {_, Grouped} = + lists:foldl( + fun ({table, Info}, {SeenKeys, Acc}) -> + Q = x_death_event_key(Info, <<"queue">>, longstr), + R = x_death_event_key(Info, <<"reason">>, longstr), + Matcher = queue_and_reason_matcher(Q, R), + {Matches, _} = lists:partition(Matcher, Tables), + {Augmented, N} = case Matches of + [X] -> {X, 1}; + [X|_] = Xs -> {X, length(Xs)} + end, + Key = {Q, R}, + Acc1 = maybe_append_to_event_group( + ensure_xdeath_event_count(Augmented, N), + Key, SeenKeys, Acc), + {sets:add_element(Key, SeenKeys), Acc1} + end, {sets:new(), []}, Tables), + Grouped. + update_x_death_header(Info, Headers) -> Q = x_death_event_key(Info, <<"queue">>, longstr), R = x_death_event_key(Info, <<"reason">>, longstr), case rabbit_basic:header(<<"x-death">>, Headers) of undefined -> - rabbit_basic:prepend_table_header(<<"x-death">>, + rabbit_basic:prepend_table_header( + <<"x-death">>, [{<<"count">>, long, 1} | Info], Headers); {<<"x-death">>, array, Tables} -> + %% group existing x-death headers in case we have some from + %% before rabbitmq-server#78 + GroupedTables = group_by_queue_and_reason(Tables), {Matches, Others} = lists:partition( - fun ({table, Info0}) -> - x_death_event_key(Info0, <<"queue">>, longstr) =:= Q - andalso x_death_event_key(Info0, <<"reason">>, longstr) =:= R - end, Tables), + queue_and_reason_matcher(Q, R), + GroupedTables), Info1 = case Matches of [] -> [{<<"count">>, long, 1} | Info]; [{table, M}] -> - case x_death_event_key(M, <<"count">>, long) of - undefined -> - [{<<"count">>, long, 1} | M]; - N -> - lists:keyreplace( - <<"count">>, 1, M, - {<<"count">>, long, N + 1}) - end + increment_xdeath_event_count(M) end, - rabbit_misc:set_table_value(Headers, <<"x-death">>, array, + rabbit_misc:set_table_value( + Headers, <<"x-death">>, array, [{table, rabbit_misc:sort_field_table(Info1)} | Others]) end. +ensure_xdeath_event_count({table, Info}, InitialVal) when InitialVal >= 1 -> + {table, ensure_xdeath_event_count(Info, InitialVal)}; +ensure_xdeath_event_count(Info, InitialVal) when InitialVal >= 1 -> + case x_death_event_key(Info, <<"count">>, long) of + undefined -> + [{<<"count">>, long, InitialVal} | Info]; + _ -> + Info + end. + +increment_xdeath_event_count(Info) -> + case x_death_event_key(Info, <<"count">>, long) of + undefined -> + [{<<"count">>, long, 1} | Info]; + N -> + lists:keyreplace( + <<"count">>, 1, Info, + {<<"count">>, long, N + 1}) + end. + +queue_and_reason_matcher(Q, R) -> + F = fun(Info) -> + x_death_event_key(Info, <<"queue">>, longstr) =:= Q + andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R + end, + fun({table, Info}) -> + F(Info); + (Info) when is_list(Info) -> + F(Info) + end. + per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> []; per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> @@ -178,7 +237,7 @@ log_cycle_once(Queues) -> true -> ok; undefined -> rabbit_log:warning( "Message dropped. Dead-letter queues cycle detected" ++ - ": ~p~nThis cycle will NOT be reported again.~n", + ": ~p~nThis cycle will NOT be reported again.~n", [Queues]), put(Key, true) end.