Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1029]🚀Support order message consume for client🔥 #1034

Merged
merged 1 commit into from
Oct 5, 2024

Conversation

mxsm
Copy link
Owner

@mxsm mxsm commented Oct 5, 2024

Which Issue(s) This PR Fixes(Closes)

Fixes #1029

Brief Description

How Did You Test This Change?

Summary by CodeRabbit

  • New Features

    • Introduced a new example configuration for an orderly message consumer.
    • Added a MessageQueueLock struct to manage locks for message queues.
    • Implemented new methods for managing message consumption and service lifecycle in MQClientAPIImpl.
  • Bug Fixes

    • Updated method signatures to improve async handling and memory management across various services.
  • Refactor

    • Streamlined logging practices and removed unnecessary complexity in DefaultPullCallback.
    • Consolidated message consumption services into a unified structure.
    • Enhanced the organization and management of concurrent and orderly message processing.
  • Documentation

    • Enhanced clarity in method implementations and usage across the codebase.

Copy link
Contributor

coderabbitai bot commented Oct 5, 2024

Caution

Review failed

The pull request is closed.

Walkthrough

The changes in this pull request introduce a new example for orderly message consumption in the rocketmq-client package. It adds an example configuration in Cargo.toml and implements a new consumer example in ordermessage_consumer.rs. Additionally, several modifications are made to existing services and structures, including updates to method signatures and the introduction of a new module for message queue locking. These changes enhance the functionality and structure of the message consumption services, focusing on orderly processing and improved resource management.

Changes

File Change Summary
rocketmq-client/Cargo.toml Added example configuration for ordermessage-consumer.
rocketmq-client/examples/ordermessage/ordermessage_consumer.rs Implemented a new orderly message consumer with logging and message processing logic.
rocketmq-client/src/consumer.rs Introduced a new crate-private module message_queue_lock.
rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Changed ownership type of this parameter in several methods from ArcRefCellWrapper<Self> to WeakCellWrapper<Self>.
rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs Added new fields and methods to ConsumeMessageOrderlyService, including locking mechanisms.
rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs Updated method signatures to use WeakCellWrapper<Self>.
rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs Updated method signatures to use WeakCellWrapper<Self>.
rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs Renamed ConsumeMessageConcurrentlyServiceGeneral to ConsumeMessageServiceGeneral and added new methods.
rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs Restructured fields to consolidate message consumption services.
rocketmq-client/src/consumer/consumer_impl/process_queue.rs Changed visibility of static variables and updated several methods to asynchronous.
rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs Added unlock_all method and modified locking logic.
rocketmq-client/src/consumer/default_mq_push_consumer.rs Changed register_message_listener_orderly from async to sync.
rocketmq-client/src/consumer/listener/message_listener_orderly.rs Updated consume_message method parameters for better borrowing semantics.
rocketmq-client/src/consumer/message_queue_lock.rs Introduced MessageQueueLock struct for managing message queue locks.
rocketmq-client/src/consumer/mq_push_consumer.rs Changed register_message_listener_orderly from async to sync.
rocketmq-client/src/consumer/pull_callback.rs Simplified DefaultPullCallback implementation and removed commented-out code.
rocketmq-client/src/implementation/client_remoting_processor.rs Replaced println! with info! for better logging.
rocketmq-client/src/implementation/mq_client_api_impl.rs Implemented various methods for message sending and consumer management.

Assessment against linked issues

