Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1004]🚀optimize and improve consume logic⚡️ #1008

Merged
merged 3 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions rocketmq-client/src/consumer/consumer_impl/process_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_common::WeakCellWrapper;
use rocketmq_remoting::protocol::body::process_queue_info::ProcessQueueInfo;
use rocketmq_rust::RocketMQTokioRwLock;
use tokio::sync::RwLock;

use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl;
Expand All @@ -56,7 +57,7 @@
Arc<RwLock<std::collections::BTreeMap<i64, ArcRefCellWrapper<MessageClientExt>>>>,
pub(crate) msg_count: Arc<AtomicU64>,
pub(crate) msg_size: Arc<AtomicU64>,
pub(crate) consume_lock: Arc<RwLock<()>>,
pub(crate) consume_lock: Arc<RocketMQTokioRwLock<()>>,

Check warning on line 60 in rocketmq-client/src/consumer/consumer_impl/process_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/process_queue.rs#L60

Added line #L60 was not covered by tests
pub(crate) consuming_msg_orderly_tree_map:
Arc<RwLock<std::collections::BTreeMap<i64, ArcRefCellWrapper<MessageClientExt>>>>,
pub(crate) try_unlock_times: Arc<AtomicI64>,
Expand All @@ -77,7 +78,7 @@
msg_tree_map: Arc::new(RwLock::new(std::collections::BTreeMap::new())),
msg_count: Arc::new(AtomicU64::new(0)),
msg_size: Arc::new(AtomicU64::new(0)),
consume_lock: Arc::new(RwLock::new(())),
consume_lock: Arc::new(RocketMQTokioRwLock::new(())),

Check warning on line 81 in rocketmq-client/src/consumer/consumer_impl/process_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/process_queue.rs#L81

Added line #L81 was not covered by tests
consuming_msg_orderly_tree_map: Arc::new(
RwLock::new(std::collections::BTreeMap::new()),
),
Expand Down Expand Up @@ -124,6 +125,10 @@
> *REBALANCE_LOCK_MAX_LIVE_TIME
}

pub(crate) fn inc_try_unlock_times(&self) {
self.try_unlock_times.fetch_add(1, Ordering::AcqRel);
}

Check warning on line 130 in rocketmq-client/src/consumer/consumer_impl/process_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/process_queue.rs#L128-L130

Added lines #L128 - L130 were not covered by tests

pub(crate) async fn clean_expired_msg(
&self,
push_consumer: Option<WeakCellWrapper<DefaultMQPushConsumerImpl>>,
Expand Down Expand Up @@ -308,6 +313,11 @@
.store(last_pull_timestamp, std::sync::atomic::Ordering::Release);
}

pub(crate) fn set_last_lock_timestamp(&self, last_lock_timestamp: u64) {
self.last_lock_timestamp
.store(last_lock_timestamp, std::sync::atomic::Ordering::Release);
}

Check warning on line 319 in rocketmq-client/src/consumer/consumer_impl/process_queue.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/process_queue.rs#L316-L319

Added lines #L316 - L319 were not covered by tests

pub fn msg_count(&self) -> u64 {
self.msg_count.load(std::sync::atomic::Ordering::Acquire)
}
Expand Down
15 changes: 14 additions & 1 deletion rocketmq-client/src/consumer/consumer_impl/re_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub trait RebalanceLocal {
) -> bool;

fn remove_unnecessary_pop_message_queue(&self, mq: MessageQueue, pq: ProcessQueue) -> bool;

