Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1154]🚀Optimize RemotingCommand method🎨 #1155

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@
None => Some(
response
.set_code(ResponseCode::ConsumerNotOnline)
.set_remark(Some(format!(
.set_remark(format!(

Check warning on line 93 in rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs#L93

Added line #L93 was not covered by tests
"the consumer group[{}] not online",
request_header.get_consumer_group()
))),
)),

Check warning on line 96 in rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs#L96

Added line #L96 was not covered by tests
),
}
}
Expand Down Expand Up @@ -234,7 +234,7 @@
Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some("No consumer offset in this broker".to_string())),
.set_remark("No consumer offset in this broker"),

Check warning on line 237 in rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/consumer_request_handler.rs#L237

Added line #L237 was not covered by tests
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@
if !mapping_context.is_leader() {
return Some(
RemotingCommand::create_response_command_with_code(ResponseCode::NotLeaderForQueue)
.set_remark(Some(format!(
.set_remark(format!(

Check warning on line 126 in rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs#L126

Added line #L126 was not covered by tests
"{}-{:?} does not exit in request process of current broker {:?}",
mapping_context.topic,
mapping_context.global_id,
mapping_detail.topic_queue_mapping_info.bname
))),
)),

Check warning on line 131 in rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs#L131

Added line #L131 was not covered by tests
);
}

Expand Down Expand Up @@ -157,7 +157,7 @@
if let Err(e) = rpc_response {
return Some(
RemotingCommand::create_response_command_with_code(ResponseCode::SystemError)
.set_remark(Some(format!("{}", e))),
.set_remark(format!("{}", e)),

Check warning on line 160 in rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs#L160

Added line #L160 was not covered by tests
);
} else {
match rpc_response
Expand All @@ -169,7 +169,7 @@
RemotingCommand::create_response_command_with_code(
ResponseCode::SystemError,
)
.set_remark(Some("Rpc response header is None".to_string())),
.set_remark("Rpc response header is None"),

Check warning on line 172 in rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs#L172

Added line #L172 was not covered by tests
);
}
Some(offset_response_header) => offset_response_header.offset,
Expand All @@ -192,12 +192,12 @@
if !mapping_context.is_leader() {
return Some(
RemotingCommand::create_response_command_with_code(ResponseCode::NotLeaderForQueue)
.set_remark(Some(format!(
.set_remark(format!(

Check warning on line 195 in rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs#L195

Added line #L195 was not covered by tests
"{}-{:?} does not exit in request process of current broker {:?}",
mapping_context.topic,
mapping_context.global_id,
mapping_detail.topic_queue_mapping_info.bname
))),
)),

Check warning on line 200 in rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs#L200

Added line #L200 was not covered by tests
);
}

Expand Down Expand Up @@ -229,7 +229,7 @@
if let Err(e) = rpc_response {
return Some(
RemotingCommand::create_response_command_with_code(ResponseCode::SystemError)
.set_remark(Some(format!("{}", e))),
.set_remark(format!("{}", e)),

Check warning on line 232 in rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs#L232

Added line #L232 was not covered by tests
);
} else {
match rpc_response
Expand All @@ -241,7 +241,7 @@
RemotingCommand::create_response_command_with_code(
ResponseCode::SystemError,
)
.set_remark(Some("Rpc response header is None".to_string())),
.set_remark("Rpc response header is None"),

Check warning on line 244 in rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/offset_request_handler.rs#L244

Added line #L244 was not covered by tests
);
}
Some(offset_response_header) => offset_response_header.offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some(result.remark().to_string())),
.set_remark(result.remark()),

Check warning on line 90 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L90

Added line #L90 was not covered by tests
);
}
if self
Expand All @@ -99,10 +99,10 @@
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some(format!(
.set_remark(format!(

Check warning on line 102 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L102

Added line #L102 was not covered by tests
"The topic[{}] is conflict with system topic.",
topic
))),
)),

Check warning on line 105 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L105

Added line #L105 was not covered by tests
);
}