Objective Addressed Explanation
Support order message consume for client (#1029)

Possibly related PRs

Suggested reviewers

  • TeslaRustor

Poem

Hop, hop, in the code we play,
New messages in an orderly way.
With locks and queues, we dance and twirl,
RocketMQ's magic begins to unfurl!
So let’s consume, with joy and cheer,
Orderly messages, we hold dear! 🐰✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot requested a review from SpaceXCN October 5, 2024 05:36
@mxsm
Copy link
Owner Author

mxsm commented Oct 5, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

Copy link

codecov bot commented Oct 5, 2024

Codecov Report

Attention: Patch coverage is 0% with 489 lines in your changes missing coverage. Please review.

Project coverage is 19.52%. Comparing base (5c08587) to head (81e4a59).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
...r/consumer_impl/consume_message_orderly_service.rs 0.00% 374 Missing ⚠️
...client/src/consumer/consumer_impl/process_queue.rs 0.00% 53 Missing ⚠️
rocketmq-client/src/consumer/message_queue_lock.rs 0.00% 23 Missing ⚠️
...sumer_impl/consume_message_concurrently_service.rs 0.00% 20 Missing ⚠️
...onsumer/consumer_impl/re_balance/rebalance_impl.rs 0.00% 8 Missing ⚠️
.../consumer/consumer_impl/consume_message_service.rs 0.00% 3 Missing ⚠️
...mer/consumer_impl/default_mq_push_consumer_impl.rs 0.00% 3 Missing ⚠️
...r_impl/consume_message_pop_concurrently_service.rs 0.00% 2 Missing ⚠️
...nsumer_impl/consume_message_pop_orderly_service.rs 0.00% 2 Missing ⚠️
rocketmq-client/src/consumer/pull_callback.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1034      +/-   ##
==========================================
- Coverage   19.85%   19.52%   -0.34%     
==========================================
  Files         420      422       +2     
  Lines       34624    35222     +598     
==========================================
  Hits         6876     6876              
- Misses      27748    28346     +598     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 31

🧹 Outside diff range and nitpick comments (17)
rocketmq-client/src/consumer.rs (1)

23-23: LGTM! Consider adding documentation for the new module.

The addition of the message_queue_lock module is a good step towards implementing ordered message consumption for the client, which aligns with the PR objectives. The pub(crate) visibility is appropriate for maintaining encapsulation while allowing usage within the crate.

Consider adding a brief documentation comment above the module declaration to explain its purpose and how it contributes to ordered message consumption. This will help other developers understand the module's role in the overall architecture.

Example:

/// Provides locking mechanisms for message queues to ensure ordered message consumption.
pub(crate) mod message_queue_lock;
rocketmq-client/src/consumer/listener/message_listener_orderly.rs (1)

Line range hint 34-36: Update MessageListenerOrderlyFn type definition to match new method signature

The MessageListenerOrderlyFn type definition is now inconsistent with the updated consume_message method signature in the MessageListenerOrderly trait. This inconsistency could lead to errors or unexpected behavior in the codebase.

Please update the MessageListenerOrderlyFn type definition to match the new method signature:

 pub type MessageListenerOrderlyFn = Arc<
-    dyn Fn(Vec<MessageExt>, ConsumeOrderlyContext) -> Result<ConsumeOrderlyStatus> + Send + Sync,
+    dyn Fn(&[&MessageExt], &mut ConsumeOrderlyContext) -> Result<ConsumeOrderlyStatus> + Send + Sync,
 >;

This change ensures consistency with the updated MessageListenerOrderly trait and prevents potential issues in the codebase.

rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs (5)

23-23: Approved: New import added for WeakCellWrapper

The addition of WeakCellWrapper import suggests a shift in the ownership strategy. This change likely aims to prevent potential reference cycles and improve memory management.

Consider documenting the rationale behind this change in the codebase or commit message to help future maintainers understand the design decision.


33-33: Approved: Updated start method signature, but implementation is missing

The change from ArcRefCellWrapper<Self> to WeakCellWrapper<Self> is consistent with the new import and likely improves memory management. However, the method implementation is empty.

Consider adding a TODO comment or implementing the method to ensure it's not overlooked:

-    fn start(&mut self, this: WeakCellWrapper<Self>) {}
+    fn start(&mut self, this: WeakCellWrapper<Self>) {
+        // TODO: Implement start method
+        unimplemented!("start method not yet implemented");
+    }

35-37: Approved: shutdown method is now async, but implementation is missing

The change to make shutdown an async method is a good improvement, likely to handle I/O or other potentially blocking operations during the shutdown process. However, the implementation is still missing.

Consider replacing the todo!() with a more informative comment or placeholder implementation:

     async fn shutdown(&mut self, await_terminate_millis: u64) {
-        todo!()
+        // TODO: Implement async shutdown logic
+        // - Stop accepting new messages
+        // - Wait for ongoing message processing to complete (up to await_terminate_millis)
+        // - Release resources
+        unimplemented!("Async shutdown not yet implemented");
     }

Line range hint 65-71: Approved: Updated submit_consume_request signature, but implementation is missing

The change from ArcRefCellWrapper<Self> to WeakCellWrapper<Self> for the this parameter is consistent with the previous changes and likely improves memory management. However, the method implementation is still missing.

Consider replacing the todo!() with a more informative comment or placeholder implementation:

     ) {
-        todo!()
+        // TODO: Implement submit_consume_request logic
+        // - Validate input parameters
+        // - Create and submit consume task to the thread pool
+        // - Update process queue status
+        unimplemented!("submit_consume_request not yet implemented");
     }

Remaining ArcRefCellWrapper Usage Found

The script identified one remaining usage of ArcRefCellWrapper in default_mq_push_consumer_impl.rs. This indicates that the transition to WeakCellWrapper is not fully consistent across the codebase.

Affected File:

  • rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs

Next Steps:

  1. Replace ArcRefCellWrapper with WeakCellWrapper in the identified file.
  2. Ensure that the change maintains proper memory management and ownership semantics.
  3. Update any related tests to accommodate the change.
🔗 Analysis chain

Line range hint 1-83: Overall review: Architectural changes introduced, but implementations are missing

The changes in this file consistently move towards using WeakCellWrapper instead of ArcRefCellWrapper, which is likely an improvement in memory management and ownership strategies. However, most of the key method implementations are missing or use todo!().

Key points:

  1. The shift to WeakCellWrapper is a significant architectural change that may affect the entire codebase.
  2. The start, shutdown, and submit_consume_request methods lack implementations.
  3. The shutdown method is now asynchronous, which is a good improvement for handling potentially blocking operations.

Consider the following next steps:

  1. Implement the missing methods or add more detailed TODO comments explaining the expected behavior.
  2. Update the PR description to clarify if this is a work in progress or if implementations will be added in separate PRs.
  3. Ensure that all usages of ArcRefCellWrapper related to this struct are updated throughout the codebase to maintain consistency.
  4. Add tests to verify the behavior of the new WeakCellWrapper usage, especially in concurrent scenarios.
  5. Update documentation to reflect the architectural changes and the rationale behind moving to WeakCellWrapper.

To ensure consistency across the codebase, run the following script to check for any remaining ArcRefCellWrapper usages related to ConsumeMessagePopOrderlyService:

This will help identify any places where the change from ArcRefCellWrapper to WeakCellWrapper might have been missed.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for ArcRefCellWrapper usages related to ConsumeMessagePopOrderlyService
rg --type rust "ArcRefCellWrapper.*ConsumeMessagePopOrderlyService" .

Length of output: 234

rocketmq-client/examples/ordermessage/ordermessage_consumer.rs (2)

34-34: Update CONSUMER_GROUP to a unique group name

The CONSUMER_GROUP constant is currently set to "please_rename_unique_group_name_3", which appears to be a placeholder. Please replace it with a meaningful and unique consumer group name to avoid potential conflicts and ensure proper consumer identification within the RocketMQ cluster.


80-81: Remove redundant println! statement for consistent logging

Both println! and info! are used to log the received messages, leading to duplicate logs and inconsistent logging practices. Since you have initialized the logger and are using info! for logging, it's advisable to remove the println! statement.

Apply this diff to remove the redundant print statement:

-            println!("Receive message: {:?}", msg);
             info!("Receive message: {:?}", msg);
rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs (1)

170-187: Potential duplication in start method implementations

The start method in both ConsumeMessageServiceGeneral and ConsumeMessagePopServiceGeneral has similar logic. This might indicate code duplication.

Consider abstracting the common logic into a helper function or using generics to reduce duplication and improve maintainability.

rocketmq-client/src/consumer/pull_callback.rs (3)

