Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1037]Optimize DefaultMQPushConsumer #1038

Merged
merged 1 commit into from
Oct 6, 2024
Merged

[ISSUE #1037]Optimize DefaultMQPushConsumer #1038

merged 1 commit into from
Oct 6, 2024

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Oct 6, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #1037

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced new asynchronous methods for enhanced message pulling and processing in the consumer module.
    • Added comprehensive methods in the MessageAccessor for managing message properties.
  • Improvements

    • Enhanced error handling and scheduled task management in the MQClientInstance.
    • Streamlined message handling process in the PullMessageService by consolidating channels.
  • Bug Fixes

    • Improved flow control logic for message pulling to prevent excessive requests.
  • Documentation

    • Added comments for clarity on new methods and scheduled tasks across various modules.

Copy link
Contributor

coderabbitai bot commented Oct 6, 2024

Walkthrough

The pull request introduces several modifications across multiple files in the rocketmq-client and rocketmq-common directories. Key changes include enhancements to the DefaultMQPushConsumerImpl for service state management and configuration checks, the addition of asynchronous message pulling capabilities in PullAPIWrapper, and the consolidation of message transmission channels in PullMessageService. Additionally, new methods for managing message properties are added in MessageAccessor. Overall, these changes improve the functionality and robustness of the message handling and validation processes.

Changes

File Path Change Summary
rocketmq-client/src/base/validators.rs Removed a formatting artifact line from Validators struct.
rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs Enhanced DefaultMQPushConsumerImpl with new service state checks, configuration validations, and flow control logic.
rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs Added pull_kernel_impl method for asynchronous message pulling and a private method for filter server computation.
rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs Consolidated message channels into one, added pull_message and pop_message methods for handling requests.
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs Simplified topic collection in do_rebalance method and improved message queue handling in update_process_queue_table_in_rebalance.
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs Added put_subscription_data method for managing subscription data.
rocketmq-client/src/consumer/default_mq_push_consumer.rs Added subscription field and set_subscription method in ConsumerConfig.
rocketmq-client/src/factory/mq_client_instance.rs Enhanced error handling and added new scheduled tasks in MQClientInstance.
rocketmq-common/src/common/message/message_accessor.rs Introduced multiple new methods for accessing and modifying message properties in MessageAccessor.

Assessment against linked issues

Objective Addressed Explanation
Optimize DefaultMQPushConsumer (#1037)

Possibly related PRs

Suggested labels

approved, auto merge

Suggested reviewers

  • TeslaRustor
  • SpaceXCN

🐇 "In the code we hop and play,
With messages sent on their way.
From consumers to pulls,
We optimize the rules,
Making RocketMQ brighter each day!" 🐇


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@mxsm
Copy link
Owner Author

mxsm commented Oct 6, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link

codecov bot commented Oct 6, 2024

Codecov Report

Attention: Patch coverage is 0% with 9 lines in your changes missing coverage. Please review.

Project coverage is 19.52%. Comparing base (e07dbc4) to head (63b1a4c).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...src/consumer/consumer_impl/pull_message_service.rs 0.00% 9 Missing ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main    #1038   +/-   ##
=======================================
  Coverage   19.51%   19.52%           
=======================================
  Files         422      423    +1     
  Lines       35222    35223    +1     
=======================================
+ Hits         6873     6876    +3     
+ Misses      28349    28347    -2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

🧹 Outside diff range and nitpick comments (23)
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (1)

Line range hint 115-122: LGTM! Consider adding error handling.

The new put_subscription_data method is well-implemented and thread-safe. The use of #[inline] is appropriate for this small, potentially frequently called method.

Consider adding error handling for the case where acquiring the write lock fails. For example:

pub async fn put_subscription_data(
    &mut self,
    topic: &str,
    subscription_data: SubscriptionData,
) -> Result<(), Box<dyn std::error::Error>> {
    let mut subscription_inner = self.rebalance_impl_inner.subscription_inner.write().await?;
    subscription_inner.insert(topic.to_string(), subscription_data);
    Ok(())
}

This change would allow callers to handle potential errors when acquiring the lock.

rocketmq-client/src/consumer/default_mq_push_consumer.rs (3)

65-65: Consider documenting the rationale for adding the subscription field.

The addition of the subscription field is noted. However, given that it's marked for removal in a future version, it would be beneficial to:

  1. Document the specific reason for adding this field.
  2. Outline the plan for its removal, including the expected timeline and any migration steps for users.
  3. Consider if this field is absolutely necessary, given its temporary nature.

This documentation will help future maintainers understand the context and manage the deprecation process more effectively.


247-252: Enhance documentation for the set_subscription method.

The addition of the set_subscription method is noted. To improve its implementation and usage:

  1. Add a deprecation warning using the #[deprecated] attribute to alert users about its planned removal.
  2. Provide usage examples in the method's documentation.
  3. Consider thread safety implications of setting this shared state and document any threading constraints.

Example:

#[deprecated(
    since = "0.1.0",
    note = "This method will be removed in a future version. Use X instead."
)]
/// Sets the subscription for the consumer.
///
/// # Examples
///
/// ```
/// let mut config = ConsumerConfig::default();
/// let subscription = ArcRefCellWrapper::new(HashMap::new());
/// config.set_subscription(subscription);
/// ```
///
/// # Thread Safety
///
/// This method modifies shared state. Ensure proper synchronization when called from multiple threads.
pub fn set_subscription(&mut self, subscription: ArcRefCellWrapper<HashMap<String, String>>) {
    self.subscription = subscription;
}

