Skip to content

Commit

Permalink
Merge pull request #12636 from rabbitmq/gh-12635
Browse files Browse the repository at this point in the history
QQ: handle case where a stale read request results in member crash.
  • Loading branch information
michaelklishin authored Nov 1, 2024
2 parents 889fdc9 + 94e6779 commit ef2c8df
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 9 deletions.
32 changes: 24 additions & 8 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1126,8 +1126,11 @@ handle_aux(_, _, garbage_collection, Aux, RaAux) ->
handle_aux(_RaState, _, force_checkpoint,
#?AUX{last_checkpoint = Check0} = Aux, RaAux) ->
Ts = erlang:system_time(millisecond),
#?STATE{cfg = #cfg{resource = QR}} = ra_aux:machine_state(RaAux),
rabbit_log:debug("~ts: rabbit_fifo: forcing checkpoint at ~b",
[rabbit_misc:rs(QR), ra_aux:last_applied(RaAux)]),
{Check, Effects} = do_checkpoints(Ts, Check0, RaAux, true),
{no_reply, Aux#?AUX{last_checkpoint= Check}, RaAux, Effects};
{no_reply, Aux#?AUX{last_checkpoint = Check}, RaAux, Effects};
handle_aux(RaState, _, {dlx, _} = Cmd, Aux0, RaAux) ->
#?STATE{dlx = DlxState,
cfg = #cfg{dead_letter_handler = DLH,
Expand Down Expand Up @@ -2052,25 +2055,38 @@ delivery_effect(ConsumerKey, [{MsgId, ?MSG(Idx, Header)}],
{CTag, CPid} = consumer_id(ConsumerKey, State),
{send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]},
?DELIVERY_SEND_MSG_OPTS};
delivery_effect(ConsumerKey, Msgs, State) ->
delivery_effect(ConsumerKey, Msgs,
#?STATE{cfg = #cfg{resource = QR}} = State) ->
{CTag, CPid} = consumer_id(ConsumerKey, State),
RaftIdxs = lists:foldr(fun ({_, ?MSG(I, _)}, Acc) ->
[I | Acc]
end, [], Msgs),
{RaftIdxs, Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) ->
{[I | Acc], N+1}
end, {[], 0}, Msgs),
{log, RaftIdxs,
fun(Log) ->
fun (Commands)
when length(Commands) < Num ->
%% the mandatory length/1 guard is a bit :(
rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b "
"indexes ~w but only ~b were returned. "
"This is most likely a stale read request "
"and can be ignored",
[rabbit_misc:rs(QR), CTag, Num, RaftIdxs,
length(Commands)]),
[];
(Commands) ->
DelMsgs = lists:zipwith(
fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
{MsgId, {Header, get_msg(Cmd)}}
end, Log, Msgs),
end, Commands, Msgs),
[{send_msg, CPid, {delivery, CTag, DelMsgs},
?DELIVERY_SEND_MSG_OPTS}]
end,
{local, node(CPid)}}.

reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
{log, [RaftIdx],
fun ([Cmd]) ->
fun ([]) ->
[];
([Cmd]) ->
[{reply, From, {wrap_reply,
{dequeue, {MsgId, {Header, get_msg(Cmd)}}, Ready}}}]
end}.
Expand Down
64 changes: 63 additions & 1 deletion deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ groups() ->
single_active_consumer_priority,
force_shrink_member_to_current_member,
force_all_queues_shrink_member_to_current_member,
force_vhost_queues_shrink_member_to_current_member
force_vhost_queues_shrink_member_to_current_member,
gh_12635
]
++ all_tests()},
{cluster_size_5, [], [start_queue,
Expand Down Expand Up @@ -1300,6 +1301,67 @@ force_vhost_queues_shrink_member_to_current_member(Config) ->
?assertEqual(3, length(Nodes0))
end || Q <- QQs, VHost <- VHosts].

gh_12635(Config) ->
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
[Server0, _Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env,
[rabbit, quorum_min_checkpoint_interval, 1]),

Ch0 = rabbit_ct_client_helpers:open_channel(Config, Server0),
#'confirm.select_ok'{} = amqp_channel:call(Ch0, #'confirm.select'{}),
QQ = ?config(queue_name, Config),
RaName = ra_name(QQ),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch0, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),

%% stop member to simulate slow or down member
ok = rpc:call(Server2, ra, stop_server, [quorum_queues, {RaName, Server2}]),

publish_confirm(Ch0, QQ),
publish_confirm(Ch0, QQ),

%% force a checkpoint on leader
ok = rpc:call(Server0, ra, cast_aux_command, [{RaName, Server0}, force_checkpoint]),
rabbit_ct_helpers:await_condition(
fun () ->
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
undefined =/= maps:get(latest_checkpoint_index, Log)
end),

%% publish 1 more message
publish_confirm(Ch0, QQ),

Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2),
%% subscribe then cancel, this will assign the messages against the consumer
%% but as the member is down they will not be delivered
qos(Ch2, 100, false),
subscribe(Ch2, QQ, false),
rabbit_ct_client_helpers:close_channel(Ch2),
flush(100),
%% purge
#'queue.purge_ok'{} = amqp_channel:call(Ch0, #'queue.purge'{queue = QQ}),

rabbit_ct_helpers:await_condition(
fun () ->
{ok, #{log := Log}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]),
undefined =/= maps:get(snapshot_index, Log)
end),
%% restart the down member
ok = rpc:call(Server2, ra, restart_server, [quorum_queues, {RaName, Server2}]),
Pid2 = rpc:call(Server2, erlang, whereis, [RaName]),
?assert(is_pid(Pid2)),
Ref = erlang:monitor(process, Pid2),
receive
{'DOWN',Ref, process,_, _} ->
ct:fail("unexpected DOWN")
after 500 ->
ok
end,
flush(1),
ok.

priority_queue_fifo(Config) ->
%% testing: if hi priority messages are published before lo priority
%% messages they are always consumed first (fifo)
Expand Down

0 comments on commit ef2c8df

Please sign in to comment.