Line range hint 71-75: Avoid potential panics by checking for None before unwrapping pull_api_wrapper

In the on_success method, pull_api_wrapper is accessed using as_mut().unwrap() without verifying its presence. If pull_api_wrapper is None, this will result in a panic.

Consider adding a check to ensure pull_api_wrapper is Some before unwrapping, or handle the None case appropriately.

Here's a suggested change:

-            push_consumer_impl
-                .pull_api_wrapper
-                .as_mut()
-                .unwrap()
-                .process_pull_result(
+            if let Some(pull_api_wrapper) = push_consumer_impl.pull_api_wrapper.as_mut() {
+                pull_api_wrapper.process_pull_result(
                     &message_queue_inner,
                     &mut pull_result_ext,
                     &subscription_data,
                 );
+            } else {
+                warn!("pull_api_wrapper is None");
+                // Handle the None case, possibly by returning early
+                return;
+            }

Line range hint 137-141: Handle possible None value when accessing offset_store to prevent panics

In the on_success method, offset_store is obtained using as_mut().unwrap() without checking if it's Some. This can cause a panic if offset_store is None.

Consider safely handling the Option by checking its value before unwrapping.

Suggested refactor:

-            let offset_store = push_consumer_impl.offset_store.as_mut().unwrap();
-            offset_store
-                .update_and_freeze_offset(
-                    pull_request.get_message_queue(),
-                    pull_request.next_offset,
-                )
-                .await;
+            if let Some(offset_store) = push_consumer_impl.offset_store.as_mut() {
+                offset_store
+                    .update_and_freeze_offset(
+                        pull_request.get_message_queue(),
+                        pull_request.next_offset,
+                    )
+                    .await;
+            } else {
+                warn!("offset_store is None");
+                // Handle the None case appropriately
+            }

Line range hint 164-169: Consistent error handling for client_instance to prevent potential panics

In the on_success method, client_instance is accessed via as_ref().unwrap() without checking if it is Some. If client_instance is None, this will cause a panic.

It's important to handle the Option safely to maintain application stability.

Here's how you can refactor the code:

-            push_consumer_impl
-                .rebalance_impl
-                .rebalance_impl_inner
-                .client_instance
-                .as_ref()
-                .unwrap()
-                .re_balance_immediately()
+            if let Some(client_instance) = push_consumer_impl
+                .rebalance_impl
+                .rebalance_impl_inner
+                .client_instance
+                .as_ref()
+            {
+                client_instance.re_balance_immediately()
+            } else {
+                warn!("client_instance is None");
+                // Handle the None case appropriately
+            }
rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs (1)

202-205: Optimize the cloning of this and ensure correct upgrade

In submit_consume_request_later, this is cloned before the upgrade check. Cloning a WeakCellWrapper before upgrading may be unnecessary and can be optimized.

Consider revising the code as follows to clone after ensuring this is valid:

self.consume_runtime.get_handle().spawn(async move {
    tokio::time::sleep(Duration::from_secs(5)).await;
-   let this_ = this.clone();
    if let Some(this) = this.upgrade() {
+       let this_ = this.clone();
        this.submit_consume_request(this_, msgs, process_queue, message_queue, true)
            .await;
    }
});
🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 202-205: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L202-L205
Added lines #L202 - L205 were not covered by tests

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (2)

Line range hint 458-479: Handle Potential None Value for client_instance

Using unwrap() on client_instance can cause a panic if it is None. It's safer to handle this case explicitly.

Modify the code to handle the None case gracefully:

pub async fn lock_with(
    &mut self,
    mq: &MessageQueue,
    process_queue_table: &HashMap<MessageQueue, Arc<ProcessQueue>>,
) -> bool {
-    let client = self.client_instance.as_mut().unwrap();
+    let client = match self.client_instance.as_mut() {
+        Some(client) => client,
+        None => {
+            error!("Client instance is None in lock_with");
+            return false;
+        }
+    };
    // Rest of the code...
}

631-678: Add Logging When Broker Address Is Not Found in unlock_all

In the unlock_all method, if find_broker_result is None, it silently continues. Logging this scenario would aid in debugging.

Add a warning log:

if let Some(find_broker_result) = find_broker_result {
    // Existing code...
} else {
+   warn!(
+       "unlock_all: Broker address not found for broker_name: {}",
+       broker_name
+   );
}
rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs (1)

316-397: Clarify Handling of Different Consumption Statuses

In process_consume_result, the handling of various ConsumeOrderlyStatus cases is complex and might benefit from clarification.

Suggestion:

  • Refactor the match statements to improve readability.
  • Possibly extract some logic into helper methods to handle each status case.
  • Ensure that offset commits and message requeueing are correctly performed for each status.

Benefits:

  • Enhances maintainability and reduces the potential for bugs in consumption state management.
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 5c08587 and 910029d.

📒 Files selected for processing (18)
  • rocketmq-client/Cargo.toml (1 hunks)
  • rocketmq-client/examples/ordermessage/ordermessage_consumer.rs (1 hunks)
  • rocketmq-client/src/consumer.rs (1 hunks)
  • rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs (7 hunks)
  • rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs (3 hunks)
  • rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs (2 hunks)
  • rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs (3 hunks)
  • rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs (2 hunks)
  • rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (6 hunks)
  • rocketmq-client/src/consumer/consumer_impl/process_queue.rs (3 hunks)
  • rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (5 hunks)
  • rocketmq-client/src/consumer/default_mq_push_consumer.rs (1 hunks)
  • rocketmq-client/src/consumer/listener/message_listener_orderly.rs (1 hunks)
  • rocketmq-client/src/consumer/message_queue_lock.rs (1 hunks)
  • rocketmq-client/src/consumer/mq_push_consumer.rs (1 hunks)
  • rocketmq-client/src/consumer/pull_callback.rs (2 hunks)
  • rocketmq-client/src/implementation/client_remoting_processor.rs (0 hunks)
  • rocketmq-client/src/implementation/mq_client_api_impl.rs (0 hunks)
💤 Files with no reviewable changes (2)
  • rocketmq-client/src/implementation/client_remoting_processor.rs
  • rocketmq-client/src/implementation/mq_client_api_impl.rs
🧰 Additional context used
🪛 GitHub Check: codecov/patch
rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs

[warning] 106-106: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L106
Added line #L106 was not covered by tests


[warning] 141-141: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L141
Added line #L141 was not covered by tests


[warning] 202-205: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L202-L205
Added lines #L202 - L205 were not covered by tests


[warning] 238-238: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L238
Added line #L238 was not covered by tests


[warning] 240-242: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L240-L242
Added lines #L240 - L242 were not covered by tests


[warning] 245-246: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L245-L246
Added lines #L245 - L246 were not covered by tests


[warning] 252-252: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L252
Added line #L252 was not covered by tests


[warning] 282-282: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L282
Added line #L282 was not covered by tests


[warning] 349-349: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L349
Added line #L349 was not covered by tests


[warning] 471-475: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L471-L475
Added lines #L471 - L475 were not covered by tests

rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs

[warning] 55-57: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L55-L57
Added lines #L55 - L57 were not covered by tests


[warning] 60-60: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L60
Added line #L60 was not covered by tests


[warning] 75-75: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L75
Added line #L75 was not covered by tests


[warning] 82-92: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L82-L92
Added lines #L82 - L92 were not covered by tests


[warning] 94-98: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L94-L98
Added lines #L94 - L98 were not covered by tests


[warning] 100-102: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L100-L102
Added lines #L100 - L102 were not covered by tests


[warning] 105-105: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L105
Added line #L105 was not covered by tests


[warning] 109-109: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L109
Added line #L109 was not covered by tests


[warning] 111-111: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L111
Added line #L111 was not covered by tests


[warning] 115-118: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L115-L118
Added lines #L115 - L118 were not covered by tests


[warning] 120-120: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L120
Added line #L120 was not covered by tests


[warning] 135-135: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L135
Added line #L135 was not covered by tests


[warning] 137-149: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L137-L149
Added lines #L137 - L149 were not covered by tests


[warning] 151-153: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L151-L153
Added lines #L151 - L153 were not covered by tests


[warning] 155-157: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L155-L157
Added lines #L155 - L157 were not covered by tests


[warning] 159-160: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L159-L160
Added lines #L159 - L160 were not covered by tests


[warning] 162-164: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L162-L164
Added lines #L162 - L164 were not covered by tests


[warning] 166-167: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L166-L167
Added lines #L166 - L167 were not covered by tests


[warning] 169-171: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L169-L171
Added lines #L169 - L171 were not covered by tests


[warning] 173-175: rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs#L173-L175
Added lines #L173 - L175 were not covered by tests

🔇 Additional comments (19)
rocketmq-client/src/consumer/listener/message_listener_orderly.rs (1)

28-29: Approve changes with caution: Breaking changes in trait method signature

The modifications to the consume_message method signature are beneficial for performance and align well with Rust's borrowing rules:

  1. Changing msgs: Vec<MessageExt> to msgs: &[&MessageExt] allows for more efficient message passing without unnecessary copying.
  2. Modifying context: ConsumeOrderlyContext to context: &mut ConsumeOrderlyContext provides flexibility to mutate the context if needed.

However, these are breaking changes that will require updates in all implementations of this trait throughout the codebase.

To ensure all implementations are updated, run the following script:

Please review the script output to ensure all implementations are updated accordingly.

✅ Verification successful

Verification Successful: All Implementations Comply with the New Method Signature

All implementations of the MessageListenerOrderly trait have been updated to use the new consume_message method signature. No outdated method signatures were found in the codebase.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find all implementations of MessageListenerOrderly trait
# and verify they use the new method signature.

# Search for trait implementations
echo "Searching for MessageListenerOrderly implementations:"
rg --type rust "impl.*MessageListenerOrderly.*for" -A 10

# Search for potential outdated method signatures
echo "Checking for potential outdated method signatures:"
rg --type rust "fn\s+consume_message.*Vec<MessageExt>.*ConsumeOrderlyContext[^&]"

Length of output: 1445

rocketmq-client/Cargo.toml (2)

Line range hint 1-84: Overall, the changes look good and align with the PR objectives.

The addition of the "ordermessage-consumer" example configuration is consistent with the existing structure and supports the goal of implementing ordered message consumption for the client.

To ensure the implementation is complete, it would be beneficial to verify the contents of the ordermessage_consumer.rs file:

This will help confirm that the actual implementation of the ordered message consumer is in place and follows the expected patterns for RocketMQ Rust clients.


81-84: LGTM! New example configuration added correctly.

The new example configuration for "ordermessage-consumer" has been added correctly and aligns with the PR objectives to support order message consumption for the client. The format and placement are consistent with other example configurations in the file.

A few observations:

  1. The path examples/ordermessage/ordermessage_consumer.rs follows the established pattern for other examples.
  2. This addition complements the existing "ordermessage-producer" example, providing a complete set for ordered message handling.

To ensure the referenced file exists and follows the expected naming convention, let's run the following verification:

✅ Verification successful

Verification Successful!

The file examples/ordermessage/ordermessage_consumer.rs exists and follows the established naming conventions. The consistency within the ordermessage directory confirms that the addition aligns with the project's patterns.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Verify the existence of the ordermessage_consumer.rs file
fd -p examples/ordermessage/ordermessage_consumer.rs

# List all files in the ordermessage directory to check for consistency
fd -p examples/ordermessage/

Length of output: 274

rocketmq-client/src/consumer/default_mq_push_consumer.rs (2)

578-590: Approved: Improved implementation for ordered message consumption

The changes to register_message_listener_orderly method look good and align with the PR objective of supporting ordered message consumption for clients. Key improvements:

  1. Changed from async to sync, simplifying the implementation.
  2. Directly creates and registers a MessageListener with the orderly consumption logic.
  3. Added 'static lifetime bound to the ML type parameter, ensuring the message listener can be stored and used for the entire lifetime of the consumer.

These changes should provide a more efficient and straightforward way to handle ordered message consumption.


578-590: Verify impact on async operations and message ordering

While the changes to register_message_listener_orderly are localized and look good, it's important to verify that this sync implementation doesn't negatively impact any async operations elsewhere in the codebase. Additionally, ensure that this change properly supports the ordered message consumption feature as intended.

To verify the impact, you can run the following script:

✅ Verification successful

Verification Successful: No conflicts found in async operations and message ordering

After reviewing the codebase, the synchronous register_message_listener_orderly implementation does not interfere with existing asynchronous operations. The changes support the ordered message consumption feature as intended.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for any remaining async calls to register_message_listener_orderly
# and verify the usage of MessageListenerOrderly

echo "Checking for async calls to register_message_listener_orderly:"
rg --type rust 'register_message_listener_orderly\(' -C 3

echo "Verifying usage of MessageListenerOrderly:"
rg --type rust 'MessageListenerOrderly' -C 3

echo "Checking for potential conflicts with async operations:"
rg --type rust 'async.*register_message_listener_orderly' -C 3

Length of output: 14609

rocketmq-client/examples/ordermessage/ordermessage_consumer.rs (1)

83-100: ⚠️ Potential issue

Increment consume_times to correctly track consumption attempts

In the consume_message method, the consume_times counter is used to control message consumption behavior based on the number of times messages have been consumed. However, consume_times is never incremented, so it always remains at its initial value of 0. This means that the conditional logic leveraging consume_times will not function as intended.

Apply this fix to increment consume_times after processing the messages:

             }