This will provide clearer guidance to users of the API and help manage the deprecation process.


65-65: Consider a comprehensive review of subscription handling.

The addition of the subscription field and its setter method, both marked for future removal, suggests an ongoing refactoring of the subscription mechanism in the codebase. To ensure a smooth transition and maintain code quality:

  1. Document the overall plan for subscription handling changes in a central location (e.g., README or CHANGELOG).
  2. Review all usages of the new subscription field across the codebase to ensure consistency.
  3. Consider creating a tracking issue for the removal of these deprecated elements, including any necessary migration steps for users.
  4. Ensure that the new subscription handling mechanism (which will replace this one) is well-documented and thoroughly tested before removing these deprecated elements.

These steps will help manage the transition effectively and minimize potential issues for users of the library.

Also applies to: 247-252

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (3)

Line range hint 236-240: Consider improving error handling

The error handling for the compute_pull_from_where method could be improved. Currently, if next_offset is less than 0, it only logs a warning. Consider adding more robust error handling or recovery mechanisms in this case.

 if next_offset >= 0 {
     // ... existing code ...
 } else {
     warn!(
         "doRebalance, {:?}, add new mq failed, {}",
         self.consumer_group,
         mq.get_topic()
     );
+    // Consider adding error recovery logic here
 }

Line range hint 253-255: Clarify the usage of all_mq_locked

The all_mq_locked variable is set but only used to trigger a rebalance later. Consider adding a comment explaining the purpose of this variable and why a rebalance is scheduled when not all message queues are locked.

 if !all_mq_locked {
+    // Schedule a rebalance later if not all message queues were successfully locked
     self.client_instance.as_mut().unwrap().rebalance_later(500);
 }

Line range hint 165-307: Add comments to explain complex logic

The method contains complex logic, especially around queue management and locking. Consider adding comments to explain the purpose and functionality of different sections, particularly for the queue removal and addition processes. This will improve code maintainability and make it easier for other developers to understand the logic.

rocketmq-common/src/common/message/message_accessor.rs (2)

218-218: Simplify parameter name in set_consume_start_time_stamp

The parameter property_consume_start_time_stamp is verbose. Renaming it to consume_start_timestamp enhances readability.

Apply this diff:

 pub fn set_consume_start_time_stamp<T: MessageTrait>(
     msg: &mut T,
     property_consume_start_time_stamp: &str,
 ) {
     msg.put_property(
         MessageConst::PROPERTY_CONSUME_START_TIMESTAMP,
         property_consume_start_time_stamp,
     );
 }

to

+pub fn set_consume_start_time_stamp<T: MessageTrait>(
+    msg: &mut T,
+    consume_start_timestamp: &str,
+) {
+    msg.put_property(
+        MessageConst::PROPERTY_CONSUME_START_TIMESTAMP,
+        consume_start_timestamp,
+    );
+}

26-31: Enhance documentation with examples

The added methods have documentation comments, but including usage examples can greatly assist users in understanding how to use these methods effectively.

For example, add an example to the set_properties method:

/// Sets the properties of a message.
///
/// # Arguments
///
/// * `msg` - A mutable reference to a message implementing the `MessageTrait`.
/// * `properties` - A `HashMap` containing the properties to set.
///
/// # Examples
///
/// ```rust
/// let mut msg = Message::new();
/// let mut properties = HashMap::new();
/// properties.insert("key".to_string(), "value".to_string());
/// MessageAccessor::set_properties(&mut msg, properties);
/// ```

Also applies to: 37-44, 49-55, 60-66, 85-91, 110-116, 135-141, 160-166, 185-191, 213-219

rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs (6)

Line range hint 145-174: Handle messages without tags to prevent unintended exclusion

In the message filtering logic, messages without tags (msg.get_tags() returning None) are currently excluded from msg_list_filter_again. If the intention is to include messages without tags when the subscription does not specify any tags, consider adjusting the logic to handle None values appropriately. This ensures that messages without tags are not unintentionally filtered out.

Apply this fix to include messages without tags when appropriate:

 let mut msg_list_filter_again =
     if !subscription_data.tags_set.is_empty() && !subscription_data.class_filter_mode {
         let mut msg_vec_again = Vec::with_capacity(msg_vec.len());
         for msg in msg_vec {
             if let Some(ref tag) = msg.get_tags() {
                 if subscription_data.tags_set.contains(tag) {
                     msg_vec_again.push(msg);
                 }
+            } else {
+                // Include messages without tags
+                msg_vec_again.push(msg);
             }
         }
         msg_vec_again
     } else {
         msg_vec
     };

Line range hint 222-324: Prevent potential panic due to unhandled None values

In the pull_kernel_impl method, when find_broker_result is None after updating the topic route info, the code eventually returns an error. However, there's a nested block where find_broker_result is unwrapped without checking for None, which can lead to a panic.

Ensure that find_broker_result is properly checked before usage to prevent a potential panic at runtime.

Apply this fix to handle None values safely:

 if let Some(find_broker_result) = find_broker_result {
     // Existing logic
 } else {
     Err(MQClientErr(
         -1,
         format!("The broker[{}] does not exist", mq.get_broker_name()),
     ))
 }

Line range hint 326-341: Avoid potential panic by handling missing topic route data

In the compute_pull_from_which_filter_server method, using topic_route_data.unwrap() can result in a panic if topic_route_data is None. It's safer to handle this case explicitly to prevent runtime errors.

Apply this fix to handle missing topic route data:

 let topic_route_data = topic_route_table.get(topic);
- let vec = topic_route_data
-     .unwrap()
-     .filter_server_table
-     .get(broker_addr);
+ if let Some(topic_route_data) = topic_route_data {
+     if let Some(vec) = topic_route_data.filter_server_table.get(broker_addr) {
+         return vec.get(random_num() as usize % vec.len()).map_or(
+             Err(MQClientErr(
+                 -1,
+                 format!(
+                     "Find Filter Server Failed, Broker Addr: {}, topic:{}",
+                     broker_addr, topic
+                 ),
+             )),
+             |v| Ok(v.clone()),
+         );
+     }
+ }
+ Err(MQClientErr(
+     -1,
+     format!(
+         "Find Filter Server Failed, Broker Addr: {}, topic:{}",
+         broker_addr, topic
+     ),
+ ))

Line range hint 222-324: Enhance error handling with the ? operator for clarity

In the pull_kernel_impl method, error propagation can be improved by utilizing the ? operator, which can make the code more concise and readable by reducing the need for explicit match or if let statements when handling Result or Option types.

Consider refactoring parts of the method where appropriate:

- let broker_name = self
-     .client_instance
-     .get_broker_name_from_message_queue(mq)
-     .await;
+ let broker_name = self
+     .client_instance
+     .get_broker_name_from_message_queue(mq)
+     .await?;

 // Similarly for other asynchronous calls that return Results

This change assumes that the called methods return Result types, and propagating errors using ? operator is suitable in the context.


231-235: Add logging for easier debugging of broker discovery issues

When find_broker_result is None, the code attempts to update the topic route info from the name server. If the broker is still not found, adding logging statements can help with diagnosing issues related to broker discovery and connectivity.

Consider adding a log statement:

if find_broker_result.is_none() {
    // Add logging here
    log::warn!(
        "Broker not found for broker name: {}, attempting to update topic route info",
        broker_name
    );
    // Existing code to update topic route info
}

Add Send trait to the generic bounds for PullCallback

The current generic bound PCB: PullCallback + 'static does not include Send. To ensure thread safety and proper asynchronous behavior when using await, you should include Send in the bounds. Update the generic bounds as follows:

 where