Expand All @@ -115,11 +115,7 @@
) {
Ok(value) => value,
Err(err) => {
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some(err)),
);
return Some(response.set_code(ResponseCode::SystemError).set_remark(err));

Check warning on line 118 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L118

Added line #L118 was not covered by tests
}
};
let mut topic_config = TopicConfig {
Expand All @@ -142,7 +138,7 @@
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some("MIXED message type is not supported.".to_string())),
.set_remark("MIXED message type is not supported."),

Check warning on line 141 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L141

Added line #L141 was not covered by tests
);
}

Expand Down Expand Up @@ -217,7 +213,7 @@
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some(result.remark().to_string())),
.set_remark(result.remark()),

Check warning on line 216 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L216

Added line #L216 was not covered by tests
);
}
if self
Expand All @@ -229,10 +225,10 @@
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some(format!(
.set_remark(format!(

Check warning on line 228 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L228

Added line #L228 was not covered by tests
"The topic[{}] is conflict with system topic.",
topic
))),
)),

Check warning on line 231 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L231

Added line #L231 was not covered by tests
);
}
if topic_config.get_topic_message_type() == TopicMessageType::Mixed
Expand All @@ -241,7 +237,7 @@
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some("MIXED message type is not supported.".to_string())),
.set_remark("MIXED message type is not supported.".to_string()),

Check warning on line 240 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L240

Added line #L240 was not covered by tests
);
}
let topic_config_origin = self
Expand Down Expand Up @@ -313,7 +309,7 @@
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some("he specified topic is blank.".to_string())),
.set_remark("he specified topic is blank."),

Check warning on line 312 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L312

Added line #L312 was not covered by tests
);
}
if self
Expand All @@ -325,10 +321,10 @@
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some(format!(
.set_remark(format!(

Check warning on line 324 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L324

Added line #L324 was not covered by tests
"The topic[{}] is conflict with system topic.",
topic
))),
)),

Check warning on line 327 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L327

Added line #L327 was not covered by tests
);
}
let groups = self
Expand Down Expand Up @@ -437,7 +433,7 @@
return Some(
response
.set_code(ResponseCode::TopicNotExist)
.set_remark(Some(format!("The topic[{}] not exist.", topic))),
.set_remark(format!("The topic[{}] not exist.", topic)),

Check warning on line 436 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L436

Added line #L436 was not covered by tests
);
}
let topic_config = topic_config.unwrap();
Expand Down Expand Up @@ -498,7 +494,7 @@
return Some(
response
.set_code(ResponseCode::TopicNotExist)
.set_remark(Some(format!("No topic in this broker. topic: {}", topic))),
.set_remark(format!("No topic in this broker. topic: {}", topic)),

Check warning on line 497 in rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/admin_broker_processor/topic_request_handler.rs#L497

Added line #L497 was not covered by tests
);
}
let mut topic_queue_mapping_detail: Option<TopicQueueMappingDetail> = None;
Expand Down
28 changes: 10 additions & 18 deletions rocketmq-broker/src/processor/consumer_manage_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@
}
Some(
response
.set_remark(Some(format!(
.set_remark(format!(

Check warning on line 150 in rocketmq-broker/src/processor/consumer_manage_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/consumer_manage_processor.rs#L150

Added line #L150 was not covered by tests
"no consumer for this group, {}",
request_header.consumer_group
)))
))

Check warning on line 153 in rocketmq-broker/src/processor/consumer_manage_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/consumer_manage_processor.rs#L153

Added line #L153 was not covered by tests
.set_code(ResponseCode::SystemError),
)
}
Expand Down Expand Up @@ -187,30 +187,30 @@
return Some(
response
.set_code(ResponseCode::SubscriptionGroupNotExist)
.set_remark(Some(format!("subscription group not exist, {}", group))),
.set_remark(format!("subscription group not exist, {}", group)),

Check warning on line 190 in rocketmq-broker/src/processor/consumer_manage_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/consumer_manage_processor.rs#L190

Added line #L190 was not covered by tests
);
}