fn remove_unnecessary_pop_message_queue_pop(
&self,
_mq: MessageQueue,
Expand All @@ -53,22 +54,34 @@ pub trait RebalanceLocal {
}

fn consume_type(&self) -> ConsumeType;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider Changing &self to &mut self in remove_dirty_offset

The method remove_dirty_offset may modify internal state when removing offsets. If this is the case, it should take a mutable reference &mut self instead of an immutable reference &self to reflect that it mutates the receiver.

async fn remove_dirty_offset(&self, mq: &MessageQueue);

async fn compute_pull_from_where_with_exception(&mut self, mq: &MessageQueue) -> Result<i64>;

async fn compute_pull_from_where(&mut self, mq: &MessageQueue) -> i64;

fn get_consume_init_mode(&self) -> i32;

async fn dispatch_pull_request(&self, pull_request_list: Vec<PullRequest>, delay: u64);

fn dispatch_pop_pull_request(&self, pull_request_list: Vec<PopRequest>, delay: u64);

fn create_process_queue(&self) -> ProcessQueue;

fn create_pop_process_queue(&self) -> PopProcessQueue;

async fn remove_process_queue(&mut self, mq: &MessageQueue);
fn unlock(&self, mq: MessageQueue, oneway: bool);

async fn unlock(&mut self, mq: &MessageQueue, oneway: bool);

fn lock_all(&self);

fn unlock_all(&self, oneway: bool);

async fn do_rebalance(&mut self, is_order: bool) -> bool;

fn client_rebalance(&mut self, topic: &str) -> bool;

fn destroy(&mut self);
}
204 changes: 167 additions & 37 deletions rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

use std::collections::HashMap;
use std::collections::HashSet;
use std::ops::DerefMut;
use std::sync::Arc;

use rocketmq_common::common::message::message_queue::MessageQueue;
use rocketmq_common::common::mix_all;
use rocketmq_common::ArcRefCellWrapper;
use rocketmq_common::TimeUtils::get_current_millis;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Unused import get_current_millis

The get_current_millis function from TimeUtils is imported but not used in this context. Ensure that it's necessary; otherwise, consider removing it to keep the code clean.

Apply this diff to remove the unused import:

-use rocketmq_common::TimeUtils::get_current_millis;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use rocketmq_common::TimeUtils::get_current_millis;

use rocketmq_common::WeakCellWrapper;
use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Unused import LockBatchRequestBody

The import LockBatchRequestBody is added but not used in the code provided. If it's intended for future use, consider adding a comment explaining its purpose. Otherwise, remove the unused import.

Apply this diff to remove the unused import:

-use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody;