+            self.consume_times.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
             if self
                 .consume_times
                 .load(std::sync::atomic::Ordering::Acquire)

Please verify that the consume_times counter now increments correctly and that the message consumption behavior aligns with the expected logic.

rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs (1)

105-128: Ensure consistency in using weak references

When passing ArcRefCellWrapper::downgrade(consume_message_concurrently_service) and ArcRefCellWrapper::downgrade(consume_message_orderly_service) to submit_consume_request, ensure that the services are alive to prevent dangling references.

It's important to verify that the services remain valid during the asynchronous operations. Consider checking the strong count or handling the Weak references appropriately to prevent issues.

rocketmq-client/src/consumer/pull_callback.rs (1)

Line range hint 215-225: Ensure consistent time delay assignment in error handling

In the on_exception method, the time delay for re-executing the pull request is determined differently based on the error. Ensure that the logic consistently assigns time_delay to avoid unintentional fall-through or incorrect delay times.

Review the match arms to confirm that all possible error codes are properly handled and that time_delay is set as intended.

To verify the handling of all ResponseCode variants, you can check if all relevant codes are covered:

This script will help ensure that all necessary ResponseCode variants are considered in the match expressions.

rocketmq-client/src/consumer/consumer_impl/process_queue.rs (1)

40-47: Constants made public for external access

