Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of #12640 to v4.0.x #12641

Merged
merged 12 commits into from
Nov 4, 2024
2 changes: 2 additions & 0 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,8 @@ overview(#?STATE{consumers = Cons,
Conf = #{name => Cfg#cfg.name,
resource => Cfg#cfg.resource,
dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
dead_letter_handler => Cfg#cfg.dead_letter_handler,
overflow_strategy => Cfg#cfg.overflow_strategy,
max_length => Cfg#cfg.max_length,
max_bytes => Cfg#cfg.max_bytes,
consumer_strategy => Cfg#cfg.consumer_strategy,
Expand Down
53 changes: 42 additions & 11 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,8 @@ declare_queue_error(Error, Queue, Leader, ActingUser) ->
ra_machine(Q) ->
{module, rabbit_fifo, ra_machine_config(Q)}.

ra_machine_config(Q) when ?is_amqqueue(Q) ->
gather_policy_config(Q, IsQueueDeclaration) ->
QName = amqqueue:get_name(Q),
{Name, _} = amqqueue:get_pid(Q),
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
OverflowBin = args_policy_lookup(<<"overflow">>, fun policy_has_precedence/2, Q),
Expand All @@ -327,28 +326,42 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>,
fun resolve_delivery_limit/2, Q) of
undefined ->
rabbit_log:info("~ts: delivery_limit not set, defaulting to ~b",
[rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]),
case IsQueueDeclaration of
true ->
rabbit_log:info(
"~ts: delivery_limit not set, defaulting to ~b",
[rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]);
false ->
ok
end,
?DEFAULT_DELIVERY_LIMIT;
DL ->
DL
end,
Expires = args_policy_lookup(<<"expires">>, fun min/2, Q),
MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dead_letter_handler(Q, Overflow),
become_leader_handler => {?MODULE, become_leader, [QName]},
DeadLetterHandler = dead_letter_handler(Q, Overflow),
#{dead_letter_handler => DeadLetterHandler,
max_length => MaxLength,
max_bytes => MaxBytes,
single_active_consumer_on => single_active_consumer_on(Q),
delivery_limit => DeliveryLimit,
overflow_strategy => Overflow,
created => erlang:system_time(millisecond),
expires => Expires,
msg_ttl => MsgTTL
}.

ra_machine_config(Q) when ?is_amqqueue(Q) ->
PolicyConfig = gather_policy_config(Q, true),
QName = amqqueue:get_name(Q),
{Name, _} = amqqueue:get_pid(Q),
PolicyConfig#{
name => Name,
queue_resource => QName,
become_leader_handler => {?MODULE, become_leader, [QName]},
single_active_consumer_on => single_active_consumer_on(Q),
created => erlang:system_time(millisecond)
}.

resolve_delivery_limit(PolVal, ArgVal)
when PolVal < 0 orelse ArgVal < 0 ->
max(PolVal, ArgVal);
Expand Down Expand Up @@ -624,7 +637,9 @@ handle_tick(QName,
ok;
_ ->
ok
end
end,
maybe_apply_policies(Q, Overview),
ok
catch
_:Err ->
rabbit_log:debug("~ts: handle tick failed with ~p",
Expand Down Expand Up @@ -708,6 +723,21 @@ system_recover(quorum_queues) ->
ok
end.

maybe_apply_policies(Q, #{config := CurrentConfig}) ->
NewPolicyConfig = gather_policy_config(Q, false),

RelevantKeys = maps:keys(NewPolicyConfig),
CurrentPolicyConfig = maps:with(RelevantKeys, CurrentConfig),

ShouldUpdate = NewPolicyConfig =/= CurrentPolicyConfig,
case ShouldUpdate of
true ->
rabbit_log:debug("Re-applying policies to ~ts", [rabbit_misc:rs(amqqueue:get_name(Q))]),
policy_changed(Q),
ok;
false -> ok
end.

-spec recover(binary(), [amqqueue:amqqueue()]) ->
{[amqqueue:amqqueue()], [amqqueue:amqqueue()]}.
recover(_Vhost, Queues) ->
Expand Down Expand Up @@ -2064,3 +2094,4 @@ file_handle_other_reservation() ->

file_handle_release_reservation() ->
ok.

204 changes: 193 additions & 11 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ groups() ->
force_shrink_member_to_current_member,
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member,
policy_repair,
gh_12635
]
++ all_tests()},
Expand Down Expand Up @@ -1303,20 +1304,175 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
?assertEqual(3, length(Nodes0))
end || Q <- QQs, VHost <- VHosts].


% Tests that, if the process of a QQ is dead in the moment of declaring a policy
% that affects such queue, when the process is made available again, the policy
% will eventually get applied. (https://github.com/rabbitmq/rabbitmq-server/issues/7863)
policy_repair(Config) ->
[Server0, _Server1, _Server2] = Servers =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),

QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
RaName = ra_name(QQ),
ExpectedMaxLength1 = 10,
Priority1 = 1,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_1">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength1}, {<<"overflow">>, <<"reject-publish">>}],
Priority1,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Wait for the policy to apply
QueryFun = fun rabbit_fifo:overview/1,
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength1}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength1 + some messages but after consuming all messages only
% MaxLength1 are retrieved.
% Checking twice to ensure consistency
publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 1),
% +1 because QQs let one pass
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength1 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ),

