Skip to content

Commit

Permalink
Merge pull request #1477 from rabbitmq/rabbitmq-server-channel-interc…
Browse files Browse the repository at this point in the history
…eptor-hang

Make sure rabbit_misc:pmap callbacks do not throw.
  • Loading branch information
michaelklishin authored Jan 24, 2018
2 parents affb941 + 5570b18 commit cef9d18
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 7 deletions.
20 changes: 18 additions & 2 deletions src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -361,13 +361,29 @@ emit_info(PidList, InfoItems, Ref, AggregatorPid) ->

refresh_config_local() ->
rabbit_misc:upmap(
fun (C) -> gen_server2:call(C, refresh_config, infinity) end,
fun (C) ->
try
gen_server2:call(C, refresh_config, infinity)
catch _:Reason ->
rabbit_log:error("Failed to refresh channel config "
"for channel ~p. Reason ~p",
[C, Reason])
end
end,
list_local()),
ok.

refresh_interceptors() ->
rabbit_misc:upmap(
fun (C) -> gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT) end,
fun (C) ->
try
gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT)
catch _:Reason ->
rabbit_log:error("Failed to refresh channel interceptors "
"for channel ~p. Reason ~p",
[C, Reason])
end
end,
list_local()),
ok.

Expand Down
14 changes: 9 additions & 5 deletions test/channel_interceptor_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ all() ->
groups() ->
[
{non_parallel_tests, [], [
register_interceptor
register_interceptor,
register_failing_interceptors
]}
].

Expand Down Expand Up @@ -71,9 +72,9 @@ end_per_testcase(Testcase, Config) ->

register_interceptor(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, register_interceptor1, [Config]).
?MODULE, register_interceptor1, [Config, dummy_interceptor]).

register_interceptor1(Config) ->
register_interceptor1(Config, Interceptor) ->
PredefinedChannels = rabbit_channel:list(),

Ch1 = rabbit_ct_client_helpers:open_channel(Config, 0),
Expand All @@ -89,8 +90,8 @@ register_interceptor1(Config) ->

ok = rabbit_registry:register(channel_interceptor,
<<"dummy interceptor">>,
dummy_interceptor),
[{interceptors, [{dummy_interceptor, undefined}]}] =
Interceptor),
[{interceptors, [{Interceptor, undefined}]}] =
rabbit_channel:info(ChannelProc, [interceptors]),

check_send_receive(Ch1, QName, <<"bar">>, <<"">>),
Expand All @@ -102,6 +103,9 @@ register_interceptor1(Config) ->
check_send_receive(Ch1, QName, <<"bar">>, <<"bar">>),
passed.

register_failing_interceptors(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, register_interceptor1, [Config, failing_dummy_interceptor]).

check_send_receive(Ch1, QName, Send, Receive) ->
amqp_channel:call(Ch1,
Expand Down
27 changes: 27 additions & 0 deletions test/failing_dummy_interceptor.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-module(failing_dummy_interceptor).

-behaviour(rabbit_channel_interceptor).

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").


-compile(export_all).

init(_Ch) ->
timer:sleep(15500),
undefined.

description() ->
[{description,
<<"Empties payload on publish">>}].

intercept(#'basic.publish'{} = Method, Content, _IState) ->
Content2 = Content#content{payload_fragments_rev = []},
{Method, Content2};

intercept(Method, Content, _VHost) ->
{Method, Content}.

applies_to() ->
['basic.publish'].

0 comments on commit cef9d18

Please sign in to comment.