From a40e982f9c9d94fb97f49ad3b9c30ee153550d95 Mon Sep 17 00:00:00 2001 From: mxsm Date: Sun, 24 Nov 2024 22:21:20 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#1277]=E2=9A=A1=EF=B8=8FOptimize=20nam?= =?UTF-8?q?e=20server=20DefaultRequestProcessor#wipe=5Fwrite=5Fperm=5Fof?= =?UTF-8?q?=5Fbroker=5Fby=5Flock=20(#1278)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../processor/default_request_processor.rs | 4 +-- .../src/route/route_info_manager.rs | 29 ++++++++++++++----- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/rocketmq-namesrv/src/processor/default_request_processor.rs b/rocketmq-namesrv/src/processor/default_request_processor.rs index 08b0006c..6a80e472 100644 --- a/rocketmq-namesrv/src/processor/default_request_processor.rs +++ b/rocketmq-namesrv/src/processor/default_request_processor.rs @@ -320,7 +320,7 @@ impl DefaultRequestProcessor { .expect("decode WipeWritePermOfBrokerRequestHeader failed"); let wipe_topic_cnt = self .route_info_manager - .wipe_write_perm_of_broker_by_lock(request_header.broker_name.as_str()); + .wipe_write_perm_of_broker_by_lock(&request_header.broker_name); RemotingCommand::create_response_command() .set_command_custom_header(WipeWritePermOfBrokerResponseHeader::new(wipe_topic_cnt)) } @@ -331,7 +331,7 @@ impl DefaultRequestProcessor { .expect("decode AddWritePermOfBrokerRequestHeader failed"); let add_topic_cnt = self .route_info_manager - .add_write_perm_of_broker_by_lock(request_header.broker_name.as_str()); + .add_write_perm_of_broker_by_lock(&request_header.broker_name); RemotingCommand::create_response_command() .set_command_custom_header(AddWritePermOfBrokerResponseHeader::new(add_topic_cnt)) } diff --git a/rocketmq-namesrv/src/route/route_info_manager.rs b/rocketmq-namesrv/src/route/route_info_manager.rs index 3e56d8fb..ade1849a 100644 --- a/rocketmq-namesrv/src/route/route_info_manager.rs +++ b/rocketmq-namesrv/src/route/route_info_manager.rs @@ -675,22 +675,35 @@ impl RouteInfoManager { Some(group_member) } - pub(crate) fn wipe_write_perm_of_broker_by_lock(&mut self, broker_name: &str) -> i32 { - self.operate_write_perm_of_broker(broker_name, RequestCode::WipeWritePermOfBroker) + #[inline] + pub(crate) fn wipe_write_perm_of_broker_by_lock(&self, broker_name: &CheetahString) -> i32 { + let lock = self.lock.write(); + let cnt = + self.operate_write_perm_of_broker(broker_name, RequestCode::WipeWritePermOfBroker); + drop(lock); + cnt } - pub(crate) fn add_write_perm_of_broker_by_lock(&mut self, broker_name: &str) -> i32 { - self.operate_write_perm_of_broker(broker_name, RequestCode::AddWritePermOfBroker) + #[inline] + pub(crate) fn add_write_perm_of_broker_by_lock(&self, broker_name: &CheetahString) -> i32 { + let lock = self.lock.write(); + let cnt = self.operate_write_perm_of_broker(broker_name, RequestCode::AddWritePermOfBroker); + drop(lock); + cnt } fn operate_write_perm_of_broker( - &mut self, - broker_name: &str, + &self, + broker_name: &CheetahString, request_code: RequestCode, ) -> i32 { let mut topic_cnt = 0; - for (_topic, qd_map) in self.topic_queue_table.iter_mut() { - let qd = qd_map.get_mut(broker_name).unwrap(); + for (_topic, qd_map) in self.topic_queue_table.mut_from_ref().iter_mut() { + let qd = qd_map.get_mut(broker_name); + if qd.is_none() { + continue; + } + let qd = qd.unwrap(); let mut perm = qd.perm; match request_code { RequestCode::WipeWritePermOfBroker => {