Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1811]🤡Implement DefaultMQPushConsumerImpl#process_pop_result method function🚀 #1814

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
use crate::consumer::mq_consumer_inner::MQConsumerInnerImpl;
use crate::consumer::pop_callback::DefaultPopCallback;
use crate::consumer::pop_result::PopResult;
use crate::consumer::pop_status::PopStatus;
use crate::consumer::pull_callback::DefaultPullCallback;
use crate::consumer::store::local_file_offset_store::LocalFileOffsetStore;
use crate::consumer::store::offset_store::OffsetStore;
Expand All @@ -87,6 +88,7 @@
use crate::factory::mq_client_instance::MQClientInstance;
use crate::hook::consume_message_context::ConsumeMessageContext;
use crate::hook::consume_message_hook::ConsumeMessageHook;
use crate::hook::filter_message_context::FilterMessageContext;
use crate::hook::filter_message_hook::FilterMessageHook;
use crate::implementation::communication_mode::CommunicationMode;
use crate::implementation::mq_client_manager::MQClientManager;
Expand Down Expand Up @@ -1110,12 +1112,54 @@
}
}

pub(crate) fn process_pop_result(
pub(crate) async fn process_pop_result(

Check warning on line 1115 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1115

Added line #L1115 was not covered by tests
&mut self,
pop_result: &PopResult,
mut pop_result: PopResult,

Check warning on line 1117 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1117

Added line #L1117 was not covered by tests
subscription_data: &SubscriptionData,
) -> PopResult {
unimplemented!("processPopResult")
if pop_result.pop_status == PopStatus::Found {
let msg_vec = pop_result.msg_found_list.take().unwrap_or_default();
let msg_list_filter_again =
if !subscription_data.tags_set.is_empty() && !subscription_data.class_filter_mode {
let mut msg_vec_again = Vec::with_capacity(msg_vec.len());
for msg in msg_vec {
if let Some(ref tag) = msg.get_tags() {
if subscription_data.tags_set.contains(tag.as_str()) {
msg_vec_again.push(msg);
}
}

Check warning on line 1130 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1120-L1130

Added lines #L1120 - L1130 were not covered by tests
}
Comment on lines +1126 to +1131
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Include messages without tags during tag filtering if appropriate.

In the tag filtering logic, messages without tags are currently being excluded because msg.get_tags() returns None. If it is intended to process messages without tags when the subscription has tag filters, consider modifying the logic to include these messages. This ensures no valid messages are unintentionally skipped.

Apply this diff to include messages without tags when applicable:

 for msg in msg_vec {
     if let Some(ref tag) = msg.get_tags() {
         if subscription_data.tags_set.contains(tag.as_str()) {
             msg_vec_again.push(msg);
         }
+    } else {
+        // Include messages without tags if appropriate
+        msg_vec_again.push(msg);
     }
 }

Committable suggestion skipped: line range outside the PR's diff.

msg_vec_again

Check warning on line 1132 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1132

Added line #L1132 was not covered by tests
} else {
msg_vec

Check warning on line 1134 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1134

Added line #L1134 was not covered by tests
};
let msg_list_filter_again = msg_list_filter_again
.into_iter()
.map(MessageClientExt::new)
.collect::<Vec<_>>();
if !self.filter_message_hook_list.is_empty() {
let context = FilterMessageContext {
unit_mode: self.consumer_config.unit_mode,
msg_list: &msg_list_filter_again,
..Default::default()
};
for hook in self.filter_message_hook_list.iter() {
hook.filter_message(&context);
}
}
let mut final_msg_list = Vec::with_capacity(msg_list_filter_again.len());
for msg in msg_list_filter_again {
if msg.message_ext_inner.reconsume_times > self.get_max_reconsume_times() {
let consumer_group = self.consumer_config.consumer_group().clone();
self.ack_async(&msg.message_ext_inner, &consumer_group)
.await;
} else {
final_msg_list.push(msg.message_ext_inner);
}

Check warning on line 1158 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1136-L1158

Added lines #L1136 - L1158 were not covered by tests
}
pop_result.msg_found_list = Some(final_msg_list);
}
pop_result

Check warning on line 1162 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1160-L1162

Added lines #L1160 - L1162 were not covered by tests
}

#[inline]
Expand Down
12 changes: 9 additions & 3 deletions rocketmq-client/src/consumer/pop_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,16 @@
let subscription_data = self.subscription_data.take().unwrap();
let pop_request = self.pop_request.take().unwrap();

push_consumer_impl.process_pop_result(&pop_result, &subscription_data);
let pop_result = push_consumer_impl
.process_pop_result(pop_result, &subscription_data)
.await;

Check warning on line 108 in rocketmq-client/src/consumer/pop_callback.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pop_callback.rs#L106-L108

Added lines #L106 - L108 were not covered by tests
match pop_result.pop_status {
PopStatus::Found => {
if pop_result.msg_found_list.is_empty() {
if pop_result
.msg_found_list
.as_ref()
.map_or(true, |value| value.is_empty())

Check warning on line 114 in rocketmq-client/src/consumer/pop_callback.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pop_callback.rs#L111-L114

Added lines #L111 - L114 were not covered by tests
{
push_consumer_impl
.execute_pop_request_immediately(pop_request)
.await;
Expand All @@ -116,7 +122,7 @@
.as_mut()
.unwrap()
.submit_pop_consume_request(
pop_result.msg_found_list,
pop_result.msg_found_list.unwrap_or_default(),

Check warning on line 125 in rocketmq-client/src/consumer/pop_callback.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pop_callback.rs#L125

Added line #L125 was not covered by tests
pop_request.get_pop_process_queue(),
pop_request.get_message_queue(),
)
Expand Down
12 changes: 6 additions & 6 deletions rocketmq-client/src/consumer/pop_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::consumer::pop_status::PopStatus;

#[derive(Default, Clone)]
pub struct PopResult {
pub msg_found_list: Vec<MessageExt>,
pub msg_found_list: Option<Vec<MessageExt>>,
pub pop_status: PopStatus,
pub pop_time: u64,
pub invisible_time: u64,
Expand All @@ -35,7 +35,7 @@ impl Display for PopResult {
f,
"PopResult [msg_found_list={}, pop_status={}, pop_time={}, invisible_time={}, \
rest_num={}]",
self.msg_found_list.len(),
self.msg_found_list.as_ref().map_or(0, |value| value.len()),
self.pop_status,
self.pop_time,
self.invisible_time,
Expand All @@ -58,7 +58,7 @@ mod tests {
#[test]
fn display_pop_result_with_empty_msg_list() {
let pop_result = PopResult {
msg_found_list: vec![],
msg_found_list: Some(vec![]),
pop_status: PopStatus::Found,
pop_time: 123456789,
invisible_time: 1000,
Expand All @@ -74,7 +74,7 @@ mod tests {
#[test]
fn display_pop_result_with_non_empty_msg_list() {
let pop_result = PopResult {
msg_found_list: vec![create_message_ext(), create_message_ext()],
msg_found_list: Some(vec![create_message_ext(), create_message_ext()]),
pop_status: PopStatus::NoNewMsg,
pop_time: 987654321,
invisible_time: 2000,
Expand All @@ -90,7 +90,7 @@ mod tests {
#[test]
fn display_pop_result_with_polling_full_status() {
let pop_result = PopResult {
msg_found_list: vec![create_message_ext()],
msg_found_list: Some(vec![create_message_ext()]),
pop_status: PopStatus::PollingFull,
pop_time: 111111111,
invisible_time: 3000,
Expand All @@ -106,7 +106,7 @@ mod tests {
#[test]
fn display_pop_result_with_polling_not_found_status() {
let pop_result = PopResult {
msg_found_list: vec![],
msg_found_list: Some(vec![]),
pop_status: PopStatus::PollingNotFound,
pop_time: 222222222,
invisible_time: 4000,
Expand Down
13 changes: 10 additions & 3 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@
};
let mut pop_result = PopResult {
pop_status,
msg_found_list,
msg_found_list: Some(msg_found_list),

Check warning on line 1474 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1474

Added line #L1474 was not covered by tests
..Default::default()
};
let response_header = response
Expand Down Expand Up @@ -1506,9 +1506,16 @@
.unwrap_or(&CheetahString::from_slice("")),
)
.map_err(RemotingError)?;
let sort_map = build_queue_offset_sorted_map(topic.as_str(), &pop_result.msg_found_list);
let sort_map = build_queue_offset_sorted_map(
topic.as_str(),
pop_result.msg_found_list.as_ref().map_or(&[], |v| v),
);

Check warning on line 1512 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1509-L1512

Added lines #L1509 - L1512 were not covered by tests
let mut map = HashMap::with_capacity(5);
for message in &mut pop_result.msg_found_list {
for message in pop_result
.msg_found_list
.as_mut()
.map_or(&mut vec![], |v| v)

Check warning on line 1517 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1514-L1517

Added lines #L1514 - L1517 were not covered by tests
{
Comment on lines +1514 to +1518
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix mutable reference to temporary value in loop over msg_found_list.

Creating a mutable reference to a temporary value using map_or(&mut vec![], |v| v) is not allowed in Rust and will result in a compilation error. To fix this issue, handle the None case separately to avoid taking a mutable reference to a temporary.

Apply this diff to correct the loop:

- for message in pop_result
-     .msg_found_list
-     .as_mut()
-     .map_or(&mut vec![], |v| v)
- {
+ if let Some(messages) = pop_result.msg_found_list.as_mut() {
+     for message in messages {
+         // Process each message
+     }
+ }

This ensures that you only iterate over the messages if msg_found_list is Some, preventing illegal mutable references to temporary values.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for message in pop_result
.msg_found_list
.as_mut()
.map_or(&mut vec![], |v| v)
{
if let Some(messages) = pop_result.msg_found_list.as_mut() {
for message in messages {

if start_offset_info.is_empty() {
let key = CheetahString::from_string(format!(
"{}{}",
Expand Down
Loading