Skip to content

Commit

Permalink
Merge pull request #153 from rabbitmq/rabbitmq-server-152
Browse files Browse the repository at this point in the history
Support x-death event values from before #78
  • Loading branch information
videlalvaro committed May 11, 2015
2 parents 6fc2e9e + 46cb4ea commit ac896b7
Showing 1 changed file with 74 additions and 15 deletions.
89 changes: 74 additions & 15 deletions src/rabbit_dead_letter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,36 +84,95 @@ x_death_event_key(Info, Key, KeyType) ->
{value, {Key, KeyType, Val}} -> Val
end.

maybe_append_to_event_group(Table, _Key, _SeenKeys, []) ->
[Table];
maybe_append_to_event_group(Table, {_Queue, _Reason} = Key, SeenKeys, Acc) ->
case sets:is_element(Key, SeenKeys) of
true -> Acc;
false -> [Table | Acc]
end.

group_by_queue_and_reason([]) ->
[];
group_by_queue_and_reason([Table]) ->
[Table];
group_by_queue_and_reason(Tables) ->
{_, Grouped} =
lists:foldl(
fun ({table, Info}, {SeenKeys, Acc}) ->
Q = x_death_event_key(Info, <<"queue">>, longstr),
R = x_death_event_key(Info, <<"reason">>, longstr),
Matcher = queue_and_reason_matcher(Q, R),
{Matches, _} = lists:partition(Matcher, Tables),
{Augmented, N} = case Matches of
[X] -> {X, 1};
[X|_] = Xs -> {X, length(Xs)}
end,
Key = {Q, R},
Acc1 = maybe_append_to_event_group(
ensure_xdeath_event_count(Augmented, N),
Key, SeenKeys, Acc),
{sets:add_element(Key, SeenKeys), Acc1}
end, {sets:new(), []}, Tables),
Grouped.

update_x_death_header(Info, Headers) ->
Q = x_death_event_key(Info, <<"queue">>, longstr),
R = x_death_event_key(Info, <<"reason">>, longstr),
case rabbit_basic:header(<<"x-death">>, Headers) of
undefined ->
rabbit_basic:prepend_table_header(<<"x-death">>,
rabbit_basic:prepend_table_header(
<<"x-death">>,
[{<<"count">>, long, 1} | Info], Headers);
{<<"x-death">>, array, Tables} ->
%% group existing x-death headers in case we have some from
%% before rabbitmq-server#78
GroupedTables = group_by_queue_and_reason(Tables),
{Matches, Others} = lists:partition(
fun ({table, Info0}) ->
x_death_event_key(Info0, <<"queue">>, longstr) =:= Q
andalso x_death_event_key(Info0, <<"reason">>, longstr) =:= R
end, Tables),
queue_and_reason_matcher(Q, R),
GroupedTables),
Info1 = case Matches of
[] ->
[{<<"count">>, long, 1} | Info];
[{table, M}] ->
case x_death_event_key(M, <<"count">>, long) of
undefined ->
[{<<"count">>, long, 1} | M];
N ->
lists:keyreplace(
<<"count">>, 1, M,
{<<"count">>, long, N + 1})
end
increment_xdeath_event_count(M)
end,
rabbit_misc:set_table_value(Headers, <<"x-death">>, array,
rabbit_misc:set_table_value(
Headers, <<"x-death">>, array,
[{table, rabbit_misc:sort_field_table(Info1)} | Others])
end.

ensure_xdeath_event_count({table, Info}, InitialVal) when InitialVal >= 1 ->
{table, ensure_xdeath_event_count(Info, InitialVal)};
ensure_xdeath_event_count(Info, InitialVal) when InitialVal >= 1 ->
case x_death_event_key(Info, <<"count">>, long) of
undefined ->
[{<<"count">>, long, InitialVal} | Info];
_ ->
Info
end.

increment_xdeath_event_count(Info) ->
case x_death_event_key(Info, <<"count">>, long) of
undefined ->
[{<<"count">>, long, 1} | Info];
N ->
lists:keyreplace(
<<"count">>, 1, Info,
{<<"count">>, long, N + 1})
end.

queue_and_reason_matcher(Q, R) ->
F = fun(Info) ->
x_death_event_key(Info, <<"queue">>, longstr) =:= Q
andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R
end,
fun({table, Info}) ->
F(Info);
(Info) when is_list(Info) ->
F(Info)
end.

per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
[];
per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
Expand Down Expand Up @@ -178,7 +237,7 @@ log_cycle_once(Queues) ->
true -> ok;
undefined -> rabbit_log:warning(
"Message dropped. Dead-letter queues cycle detected" ++
": ~p~nThis cycle will NOT be reported again.~n",
": ~p~nThis cycle will NOT be reported again.~n",
[Queues]),
put(Key, true)
end.

0 comments on commit ac896b7

Please sign in to comment.