-    PCB: PullCallback + 'static,
+    PCB: PullCallback + Send + Sync + 'static,
🔗 Analysis chain

Line range hint 245-257: Clarify the generic bounds for PullCallback

The generic bound PCB: PullCallback + 'static may not be sufficient if PCB needs to be Send and Sync for asynchronous operations, especially when using await. Ensure that the PullCallback implementation meets the necessary trait bounds.

Run the following script to check for required trait implementations:

If PullCallback needs to be Send and Sync, update the generic bounds accordingly:

 where
-    PCB: PullCallback + 'static,
+    PCB: PullCallback + Send + Sync + 'static,
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify if PullCallback implementations are Send and Sync

# Search for the PullCallback trait definition
rg --type rust 'trait PullCallback' -A 5

# Check where PullCallback is implemented and ensure Send + Sync are derived
rg --type rust 'impl PullCallback' -A 5

Length of output: 1145

rocketmq-client/src/factory/mq_client_instance.rs (7)

Line range hint 344-354: Handle potential None value to prevent panics

In the start_scheduled_task method, using unwrap() on self.mq_client_api_impl.as_ref() may cause a panic if mq_client_api_impl is None. Consider handling the None case to prevent potential runtime errors.

You can use expect to provide a clearer error message or handle the None case gracefully:

-            let mq_client_api_impl = self.mq_client_api_impl.as_ref().unwrap().clone();
+            let mq_client_api_impl = self
+                .mq_client_api_impl
+                .as_ref()
+                .expect("mq_client_api_impl is None in start_scheduled_task")
+                .clone();

This ensures that if mq_client_api_impl is None, a descriptive error message is provided.


346-346: Remove unnecessary mut declaration

The variable mq_client_api_impl is not mutated after its declaration. You can remove the mut keyword to improve code clarity.

-            let mut mq_client_api_impl = self
+            let mq_client_api_impl = self
                 .mq_client_api_impl
                 .as_ref()
                 .expect("mq_client_api_impl is None in start_scheduled_task")
                 .clone();

Line range hint 344-354: Simplify scheduled task using tokio::time::interval

The manual calculation of delays in the scheduled task can be simplified using tokio::time::interval, which provides a cleaner approach for periodic tasks.

