From dc9cda7f07ee4e29ba40f914900a6ff34579b3a7 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sun, 10 May 2015 00:33:50 +0300 Subject: [PATCH 1/6] Support x-death event proplists from before #78 We group x-death header values before processing them to make sure there's only one per {queue, reason}. --- src/rabbit_dead_letter.erl | 77 +++++++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 14 deletions(-) diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 010288024b56..b046db998e30 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -84,36 +84,85 @@ x_death_event_key(Info, Key, KeyType) -> {value, {Key, KeyType, Val}} -> Val end. +maybe_append_to_event_group(Info, _Queue, _Reason, []) -> + [Info]; +maybe_append_to_event_group(Info, Queue, Reason, Acc) -> + case lists:any(queue_and_reason_matcher(Queue, Reason), Acc) of + true -> Acc; + false -> [Info | Acc] + end. + +group_by_queue_and_reason(Tables) -> + Infos = [Info || {table, Info} <- Tables], + Grouped = + lists:foldl(fun (Info, Acc) -> + Q = x_death_event_key(Info, <<"queue">>, longstr), + R = x_death_event_key(Info, <<"reason">>, longstr), + Matcher = queue_and_reason_matcher(Q, R), + {Matches, _} = lists:partition(Matcher, Infos), + case Matches of + [X] -> + maybe_append_to_event_group( + ensure_xdeath_event_count(X), Q, R, Acc); + [X|_] = Xs when is_list(Xs) -> + maybe_append_to_event_group( + ensure_xdeath_event_count(X, length(Xs)), Q, R, Acc) + end + end, [], Infos), + [{table, Info} || Info <- Grouped]. + update_x_death_header(Info, Headers) -> Q = x_death_event_key(Info, <<"queue">>, longstr), R = x_death_event_key(Info, <<"reason">>, longstr), case rabbit_basic:header(<<"x-death">>, Headers) of undefined -> rabbit_basic:prepend_table_header(<<"x-death">>, - [{<<"count">>, long, 1} | Info], Headers); + [{<<"count">>, long, 1} | Info], Headers); {<<"x-death">>, array, Tables} -> + %% group existing x-death headers in case we have some from + %% before rabbitmq-server#78 + GroupedTables = group_by_queue_and_reason(Tables), {Matches, Others} = lists:partition( - fun ({table, Info0}) -> - x_death_event_key(Info0, <<"queue">>, longstr) =:= Q - andalso x_death_event_key(Info0, <<"reason">>, longstr) =:= R - end, Tables), + queue_and_reason_matcher(Q, R), GroupedTables), Info1 = case Matches of [] -> [{<<"count">>, long, 1} | Info]; [{table, M}] -> - case x_death_event_key(M, <<"count">>, long) of - undefined -> - [{<<"count">>, long, 1} | M]; - N -> - lists:keyreplace( - <<"count">>, 1, M, - {<<"count">>, long, N + 1}) - end + increment_xdeath_event_count(M) end, rabbit_misc:set_table_value(Headers, <<"x-death">>, array, - [{table, rabbit_misc:sort_field_table(Info1)} | Others]) + [{table, rabbit_misc:sort_field_table(Info1)} | Others]) + end. + +ensure_xdeath_event_count(Info) -> + ensure_xdeath_event_count(Info, 1). +ensure_xdeath_event_count(Info, InitialVal) when InitialVal >= 1 -> + case x_death_event_key(Info, <<"count">>, long) of + undefined -> + [{<<"count">>, long, InitialVal} | Info]; + _ -> + Info + end. + +increment_xdeath_event_count(Info) -> + case x_death_event_key(Info, <<"count">>, long) of + undefined -> + [{<<"count">>, long, 1} | Info]; + N -> + lists:keyreplace( + <<"count">>, 1, Info, + {<<"count">>, long, N + 1}) end. +queue_and_reason_matcher(Q, R) -> + fun({table, Info}) -> + x_death_event_key(Info, <<"queue">>, longstr) =:= Q + andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R; + (Info) when is_list(Info) -> + x_death_event_key(Info, <<"queue">>, longstr) =:= Q + andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R + end. + per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> []; per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> From a61069fe9fc5a7232c2492c44ba2367e364db04a Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sun, 10 May 2015 14:53:04 +0300 Subject: [PATCH 2/6] Refactor --- src/rabbit_dead_letter.erl | 45 +++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index b046db998e30..e756d8a01157 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -84,32 +84,31 @@ x_death_event_key(Info, Key, KeyType) -> {value, {Key, KeyType, Val}} -> Val end. -maybe_append_to_event_group(Info, _Queue, _Reason, []) -> - [Info]; -maybe_append_to_event_group(Info, Queue, Reason, Acc) -> +maybe_append_to_event_group(Table, _Queue, _Reason, []) -> + [Table]; +maybe_append_to_event_group(Table, Queue, Reason, Acc) -> case lists:any(queue_and_reason_matcher(Queue, Reason), Acc) of true -> Acc; - false -> [Info | Acc] + false -> [Table | Acc] end. +group_by_queue_and_reason([]) -> + []; +group_by_queue_and_reason([Table]) -> + [Table]; group_by_queue_and_reason(Tables) -> - Infos = [Info || {table, Info} <- Tables], - Grouped = - lists:foldl(fun (Info, Acc) -> - Q = x_death_event_key(Info, <<"queue">>, longstr), - R = x_death_event_key(Info, <<"reason">>, longstr), - Matcher = queue_and_reason_matcher(Q, R), - {Matches, _} = lists:partition(Matcher, Infos), - case Matches of - [X] -> - maybe_append_to_event_group( - ensure_xdeath_event_count(X), Q, R, Acc); - [X|_] = Xs when is_list(Xs) -> - maybe_append_to_event_group( - ensure_xdeath_event_count(X, length(Xs)), Q, R, Acc) - end - end, [], Infos), - [{table, Info} || Info <- Grouped]. + lists:foldl(fun ({table, Info}, Acc) -> + Q = x_death_event_key(Info, <<"queue">>, longstr), + R = x_death_event_key(Info, <<"reason">>, longstr), + Matcher = queue_and_reason_matcher(Q, R), + {Matches, _} = lists:partition(Matcher, Tables), + {Augmented, N} = case Matches of + [X] -> {X, 1}; + [X|_] = Xs -> {X, length(Xs)} + end, + maybe_append_to_event_group( + ensure_xdeath_event_count(Augmented, N), Q, R, Acc) + end, [], Tables). update_x_death_header(Info, Headers) -> Q = x_death_event_key(Info, <<"queue">>, longstr), @@ -134,8 +133,8 @@ update_x_death_header(Info, Headers) -> [{table, rabbit_misc:sort_field_table(Info1)} | Others]) end. -ensure_xdeath_event_count(Info) -> - ensure_xdeath_event_count(Info, 1). +ensure_xdeath_event_count({table, Info}, InitialVal) when InitialVal >= 1 -> + {table, ensure_xdeath_event_count(Info, InitialVal)}; ensure_xdeath_event_count(Info, InitialVal) when InitialVal >= 1 -> case x_death_event_key(Info, <<"count">>, long) of undefined -> From 9a8e7b0a243daa2b3778dbf342a0229f7cfd40f9 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sun, 10 May 2015 21:04:52 +0300 Subject: [PATCH 3/6] Refactor --- src/rabbit_dead_letter.erl | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index e756d8a01157..f013c2e984a7 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -154,12 +154,14 @@ increment_xdeath_event_count(Info) -> end. queue_and_reason_matcher(Q, R) -> - fun({table, Info}) -> - x_death_event_key(Info, <<"queue">>, longstr) =:= Q - andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R; - (Info) when is_list(Info) -> + F = fun(Info) -> x_death_event_key(Info, <<"queue">>, longstr) =:= Q andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R + end, + fun({table, Info}) -> + F(Info); + (Info) when is_list(Info) -> + F(Info) end. per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> From 54c63a2eeaccdd36381d12283e244e7133c6ab27 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sun, 10 May 2015 21:15:43 +0300 Subject: [PATCH 4/6] Use a set to look up previously seen {queue, reason} keys For O(1) lookup complexity. --- src/rabbit_dead_letter.erl | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index f013c2e984a7..050e661b5acd 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -84,10 +84,10 @@ x_death_event_key(Info, Key, KeyType) -> {value, {Key, KeyType, Val}} -> Val end. -maybe_append_to_event_group(Table, _Queue, _Reason, []) -> +maybe_append_to_event_group(Table, _Key, _SeenKeys, []) -> [Table]; -maybe_append_to_event_group(Table, Queue, Reason, Acc) -> - case lists:any(queue_and_reason_matcher(Queue, Reason), Acc) of +maybe_append_to_event_group(Table, {_Queue, _Reason} = Key, SeenKeys, Acc) -> + case sets:is_element(Key, SeenKeys) of true -> Acc; false -> [Table | Acc] end. @@ -97,18 +97,22 @@ group_by_queue_and_reason([]) -> group_by_queue_and_reason([Table]) -> [Table]; group_by_queue_and_reason(Tables) -> - lists:foldl(fun ({table, Info}, Acc) -> - Q = x_death_event_key(Info, <<"queue">>, longstr), - R = x_death_event_key(Info, <<"reason">>, longstr), - Matcher = queue_and_reason_matcher(Q, R), - {Matches, _} = lists:partition(Matcher, Tables), - {Augmented, N} = case Matches of - [X] -> {X, 1}; - [X|_] = Xs -> {X, length(Xs)} - end, - maybe_append_to_event_group( - ensure_xdeath_event_count(Augmented, N), Q, R, Acc) - end, [], Tables). + {_, Grouped} = + lists:foldl(fun ({table, Info}, {SeenKeys, Acc}) -> + Q = x_death_event_key(Info, <<"queue">>, longstr), + R = x_death_event_key(Info, <<"reason">>, longstr), + Matcher = queue_and_reason_matcher(Q, R), + {Matches, _} = lists:partition(Matcher, Tables), + {Augmented, N} = case Matches of + [X] -> {X, 1}; + [X|_] = Xs -> {X, length(Xs)} + end, + Key = {Q, R}, + Acc1 = maybe_append_to_event_group( + ensure_xdeath_event_count(Augmented, N), Key, SeenKeys, Acc), + {sets:add_element(Key, SeenKeys), Acc1} + end, {sets:new(), []}, Tables), + Grouped. update_x_death_header(Info, Headers) -> Q = x_death_event_key(Info, <<"queue">>, longstr), From 3f08241f866c01011170439b5f042f672e6f3b1a Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 11 May 2015 14:02:28 +0300 Subject: [PATCH 5/6] Re-format with Emacs --- src/rabbit_dead_letter.erl | 54 +++++++++++++++++++------------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 050e661b5acd..2e6e024ce752 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -99,19 +99,19 @@ group_by_queue_and_reason([Table]) -> group_by_queue_and_reason(Tables) -> {_, Grouped} = lists:foldl(fun ({table, Info}, {SeenKeys, Acc}) -> - Q = x_death_event_key(Info, <<"queue">>, longstr), - R = x_death_event_key(Info, <<"reason">>, longstr), - Matcher = queue_and_reason_matcher(Q, R), - {Matches, _} = lists:partition(Matcher, Tables), - {Augmented, N} = case Matches of - [X] -> {X, 1}; - [X|_] = Xs -> {X, length(Xs)} - end, - Key = {Q, R}, - Acc1 = maybe_append_to_event_group( - ensure_xdeath_event_count(Augmented, N), Key, SeenKeys, Acc), - {sets:add_element(Key, SeenKeys), Acc1} - end, {sets:new(), []}, Tables), + Q = x_death_event_key(Info, <<"queue">>, longstr), + R = x_death_event_key(Info, <<"reason">>, longstr), + Matcher = queue_and_reason_matcher(Q, R), + {Matches, _} = lists:partition(Matcher, Tables), + {Augmented, N} = case Matches of + [X] -> {X, 1}; + [X|_] = Xs -> {X, length(Xs)} + end, + Key = {Q, R}, + Acc1 = maybe_append_to_event_group( + ensure_xdeath_event_count(Augmented, N), Key, SeenKeys, Acc), + {sets:add_element(Key, SeenKeys), Acc1} + end, {sets:new(), []}, Tables), Grouped. update_x_death_header(Info, Headers) -> @@ -120,13 +120,13 @@ update_x_death_header(Info, Headers) -> case rabbit_basic:header(<<"x-death">>, Headers) of undefined -> rabbit_basic:prepend_table_header(<<"x-death">>, - [{<<"count">>, long, 1} | Info], Headers); + [{<<"count">>, long, 1} | Info], Headers); {<<"x-death">>, array, Tables} -> %% group existing x-death headers in case we have some from %% before rabbitmq-server#78 GroupedTables = group_by_queue_and_reason(Tables), {Matches, Others} = lists:partition( - queue_and_reason_matcher(Q, R), GroupedTables), + queue_and_reason_matcher(Q, R), GroupedTables), Info1 = case Matches of [] -> [{<<"count">>, long, 1} | Info]; @@ -134,7 +134,7 @@ update_x_death_header(Info, Headers) -> increment_xdeath_event_count(M) end, rabbit_misc:set_table_value(Headers, <<"x-death">>, array, - [{table, rabbit_misc:sort_field_table(Info1)} | Others]) + [{table, rabbit_misc:sort_field_table(Info1)} | Others]) end. ensure_xdeath_event_count({table, Info}, InitialVal) when InitialVal >= 1 -> @@ -153,20 +153,20 @@ increment_xdeath_event_count(Info) -> [{<<"count">>, long, 1} | Info]; N -> lists:keyreplace( - <<"count">>, 1, Info, - {<<"count">>, long, N + 1}) + <<"count">>, 1, Info, + {<<"count">>, long, N + 1}) end. queue_and_reason_matcher(Q, R) -> - F = fun(Info) -> - x_death_event_key(Info, <<"queue">>, longstr) =:= Q - andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R - end, - fun({table, Info}) -> - F(Info); - (Info) when is_list(Info) -> - F(Info) - end. + F = fun(Info) -> + x_death_event_key(Info, <<"queue">>, longstr) =:= Q + andalso x_death_event_key(Info, <<"reason">>, longstr) =:= R + end, + fun({table, Info}) -> + F(Info); + (Info) when is_list(Info) -> + F(Info) + end. per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> []; From 46cb4eaf14fcf7eff67f4bee0362e2b84ea620f7 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 11 May 2015 18:06:33 +0300 Subject: [PATCH 6/6] Re-format for 80 characters wide --- src/rabbit_dead_letter.erl | 45 +++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 2e6e024ce752..dbf38e41e780 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -98,20 +98,22 @@ group_by_queue_and_reason([Table]) -> [Table]; group_by_queue_and_reason(Tables) -> {_, Grouped} = - lists:foldl(fun ({table, Info}, {SeenKeys, Acc}) -> - Q = x_death_event_key(Info, <<"queue">>, longstr), - R = x_death_event_key(Info, <<"reason">>, longstr), - Matcher = queue_and_reason_matcher(Q, R), - {Matches, _} = lists:partition(Matcher, Tables), - {Augmented, N} = case Matches of - [X] -> {X, 1}; - [X|_] = Xs -> {X, length(Xs)} - end, - Key = {Q, R}, - Acc1 = maybe_append_to_event_group( - ensure_xdeath_event_count(Augmented, N), Key, SeenKeys, Acc), - {sets:add_element(Key, SeenKeys), Acc1} - end, {sets:new(), []}, Tables), + lists:foldl( + fun ({table, Info}, {SeenKeys, Acc}) -> + Q = x_death_event_key(Info, <<"queue">>, longstr), + R = x_death_event_key(Info, <<"reason">>, longstr), + Matcher = queue_and_reason_matcher(Q, R), + {Matches, _} = lists:partition(Matcher, Tables), + {Augmented, N} = case Matches of + [X] -> {X, 1}; + [X|_] = Xs -> {X, length(Xs)} + end, + Key = {Q, R}, + Acc1 = maybe_append_to_event_group( + ensure_xdeath_event_count(Augmented, N), + Key, SeenKeys, Acc), + {sets:add_element(Key, SeenKeys), Acc1} + end, {sets:new(), []}, Tables), Grouped. update_x_death_header(Info, Headers) -> @@ -119,22 +121,25 @@ update_x_death_header(Info, Headers) -> R = x_death_event_key(Info, <<"reason">>, longstr), case rabbit_basic:header(<<"x-death">>, Headers) of undefined -> - rabbit_basic:prepend_table_header(<<"x-death">>, - [{<<"count">>, long, 1} | Info], Headers); + rabbit_basic:prepend_table_header( + <<"x-death">>, + [{<<"count">>, long, 1} | Info], Headers); {<<"x-death">>, array, Tables} -> %% group existing x-death headers in case we have some from %% before rabbitmq-server#78 GroupedTables = group_by_queue_and_reason(Tables), {Matches, Others} = lists:partition( - queue_and_reason_matcher(Q, R), GroupedTables), + queue_and_reason_matcher(Q, R), + GroupedTables), Info1 = case Matches of [] -> [{<<"count">>, long, 1} | Info]; [{table, M}] -> increment_xdeath_event_count(M) end, - rabbit_misc:set_table_value(Headers, <<"x-death">>, array, - [{table, rabbit_misc:sort_field_table(Info1)} | Others]) + rabbit_misc:set_table_value( + Headers, <<"x-death">>, array, + [{table, rabbit_misc:sort_field_table(Info1)} | Others]) end. ensure_xdeath_event_count({table, Info}, InitialVal) when InitialVal >= 1 -> @@ -232,7 +237,7 @@ log_cycle_once(Queues) -> true -> ok; undefined -> rabbit_log:warning( "Message dropped. Dead-letter queues cycle detected" ++ - ": ~p~nThis cycle will NOT be reported again.~n", + ": ~p~nThis cycle will NOT be reported again.~n", [Queues]), put(Key, true) end.