use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType;
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel;
use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData;
Expand Down Expand Up @@ -173,45 +176,55 @@ where
let mut changed = false;
let mut remove_queue_map = HashMap::new();
let process_queue_table_cloned = self.process_queue_table.clone();
let mut process_queue_table = process_queue_table_cloned.write().await;
// Drop process queues no longer belong to me
for (mq, pq) in process_queue_table.iter() {
if mq.get_topic() == topic {
if !mq_set.contains(mq) {
pq.set_dropped(true);
remove_queue_map.insert(mq.clone(), pq.clone());
} else if pq.is_pull_expired() {
if let Some(sub_rebalance) = self.sub_rebalance_impl.as_mut().unwrap().upgrade()
{
if sub_rebalance.consume_type() == ConsumeType::ConsumePassively {
pq.set_dropped(true);
remove_queue_map.insert(mq.clone(), pq.clone());
error!(
"[BUG]doRebalance, {:?}, try remove unnecessary mq, {}, because \
pull is pause, so try to fixed it",
self.consumer_group,
mq.get_topic()
);
{
let process_queue_table = process_queue_table_cloned.read().await;
// Drop process queues no longer belong to me
for (mq, pq) in process_queue_table.iter() {
if mq.get_topic() == topic {
if !mq_set.contains(mq) {
pq.set_dropped(true);
remove_queue_map.insert(mq.clone(), pq.clone());
} else if pq.is_pull_expired() {
if let Some(sub_rebalance) =
self.sub_rebalance_impl.as_mut().unwrap().upgrade()
{
if sub_rebalance.consume_type() == ConsumeType::ConsumePassively {
pq.set_dropped(true);
remove_queue_map.insert(mq.clone(), pq.clone());
error!(
"[BUG]doRebalance, {:?}, try remove unnecessary mq, {}, \
because pull is pause, so try to fixed it",
self.consumer_group,
mq.get_topic()
);
}
Comment on lines +179 to +200
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Possible deadlock due to nested await calls

Within the asynchronous block starting at line 179, there is a read lock on process_queue_table_cloned. Later, at line 189, there's an upgrade of self.sub_rebalance_impl which may attempt to acquire a lock internally. Be cautious of potential deadlocks caused by holding a lock while awaiting on futures that may also require locks.

Consider restructuring the code to avoid holding the read lock across the await calls. For example, collect necessary data before the async calls or minimize the locking scope.

}
}
}
}
}

// Remove message queues no longer belong to me
for (mq, pq) in remove_queue_map {
if let Some(mut sub_rebalance) = self.sub_rebalance_impl.as_mut().unwrap().upgrade() {
if sub_rebalance
.remove_unnecessary_message_queue(&mq, &pq)
.await
{
process_queue_table.remove(&mq);
changed = true;
info!(
"doRebalance, {:?}, remove unnecessary mq, {}",
self.consumer_group,
mq.get_topic()
);
{
if !remove_queue_map.is_empty() {
let mut process_queue_table = process_queue_table_cloned.write().await;
// Remove message queues no longer belong to me
for (mq, pq) in remove_queue_map {
if let Some(mut sub_rebalance) =
self.sub_rebalance_impl.as_mut().unwrap().upgrade()
{
if sub_rebalance
.remove_unnecessary_message_queue(&mq, &pq)
.await
{
process_queue_table.remove(&mq);
changed = true;
info!(
"doRebalance, {:?}, remove unnecessary mq, {}",
self.consumer_group,
mq.get_topic()
);
}
}
}
}
}
Expand All @@ -223,9 +236,10 @@ where
return false;
}
let mut sub_rebalance_impl = sub_rebalance_impl.unwrap();
let mut process_queue_table = process_queue_table_cloned.write().await;
for mq in mq_set {
if !process_queue_table.contains_key(mq) {
if is_order && !self.lock(mq) {
if is_order && !self.lock(mq, process_queue_table.deref_mut()).await {
warn!(
"doRebalance, {:?}, add a new mq failed, {}, because lock failed",
self.consumer_group,
Expand All @@ -236,7 +250,7 @@ where
}

sub_rebalance_impl.remove_dirty_offset(mq).await;
let pq = Arc::new(ProcessQueue::new());
let pq = Arc::new(sub_rebalance_impl.create_process_queue());
pq.set_locked(true);
let next_offset = sub_rebalance_impl.compute_pull_from_where(mq).await;
if next_offset >= 0 {
Expand Down Expand Up @@ -404,7 +418,123 @@ where
queue_set
}

pub fn lock(&self, mq: &MessageQueue) -> bool {
unimplemented!()
pub async fn lock(
&mut self,
mq: &MessageQueue,
process_queue_table: &mut HashMap<MessageQueue, Arc<ProcessQueue>>,
) -> bool {
let client = self.client_instance.as_mut().unwrap();
let broker_name = client.get_broker_name_from_message_queue(mq).await;
let find_broker_result = client
Comment on lines +427 to +428
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Inefficient broker name retrieval

At line 427, get_broker_name_from_message_queue(mq).await is called, which may involve an asynchronous operation. If mq contains the broker name or if it can be retrieved synchronously, consider optimizing this call to reduce unnecessary awaits.

If possible, modify MessageQueue to include broker information or cache the broker names to avoid redundant asynchronous calls.

.find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true)
.await;
if let Some(find_broker_result) = find_broker_result {
Comment on lines +426 to +431
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential unwrap() on None in lock method

At line 426, the code uses self.client_instance.as_mut().unwrap(). Ensure that client_instance is always Some; otherwise, this will cause a panic.

Consider handling the None case explicitly or using expect() with a meaningful message.

let client = self.client_instance.as_mut().expect("client_instance should not be None");

let mut request_body = LockBatchRequestBody {
consumer_group: Some(self.consumer_group.clone().unwrap()),
client_id: Some(client.client_id.clone()),
..Default::default()
};
request_body.mq_set.insert(mq.clone());
let result = client
.mq_client_api_impl
.as_mut()
.unwrap()
.lock_batch_mq(find_broker_result.broker_addr.as_str(), request_body, 1_000)
.await;
match result {
Ok(locked_mq) => {
for mq in &locked_mq {
if let Some(pq) = process_queue_table.get(mq) {
pq.set_locked(true);
pq.set_last_pull_timestamp(get_current_millis());
}
}
let lock_ok = locked_mq.contains(mq);
info!(
"message queue lock {}, {:?} {}",
lock_ok, self.consumer_group, mq
);
lock_ok
}
Comment on lines +445 to +458
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Inefficient handling of locked message queues

When processing the locked_mq set, the code at lines 445-458 iterates over all locked queues and updates their process queues. If locked_mq is large, this could be inefficient.

Consider processing only the relevant message queues or optimizing the data structures for faster lookups.

Err(e) => {
error!("lockBatchMQ exception {},{}", mq, e);
false
}
}
} else {
false
}
}

pub async fn lock_all(&mut self) {
let broker_mqs = self.build_process_queue_table_by_broker_name().await;
for (broker_name, mqs) in broker_mqs {
if mqs.is_empty() {
continue;
}
let client = self.client_instance.as_mut().unwrap();
let find_broker_result = client
.find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true)
.await;
if let Some(find_broker_result) = find_broker_result {
let request_body = LockBatchRequestBody {
consumer_group: Some(self.consumer_group.clone().unwrap()),
client_id: Some(client.client_id.clone()),
mq_set: mqs.clone(),
..Default::default()
};
let result = client
.mq_client_api_impl
.as_mut()
.unwrap()
.lock_batch_mq(find_broker_result.broker_addr.as_str(), request_body, 1_000)
.await;
match result {
Ok(lock_okmqset) => {
let process_queue_table = self.process_queue_table.read().await;
for mq in &mqs {
if let Some(pq) = process_queue_table.get(mq) {
if lock_okmqset.contains(mq) {
if pq.is_locked() {
info!(
"the message queue locked OK, Group: {:?} {}",
self.consumer_group, mq
);
}
pq.set_locked(true);
pq.set_last_lock_timestamp(get_current_millis());
} else {
pq.set_locked(false);
warn!(
"the message queue locked Failed, Group: {:?} {}",
self.consumer_group, mq
);
}
}
}
}
Err(e) => {
error!("lockBatchMQ exception {}", e);
}
}
Comment on lines +562 to +589
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missing error handling when unlocking message queues

In the lock_all method, after attempting to lock message queues, there's no handling for the situation where queues fail to lock. This could lead to processing messages from queues that are not properly locked.

Ensure that when a queue fails to lock, appropriate action is taken, such as marking the queue as dropped or preventing further message consumption from it.

}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Duplicate code in lock_all for handling lock_batch_mq result

The code handling the result of lock_batch_mq in lock_all is similar to that in the lock method. This duplication could be refactored.

Extract a common method for handling lock_batch_mq results to reduce code duplication and improve maintainability.

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Possible parallelization of lock_all operations

The lock_all method processes brokers sequentially, which could lead to delays if there are many brokers. Consider running lock operations concurrently to improve performance.

Use futures::future::join_all or similar to execute lock requests in parallel.

let lock_futures = broker_mqs.into_iter().map(|(broker_name, mqs)| async move {
    // existing locking code
});
futures::future::join_all(lock_futures).await;


async fn build_process_queue_table_by_broker_name(
&self,
) -> HashMap<String /* brokerName */, HashSet<MessageQueue>> {
let mut result = HashMap::new();
let process_queue_table = self.process_queue_table.read().await;
let client = self.client_instance.as_ref().unwrap();
for (mq, pq) in process_queue_table.iter() {
if pq.is_dropped() {
continue;
}
let broker_name = client.get_broker_name_from_message_queue(mq).await;
let entry = result.entry(broker_name).or_insert(HashSet::new());
entry.insert(mq.clone());
}
result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Inefficient cloning in build_process_queue_table_by_broker_name

At line 536, message queues are cloned and inserted into a HashSet. If MessageQueue implements Copy, cloning can be avoided. If not, consider if it's necessary to clone here or if references can be used instead.

Adjust the code to avoid unnecessary cloning:

- entry.insert(mq.clone());
+ entry.insert(mq.to_owned());

Ensure that to_owned() is implemented efficiently for MessageQueue.

Committable suggestion was skipped due to low confidence.

}
}
Loading
Loading