Skip to content

Commit

Permalink
[ISSUE #1277]⚡️Optimize name server DefaultRequestProcessor#wipe_writ…
Browse files Browse the repository at this point in the history
…e_perm_of_broker_by_lock (#1278)
  • Loading branch information
mxsm authored Nov 24, 2024
1 parent 4e333ce commit a40e982
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
4 changes: 2 additions & 2 deletions rocketmq-namesrv/src/processor/default_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl DefaultRequestProcessor {
.expect("decode WipeWritePermOfBrokerRequestHeader failed");
let wipe_topic_cnt = self
.route_info_manager
.wipe_write_perm_of_broker_by_lock(request_header.broker_name.as_str());
.wipe_write_perm_of_broker_by_lock(&request_header.broker_name);
RemotingCommand::create_response_command()
.set_command_custom_header(WipeWritePermOfBrokerResponseHeader::new(wipe_topic_cnt))
}
Expand All @@ -331,7 +331,7 @@ impl DefaultRequestProcessor {
.expect("decode AddWritePermOfBrokerRequestHeader failed");
let add_topic_cnt = self
.route_info_manager
.add_write_perm_of_broker_by_lock(request_header.broker_name.as_str());
.add_write_perm_of_broker_by_lock(&request_header.broker_name);
RemotingCommand::create_response_command()
.set_command_custom_header(AddWritePermOfBrokerResponseHeader::new(add_topic_cnt))
}
Expand Down
29 changes: 21 additions & 8 deletions rocketmq-namesrv/src/route/route_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,22 +675,35 @@ impl RouteInfoManager {
Some(group_member)
}

pub(crate) fn wipe_write_perm_of_broker_by_lock(&mut self, broker_name: &str) -> i32 {
self.operate_write_perm_of_broker(broker_name, RequestCode::WipeWritePermOfBroker)
#[inline]
pub(crate) fn wipe_write_perm_of_broker_by_lock(&self, broker_name: &CheetahString) -> i32 {
let lock = self.lock.write();
let cnt =
self.operate_write_perm_of_broker(broker_name, RequestCode::WipeWritePermOfBroker);
drop(lock);
cnt
}

pub(crate) fn add_write_perm_of_broker_by_lock(&mut self, broker_name: &str) -> i32 {
self.operate_write_perm_of_broker(broker_name, RequestCode::AddWritePermOfBroker)
#[inline]
pub(crate) fn add_write_perm_of_broker_by_lock(&self, broker_name: &CheetahString) -> i32 {
let lock = self.lock.write();
let cnt = self.operate_write_perm_of_broker(broker_name, RequestCode::AddWritePermOfBroker);
drop(lock);
cnt
}

fn operate_write_perm_of_broker(
&mut self,
broker_name: &str,
&self,
broker_name: &CheetahString,
request_code: RequestCode,
) -> i32 {
let mut topic_cnt = 0;
for (_topic, qd_map) in self.topic_queue_table.iter_mut() {
let qd = qd_map.get_mut(broker_name).unwrap();
for (_topic, qd_map) in self.topic_queue_table.mut_from_ref().iter_mut() {
let qd = qd_map.get_mut(broker_name);
if qd.is_none() {
continue;
}
let qd = qd.unwrap();
let mut perm = qd.perm;
match request_code {
RequestCode::WipeWritePermOfBroker => {
Expand Down

0 comments on commit a40e982

Please sign in to comment.