From 189a452d61eae952f86ae0c8a03857730d6b7f87 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Fri, 19 Jan 2018 16:50:19 +0000 Subject: [PATCH 1/2] Make sure rabbit_misc:pmap callbacks do not throw. rabbit_misc:pmap/2 can hang if a callback throws an error. This can cause plugin startup to hang when enabling channel interceptors. Made it log errors and continue. [#153846585] --- src/rabbit_channel.erl | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c671438ce862..1de367e70ca5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -361,13 +361,29 @@ emit_info(PidList, InfoItems, Ref, AggregatorPid) -> refresh_config_local() -> rabbit_misc:upmap( - fun (C) -> gen_server2:call(C, refresh_config, infinity) end, + fun (C) -> + try + gen_server2:call(C, refresh_config, infinity) + catch _:Reason -> + rabbit_log:error("Failed to refresh channel config " + "for channel ~p. Reason ~p", + [C, Reason]) + end + end, list_local()), ok. refresh_interceptors() -> rabbit_misc:upmap( - fun (C) -> gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT) end, + fun (C) -> + try + gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT) + catch _:Reason -> + rabbit_log:error("Failed to refresh channel interceptors " + "for channel ~p. Reason ~p", + [C, Reason]) + end + end, list_local()), ok. From 5570b18f5e26096927adffd0c6f2cbcc747ed346 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Wed, 24 Jan 2018 12:44:40 +0000 Subject: [PATCH 2/2] Add a test to check channel interceptor hang. The test can hang without channel interceptor fix and will fail with timetrap timeout [#153846585] --- test/channel_interceptor_SUITE.erl | 14 +++++++++----- test/failing_dummy_interceptor.erl | 27 +++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 5 deletions(-) create mode 100644 test/failing_dummy_interceptor.erl diff --git a/test/channel_interceptor_SUITE.erl b/test/channel_interceptor_SUITE.erl index 0e4948ea3c4f..4081086f4f2f 100644 --- a/test/channel_interceptor_SUITE.erl +++ b/test/channel_interceptor_SUITE.erl @@ -29,7 +29,8 @@ all() -> groups() -> [ {non_parallel_tests, [], [ - register_interceptor + register_interceptor, + register_failing_interceptors ]} ]. @@ -71,9 +72,9 @@ end_per_testcase(Testcase, Config) -> register_interceptor(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, register_interceptor1, [Config]). + ?MODULE, register_interceptor1, [Config, dummy_interceptor]). -register_interceptor1(Config) -> +register_interceptor1(Config, Interceptor) -> PredefinedChannels = rabbit_channel:list(), Ch1 = rabbit_ct_client_helpers:open_channel(Config, 0), @@ -89,8 +90,8 @@ register_interceptor1(Config) -> ok = rabbit_registry:register(channel_interceptor, <<"dummy interceptor">>, - dummy_interceptor), - [{interceptors, [{dummy_interceptor, undefined}]}] = + Interceptor), + [{interceptors, [{Interceptor, undefined}]}] = rabbit_channel:info(ChannelProc, [interceptors]), check_send_receive(Ch1, QName, <<"bar">>, <<"">>), @@ -102,6 +103,9 @@ register_interceptor1(Config) -> check_send_receive(Ch1, QName, <<"bar">>, <<"bar">>), passed. +register_failing_interceptors(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, register_interceptor1, [Config, failing_dummy_interceptor]). check_send_receive(Ch1, QName, Send, Receive) -> amqp_channel:call(Ch1, diff --git a/test/failing_dummy_interceptor.erl b/test/failing_dummy_interceptor.erl new file mode 100644 index 000000000000..62669e7f1f94 --- /dev/null +++ b/test/failing_dummy_interceptor.erl @@ -0,0 +1,27 @@ +-module(failing_dummy_interceptor). + +-behaviour(rabbit_channel_interceptor). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit_framing.hrl"). + + +-compile(export_all). + +init(_Ch) -> + timer:sleep(15500), + undefined. + +description() -> + [{description, + <<"Empties payload on publish">>}]. + +intercept(#'basic.publish'{} = Method, Content, _IState) -> + Content2 = Content#content{payload_fragments_rev = []}, + {Method, Content2}; + +intercept(Method, Content, _VHost) -> + {Method, Content}. + +applies_to() -> + ['basic.publish'].