Skip to content

Commit

Permalink
Support x-death event proplists from before #78
Browse files Browse the repository at this point in the history
We group x-death header values before processing them
to make sure there's only one per {queue, reason}.
  • Loading branch information
michaelklishin committed May 9, 2015
1 parent 18fcb1a commit dc9cda7
Showing 1 changed file with 63 additions and 14 deletions.
77 changes: 63 additions & 14 deletions src/rabbit_dead_letter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,36 +84,85 @@ x_death_event_key(Info, Key, KeyType) ->
{value, {Key, KeyType, Val}} -> Val
end.

maybe_append_to_event_group(Info, _Queue, _Reason, []) ->
[Info];
maybe_append_to_event_group(Info, Queue, Reason, Acc) ->
case lists:any(queue_and_reason_matcher(Queue, Reason), Acc) of
true -> Acc;
false -> [Info | Acc]
end.

group_by_queue_and_reason(Tables) ->
Infos = [Info || {table, Info} <- Tables],
Grouped =
lists:foldl(fun (Info, Acc) ->
Q = x_death_event_key(Info, <<"queue">>, longstr),
R = x_death_event_key(Info, <<"reason">>, longstr),
Matcher = queue_and_reason_matcher(Q, R),
{Matches, _} = lists:partition(Matcher, Infos),
case Matches of
[X] ->
maybe_append_to_event_group(
ensure_xdeath_event_count(X), Q, R, Acc);
[X|_] = Xs when is_list(Xs) ->
maybe_append_to_event_group(
ensure_xdeath_event_count(X, length(Xs)), Q, R, Acc)
end
end, [], Infos),
[{table, Info} || Info <- Grouped].

update_x_death_header(Info, Headers) ->
Q = x_death_event_key(Info, <<"queue">>, longstr),
R = x_death_event_key(Info, <<"reason">>, longstr),
case rabbit_basic:header(<<"x-death">>, Headers) of
undefined ->
rabbit_basic:prepend_table_header(<<"x-death">>,
[{<<"count">>, long, 1} | Info], Headers);
[{<<"count">>, long, 1} | Info], Headers);
{<<"x-death">>, array, Tables} ->
%% group existing x-death headers in case we have some from
%% before rabbitmq-server#78
GroupedTables = group_by_queue_and_reason(Tables),
{Matches, Others} = lists:partition(
fun ({table, Info0}) ->
x_death_event_key(Info0, <<"queue">>, longstr) =:= Q
andalso x_death_event_key(Info0, <<"reason">>, longstr) =:= R
end, Tables),
queue_and_reason_matcher(Q, R), GroupedTables),
Info1 = case Matches of
[] ->
[{<<"count">>, long, 1} | Info];
[{table, M}] ->
case x_death_event_key(M, <<"count">>, long) of
undefined ->
[{<<"count">>, long, 1} | M];
N ->
lists:keyreplace(
<<"count">>, 1, M,
{<<"count">>, long, N + 1})
end
increment_xdeath_event_count(M)
end,
rabbit_misc:set_table_value(Headers, <<"x-death">>, array,
[{table, rabbit_misc:sort_field_table(Info1)} | Others])
[{table, rabbit_misc:sort_field_table(Info1)} | Others])
end.

ensure_xdeath_event_count(Info) ->
ensure_xdeath_event_count(Info, 1).
ensure_xdeath_event_count(Info, InitialVal) when InitialVal >= 1 ->
case x_death_event_key(Info, <<"count">>, long) of
undefined ->
[{<<"count">>, long, InitialVal} | Info];
_ ->
Info
end.

increment_xdeath_event_count(Info) ->
case x_death_event_key(Info, <<"count">>, long) of
undefined ->
[{<<"count">>, long, 1} | Info];
N ->
lists:keyreplace(
<<"count">>, 1, Info,
{<<"count">>, long, N + 1})
end.

queue_and_reason_matcher(Q, R) ->
fun({table, Info}) ->
x_death_event_key(Info, <<"queue">>, longstr) =:= Q
andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R;
(Info) when is_list(Info) ->
x_death_event_key(Info, <<"queue">>, longstr) =:= Q
andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R
end.

per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
[];
per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
Expand Down

0 comments on commit dc9cda7

Please sign in to comment.