Skip to content

Commit

Permalink
[ISSUE #1811]🤡Implement DefaultMQPushConsumerImpl#process_pop_result …
Browse files Browse the repository at this point in the history
…method function🚀
  • Loading branch information
mxsm committed Dec 16, 2024
1 parent f3000f5 commit 06305e5
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use crate::consumer::mq_consumer_inner::MQConsumerInner;
use crate::consumer::mq_consumer_inner::MQConsumerInnerImpl;
use crate::consumer::pop_callback::DefaultPopCallback;
use crate::consumer::pop_result::PopResult;
use crate::consumer::pop_status::PopStatus;
use crate::consumer::pull_callback::DefaultPullCallback;
use crate::consumer::store::local_file_offset_store::LocalFileOffsetStore;
use crate::consumer::store::offset_store::OffsetStore;
Expand All @@ -87,6 +88,7 @@ use crate::consumer::store::remote_broker_offset_store::RemoteBrokerOffsetStore;
use crate::factory::mq_client_instance::MQClientInstance;
use crate::hook::consume_message_context::ConsumeMessageContext;
use crate::hook::consume_message_hook::ConsumeMessageHook;
use crate::hook::filter_message_context::FilterMessageContext;
use crate::hook::filter_message_hook::FilterMessageHook;
use crate::implementation::communication_mode::CommunicationMode;
use crate::implementation::mq_client_manager::MQClientManager;
Expand Down Expand Up @@ -1110,12 +1112,54 @@ impl DefaultMQPushConsumerImpl {
}
}

