From 2dee5fef9c4d0d669f00cb11e10bed3598895ded Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Tue, 5 Sep 2023 11:04:31 +0200 Subject: [PATCH] Delete stream replica even if node is down. Otherwise we can't forget replicas on nodes that are no longer cluster members. Fixes https://github.com/rabbitmq/rabbitmq-server/issues/9282 --- MODULE.bazel | 2 +- deps/rabbit/Makefile | 2 +- deps/rabbit/src/rabbit_stream_queue.erl | 11 ++----- .../rabbit/test/rabbit_stream_queue_SUITE.erl | 29 +++++++++++++------ 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/MODULE.bazel b/MODULE.bazel index d7ab55d5374..fae33481fa6 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -43,7 +43,7 @@ bazel_dep( bazel_dep( name = "rabbitmq_osiris", - version = "1.6.4", + version = "1.6.5", repo_name = "osiris", ) diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index bb2fb67eb49..b6dd9f12854 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -149,7 +149,7 @@ TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck prop PLT_APPS += mnesia dep_syslog = git https://github.com/schlagert/syslog 4.0.0 -dep_osiris = git https://github.com/rabbitmq/osiris v1.6.4 +dep_osiris = git https://github.com/rabbitmq/osiris v1.6.5 dep_systemd = hex 0.6.1 dep_seshat = hex 0.4.0 diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index b9d9a72e474..a5433bc1e86 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -924,14 +924,9 @@ delete_replica(VHost, Name, Node) -> {ok, Q} when ?amqqueue_is_quorum(Q) -> {error, quorum_queue_not_supported}; {ok, Q} when ?amqqueue_is_stream(Q) -> - case lists:member(Node, rabbit_nodes:list_running()) of - false -> - {error, node_not_running}; - true -> - #{name := StreamId} = amqqueue:get_type_state(Q), - {ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node), - Reply - end; + #{name := StreamId} = amqqueue:get_type_state(Q), + {ok, Reply, _} = rabbit_stream_coordinator:delete_replica(StreamId, Node), + Reply; E -> E end. diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index e31444bb1ac..05394839c50 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -16,6 +16,8 @@ -compile(nowarn_export_all). -compile(export_all). +-define(WAIT, 5000). + suite() -> [{timetrap, 15 * 60000}]. @@ -583,7 +585,7 @@ delete_replica(Config) -> declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), check_leader_and_replicas(Config, [Server0, Server1, Server2]), %% Not a member of the cluster, what would happen? - ?assertEqual({error, node_not_running}, + ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica, [<<"/">>, Q, 'zen@rabbit'])), ?assertEqual(ok, @@ -725,17 +727,20 @@ delete_down_replica(Config) -> declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), check_leader_and_replicas(Config, [Server0, Server1, Server2]), ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), - ?assertEqual({error, node_not_running}, + ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica, [<<"/">>, Q, Server1])), - %% check it isn't gone - check_leader_and_replicas(Config, [Server0, Server1, Server2], members), + %% check it's gone + check_leader_and_replicas(Config, [Server0, Server2], members), ok = rabbit_ct_broker_helpers:start_node(Config, Server1), - rabbit_ct_helpers:await_condition( - fun() -> - ok == rpc:call(Server0, rabbit_stream_queue, delete_replica, - [<<"/">>, Q, Server1]) - end), + check_leader_and_replicas(Config, [Server0, Server2], members), + %% check the folder was deleted + QName = rabbit_misc:r(<<"/">>, queue, Q), + StreamId = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, get_stream_id, [QName]), + Server1DataDir = rabbit_ct_broker_helpers:get_node_config(Config, 1, data_dir), + DeletedReplicaDir = filename:join([Server1DataDir, "stream", StreamId]), + timer:sleep(1000), + ?awaitMatch(false, filelib:is_dir(DeletedReplicaDir), ?WAIT), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). publish_coordinator_unavailable(Config) -> @@ -1294,6 +1299,12 @@ get_leader_info(QName) -> {error, not_found} end. +get_stream_id(QName) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + QState = amqqueue:get_type_state(Q), + #{name := StreamId} = QState, + StreamId. + kill_process(Config, Node, Pid) -> rabbit_ct_broker_helpers:rpc(Config, Node, ?MODULE, do_kill_process, [Pid]).