if !self.topic_config_manager.contains_topic(topic) {
return Some(
response
.set_code(ResponseCode::TopicNotExist)
.set_remark(Some(format!("topic not exist, {}", topic))),
.set_remark(format!("topic not exist, {}", topic)),

Check warning on line 198 in rocketmq-broker/src/processor/consumer_manage_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/consumer_manage_processor.rs#L198

Added line #L198 was not covered by tests
);
}

if queue_id.is_none() {
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some(format!("QueueId is null, topic is {}", topic))),
.set_remark(format!("QueueId is null, topic is {}", topic)),

Check warning on line 206 in rocketmq-broker/src/processor/consumer_manage_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/consumer_manage_processor.rs#L206

Added line #L206 was not covered by tests
);
}
if offset.is_none() {
return Some(
response
.set_code(ResponseCode::SystemError)
.set_remark(Some(format!("Offset is null, topic is {}", topic))),
.set_remark(format!("Offset is null, topic is {}", topic)),

Check warning on line 213 in rocketmq-broker/src/processor/consumer_manage_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/consumer_manage_processor.rs#L213

Added line #L213 was not covered by tests
);
}
if self.broker_config.use_server_side_reset_offset
Expand All @@ -226,7 +226,7 @@
queue_id.unwrap(),
offset.unwrap()
);
return Some(response.set_remark(Some("Offset has been previously reset".to_string())));
return Some(response.set_remark("Offset has been previously reset"));

Check warning on line 229 in rocketmq-broker/src/processor/consumer_manage_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/consumer_manage_processor.rs#L229

Added line #L229 was not covered by tests
}
self.consumer_offset_manager.commit_offset(
channel.remote_address(),
Expand Down Expand Up @@ -272,10 +272,7 @@
if !value {
response = response
.set_code(ResponseCode::QueryNotFound)
.set_remark(Some(
"Not found, do not set to zero, maybe this group boot first"
.to_string(),
));
.set_remark("Not found, do not set to zero, maybe this group boot first");

Check warning on line 275 in rocketmq-broker/src/processor/consumer_manage_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/consumer_manage_processor.rs#L275

Added line #L275 was not covered by tests
}
} else if min_offset <= 0
&& self.message_store.check_in_mem_by_consume_offset(
Expand All @@ -289,10 +286,7 @@
} else {
response = response
.set_code(ResponseCode::QueryNotFound)
.set_remark(Some(
"Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first"
.to_string(),
));
.set_remark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");

Check warning on line 289 in rocketmq-broker/src/processor/consumer_manage_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/consumer_manage_processor.rs#L289

Added line #L289 was not covered by tests
}
}
if let Some(result) = self.rewrite_response_for_static_topic(
Expand Down Expand Up @@ -393,9 +387,7 @@
} else {
response = response
.set_code(ResponseCode::QueryNotFound)
.set_remark(Some(
"Not found, maybe this group consumer boot first".to_string(),
));
.set_remark("Not found, maybe this group consumer boot first");

Check warning on line 390 in rocketmq-broker/src/processor/consumer_manage_processor.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/consumer_manage_processor.rs#L390

Added line #L390 was not covered by tests
}
let rewrite_response_result = self.rewrite_response_for_static_topic(
request_header,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@
client_address: &str,
) {
let mut response_header = PullMessageResponseHeader::default();
response.set_remark_mut(Some(format!("{:?}", get_message_result.status())));
response.set_remark_mut(format!("{:?}", get_message_result.status()));

Check warning on line 423 in rocketmq-broker/src/processor/default_pull_message_result_handler.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/default_pull_message_result_handler.rs#L423

Added line #L423 was not covered by tests
response_header.next_begin_offset = Some(get_message_result.next_begin_offset());
response_header.min_offset = Some(get_message_result.min_offset());
response_header.max_offset = Some(get_message_result.max_offset());
Expand Down
Loading
Loading