pub(crate) fn process_pop_result(
pub(crate) async fn process_pop_result(

Check warning on line 1115 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1115

Added line #L1115 was not covered by tests
&mut self,
pop_result: &PopResult,
mut pop_result: PopResult,

Check warning on line 1117 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1117

Added line #L1117 was not covered by tests
subscription_data: &SubscriptionData,
) -> PopResult {
unimplemented!("processPopResult")
if pop_result.pop_status == PopStatus::Found {
let msg_vec = pop_result.msg_found_list.take().unwrap_or_default();
let msg_list_filter_again =
if !subscription_data.tags_set.is_empty() && !subscription_data.class_filter_mode {
let mut msg_vec_again = Vec::with_capacity(msg_vec.len());
for msg in msg_vec {
if let Some(ref tag) = msg.get_tags() {
if subscription_data.tags_set.contains(tag.as_str()) {
msg_vec_again.push(msg);
}
}

Check warning on line 1130 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1120-L1130

Added lines #L1120 - L1130 were not covered by tests
}
msg_vec_again

Check warning on line 1132 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1132

Added line #L1132 was not covered by tests
} else {
msg_vec

Check warning on line 1134 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1134

Added line #L1134 was not covered by tests
};
let msg_list_filter_again = msg_list_filter_again
.into_iter()
.map(MessageClientExt::new)
.collect::<Vec<_>>();
if !self.filter_message_hook_list.is_empty() {
let context = FilterMessageContext {
unit_mode: self.consumer_config.unit_mode,
msg_list: &msg_list_filter_again,
..Default::default()
};
for hook in self.filter_message_hook_list.iter() {
hook.filter_message(&context);
}
}
let mut final_msg_list = Vec::with_capacity(msg_list_filter_again.len());
for msg in msg_list_filter_again {
if msg.message_ext_inner.reconsume_times > self.get_max_reconsume_times() {
let consumer_group = self.consumer_config.consumer_group().clone();
self.ack_async(&msg.message_ext_inner, &consumer_group)
.await;
} else {
final_msg_list.push(msg.message_ext_inner);
}

Check warning on line 1158 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1136-L1158

Added lines #L1136 - L1158 were not covered by tests
}
pop_result.msg_found_list = Some(final_msg_list);
}
pop_result

Check warning on line 1162 in rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs#L1160-L1162

Added lines #L1160 - L1162 were not covered by tests
}

#[inline]
Expand Down
12 changes: 9 additions & 3 deletions rocketmq-client/src/consumer/pop_callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,16 @@ impl PopCallback for DefaultPopCallback {
let subscription_data = self.subscription_data.take().unwrap();
let pop_request = self.pop_request.take().unwrap();

push_consumer_impl.process_pop_result(&pop_result, &subscription_data);
let pop_result = push_consumer_impl
.process_pop_result(pop_result, &subscription_data)
.await;

Check warning on line 108 in rocketmq-client/src/consumer/pop_callback.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pop_callback.rs#L106-L108

Added lines #L106 - L108 were not covered by tests
match pop_result.pop_status {
PopStatus::Found => {
if pop_result.msg_found_list.is_empty() {
if pop_result
.msg_found_list
.as_ref()
.map_or(true, |value| value.is_empty())

Check warning on line 114 in rocketmq-client/src/consumer/pop_callback.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pop_callback.rs#L111-L114

Added lines #L111 - L114 were not covered by tests
{
push_consumer_impl
.execute_pop_request_immediately(pop_request)
.await;
Expand All @@ -116,7 +122,7 @@ impl PopCallback for DefaultPopCallback {
.as_mut()
.unwrap()
.submit_pop_consume_request(
pop_result.msg_found_list,
pop_result.msg_found_list.unwrap_or_default(),

Check warning on line 125 in rocketmq-client/src/consumer/pop_callback.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/pop_callback.rs#L125

Added line #L125 was not covered by tests
pop_request.get_pop_process_queue(),
pop_request.get_message_queue(),
)
Expand Down
12 changes: 6 additions & 6 deletions rocketmq-client/src/consumer/pop_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::consumer::pop_status::PopStatus;

#[derive(Default, Clone)]
pub struct PopResult {
pub msg_found_list: Vec<MessageExt>,
pub msg_found_list: Option<Vec<MessageExt>>,
pub pop_status: PopStatus,
pub pop_time: u64,
pub invisible_time: u64,
Expand All @@ -35,7 +35,7 @@ impl Display for PopResult {
f,
"PopResult [msg_found_list={}, pop_status={}, pop_time={}, invisible_time={}, \
rest_num={}]",
self.msg_found_list.len(),
self.msg_found_list.as_ref().map_or(0, |value| value.len()),
self.pop_status,
self.pop_time,
self.invisible_time,
Expand All @@ -58,7 +58,7 @@ mod tests {
#[test]
fn display_pop_result_with_empty_msg_list() {
let pop_result = PopResult {
msg_found_list: vec![],
msg_found_list: Some(vec![]),
pop_status: PopStatus::Found,
pop_time: 123456789,
invisible_time: 1000,
Expand All @@ -74,7 +74,7 @@ mod tests {
#[test]
fn display_pop_result_with_non_empty_msg_list() {
let pop_result = PopResult {
msg_found_list: vec![create_message_ext(), create_message_ext()],
msg_found_list: Some(vec![create_message_ext(), create_message_ext()]),
pop_status: PopStatus::NoNewMsg,
pop_time: 987654321,
invisible_time: 2000,
Expand All @@ -90,7 +90,7 @@ mod tests {
#[test]
fn display_pop_result_with_polling_full_status() {
let pop_result = PopResult {
msg_found_list: vec![create_message_ext()],
msg_found_list: Some(vec![create_message_ext()]),
pop_status: PopStatus::PollingFull,
pop_time: 111111111,
invisible_time: 3000,
Expand All @@ -106,7 +106,7 @@ mod tests {
#[test]
fn display_pop_result_with_polling_not_found_status() {
let pop_result = PopResult {
msg_found_list: vec![],
msg_found_list: Some(vec![]),
pop_status: PopStatus::PollingNotFound,
pop_time: 222222222,
invisible_time: 4000,
Expand Down
13 changes: 10 additions & 3 deletions rocketmq-client/src/implementation/mq_client_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1471,7 +1471,7 @@ impl MQClientAPIImpl {
};
let mut pop_result = PopResult {
pop_status,
msg_found_list,
msg_found_list: Some(msg_found_list),

Check warning on line 1474 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1474

Added line #L1474 was not covered by tests
..Default::default()
};
let response_header = response
Expand Down Expand Up @@ -1506,9 +1506,16 @@ impl MQClientAPIImpl {
.unwrap_or(&CheetahString::from_slice("")),
)
.map_err(RemotingError)?;
let sort_map = build_queue_offset_sorted_map(topic.as_str(), &pop_result.msg_found_list);
let sort_map = build_queue_offset_sorted_map(
topic.as_str(),
pop_result.msg_found_list.as_ref().map_or(&[], |v| v),
);

Check warning on line 1512 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1509-L1512

Added lines #L1509 - L1512 were not covered by tests
let mut map = HashMap::with_capacity(5);
for message in &mut pop_result.msg_found_list {
for message in pop_result
.msg_found_list
.as_mut()
.map_or(&mut vec![], |v| v)

Check warning on line 1517 in rocketmq-client/src/implementation/mq_client_api_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/implementation/mq_client_api_impl.rs#L1514-L1517

Added lines #L1514 - L1517 were not covered by tests
{
if start_offset_info.is_empty() {
let key = CheetahString::from_string(format!(
"{}{}",
Expand Down

0 comments on commit 06305e5

Please sign in to comment.