The constants REBALANCE_LOCK_MAX_LIVE_TIME and REBALANCE_LOCK_INTERVAL have been changed to pub, making them accessible outside the module. Ensure that this change is necessary and aligns with the intended encapsulation of your module.

rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs (2)

471-475: Handle potential None value when upgrading consume_message_concurrently_service

Within ConsumeRequest::run, after cloning consume_message_concurrently_service, an upgrade is performed. Ensure that the None case is properly handled to avoid panics.

Review the existing upgrade check to confirm it correctly handles the None case.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 471-475: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L471-L475
Added lines #L471 - L475 were not covered by tests


141-141: Ensure process queue contains message before proceeding

The contains_message method is awaited without handling the result in line 141. Verify that the message existence check is properly handled.

Run the following script to check for error handling in message existence verification:

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 141-141: rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L141
Added line #L141 was not covered by tests

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs (2)

20-20: Import Deref Trait for Method Usage

The addition of use std::ops::Deref; is appropriate since the deref() method is used later in the code, specifically when dereferencing process_queue_table.


29-29: Import UnlockBatchRequestBody for Unlocking Message Queues

The import of UnlockBatchRequestBody is necessary for the implementation of the new unlock_all method, which handles the unlocking of message queues.

rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs (6)

61-62: Good use of necessary imports

The addition of ConsumeMessagePopServiceGeneral and ConsumeMessageServiceGeneral imports is appropriate for the new message consumption services.


114-125: Addition of new fields for message consumption services

The new fields consume_message_service and consume_message_pop_service help unify the message consumption logic. Their implementation appears consistent and appropriate.


161-162: Initialize new consumption service fields in constructor

The initialization of consume_message_service and consume_message_pop_service to None in the constructor is correct and maintains consistency with the rest of the struct initialization.


182-184: Ensure safe handling of optional values

When accessing get_consume_message_concurrently_service() and setting default_mqpush_consumer_impl, please confirm that any None values are properly handled to prevent potential panics.


327-335: Verify that both consume services are started

In the start method, only consume_message_service and consume_message_pop_service are being started. Ensure that both services are necessary to start in all scenarios and confirm that no additional services need to be initialized.


108-108: Confirm the necessity of changing client_instance visibility

The client_instance field has been changed to pub(crate). Please verify if this increased visibility is required and does not expose internal implementation details unnecessarily.

Run the following script to check if client_instance is accessed outside its original scope:

✅ Verification successful

Client Instance Visibility Confirmation

After reviewing the codebase, it is confirmed that changing client_instance to pub(crate) is necessary for its usage across multiple modules, ensuring proper functionality without unnecessarily exposing internal implementation details. Key areas where client_instance is utilized include:

  • implementation/mq_admin_impl.rs
  • implementation/mq_client_api_impl.rs
  • implementation/mq_client_manager.rs
  • implementation/client_remoting_processor.rs
  • producer/producer_impl/default_mq_producer_impl.rs
  • consumer/store/local_file_offset_store.rs
  • consumer/store/remote_broker_offset_store.rs
  • consumer/consumer_impl/re_balance/rebalance_impl.rs
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for external accesses to `client_instance` outside `default_mq_push_consumer_impl.rs`

# Search for usages of `client_instance` in the codebase, excluding the current file
rg --type rust 'client_instance' --glob '!rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs'

