Skip to content

Commit

Permalink
[ISSUE #1281]⚡️Optimize name server DefaultRequestProcessor#delete_to…
Browse files Browse the repository at this point in the history
…pic_in_name_srv
  • Loading branch information
mxsm committed Nov 24, 2024
1 parent 4ed0d3c commit d4072c5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
6 changes: 2 additions & 4 deletions rocketmq-namesrv/src/processor/default_request_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,8 @@ impl DefaultRequestProcessor {
let request_header = request
.decode_command_custom_header::<DeleteTopicFromNamesrvRequestHeader>()
.expect("decode DeleteTopicFromNamesrvRequestHeader failed");
self.route_info_manager.delete_topic(
request_header.topic.as_str(),
request_header.cluster_name.clone(),
);
self.route_info_manager
.delete_topic(request_header.topic, request_header.cluster_name);

Check warning on line 353 in rocketmq-namesrv/src/processor/default_request_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/processor/default_request_processor.rs#L352-L353

Added lines #L352 - L353 were not covered by tests
RemotingCommand::create_response_command()
}

Expand Down
19 changes: 10 additions & 9 deletions rocketmq-namesrv/src/route/route_info_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,29 +736,30 @@ impl RouteInfoManager {

pub(crate) fn delete_topic(
&mut self,
topic: impl Into<String>,
cluster_name: Option<impl Into<String>>,
topic: CheetahString,
cluster_name: Option<CheetahString>,

Check warning on line 740 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L739-L740

Added lines #L739 - L740 were not covered by tests
) {
let topic_inner = topic.into();
if cluster_name.is_some() {
let cluster_name_inner = cluster_name.map(|s| s.into()).unwrap();
let broker_names = self.cluster_addr_table.get(cluster_name_inner.as_str());
let lock = self.lock.write();
if cluster_name.as_ref().is_some_and(|inner| !inner.is_empty()) {
let cluster_name_inner = cluster_name.unwrap();
let broker_names = self.cluster_addr_table.get(&cluster_name_inner);

Check warning on line 745 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L742-L745

Added lines #L742 - L745 were not covered by tests
if broker_names.is_none() || broker_names.unwrap().is_empty() {
return;
}
if let Some(queue_data_map) = self.topic_queue_table.get_mut(topic_inner.as_str()) {
if let Some(queue_data_map) = self.topic_queue_table.mut_from_ref().get_mut(&topic) {

Check warning on line 749 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L749

Added line #L749 was not covered by tests
for broker_name in broker_names.unwrap() {
if let Some(remove_qd) = queue_data_map.remove(broker_name) {
info!(
"deleteTopic, remove one broker's topic {} {} {:?}",
broker_name, &topic_inner, remove_qd
broker_name, &topic, remove_qd

Check warning on line 754 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L754

Added line #L754 was not covered by tests
)
}
}
}
} else {
self.topic_queue_table.remove(topic_inner.as_str());
self.topic_queue_table.mut_from_ref().remove(&topic);

Check warning on line 760 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L760

Added line #L760 was not covered by tests
}
drop(lock)

Check warning on line 762 in rocketmq-namesrv/src/route/route_info_manager.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-namesrv/src/route/route_info_manager.rs#L762

Added line #L762 was not covered by tests
}

pub(crate) fn register_topic(&mut self, topic: CheetahString, queue_data_vec: Vec<QueueData>) {
Expand Down

0 comments on commit d4072c5

Please sign in to comment.