Skip to content

Commit

Permalink
Test predeclared dest queue
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcialRosales authored and michaelklishin committed Jul 24, 2024
1 parent 04164df commit a888c7b
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 19 deletions.
23 changes: 16 additions & 7 deletions deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
%% from and can break with the next upgrade. It should not be used by
%% another one that the one who created it or survive a node restart.
%% Thus, function references have been replace by the following MFA.
-export([dest_decl/4, src_decl_exchange/4, src_decl_queue/4,src_check_queue/4,
-export([dest_decl/4, dest_check/4, src_decl_exchange/4, src_decl_queue/4,src_check_queue/4,
fields_fun/5, props_fun/9]).

-import(rabbit_misc, [pget/2, pget/3, pset/3]).
Expand Down Expand Up @@ -331,7 +331,12 @@ parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
DestXKey = pget(<<"dest-exchange-key">>, Def, none),
DestQ = pget(<<"dest-queue">>, Def, none),
DestQArgs = pget(<<"dest-queue-args">>, Def, #{}),
DestDeclFun = {?MODULE, dest_decl, [DestQ, DestQArgs]},
Predeclared = pget(<<"dest-predeclared">>, Def, false),
DestDeclFun = case Predeclared of
true -> {?MODULE, dest_check, [DestQ, DestQArgs]};
false -> {?MODULE, dest_decl, [DestQ, DestQArgs]}
end,

{X, Key} = case DestQ of
none -> {DestX, DestXKey};
_ -> {<<>>, DestQ}
Expand All @@ -352,7 +357,7 @@ parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
AddTimestampHeaderLegacy = pget(<<"add-timestamp-header">>, Def, false),
AddTimestampHeader = pget(<<"dest-add-timestamp-header">>, Def,
AddTimestampHeaderLegacy),
Predeclared = pget(<<"dest-predeclared">>, Def, false),

%% Details are only used for status report in rabbitmqctl, as vhost is not
%% available to query the runtime parameters.
Details = maps:from_list([{K, V} || {K, V} <- [{dest_exchange, DestX},
Expand All @@ -365,8 +370,7 @@ parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
fields_fun => {?MODULE, fields_fun, [X, Key]},
props_fun => {?MODULE, props_fun, [Table0, Table2, SetProps,
AddHeaders, SourceHeaders,
AddTimestampHeader]},
predeclared => Predeclared
AddTimestampHeader]}
}, Details).

fields_fun(X, Key, _SrcURI, _DestURI, P0) ->
Expand Down Expand Up @@ -398,6 +402,11 @@ dest_decl(DestQ, DestQArgs, Conn, _Ch) ->
none -> ok;
_ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
end.
dest_check(DestQ, DestQArgs, Conn, _Ch) ->
case DestQ of
none -> ok;
_ -> check_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
end.

parse_amqp10_source(Def) ->
Uris = deobfuscated_uris(<<"src-uri">>, Def),
Expand Down Expand Up @@ -498,13 +507,13 @@ ensure_queue(Conn, Queue, XArgs) ->
after
catch amqp_channel:close(Ch)
end.
check_queue(Conn, Queue, XArgs) ->
check_queue(Conn, Queue, _XArgs) ->
{ok, Ch} = amqp_connection:open_channel(Conn),
try
rabbit_log:debug("Check if queue ~p exists", [Queue]),
amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
passive = true}),
rabbit_log:debug("Check if queue ~p does exist", [Queue])
rabbit_log:debug("Checked queue ~p does exist", [Queue])
after
catch amqp_channel:close(Ch)
end.
Expand Down
67 changes: 55 additions & 12 deletions deps/rabbitmq_shovel/test/dynamic_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ groups() ->
dest_resource_alarm_on_confirm,
dest_resource_alarm_on_publish,
dest_resource_alarm_no_ack,
predeclared_missing_src_queue
predeclared_missing_src_queue,
predeclared_missing_dest_queue
]},

{quorum_queue_tests, [], [
Expand Down Expand Up @@ -291,18 +292,50 @@ predeclared_missing_src_queue(Config) ->

with_newch(Config,
fun(Ch2) ->
amqp_channel:call(
Ch2, #'queue.declare'{queue = <<"src">>,
durable = true}),
ct:log("Declare queue"),
amqp_channel:call(
Ch2, #'queue.bind'{queue = <<"src">>,
exchange = <<"amq.direct">>,
routing_key = <<"src-key">>}),
shovel_test_utils:await_shovel(Config, 0, <<"test">>, running),
amqp_channel:call(
Ch2, #'queue.declare'{queue = <<"src">>,
durable = true}),
ct:log("Declare queue"),
amqp_channel:call(
Ch2, #'queue.bind'{queue = <<"src">>,
exchange = <<"amq.direct">>,
routing_key = <<"src-key">>}),
shovel_test_utils:await_shovel(Config, 0, <<"test">>, running),

publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, <<"dest">>, <<"hello!">>)
end)
end).


predeclared_missing_dest_queue(Config) ->
with_ch(Config,
fun (Ch) ->
amqp_channel:call(
Ch, #'queue.declare'{queue = <<"src">>,
durable = true}),
amqp_channel:call(
Ch, #'queue.bind'{queue = <<"src">>,
exchange = <<"amq.direct">>,
routing_key = <<"src-key">>}),

shovel_test_utils:set_param_nowait(Config,
<<"test">>, [{<<"src-queue">>, <<"src">>},
{<<"dest-predeclared">>, true},
{<<"dest-queue">>, <<"dest">>},
{<<"src-prefetch-count">>, 1}]),
shovel_test_utils:await_shovel(Config, 0, <<"test">>, terminated),
expect_missing_queue(Ch, <<"dest">>),

with_newch(Config,
fun(Ch2) ->
amqp_channel:call(
Ch2, #'queue.declare'{queue = <<"dest">>,
durable = true}),

shovel_test_utils:await_shovel(Config, 0, <<"test">>, running),

publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, <<"dest">>, <<"hello!">>)
end)
publish_expect(Ch2, <<"amq.direct">>, <<"src-key">>, <<"dest">>, <<"hello!">>)
end)
end).

missing_dest_exchange(Config) ->
Expand Down Expand Up @@ -781,6 +814,16 @@ expect_missing_queue(Ch, Q) ->
ct:log("Queue ~p does not exist", [Q]),
ok
end.
expect_missing_exchange(Ch, X) ->
try
amqp_channel:call(Ch, #'exchange.declare'{exchange = X,
passive = true}),
ct:log("Exchange ~p still exists", [X]),
ct:fail(exchange_still_exists)
catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} ->
ct:log("Exchange ~p does not exist", [X]),
ok
end.

publish_count(Ch, X, Key, M, Count) ->
[begin
Expand Down

0 comments on commit a888c7b

Please sign in to comment.