Skip to content

Commit

Permalink
[ISSUE #2232]🐛Fix Boker shutdown maybe register again
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 14, 2025
1 parent 1807c33 commit 6404947
Showing 1 changed file with 11 additions and 12 deletions.
23 changes: 11 additions & 12 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ pub(crate) struct BrokerRuntime {
transactional_message_service:
Option<ArcMut<DefaultTransactionalMessageService<DefaultMessageStore>>>,
broker_runtime: Option<RocketMQRuntime>,
shutdown: Arc<AtomicBool>,
shutdown_hook: Option<BrokerShutdownHook>,
// receiver for shutdown signal
pub(crate) shutdown_rx: Option<tokio::sync::broadcast::Receiver<()>>,
Expand Down Expand Up @@ -188,6 +187,7 @@ impl BrokerRuntime {
PopInflightMessageCounter::new(should_start_time.clone());

let mut inner = ArcMut::new(BrokerRuntimeInner::<DefaultMessageStore> {
shutdown: Arc::new(AtomicBool::new(false)),

Check warning on line 190 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L190

Added line #L190 was not covered by tests
store_host,
broker_config,
message_store_config,
Expand Down Expand Up @@ -240,7 +240,6 @@ impl BrokerRuntime {
inner,
transactional_message_service: None,
broker_runtime: Some(runtime),
shutdown: Arc::new(AtomicBool::new(false)),
shutdown_hook: None,
shutdown_rx: None,
}
Expand Down Expand Up @@ -274,7 +273,7 @@ impl BrokerRuntime {
}

pub(crate) fn shutdown_basic_service(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
self.inner.shutdown.store(true, Ordering::SeqCst);

Check warning on line 276 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L276

Added line #L276 was not covered by tests

if let Some(hook) = self.shutdown_hook.as_ref() {
hook.before_shutdown();
Expand Down Expand Up @@ -988,15 +987,6 @@ impl BrokerRuntime {
}
}

/*#[derive(Clone)]
pub(crate) struct BrokerRuntimeInner {
pub(crate) broker_out_api: Arc<BrokerOuterAPI>,
pub(crate) broker_config: Arc<BrokerConfig>,
pub(crate) message_store_config: Arc<MessageStoreConfig>,
pub(crate) server_config: Arc<ServerConfig>,
pub(crate) topic_queue_mapping_manager: Arc<TopicQueueMappingManager>,
}*/

impl<MS: MessageStore> BrokerRuntimeInner<MS> {
pub async fn register_single_topic_all(&self, topic_config: TopicConfig) {
let mut topic_config = topic_config;
Expand Down Expand Up @@ -1073,6 +1063,14 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
oneway: bool,
topic_config_wrapper: TopicConfigAndMappingSerializeWrapper,
) {
if this.shutdown.load(Ordering::Acquire) {
info!(
"BrokerRuntimeInner#do_register_broker_all: broker has shutdown, no need to \
register any more."

Check warning on line 1069 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1066-L1069

Added lines #L1066 - L1069 were not covered by tests
);
return;
}

Check warning on line 1073 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1071-L1073

Added lines #L1071 - L1073 were not covered by tests
let cluster_name = this
.broker_config
.broker_identity
Expand Down Expand Up @@ -1180,6 +1178,7 @@ impl<MS: MessageStore> StateGetter for ConsumerStateGetter<MS> {
}

pub(crate) struct BrokerRuntimeInner<MS> {
shutdown: Arc<AtomicBool>,
store_host: SocketAddr,
broker_config: BrokerConfig,
message_store_config: MessageStoreConfig,
Expand Down

0 comments on commit 6404947

Please sign in to comment.