Length of output: 13878

Comment on lines +78 to +80
fn register_message_listener_orderly<ML>(&mut self, message_listener: ML)
where
ML: MessageListenerOrderly + Send + Sync;
ML: MessageListenerOrderly + Send + Sync + 'static;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Inconsistency Detected in Listener Registration Methods

The register_message_listener_concurrently method does not reflect the changes applied to register_message_listener_orderly. This discrepancy may lead to inconsistent behaviors in message consumption.

🔗 Analysis chain

Approve changes to register_message_listener_orderly with considerations

The modifications to the register_message_listener_orderly method align with the PR objective of supporting ordered message consumption for clients. The changes appear to be appropriate and beneficial:

  1. Changing from async fn to fn:

    • This simplifies the API by making it synchronous, which is suitable if the method is just registering a listener without performing any I/O operations.
    • It may improve performance by reducing async overhead for this specific operation.
  2. Adding the 'static bound to ML:

    • This ensures that the message listener can be stored and used for the entire lifetime of the consumer, preventing potential lifetime issues with references in the listener.

These changes contribute to a more robust and efficient implementation of ordered message consumption. However, consider the following:

To ensure consistency, let's verify if similar changes have been applied to the concurrent message listener registration method:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check if the concurrent listener registration method has similar changes
rg --type rust "fn register_message_listener_concurrently<ML>" rocketmq-client/src/consumer/mq_push_consumer.rs

Length of output: 196

use tokio::sync::Mutex;

type LockObject = Arc<Mutex<()>>;
type LockTable = Arc<Mutex<HashMap<MessageQueue, Arc<Mutex<HashMap<i32, LockObject>>>>>>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider using DashMap for concurrent access to improve performance

The current implementation uses Mutex locks around HashMaps, which can lead to performance bottlenecks due to serialized access. By utilizing a concurrent data structure like DashMap, you can achieve thread-safe concurrent access without the overhead of locking the entire map, improving performance in high-concurrency scenarios.

Here's how you can refactor the code:

  1. Add dashmap to your Cargo.toml dependencies:

    [dependencies]
    dashmap = "5.1"
  2. Update the imports and type definitions:

    -use std::collections::HashMap;
    -use std::sync::Arc;
    +use dashmap::DashMap;
    +use std::sync::Arc;
    
    -type LockObject = Arc<Mutex<()>>;
    -type LockTable = Arc<Mutex<HashMap<MessageQueue, Arc<Mutex<HashMap<i32, LockObject>>>>>>;
    +type LockObject = Arc<Mutex<()>>;
    +type LockTable = DashMap<MessageQueue, DashMap<i32, LockObject>>;
  3. Modify the MessageQueueLock struct:

     #[derive(Default)]
     pub(crate) struct MessageQueueLock {
    -    mq_lock_table: LockTable,
    +    mq_lock_table: LockTable,
     }
  4. Refactor the fetch_lock_object_with_sharding_key method:

     pub async fn fetch_lock_object_with_sharding_key(
         &self,
         mq: &MessageQueue,
         sharding_key_index: i32,
     ) -> Arc<Mutex<()>> {
    -    let mut mq_lock_table = self.mq_lock_table.lock().await;
    -    let obj_map = mq_lock_table
    -        .entry(mq.clone())
    -        .or_insert_with(|| Arc::new(Mutex::new(HashMap::new())));
    -    let mut obj_map = obj_map.lock().await;
    -    let lock = obj_map
    -        .entry(sharding_key_index)
    -        .or_insert_with(|| Arc::new(Mutex::new(())));
    -    lock.clone()
    +    let obj_map = self.mq_lock_table
    +        .entry(mq.clone())
    +        .or_insert_with(DashMap::new);
    +    let lock = obj_map
    +        .entry(sharding_key_index)
    +        .or_insert_with(|| Arc::new(Mutex::new(())))
    +        .clone();
    +    lock
     }

This refactoring reduces the overhead of locking and can enhance performance in concurrent environments.

Also applies to: 47-54

