Skip to content

Commit

Permalink
Delete stream replica even if node is down.
Browse files Browse the repository at this point in the history
Otherwise we can't forget replicas on nodes that are no longer
cluster members.

Fixes #9282
  • Loading branch information
mkuratczyk committed Sep 6, 2023
1 parent 3783388 commit 2dee5fe
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 19 deletions.
2 changes: 1 addition & 1 deletion MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ bazel_dep(

bazel_dep(
name = "rabbitmq_osiris",
version = "1.6.4",
version = "1.6.5",
repo_name = "osiris",
)

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck prop
PLT_APPS += mnesia

dep_syslog = git https://github.com/schlagert/syslog 4.0.0
dep_osiris = git https://github.com/rabbitmq/osiris v1.6.4
dep_osiris = git https://github.com/rabbitmq/osiris v1.6.5
dep_systemd = hex 0.6.1
dep_seshat = hex 0.4.0

Expand Down
11 changes: 3 additions & 8 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -924,14 +924,9 @@ delete_replica(VHost, Name, Node) ->
{ok, Q} when ?amqqueue_is_quorum(Q) ->
{error, quorum_queue_not_supported};
{ok, Q} when ?amqqueue_is_stream(Q) ->
case lists:member(Node, rabbit_nodes:list_running()) of
false ->
{error, node_not_running};
true ->
#{name := StreamId} = amqqueue:get_type_state(Q),
{ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node),
Reply
end;
#{name := StreamId} = amqqueue:get_type_state(Q),
{ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node),
Reply;
E ->
E
end.
Expand Down
29 changes: 20 additions & 9 deletions deps/rabbit/test/rabbit_stream_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
-compile(nowarn_export_all).
-compile(export_all).

-define(WAIT, 5000).

suite() ->
[{timetrap, 15 * 60000}].

Expand Down Expand Up @@ -583,7 +585,7 @@ delete_replica(Config) ->
declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
check_leader_and_replicas(Config, [Server0, Server1, Server2]),
%% Not a member of the cluster, what would happen?
?assertEqual({error, node_not_running},
?assertEqual(ok,
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, 'zen@rabbit'])),
?assertEqual(ok,
Expand Down Expand Up @@ -725,17 +727,20 @@ delete_down_replica(Config) ->
declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
check_leader_and_replicas(Config, [Server0, Server1, Server2]),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
?assertEqual({error, node_not_running},
?assertEqual(ok,
rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server1])),
%% check it isn't gone
check_leader_and_replicas(Config, [Server0, Server1, Server2], members),
%% check it's gone
check_leader_and_replicas(Config, [Server0, Server2], members),
ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
rabbit_ct_helpers:await_condition(
fun() ->
ok == rpc:call(Server0, rabbit_stream_queue, delete_replica,
[<<"/">>, Q, Server1])
end),
check_leader_and_replicas(Config, [Server0, Server2], members),
%% check the folder was deleted
QName = rabbit_misc:r(<<"/">>, queue, Q),
StreamId = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_stream_id, [QName]),
Server1DataDir = rabbit_ct_broker_helpers:get_node_config(Config, 1, data_dir),
DeletedReplicaDir = filename:join([Server1DataDir, "stream", StreamId]),
timer:sleep(1000),
?awaitMatch(false, filelib:is_dir(DeletedReplicaDir), ?WAIT),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).

publish_coordinator_unavailable(Config) ->
Expand Down Expand Up @@ -1294,6 +1299,12 @@ get_leader_info(QName) ->
{error, not_found}
end.

get_stream_id(QName) ->
{ok, Q} = rabbit_amqqueue:lookup(QName),
QState = amqqueue:get_type_state(Q),
#{name := StreamId} = QState,
StreamId.

kill_process(Config, Node, Pid) ->
rabbit_ct_broker_helpers:rpc(Config, Node, ?MODULE, do_kill_process,
[Pid]).
Expand Down

0 comments on commit 2dee5fe

Please sign in to comment.