% Set higher priority policy, allowing more messages
ExpectedMaxLength2 = 20,
Priority2 = 2,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_2">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength2}, {<<"overflow">>, <<"reject-publish">>}],
Priority2,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Wait for the policy to apply
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength2}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength2 + some messages but after consuming all messages only
% MaxLength2 are retrieved.
% Checking twice to ensure consistency.
% + 1 because QQs let one pass
publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 1),
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength2 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ),

% Ensure the queue process is unavailable
lists:foreach(fun(Srv) -> ensure_qq_proc_dead(Config, Srv, RaName) end, Servers),

% Add policy with higher priority, allowing even more messages.
ExpectedMaxLength3 = 30,
Priority3 = 3,
ok = rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_policy,
set,
[
<<"/">>,
<<QQ/binary, "_3">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength3}, {<<"overflow">>, <<"reject-publish">>}],
Priority3,
<<"quorum_queues">>,
<<"acting-user">>
]),

% Restart the queue process.
{ok, Queue} =
rabbit_ct_broker_helpers:rpc(
Config,
0,
rabbit_amqqueue,
lookup,
[{resource, <<"/">>, queue, QQ}]),
lists:foreach(
fun(Srv) ->
rabbit_ct_broker_helpers:rpc(
Config,
Srv,
rabbit_quorum_queue,
recover,
[foo, [Queue]]
)
end,
Servers),

% Wait for the queue to be available again.
lists:foreach(fun(Srv) ->
rabbit_ct_helpers:await_condition(
fun () ->
is_pid(
rabbit_ct_broker_helpers:rpc(
Config,
Srv,
erlang,
whereis,
[RaName]))
end)
end,
Servers),

% Wait for the policy to apply
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength3 + some messages but after consuming all messages only
% MaxLength3 are retrieved.
% Checking twice to ensure consistency.
% + 1 because QQs let one pass
publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 1),
wait_for_messages_ready(Servers, RaName, ExpectedMaxLength3 + 1),
fail = publish_confirm(Ch, QQ),
fail = publish_confirm(Ch, QQ),
consume_all(Ch, QQ).


gh_12635(Config) ->
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
[Server0, _Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
[rabbit, quorum_min_checkpoint_interval, 1]),
[rabbit, quorum_min_checkpoint_interval, 1]),

Ch0 = rabbit_ct_client_helpers:open_channel(Config, Server0),
#'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}),
QQ = ?config(queue_name, Config),
RaName = ra_name(QQ),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

%% stop member to simulate slow or down member
ok = rpc:call(Server2, ra, stop_server, [quorum_queues, {RaName, Server2}]),
Expand All @@ -1327,10 +1483,10 @@ gh_12635(Config) ->
%% force a checkpoint on leader
ok = rpc:call(Server0, ra, cast_aux_command, [{RaName, Server0}, force_checkpoint]),
rabbit_ct_helpers:await_condition(
fun () ->
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
undefined =/= maps:get(latest_checkpoint_index, Log)
end),
fun () ->
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
undefined =/= maps:get(latest_checkpoint_index, Log)
end),

%% publish 1 more message
publish_confirm(Ch0, QQ),
Expand All @@ -1346,10 +1502,10 @@ gh_12635(Config) ->
#'queue.purge_ok'{} = amqp_channel:call(Ch0, #'queue.purge'{queue = QQ}),

rabbit_ct_helpers:await_condition(
fun () ->
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
undefined =/= maps:get(snapshot_index, Log)
end),
fun () ->
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
undefined =/= maps:get(snapshot_index, Log)
end),
%% restart the down member
ok = rpc:call(Server2, ra, restart_server, [quorum_queues, {RaName, Server2}]),
Pid2 = rpc:call(Server2, erlang, whereis, [RaName]),
Expand All @@ -1359,11 +1515,12 @@ gh_12635(Config) ->
{'DOWN',Ref, process,_, _} ->
ct:fail("unexpected DOWN")
after 500 ->
ok
ok
end,
flush(1),
ok.


priority_queue_fifo(Config) ->
%% testing: if hi priority messages are published before lo priority
%% messages they are always consumed first (fifo)
Expand Down Expand Up @@ -4397,3 +4554,28 @@ lists_interleave([Item | Items], List)
{Left, Right} = lists:split(2, List),
Left ++ [Item | lists_interleave(Items, Right)].

publish_confirm_many(Ch, Queue, Count) ->
lists:foreach(fun(_) -> publish_confirm(Ch, Queue) end, lists:seq(1, Count)).

consume_all(Ch, QQ) ->
Consume = fun C(Acc) ->
case amqp_channel:call(Ch, #'basic.get'{queue = QQ}) of
{#'basic.get_ok'{}, Msg} ->
C([Msg | Acc]);
_ ->
Acc
end
end,
Consume([]).

ensure_qq_proc_dead(Config, Server, RaName) ->
case rabbit_ct_broker_helpers:rpc(Config, Server, erlang, whereis, [RaName]) of
undefined ->
ok;
Pid ->
rabbit_ct_broker_helpers:rpc(Config, Server, erlang, exit, [Pid, kill]),
%% Give some time for the supervisor to restart the process
timer:sleep(500),
ensure_qq_proc_dead(Config, Server, RaName)
end.

Loading