Comment on lines +31 to +56
impl MessageQueueLock {
pub fn new() -> Self {
MessageQueueLock {
mq_lock_table: Arc::new(Mutex::new(HashMap::new())),
}
}

pub async fn fetch_lock_object(&self, mq: &MessageQueue) -> Arc<Mutex<()>> {
self.fetch_lock_object_with_sharding_key(mq, -1).await
}

pub async fn fetch_lock_object_with_sharding_key(
&self,
mq: &MessageQueue,
sharding_key_index: i32,
) -> Arc<Mutex<()>> {
let mut mq_lock_table = self.mq_lock_table.lock().await;
let obj_map = mq_lock_table
.entry(mq.clone())
.or_insert_with(|| Arc::new(Mutex::new(HashMap::new())));
let mut obj_map = obj_map.lock().await;
let lock = obj_map
.entry(sharding_key_index)
.or_insert_with(|| Arc::new(Mutex::new(())));
lock.clone()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add unit tests for MessageQueueLock functionality

To ensure the correctness and reliability of the MessageQueueLock implementation, please consider adding unit tests that cover:

  • Fetching lock objects for different MessageQueues and verifying that they are distinct.
  • Fetching lock objects with various sharding keys to ensure correct behavior.
  • Simulating concurrent access to test for race conditions and potential deadlocks.

Would you like assistance in writing these unit tests?

Comment on lines +47 to +54
let mut mq_lock_table = self.mq_lock_table.lock().await;
let obj_map = mq_lock_table
.entry(mq.clone())
.or_insert_with(|| Arc::new(Mutex::new(HashMap::new())));
let mut obj_map = obj_map.lock().await;
let lock = obj_map
.entry(sharding_key_index)
.or_insert_with(|| Arc::new(Mutex::new(())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Minimize the scope of locks to reduce potential for deadlocks

The current implementation holds a lock on mq_lock_table while also locking obj_map. Holding multiple locks simultaneously increases the risk of deadlocks and can impact performance. Consider restructuring the code to minimize the time each lock is held and avoid nested locking.

One approach is to:

  • Reduce the scope of each lock to the minimal necessary.
  • Retrieve or insert obj_map without holding the lock longer than needed.
  • Use lock-free data structures where possible (as suggested with DashMap).

Implementing these changes can improve the safety and efficiency of your concurrency control.

Comment on lines +60 to 62
fn start(&mut self, this: WeakCellWrapper<Self>) {
//todo!()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential failure when upgrading WeakCellWrapper in start method

The start method now accepts this: WeakCellWrapper<Self>. Ensure that within the method, you appropriately upgrade the weak reference using this.upgrade(). It's crucial to handle the case where upgrade() might return None, which can occur if the original strong reference has been dropped. Failing to do so may lead to runtime errors when attempting to access a None value.

Comment on lines +62 to +72
pub struct ConsumeMessageOrderlyService {
pub(crate) default_mqpush_consumer_impl: Option<WeakCellWrapper<DefaultMQPushConsumerImpl>>,
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>,
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>,
pub(crate) consumer_group: Arc<String>,
pub(crate) message_listener: ArcBoxMessageListenerOrderly,
pub(crate) consume_runtime: RocketMQRuntime,
pub(crate) stopped: AtomicBool,
pub(crate) global_lock: Arc<RocketMQTokioMutex<()>>,
pub(crate) message_queue_lock: MessageQueueLock,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Redundant Optional Field in Struct Definition

The field default_mqpush_consumer_impl is wrapped with Option<WeakCellWrapper<...>>, but it seems this field is expected to be always present after initialization.

Suggestion:

  • If default_mqpush_consumer_impl is mandatory, remove the Option to simplify code and avoid unnecessary unwrap calls.
  • This change reduces potential for None cases and clarifies the intent.
-pub(crate) default_mqpush_consumer_impl: Option<WeakCellWrapper<DefaultMQPushConsumerImpl>>,
+pub(crate) default_mqpush_consumer_impl: WeakCellWrapper<DefaultMQPushConsumerImpl>,

Note: Ensure all usages are updated accordingly.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub struct ConsumeMessageOrderlyService {
pub(crate) default_mqpush_consumer_impl: Option<WeakCellWrapper<DefaultMQPushConsumerImpl>>,
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>,
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>,
pub(crate) consumer_group: Arc<String>,
pub(crate) message_listener: ArcBoxMessageListenerOrderly,
pub(crate) consume_runtime: RocketMQRuntime,
pub(crate) stopped: AtomicBool,
pub(crate) global_lock: Arc<RocketMQTokioMutex<()>>,
pub(crate) message_queue_lock: MessageQueueLock,
}
pub struct ConsumeMessageOrderlyService {
pub(crate) default_mqpush_consumer_impl: WeakCellWrapper<DefaultMQPushConsumerImpl>,
pub(crate) client_config: ArcRefCellWrapper<ClientConfig>,
pub(crate) consumer_config: ArcRefCellWrapper<ConsumerConfig>,
pub(crate) consumer_group: Arc<String>,
pub(crate) message_listener: ArcBoxMessageListenerOrderly,
pub(crate) consume_runtime: RocketMQRuntime,
pub(crate) stopped: AtomicBool,
pub(crate) global_lock: Arc<RocketMQTokioMutex<()>>,
pub(crate) message_queue_lock: MessageQueueLock,
}

Comment on lines +401 to +415
fn start(&mut self, this: WeakCellWrapper<Self>) {
if MessageModel::Clustering == self.consumer_config.message_model {
self.consume_runtime.get_handle().spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(1_000)).await;
loop {
if let Some(mut this) = this.upgrade() {
this.lock_mqperiodically().await;
tokio::time::sleep(tokio::time::Duration::from_millis(
*REBALANCE_LOCK_INTERVAL,
))
.await;
}
}
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Missed Await in Async Function

In the start method, the spawned task does not include a condition to exit the loop when the service is stopped.

Suggestion:

  • Add a check for self.stopped within the loop to allow graceful shutdown.
  • Prevent potential infinite loops after the service is intended to stop.
loop {
    if let Some(mut this) = this.upgrade() {
+       if this.stopped.load(std::sync::atomic::Ordering::Acquire) {
+           break;
+       }
        this.lock_mqperiodically().await;

Committable suggestion was skipped due to low confidence.

Comment on lines +480 to +723
break;
}
let vec = msgs
.iter()
.map(|msg| &msg.message_ext_inner)
.collect::<Vec<&MessageExt>>();

match consume_message_orderly_service_inner
.message_listener
.consume_message(&vec, &mut context)
{
Ok(value) => {
status = Some(value);
}
Err(_) => {
has_exception = true;
}
}
drop(consume_lock);
if status.is_none()
|| *status.as_ref().unwrap() == ConsumeOrderlyStatus::Rollback
|| *status.as_ref().unwrap() == ConsumeOrderlyStatus::SuspendCurrentQueueAMoment
{
warn!(
"consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
self.consumer_group.as_ref(),
msgs.len(),
self.message_queue,
);
}
let consume_rt = begin_timestamp.elapsed().as_millis() as u64;
if status.is_none() {
if has_exception {
return_type = ConsumeReturnType::Exception;
} else {
return_type = ConsumeReturnType::ReturnNull;
}
} else if consume_rt
>= default_mqpush_consumer_impl.consumer_config.consume_timeout * 60 * 1000
{
return_type = ConsumeReturnType::TimeOut;
} else if *status.as_ref().unwrap()
== ConsumeOrderlyStatus::SuspendCurrentQueueAMoment
{
return_type = ConsumeReturnType::Failed;
} else if *status.as_ref().unwrap() == ConsumeOrderlyStatus::Success {
return_type = ConsumeReturnType::Success;
}
if default_mqpush_consumer_impl.has_hook() {
consume_message_context.as_mut().unwrap().props.insert(
mix_all::CONSUME_CONTEXT_TYPE.to_string(),
return_type.to_string(),
);
}
if status.is_none() {
status = Some(ConsumeOrderlyStatus::SuspendCurrentQueueAMoment);
}
if default_mqpush_consumer_impl.has_hook() {
let status = *status.as_ref().unwrap();
consume_message_context.as_mut().unwrap().success = status
== ConsumeOrderlyStatus::Success
|| status == ConsumeOrderlyStatus::Commit;
consume_message_context.as_mut().unwrap().status = status.to_string();
default_mqpush_consumer_impl.execute_hook_after(&mut consume_message_context);
}
let continue_consume = consume_message_orderly_service_inner
.process_consume_result(
msgs,
consume_message_orderly_service.clone(),
status.unwrap(),
&context,
self,
)
.await;
if !continue_consume {
break;
}
}
} else {
if self.process_queue.is_dropped() {
warn!(
"the message queue not be able to consume, because it's dropped. {}",
self.message_queue
);
return;
}
let consume_message_orderly_service_weak =
ArcRefCellWrapper::downgrade(&consume_message_orderly_service_inner);
consume_message_orderly_service_inner
.try_lock_later_and_reconsume(
consume_message_orderly_service_weak,
&self.message_queue,
self.process_queue.clone(),
100,
)
.await;
}
drop(locked);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Potential issue

Potential Deadlock Confirmed

The analysis shows that a lock acquired at line 22 in ConsumeRequest::run is held across an await point, which may lead to deadlocks.

🔗 Analysis chain

Potential Deadlock Due to Locks in ConsumeRequest::run

The method run acquires multiple locks which may lead to deadlocks if not carefully managed.

Suggestion:

  • Review the locking order and scope to ensure that locks are held for the minimal required duration.
  • Consider using asynchronous locking primitives or redesigning the logic to avoid holding locks across await points.

Specific Points:

  • The consume_lock is held during message consumption; releasing it earlier might improve concurrency.
  • Ensure that any shared state modification is thread-safe and doesn't introduce race conditions.

To confirm the potential for deadlocks, you can analyze the locking sequences:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Analyze lock usage in `ConsumeRequest::run`

# Find all instances of locks being acquired
rg --type rust 'lock\(\)' -A 5 rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs

# Check for locks held across await points
# Expectation: Locks should not be held across await points to prevent deadlocks

Length of output: 789


Script:

#!/bin/bash
# Description: Check if locks acquired with `lock().await` are held across await points in `ConsumeRequest::run`

# Navigate to the target file
cd rocketmq-client/src/consumer/consumer_impl/

# Extract the lines of the run method
rg --type rust 'async fn run' -A 500 consume_message_orderly_service.rs > run_method.rs

# Search for `lock().await` followed by another `await` before the lock is dropped
rg -n 'lock\(\).await' run_method.rs | while read -r line; do
    line_number=$(echo "$line" | cut -d':' -f1)
    # Check the next 50 lines for another `await`
    tail -n +"$line_number" run_method.rs | head -n 50 | grep -q 'await' && echo "Potential hold across await at line $line_number"
done

Length of output: 420

Comment on lines +270 to +295
let consume_message_concurrently_service =
ArcRefCellWrapper::new(ConsumeMessageConcurrentlyService::new(
self.client_config.clone(),
self.consumer_config.clone(),
self.consumer_config.consumer_group.clone(),
listener.clone().expect("listener is None"),
self.default_mqpush_consumer_impl.clone(),
));
self.consume_message_service =
Some(ArcRefCellWrapper::new(ConsumeMessageServiceGeneral::new(
Some(consume_message_concurrently_service),
None,
)));
let consume_message_pop_concurrently_service =
ArcRefCellWrapper::new(ConsumeMessagePopConcurrentlyService::new(
self.client_config.clone(),
self.consumer_config.clone(),
self.consumer_config.consumer_group.clone(),
listener.expect("listener is None"),
));

self.consume_message_pop_service = Some(ArcRefCellWrapper::new(
ConsumeMessagePopServiceGeneral::new(
Some(consume_message_pop_concurrently_service),
None,
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor to eliminate code duplication in service initialization

The initialization code for consume_message_service and consume_message_pop_service is duplicated in both the concurrent and orderly message listener branches. Consider refactoring this code to reduce duplication and enhance maintainability.

Suggested Approach:

  • Extract common initialization steps into a helper function that accepts parameters specific to each listener type.
  • This will make future maintenance easier and reduce the potential for errors in duplicated code.

Also applies to: 298-321

Comment on lines +404 to +408
self.consume_message_service.as_mut()
{
consume_message_concurrently_service
.consume_message_concurrently_service
.shutdown(await_terminate_millis);
.shutdown(await_terminate_millis)
.await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ensure both consume services are properly shut down

The shutdown method currently only calls shutdown on consume_message_service. To ensure all resources are cleaned up correctly, consume_message_pop_service should also be shut down.

Apply this diff to include the shutdown of consume_message_pop_service:

 if let Some(consume_message_concurrently_service) =
     self.consume_message_service.as_mut()
 {
     consume_message_concurrently_service
         .shutdown(await_terminate_millis)
         .await;
 }

+if let Some(consume_message_pop_service) = self.consume_message_pop_service.as_mut() {
+    consume_message_pop_service
+        .shutdown(await_terminate_millis)
+        .await;
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.consume_message_service.as_mut()
{
consume_message_concurrently_service
.consume_message_concurrently_service
.shutdown(await_terminate_millis);
.shutdown(await_terminate_millis)
.await;
self.consume_message_service.as_mut()
{
consume_message_concurrently_service
.shutdown(await_terminate_millis)
.await;
}
if let Some(consume_message_pop_service) = self.consume_message_pop_service.as_mut() {
consume_message_pop_service
.shutdown(await_terminate_millis)
.await;
}

@mxsm
Copy link
Owner Author

mxsm commented Oct 5, 2024

🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥

@SpaceXCN SpaceXCN added approved PR has approved and removed ready to review labels Oct 5, 2024
@mxsm mxsm merged commit e07dbc4 into main Oct 5, 2024
13 of 15 checks passed
@mxsm mxsm deleted the feature-1029 branch October 5, 2024 12:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved PR has approved auto merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature🚀] Support order message consume for client
2 participants