diff --git a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs index e4484128..6f20278b 100644 --- a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs @@ -79,6 +79,7 @@ use crate::consumer::mq_consumer_inner::MQConsumerInner; use crate::consumer::mq_consumer_inner::MQConsumerInnerImpl; use crate::consumer::pop_callback::DefaultPopCallback; use crate::consumer::pop_result::PopResult; +use crate::consumer::pop_status::PopStatus; use crate::consumer::pull_callback::DefaultPullCallback; use crate::consumer::store::local_file_offset_store::LocalFileOffsetStore; use crate::consumer::store::offset_store::OffsetStore; @@ -87,6 +88,7 @@ use crate::consumer::store::remote_broker_offset_store::RemoteBrokerOffsetStore; use crate::factory::mq_client_instance::MQClientInstance; use crate::hook::consume_message_context::ConsumeMessageContext; use crate::hook::consume_message_hook::ConsumeMessageHook; +use crate::hook::filter_message_context::FilterMessageContext; use crate::hook::filter_message_hook::FilterMessageHook; use crate::implementation::communication_mode::CommunicationMode; use crate::implementation::mq_client_manager::MQClientManager; @@ -1110,12 +1112,54 @@ impl DefaultMQPushConsumerImpl { } } - pub(crate) fn process_pop_result( + pub(crate) async fn process_pop_result( &mut self, - pop_result: &PopResult, + mut pop_result: PopResult, subscription_data: &SubscriptionData, ) -> PopResult { - unimplemented!("processPopResult") + if pop_result.pop_status == PopStatus::Found { + let msg_vec = pop_result.msg_found_list.take().unwrap_or_default(); + let msg_list_filter_again = + if !subscription_data.tags_set.is_empty() && !subscription_data.class_filter_mode { + let mut msg_vec_again = Vec::with_capacity(msg_vec.len()); + for msg in msg_vec { + if let Some(ref tag) = msg.get_tags() { + if subscription_data.tags_set.contains(tag.as_str()) { + msg_vec_again.push(msg); + } + } + } + msg_vec_again + } else { + msg_vec + }; + let msg_list_filter_again = msg_list_filter_again + .into_iter() + .map(MessageClientExt::new) + .collect::>(); + if !self.filter_message_hook_list.is_empty() { + let context = FilterMessageContext { + unit_mode: self.consumer_config.unit_mode, + msg_list: &msg_list_filter_again, + ..Default::default() + }; + for hook in self.filter_message_hook_list.iter() { + hook.filter_message(&context); + } + } + let mut final_msg_list = Vec::with_capacity(msg_list_filter_again.len()); + for msg in msg_list_filter_again { + if msg.message_ext_inner.reconsume_times > self.get_max_reconsume_times() { + let consumer_group = self.consumer_config.consumer_group().clone(); + self.ack_async(&msg.message_ext_inner, &consumer_group) + .await; + } else { + final_msg_list.push(msg.message_ext_inner); + } + } + pop_result.msg_found_list = Some(final_msg_list); + } + pop_result } #[inline] diff --git a/rocketmq-client/src/consumer/pop_callback.rs b/rocketmq-client/src/consumer/pop_callback.rs index 86c8f3ea..606dd8f8 100644 --- a/rocketmq-client/src/consumer/pop_callback.rs +++ b/rocketmq-client/src/consumer/pop_callback.rs @@ -103,10 +103,16 @@ impl PopCallback for DefaultPopCallback { let subscription_data = self.subscription_data.take().unwrap(); let pop_request = self.pop_request.take().unwrap(); - push_consumer_impl.process_pop_result(&pop_result, &subscription_data); + let pop_result = push_consumer_impl + .process_pop_result(pop_result, &subscription_data) + .await; match pop_result.pop_status { PopStatus::Found => { - if pop_result.msg_found_list.is_empty() { + if pop_result + .msg_found_list + .as_ref() + .map_or(true, |value| value.is_empty()) + { push_consumer_impl .execute_pop_request_immediately(pop_request) .await; @@ -116,7 +122,7 @@ impl PopCallback for DefaultPopCallback { .as_mut() .unwrap() .submit_pop_consume_request( - pop_result.msg_found_list, + pop_result.msg_found_list.unwrap_or_default(), pop_request.get_pop_process_queue(), pop_request.get_message_queue(), ) diff --git a/rocketmq-client/src/consumer/pop_result.rs b/rocketmq-client/src/consumer/pop_result.rs index 1b7de0cf..b371ce96 100644 --- a/rocketmq-client/src/consumer/pop_result.rs +++ b/rocketmq-client/src/consumer/pop_result.rs @@ -22,7 +22,7 @@ use crate::consumer::pop_status::PopStatus; #[derive(Default, Clone)] pub struct PopResult { - pub msg_found_list: Vec, + pub msg_found_list: Option>, pub pop_status: PopStatus, pub pop_time: u64, pub invisible_time: u64, @@ -35,7 +35,7 @@ impl Display for PopResult { f, "PopResult [msg_found_list={}, pop_status={}, pop_time={}, invisible_time={}, \ rest_num={}]", - self.msg_found_list.len(), + self.msg_found_list.as_ref().map_or(0, |value| value.len()), self.pop_status, self.pop_time, self.invisible_time, @@ -58,7 +58,7 @@ mod tests { #[test] fn display_pop_result_with_empty_msg_list() { let pop_result = PopResult { - msg_found_list: vec![], + msg_found_list: Some(vec![]), pop_status: PopStatus::Found, pop_time: 123456789, invisible_time: 1000, @@ -74,7 +74,7 @@ mod tests { #[test] fn display_pop_result_with_non_empty_msg_list() { let pop_result = PopResult { - msg_found_list: vec![create_message_ext(), create_message_ext()], + msg_found_list: Some(vec![create_message_ext(), create_message_ext()]), pop_status: PopStatus::NoNewMsg, pop_time: 987654321, invisible_time: 2000, @@ -90,7 +90,7 @@ mod tests { #[test] fn display_pop_result_with_polling_full_status() { let pop_result = PopResult { - msg_found_list: vec![create_message_ext()], + msg_found_list: Some(vec![create_message_ext()]), pop_status: PopStatus::PollingFull, pop_time: 111111111, invisible_time: 3000, @@ -106,7 +106,7 @@ mod tests { #[test] fn display_pop_result_with_polling_not_found_status() { let pop_result = PopResult { - msg_found_list: vec![], + msg_found_list: Some(vec![]), pop_status: PopStatus::PollingNotFound, pop_time: 222222222, invisible_time: 4000, diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index 93f2aa3d..af08728b 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -1471,7 +1471,7 @@ impl MQClientAPIImpl { }; let mut pop_result = PopResult { pop_status, - msg_found_list, + msg_found_list: Some(msg_found_list), ..Default::default() }; let response_header = response @@ -1506,9 +1506,16 @@ impl MQClientAPIImpl { .unwrap_or(&CheetahString::from_slice("")), ) .map_err(RemotingError)?; - let sort_map = build_queue_offset_sorted_map(topic.as_str(), &pop_result.msg_found_list); + let sort_map = build_queue_offset_sorted_map( + topic.as_str(), + pop_result.msg_found_list.as_ref().map_or(&[], |v| v), + ); let mut map = HashMap::with_capacity(5); - for message in &mut pop_result.msg_found_list { + for message in pop_result + .msg_found_list + .as_mut() + .map_or(&mut vec![], |v| v) + { if start_offset_info.is_empty() { let key = CheetahString::from_string(format!( "{}{}",