-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1813]🚀Implement DefaultMQPushConsumerImpl#ack_async method #1827
Conversation
WalkthroughThe pull request enhances the Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (1)
rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (1)
1373-1395
: Refactor Repeated Error Handling CodeThe pattern of checking for an error, logging it, and returning is repeated multiple times (e.g., lines 1379–1382, 1385–1388, 1391–1394). Consider refactoring this repetitive code into a helper function or using the
?
operator to simplify error handling, reduce duplication, and enhance readability.Apply this diff to refactor using the
?
operator:pub(crate) async fn ack_async(&mut self, message: &MessageExt, consumer_group: &CheetahString) { let extra_info = message .get_property(&CheetahString::from_static_str( MessageConst::PROPERTY_POP_CK, )) .unwrap_or_default(); - let extra_info_strs = ExtraInfoUtil::split(extra_info.as_str()); - if extra_info_strs.is_err() { - error!("ackAsync error: {}", extra_info_strs.unwrap_err()); - return; - } - let extra_info_strs = extra_info_strs.unwrap(); + let extra_info_strs = match ExtraInfoUtil::split(extra_info.as_str()) { + Ok(value) => value, + Err(e) => { + error!("ackAsync error: {}", e); + return; + } + }; let queue_id = ExtraInfoUtil::get_queue_id(extra_info_strs.as_slice()); - if queue_id.is_err() { - error!("ackAsync error: {}", queue_id.unwrap_err()); - return; - } - let queue_id = queue_id.unwrap(); + let queue_id = match queue_id { + Ok(value) => value, + Err(e) => { + error!("ackAsync error: {}", e); + return; + } + }; let queue_offset = ExtraInfoUtil::get_queue_offset(extra_info_strs.as_slice()); - if queue_offset.is_err() { - error!("ackAsync error: {}", queue_offset.unwrap_err()); - return; - } - let queue_offset = queue_offset.unwrap(); + let queue_offset = match queue_offset { + Ok(value) => value, + Err(e) => { + error!("ackAsync error: {}", e); + return; + } + };Alternatively, adjust the function to return
Result<()>
and use the?
operator for error propagation.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs
(2 hunks)
🔇 Additional comments (1)
rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (1)
44-44
: Import Statement Added
The addition of the AckMessageRequestHeader
import is appropriate for the implementation of the acknowledgment functionality.
} | ||
|
||
pub(crate) async fn ack_async(&mut self, message: &MessageExt, consumer_group: &CheetahString) { | ||
let extra_info = message | ||
.get_property(&CheetahString::from_static_str( | ||
MessageConst::PROPERTY_POP_CK, | ||
)) | ||
.unwrap_or_default(); | ||
let extra_info_strs = ExtraInfoUtil::split(extra_info.as_str()); | ||
if extra_info_strs.is_err() { | ||
error!("ackAsync error: {}", extra_info_strs.unwrap_err()); | ||
return; | ||
} | ||
let extra_info_strs = extra_info_strs.unwrap(); | ||
let queue_id = ExtraInfoUtil::get_queue_id(extra_info_strs.as_slice()); | ||
if queue_id.is_err() { | ||
error!("ackAsync error: {}", queue_id.unwrap_err()); | ||
return; | ||
} | ||
let queue_id = queue_id.unwrap(); | ||
let queue_offset = ExtraInfoUtil::get_queue_offset(extra_info_strs.as_slice()); | ||
if queue_offset.is_err() { | ||
error!("ackAsync error: {}", queue_offset.unwrap_err()); | ||
return; | ||
} | ||
let queue_offset = queue_offset.unwrap(); | ||
let broker_name = CheetahString::from( | ||
ExtraInfoUtil::get_broker_name(extra_info_strs.as_slice()).unwrap_or_default(), | ||
); | ||
let topic = message.get_topic(); | ||
|
||
let client_instance = self.client_instance.as_mut().unwrap(); | ||
let des_broker_name = if !broker_name.is_empty() | ||
&& broker_name.starts_with(mix_all::LOGICAL_QUEUE_MOCK_BROKER_PREFIX) | ||
{ | ||
let mq = self | ||
.client_config | ||
.queue_with_namespace(MessageQueue::from_parts( | ||
topic, | ||
broker_name.clone(), | ||
queue_id, | ||
)); | ||
client_instance | ||
.get_broker_name_from_message_queue(&mq) | ||
.await | ||
} else { | ||
broker_name.clone() | ||
}; | ||
|
||
let mut find_broker_result = client_instance | ||
.find_broker_address_in_subscribe(&des_broker_name, mix_all::MASTER_ID, true) | ||
.await; | ||
if find_broker_result.is_none() { | ||
client_instance | ||
.update_topic_route_info_from_name_server_topic(topic) | ||
.await; | ||
find_broker_result = client_instance | ||
.find_broker_address_in_subscribe(&des_broker_name, mix_all::MASTER_ID, true) | ||
.await; | ||
} | ||
if find_broker_result.is_none() { | ||
error!("The broker[{}] not exist", des_broker_name); | ||
return; | ||
} | ||
|
||
let request_header = AckMessageRequestHeader { | ||
consumer_group: consumer_group.clone(), | ||
topic: CheetahString::from_string( | ||
ExtraInfoUtil::get_real_topic(extra_info_strs.as_slice(), topic, consumer_group) | ||
.unwrap_or_default(), | ||
), | ||
queue_id, | ||
extra_info, | ||
offset: queue_offset, | ||
topic_request_header: Some(TopicRequestHeader { | ||
rpc_request_header: Some(RpcRequestHeader { | ||
broker_name: Some(broker_name.clone()), | ||
..Default::default() | ||
}), | ||
lo: None, | ||
}), | ||
}; | ||
//client_instance.mq_client_api_impl.as_mut().unwrap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete Implementation of ack_async
Function
The ack_async
function is currently incomplete, as it ends with unimplemented!("ackAsync");
. Please complete the implementation before merging to ensure the acknowledgment process functions correctly.
let extra_info = message | ||
.get_property(&CheetahString::from_static_str( | ||
MessageConst::PROPERTY_POP_CK, | ||
)) | ||
.unwrap_or_default(); | ||
let extra_info_strs = ExtraInfoUtil::split(extra_info.as_str()); | ||
if extra_info_strs.is_err() { | ||
error!("ackAsync error: {}", extra_info_strs.unwrap_err()); | ||
return; | ||
} | ||
let extra_info_strs = extra_info_strs.unwrap(); | ||
let queue_id = ExtraInfoUtil::get_queue_id(extra_info_strs.as_slice()); | ||
if queue_id.is_err() { | ||
error!("ackAsync error: {}", queue_id.unwrap_err()); | ||
return; | ||
} | ||
let queue_id = queue_id.unwrap(); | ||
let queue_offset = ExtraInfoUtil::get_queue_offset(extra_info_strs.as_slice()); | ||
if queue_offset.is_err() { | ||
error!("ackAsync error: {}", queue_offset.unwrap_err()); | ||
return; | ||
} | ||
let queue_offset = queue_offset.unwrap(); | ||
let broker_name = CheetahString::from( | ||
ExtraInfoUtil::get_broker_name(extra_info_strs.as_slice()).unwrap_or_default(), | ||
); | ||
let topic = message.get_topic(); | ||
|
||
let client_instance = self.client_instance.as_mut().unwrap(); | ||
let des_broker_name = if !broker_name.is_empty() | ||
&& broker_name.starts_with(mix_all::LOGICAL_QUEUE_MOCK_BROKER_PREFIX) | ||
{ | ||
let mq = self | ||
.client_config | ||
.queue_with_namespace(MessageQueue::from_parts( | ||
topic, | ||
broker_name.clone(), | ||
queue_id, | ||
)); | ||
client_instance | ||
.get_broker_name_from_message_queue(&mq) | ||
.await | ||
} else { | ||
broker_name.clone() | ||
}; | ||
|
||
let mut find_broker_result = client_instance | ||
.find_broker_address_in_subscribe(&des_broker_name, mix_all::MASTER_ID, true) | ||
.await; | ||
if find_broker_result.is_none() { | ||
client_instance | ||
.update_topic_route_info_from_name_server_topic(topic) | ||
.await; | ||
find_broker_result = client_instance | ||
.find_broker_address_in_subscribe(&des_broker_name, mix_all::MASTER_ID, true) | ||
.await; | ||
} | ||
if find_broker_result.is_none() { | ||
error!("The broker[{}] not exist", des_broker_name); | ||
return; | ||
} | ||
|
||
let request_header = AckMessageRequestHeader { | ||
consumer_group: consumer_group.clone(), | ||
topic: CheetahString::from_string( | ||
ExtraInfoUtil::get_real_topic(extra_info_strs.as_slice(), topic, consumer_group) | ||
.unwrap_or_default(), | ||
), | ||
queue_id, | ||
extra_info, | ||
offset: queue_offset, | ||
topic_request_header: Some(TopicRequestHeader { | ||
rpc_request_header: Some(RpcRequestHeader { | ||
broker_name: Some(broker_name.clone()), | ||
..Default::default() | ||
}), | ||
lo: None, | ||
}), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve Error Handling and Propagate Errors in ack_async
The ack_async
function logs errors and returns early upon encountering errors (e.g., lines 1380–1381, 1386–1387, 1392–1393, 1431–1432), but does not inform the caller of these failures. Consider changing the function signature to return Result<()>
to propagate errors to the caller, enabling better upstream error handling and reducing silent failures.
); | ||
let topic = message.get_topic(); | ||
|
||
let client_instance = self.client_instance.as_mut().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid Potential Panic by Handling Option
Properly
Using unwrap()
on self.client_instance
may lead to a panic if self.client_instance
is None
. To prevent potential runtime panics, consider handling the None
case explicitly or ensuring that self.client_instance
is always initialized before invoking this method.
Apply this diff to handle the Option
safely:
-let client_instance = self.client_instance.as_mut().unwrap();
+let client_instance = match self.client_instance.as_mut() {
+ Some(instance) => instance,
+ None => {
+ error!("client_instance is None in ack_async");
+ return;
+ }
+};
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let client_instance = self.client_instance.as_mut().unwrap(); | |
let client_instance = match self.client_instance.as_mut() { | |
Some(instance) => instance, | |
None => { | |
error!("client_instance is None in ack_async"); | |
return; | |
} | |
}; |
let mut find_broker_result = client_instance | ||
.find_broker_address_in_subscribe(&des_broker_name, mix_all::MASTER_ID, true) | ||
.await; | ||
if find_broker_result.is_none() { | ||
client_instance | ||
.update_topic_route_info_from_name_server_topic(topic) | ||
.await; | ||
find_broker_result = client_instance | ||
.find_broker_address_in_subscribe(&des_broker_name, mix_all::MASTER_ID, true) | ||
.await; | ||
} | ||
if find_broker_result.is_none() { | ||
error!("The broker[{}] not exist", des_broker_name); | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle Missing Broker Address Gracefully
In the case where find_broker_result
is None
after attempting to update the topic route info (lines 1422–1433), the function logs an error and returns without notifying the caller. Consider returning an error to inform the caller about the missing broker, which would enable upstream handling of this situation.
Apply this diff to propagate the error:
if find_broker_result.is_none() {
error!("The broker[{}] not exist", des_broker_name);
- return;
+ return Err(MQClientError::new(format!(
+ "The broker [{}] does not exist",
+ des_broker_name
+ )));
}
And update the function signature:
-pub(crate) async fn ack_async(&mut self, message: &MessageExt, consumer_group: &CheetahString) {
+pub(crate) async fn ack_async(&mut self, message: &MessageExt, consumer_group: &CheetahString) -> Result<()> {
Committable suggestion skipped: line range outside the PR's diff.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1827 +/- ##
==========================================
- Coverage 28.29% 28.26% -0.04%
==========================================
Files 473 473
Lines 65794 65871 +77
==========================================
Hits 18617 18617
- Misses 47177 47254 +77 ☔ View full report in Codecov by Sentry. |
Which Issue(s) This PR Fixes(Closes)
Fixes #1813
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Bug Fixes