Refactor the code as follows:

             let mq_client_api_impl = self
                 .mq_client_api_impl
                 .as_ref()
                 .expect("mq_client_api_impl is None in start_scheduled_task")
                 .clone();
             self.instance_runtime.get_handle().spawn(async move {
                 info!("ScheduledTask fetchNameServerAddr started");
-                tokio::time::sleep(Duration::from_secs(10)).await;
-                loop {
-                    let current_execution_time = tokio::time::Instant::now();
+                let mut interval = tokio::time::interval(Duration::from_secs(120));
+                interval.tick().await; // Initial delay
+                loop {
                     mq_client_api_impl.fetch_name_server_addr().await;
-                    let next_execution_time = current_execution_time + Duration::from_secs(120);
-                    let delay = next_execution_time.saturating_duration_since(tokio::time::Instant::now());
-                    tokio::time::sleep(delay).await;
+                    interval.tick().await;
                 }
             });

This change improves readability and ensures consistent execution intervals.


Line range hint 360-370: Remove unnecessary mut declaration and simplify timing logic

The variable client_instance does not need to be mutable. Additionally, simplifying the timing logic using tokio::time::interval can enhance code maintainability.

Remove mut and refactor the code:

-            let mut client_instance = this.clone();
+            let client_instance = this.clone();
             let poll_name_server_interval = Duration::from_millis(self.client_config.poll_name_server_interval as u64);
             self.instance_runtime.get_handle().spawn(async move {
                 info!("ScheduledTask update_topic_route_info_from_name_server started");
-                tokio::time::sleep(Duration::from_millis(10)).await;
-                loop {
-                    let current_execution_time = tokio::time::Instant::now();
+                let mut interval = tokio::time::interval(poll_name_server_interval);
+                interval.tick().await; // Initial delay
+                loop {
                     client_instance.update_topic_route_info_from_name_server().await;
-                    let next_execution_time = current_execution_time + poll_name_server_interval;
-                    let delay = next_execution_time.saturating_duration_since(tokio::time::Instant::now());
-                    tokio::time::sleep(delay).await;
+                    interval.tick().await;
                 }
             });

This refactoring reduces complexity and aligns the timing mechanism with idiomatic async Rust patterns.


Line range hint 379-390: Remove unnecessary mut declaration and use tokio::time::interval

The variable client_instance is not mutated. Also, using tokio::time::interval simplifies the timing logic for the scheduled task.

Make the following changes:

-            let mut client_instance = this.clone();
+            let client_instance = this.clone();
             let heartbeat_broker_interval = Duration::from_millis(self.client_config.heartbeat_broker_interval as u64);
             self.instance_runtime.get_handle().spawn(async move {
                 info!("ScheduledTask clean_offline_broker started");
-                tokio::time::sleep(Duration::from_secs(1)).await;
-                loop {
-                    let current_execution_time = tokio::time::Instant::now();
+                let mut interval = tokio::time::interval(heartbeat_broker_interval);
+                interval.tick().await; // Initial delay
+                loop {
                     client_instance.clean_offline_broker().await;
                     client_instance.send_heartbeat_to_all_broker_with_lock().await;
-                    let next_execution_time = current_execution_time + heartbeat_broker_interval;
-                    let delay = next_execution_time.saturating_duration_since(tokio::time::Instant::now());
-                    tokio::time::sleep(delay).await;
+                    interval.tick().await;
                 }
             });

This improves code clarity and ensures consistent timing intervals.


Line range hint 399-410: Correct variable usage and simplify timing logic

The assignment let mut client_instance = this; may lead to ownership issues. Additionally, simplifying the timing logic enhances readability.

Update the code as follows:

-            let mut client_instance = this;
+            let client_instance = this.clone();
             let persist_consumer_offset_interval = Duration::from_millis(self.client_config.persist_consumer_offset_interval as u64);
             self.instance_runtime.get_handle().spawn(async move {
                 info!("ScheduledTask persistAllConsumerOffset started");
-                tokio::time::sleep(Duration::from_secs(10)).await;
-                loop {
-                    let current_execution_time = tokio::time::Instant::now();
+                let mut interval = tokio::time::interval(persist_consumer_offset_interval);
+                interval.tick().await; // Initial delay
+                loop {
                     client_instance.persist_all_consumer_offset().await;
-                    let next_execution_time = current_execution_time + persist_consumer_offset_interval;
-                    let delay = next_execution_time.saturating_duration_since(tokio::time::Instant::now());
-                    tokio::time::sleep(delay).await;
+                    interval.tick().await;
                 }
             });

By cloning this, you avoid potential ownership and borrowing issues. The use of tokio::time::interval simplifies the periodic execution.


Line range hint 399-410: Consistent initial delay across scheduled tasks

Ensure that all scheduled tasks have consistent initial delays to prevent potential race conditions or unintended behavior at startup.

In the persist consumer offset task, consider adding an initial delay similar to other tasks:

             self.instance_runtime.get_handle().spawn(async move {
                 info!("ScheduledTask persistAllConsumerOffset started");
+                interval.tick().await; // Initial delay
                 loop {
                     client_instance.persist_all_consumer_offset().await;
                     interval.tick().await;
                 }
             });

This maintains consistency and allows for any necessary setup before the first execution.

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (1)

206-208: Clarify or remove misleading comment

The comment //copy_subscription is can be removed suggests that the copy_subscription method may no longer be necessary. However, the method self.copy_subscription().await?; is still being called immediately after this comment. To avoid confusion, please clarify the intent of this comment or remove it if the method call is required.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between e07dbc4 and 63b1a4c.

📒 Files selected for processing (9)
  • rocketmq-client/src/base/validators.rs (0 hunks)
  • rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (3 hunks)
  • rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs (2 hunks)
  • rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs (2 hunks)
  • rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (1 hunks)
  • rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs (1 hunks)
  • rocketmq-client/src/consumer/default_mq_push_consumer.rs (2 hunks)
  • rocketmq-client/src/factory/mq_client_instance.rs (5 hunks)
  • rocketmq-common/src/common/message/message_accessor.rs (2 hunks)
💤 Files with no reviewable changes (1)
  • rocketmq-client/src/base/validators.rs
🧰 Additional context used
🔇 Additional comments (5)
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (2)

97-97: Improved efficiency in topic collection

The change to directly clone keys into a HashSet<String> is a good optimization. It simplifies the code and potentially improves performance by avoiding an unnecessary intermediate step.


Line range hint 165-307: Improved message queue handling logic

The changes in this method significantly enhance the handling of message queues. The new implementation more efficiently removes unnecessary queues, improves the logic for checking if a queue should be dropped, and streamlines the process of adding new message queues. These improvements should lead to better performance and more robust queue management.

rocketmq-client/src/consumer/consumer_impl/pull_message_service.rs (1)

82-84: ⚠️ Potential issue

Potential panic due to unwrap() on tx field

In the methods execute_pull_request_later and execute_pull_request_immediately, using unwrap() on self.tx.as_ref() can cause a panic if tx is None. This might happen if these methods are called before the start method initializes tx.

Ensure that tx is always initialized before calling these methods or handle the None case gracefully to prevent panics.

Verification:

Confirm that start is always called before invoking execute_pull_request_later and execute_pull_request_immediately. If there's a possibility of tx being None, consider returning a Result or logging a warning instead of unwrapping.

Also applies to: 89-91

rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs (1)

228-230: ⚠️ Potential issue

Correct the error message to reflect the expression type

In the error message when the broker does not support the specified expression type, the message mentions filtering by tag, even if the expression type is different.

Apply this fix to make the error message accurate:

 return Err(MQClientErr(
     -1,
     format!(
-        "The broker[{}],[{}] does not support consumer to filter message by \
-         tag[{}]",
+        "The broker[{}],[{}] does not support consumer to filter messages by \
+         expression type [{}]",
         mq.get_broker_name(),
         find_broker_result.broker_version,
         expression_type
     ),
 ));

Likely invalid or redundant comment.

rocketmq-client/src/factory/mq_client_instance.rs (1)

Line range hint 379-390: Handle potential errors when sending heartbeats

In the scheduled task for sending heartbeats, consider handling potential errors from the send_heartbeat_to_all_broker_with_lock method to ensure robustness.

Add error handling to log or manage any issues that may arise during the heartbeat process.

                     client_instance.clean_offline_broker().await;
-                    client_instance.send_heartbeat_to_all_broker_with_lock().await;
+                    if let Err(e) = client_instance.send_heartbeat_to_all_broker_with_lock().await {
+                        warn!("Failed to send heartbeat: {}", e);
+                    }

This ensures that any failures in sending heartbeats are appropriately logged.

Comment on lines +42 to +50
while let Some(request) = rx.recv().await {
if request.get_message_request_mode() == MessageRequestMode::Pull {
let pull_request =
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PullRequest) };
PullMessageService::pull_message(pull_request, instance.as_mut()).await;
} else {
warn!(
"No matched consumer for the PullRequest {},drop it",
request
)
let pop_request =
unsafe { *Box::from_raw(Box::into_raw(request) as *mut PopRequest) };
PullMessageService::pop_message(pop_request, instance.as_mut()).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Avoid unsafe downcasting to prevent undefined behavior

The use of unsafe blocks to downcast Box<dyn MessageRequest> to PullRequest or PopRequest using pointer casting is risky and can lead to undefined behavior if the actual type does not match the expected type. This might cause application crashes or security vulnerabilities.

Consider implementing safe downcasting by extending the MessageRequest trait with the Any trait and providing an as_any method for downcasting.

Apply these changes to enable safe downcasting:

  1. Extend MessageRequest with Any and add as_any method:

    +use std::any::Any;
    
    -pub trait MessageRequest {
    +pub trait MessageRequest: Any {
        // Existing methods...
    +   fn as_any(&self) -> &dyn Any;
    }
  2. Implement as_any in PullRequest and PopRequest:

    impl MessageRequest for PullRequest {
        // Existing implementations...
        fn as_any(&self) -> &dyn Any {
            self
        }
    }
    
    impl MessageRequest for PopRequest {
        // Existing implementations...
        fn as_any(&self) -> &dyn Any {
            self
        }
    }
  3. Modify the message handling loop to use safe downcasting:

    while let Some(request) = rx.recv().await {
        if let Some(pull_request) = request.as_any().downcast_ref::<PullRequest>() {
            Self::pull_message(pull_request.clone(), instance.as_mut()).await;
        } else if let Some(pop_request) = request.as_any().downcast_ref::<PopRequest>() {
            Self::pop_message(pop_request.clone(), instance.as_mut()).await;
        } else {
            warn!("Received unknown MessageRequest type");
        }
    }

This approach eliminates the need for unsafe code and ensures type safety when downcasting.

Comment on lines +98 to 100
if let Err(e) = this.tx.as_ref().unwrap().send(Box::new(pop_request)).await {
warn!("Failed to send pull request to pull_tx, error: {:?}", e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Incorrect error messages for pop requests

In execute_pop_pull_request_later and execute_pop_pull_request_immediately, the error messages incorrectly mention "pull request" and "pull_tx" when they should refer to "pop request" and "tx".

Update the error messages to accurately reflect the operation and current channel name.

Apply this diff to correct the error messages:

 if let Err(e) = this.tx.as_ref().unwrap().send(Box::new(pop_request)).await {
-    warn!("Failed to send pull request to pull_tx, error: {:?}", e);
+    warn!("Failed to send pop request to tx, error: {:?}", e);
 }

Repeat the change for both methods to maintain consistency.

Also applies to: 105-107

Comment on lines +82 to 84
if let Err(e) = this.tx.as_ref().unwrap().send(Box::new(pull_request)).await {
warn!("Failed to send pull request to pull_tx, error: {:?}", e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Outdated channel name in error messages for pull requests

In execute_pull_request_later and execute_pull_request_immediately, the error messages reference pull_tx, which no longer exists after consolidating channels into tx.

Update the error messages to reflect the current channel name to avoid confusion.

Apply this diff to update the error messages:

 if let Err(e) = self.tx.as_ref().unwrap().send(Box::new(pull_request)).await {
-    warn!("Failed to send pull request to pull_tx, error: {:?}", e);
+    warn!("Failed to send pull request to tx, error: {:?}", e);
 }

Ensure all mentions of pull_tx are updated accordingly.

Also applies to: 89-91

Comment on lines +71 to +80
/// Gets the transfer flag of a message.
///
/// # Arguments
///
/// * `msg` - A reference to a message implementing the `MessageTrait`.
///
/// # Returns
///
/// * `Option<String>` - The transfer flag value if it exists.
#[inline]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider returning specific types in get_ methods

The get_ methods currently return Option<String>, even when the properties represent numeric values like timestamps or counts. Consider parsing these strings into appropriate numeric types (e.g., u32, u64) to improve usability and reduce the need for parsing at the call site.

Example for get_reconsume_time method:

 pub fn get_reconsume_time<T: MessageTrait>(msg: &T) -> Option<String> {
     msg.get_property(MessageConst::PROPERTY_RECONSUME_TIME)
 }

to

+pub fn get_reconsume_time<T: MessageTrait>(msg: &T) -> Option<u32> {
+    msg.get_property(MessageConst::PROPERTY_RECONSUME_TIME)
+        .and_then(|value| value.parse::<u32>().ok())
+}

Apply similar changes to other get_ methods where applicable.

Also applies to: 96-105, 121-130, 146-155, 171-179, 199-208, 230-239

Comment on lines +49 to +55
/// Clears a property from a message.
///
/// # Arguments
///
/// * `msg` - A mutable reference to a message implementing the `MessageTrait`.
/// * `name` - The name of the property to clear.
#[inline]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for missing properties in clear_property

When clearing a property using clear_property, it might be helpful to handle the case where the property does not exist. Consider returning a result or indicating whether the property was found and cleared, to provide better feedback to the caller.

/// # Arguments
///
/// * `msg` - A mutable reference to a message implementing the `MessageTrait`.
/// * `unit` - The transfer flag value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Rename parameter 'unit' to 'transfer_flag' in set_transfer_flag

The parameter name unit in the set_transfer_flag method may be misleading. Renaming it to transfer_flag would improve clarity and better represent its purpose.

Apply this diff to rename the parameter:

 pub fn set_transfer_flag<T: MessageTrait>(msg: &mut T, unit: &str) {
     msg.put_property(MessageConst::PROPERTY_TRANSFER_FLAG, unit);
 }

to

+pub fn set_transfer_flag<T: MessageTrait>(msg: &mut T, transfer_flag: &str) {
+    msg.put_property(MessageConst::PROPERTY_TRANSFER_FLAG, transfer_flag);
+}

Committable suggestion was skipped due to low confidence.

///
/// * `msg` - A mutable reference to a message implementing the `MessageTrait`.
/// * `unit` - The correction flag value.
#[inline]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Rename parameter 'unit' to 'correction_flag' in set_correction_flag

Similarly, in the set_correction_flag method, the parameter unit should be renamed to correction_flag for better readability.

Apply this diff:

 pub fn set_correction_flag<T: MessageTrait>(msg: &mut T, unit: &str) {
     msg.put_property(MessageConst::PROPERTY_CORRECTION_FLAG, unit);
 }

to

+pub fn set_correction_flag<T: MessageTrait>(msg: &mut T, correction_flag: &str) {
+    msg.put_property(MessageConst::PROPERTY_CORRECTION_FLAG, correction_flag);
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[inline]
#[inline]
pub fn set_correction_flag<T: MessageTrait>(msg: &mut T, correction_flag: &str) {
msg.put_property(MessageConst::PROPERTY_CORRECTION_FLAG, correction_flag);
}

Comment on lines +285 to +294
.expect("mq_client_api_impl is None")
.fetch_name_server_addr()
.await;
}
// Start request-response channel
self.mq_client_api_impl.as_mut().unwrap().start().await;
self.mq_client_api_impl
.as_mut()
.expect("mq_client_api_impl is None")
.start()
.await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor repeated as_mut().expect() calls for clarity

In the start method, the repeated calls to self.mq_client_api_impl.as_mut().expect("mq_client_api_impl is None") can be refactored to improve readability and reduce redundancy.

Consider assigning the unwrapped mq_client_api_impl to a variable:

+            let mq_client_api_impl = self.mq_client_api_impl.as_mut().expect("mq_client_api_impl is None");
             if self.client_config.namesrv_addr.is_none() {
-                self.mq_client_api_impl
-                    .as_mut()
-                    .expect("mq_client_api_impl is None")
-                    .fetch_name_server_addr()
-                    .await;
+                mq_client_api_impl.fetch_name_server_addr().await;
             }
-            self.mq_client_api_impl
-                .as_mut()
-                .expect("mq_client_api_impl is None")
-                .start()
-                .await;
+            mq_client_api_impl.start().await;

This change reduces code repetition and enhances readability.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.expect("mq_client_api_impl is None")
.fetch_name_server_addr()
.await;
}
// Start request-response channel
self.mq_client_api_impl.as_mut().unwrap().start().await;
self.mq_client_api_impl
.as_mut()
.expect("mq_client_api_impl is None")
.start()
.await;
let mq_client_api_impl = self.mq_client_api_impl.as_mut().expect("mq_client_api_impl is None");
if self.client_config.namesrv_addr.is_none() {
mq_client_api_impl.fetch_name_server_addr().await;
}
// Start request-response channel
mq_client_api_impl.start().await;

Comment on lines +509 to +516
.is_none()
&& self
.consumer_config
.message_listener
.as_ref()
.unwrap()
.message_listener_concurrently
.is_some()
.is_none()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify conditional checks to improve readability

The conditional check uses multiple unwrap() calls on message_listener, which can be simplified for better readability and to avoid potential panics. Consider using pattern matching or if let constructs to handle the Option safely.

For example:

if let Some(message_listener) = &self.consumer_config.message_listener {
    if message_listener.message_listener_orderly.is_none()
        && message_listener.message_listener_concurrently.is_none()
    {
        return Err(MQClientError::MQClientErr(
            -1,
            format!(
                "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently{}",
                FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL)
            ),
        ));
    }
} else {
    return Err(MQClientError::MQClientErr(
        -1,
        format!(
            "messageListener is null{}",
            FAQUrl::suggest_todo(FAQUrl::CLIENT_PARAMETER_CHECK_URL)
        ),
    ));
}

Comment on lines +682 to +693
for (topic, sub_expression) in sub.as_ref() {
let subscription_data = FilterAPI::build_subscription_data(topic, sub_expression)
.map_err(|e| {
MQClientError::MQClientErr(
-1,
format!("buildSubscriptionData exception, {}", e),
)
})?;
self.rebalance_impl
.put_subscription_data(topic, subscription_data)
.await;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify error handling for improved readability

The error handling within the map_err closure can be streamlined for better clarity. Instead of the multi-line closure, consider a more concise expression.

For example:

let subscription_data = FilterAPI::build_subscription_data(topic, sub_expression)
    .map_err(|e| MQClientError::MQClientErr(-1, format!("buildSubscriptionData exception, {}", e)))?;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved PR has approved auto merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Enhancement⚡️] Optimize DefaultMQPushConsumer
2 participants