From 64049473e91f0d8655d3cfb7cea49d6790e668c9 Mon Sep 17 00:00:00 2001 From: mxsm Date: Tue, 14 Jan 2025 06:05:17 +0000 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#2232]=F0=9F=90=9BFix=20Boker=20shutdo?= =?UTF-8?q?wn=20maybe=20register=20again?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-broker/src/broker_runtime.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/rocketmq-broker/src/broker_runtime.rs b/rocketmq-broker/src/broker_runtime.rs index 786e18b5..6885e1ad 100644 --- a/rocketmq-broker/src/broker_runtime.rs +++ b/rocketmq-broker/src/broker_runtime.rs @@ -149,7 +149,6 @@ pub(crate) struct BrokerRuntime { transactional_message_service: Option>>, broker_runtime: Option, - shutdown: Arc, shutdown_hook: Option, // receiver for shutdown signal pub(crate) shutdown_rx: Option>, @@ -188,6 +187,7 @@ impl BrokerRuntime { PopInflightMessageCounter::new(should_start_time.clone()); let mut inner = ArcMut::new(BrokerRuntimeInner:: { + shutdown: Arc::new(AtomicBool::new(false)), store_host, broker_config, message_store_config, @@ -240,7 +240,6 @@ impl BrokerRuntime { inner, transactional_message_service: None, broker_runtime: Some(runtime), - shutdown: Arc::new(AtomicBool::new(false)), shutdown_hook: None, shutdown_rx: None, } @@ -274,7 +273,7 @@ impl BrokerRuntime { } pub(crate) fn shutdown_basic_service(&mut self) { - self.shutdown.store(true, Ordering::SeqCst); + self.inner.shutdown.store(true, Ordering::SeqCst); if let Some(hook) = self.shutdown_hook.as_ref() { hook.before_shutdown(); @@ -988,15 +987,6 @@ impl BrokerRuntime { } } -/*#[derive(Clone)] -pub(crate) struct BrokerRuntimeInner { - pub(crate) broker_out_api: Arc, - pub(crate) broker_config: Arc, - pub(crate) message_store_config: Arc, - pub(crate) server_config: Arc, - pub(crate) topic_queue_mapping_manager: Arc, -}*/ - impl BrokerRuntimeInner { pub async fn register_single_topic_all(&self, topic_config: TopicConfig) { let mut topic_config = topic_config; @@ -1073,6 +1063,14 @@ impl BrokerRuntimeInner { oneway: bool, topic_config_wrapper: TopicConfigAndMappingSerializeWrapper, ) { + if this.shutdown.load(Ordering::Acquire) { + info!( + "BrokerRuntimeInner#do_register_broker_all: broker has shutdown, no need to \ + register any more." + ); + return; + } + let cluster_name = this .broker_config .broker_identity @@ -1180,6 +1178,7 @@ impl StateGetter for ConsumerStateGetter { } pub(crate) struct BrokerRuntimeInner { + shutdown: Arc, store_host: SocketAddr, broker_config: BrokerConfig, message_store_config: MessageStoreConfig,