-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1899]🚨PopMessageProcessor supports process_request handle🚀 #1906
Conversation
WalkthroughThe pull request introduces significant modifications to the RocketMQ broker's message processing components, primarily focusing on enhancing the Changes
Assessment against linked issues
Possibly related PRs
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
rocketmq-broker/src/processor/pop_message_processor.rs (3)
17-17
: Consider removing the unused variables allowance.
Currently, "#![allow(unused_variables)]" might hide potential issues if variables remain unused. Unless you have a specific reason to keep it, removing it will ensure the compiler catches dead code or unused variables.
133-143
: Parameterize the 500ms threshold for timeouts.
The 500ms constant is hard-coded when checking for timeouts. Exposing it as a configuration property or constant, rather than an embedded literal, would make it more flexible and testable.
379-379
: Provide an implementation for the unimplemented process_request function.
Leaving the method unimplemented will lead to runtime panic if invoked. If it’s intentionally left blank for future work, consider adding a TODO comment or an appropriate fallback response to avoid unwelcome failures in production.rocketmq-remoting/src/protocol/header/pop_message_request_header.rs (1)
56-58
: Make the 500ms timeout threshold configurable.
"500" in the expression “get_current_millis() - self.born_time - self.poll_time > 500” acts as a magic number. Consider externalizing it in a constant or struct field for clarity and future flexibility.rocketmq-broker/src/processor/change_invisible_time_processor.rs (1)
294-294
: Optimize repeated references to PopMessageProcessor::::gen_*_unique_id.
Repeatedly referencing “PopMessageProcessor::::gen_*_unique_id” might be streamlined by aliasing the function or storing it in a local closure, thus enhancing code brevity.Also applies to: 361-361
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
rocketmq-broker/src/broker_runtime.rs
(1 hunks)rocketmq-broker/src/processor.rs
(2 hunks)rocketmq-broker/src/processor/ack_message_processor.rs
(5 hunks)rocketmq-broker/src/processor/change_invisible_time_processor.rs
(4 hunks)rocketmq-broker/src/processor/pop_message_processor.rs
(2 hunks)rocketmq-common/src/common/broker/broker_config.rs
(2 hunks)rocketmq-remoting/src/protocol/header/pop_message_request_header.rs
(2 hunks)rocketmq-remoting/src/protocol/remoting_command.rs
(1 hunks)
🔇 Additional comments (8)
rocketmq-broker/src/processor.rs (2)
68-68
: Switching to a generic PopMessageProcessor field.
Using the generic parameter “PopMessageProcessor” promotes type safety and flexibility across different message stores. This design choice seems solid.
187-194
: Implementation looks consistent with the existing request flow.
The new match arm for “RequestCode::PopMessage” delegates processing correctly to the pop_message_processor. This keeps responsibilities well-separated in line with other request codes.
rocketmq-broker/src/processor/change_invisible_time_processor.rs (1)
Line range hint 68-81
: Adopting the generic PopMessageProcessor field.
Introducing “ArcMut<PopMessageProcessor>” aligns with the new generic design, ensuring all references share a consistent message store type.
rocketmq-common/src/common/broker/broker_config.rs (1)
183-183
: Verify the usage of the new retry topic feature flag.
The new enable_retry_topic_v2
field appears to be a feature flag for a new retry topic implementation. Let's verify how this flag is used in the codebase.
Also applies to: 272-272
✅ Verification successful
Let me gather more context about the retry topic implementation to understand how this flag affects the behavior.
Let me check one more thing to understand the key builder implementation for retry topics.
Based on the comprehensive analysis of the codebase, I can now provide the final response:
The enable_retry_topic_v2
flag controls the naming format of retry topics and is properly integrated
The flag is correctly implemented and used throughout the codebase:
- It determines whether to use v1 or v2 format for retry topic names via
KeyBuilder::build_pop_retry_topic
- The feature is used in critical paths:
- Pop message processing (
pop_message_processor.rs
) - Topic administration (
topic_request_handler.rs
) - Consumer rebalancing (
rebalance_impl.rs
)
- Pop message processing (
- Default value is
false
to maintain backward compatibility - Both v1 and v2 formats are supported with proper migration handling
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Search for usages of the new retry topic feature flag
# Test: Look for references to enable_retry_topic_v2 in the codebase
rg -A 5 "enable_retry_topic_v2"
# Test: Look for any retry topic v2 related code
ast-grep --pattern 'retry_topic_v2'
Length of output: 2171
Script:
#!/bin/bash
# Look for retry topic related code in pop_message_processor.rs with more context
rg -A 10 "retry_topic" rocketmq-broker/src/processor/pop_message_processor.rs
# Search for retry topic related functions and implementations
ast-grep --pattern 'fn $_($$$) {
$$$
retry_topic
$$$
}'
# Look for any retry topic related constants or configurations
rg "RETRY_TOPIC" -A 2
Length of output: 7317
Script:
#!/bin/bash
# Look for the KeyBuilder implementation to understand retry topic naming
rg -A 10 "build_pop_retry_topic"
# Check for any migration or version-specific code
rg -A 5 "retry_topic.*v[12]"
Length of output: 25363
rocketmq-broker/src/processor/ack_message_processor.rs (2)
72-72
: LGTM! Type safety improvement with generic parameter.
The addition of the generic type parameter MS
to pop_message_processor
field and constructor parameter improves type safety by ensuring consistent message store types throughout the processing chain.
Also applies to: 87-87
397-399
: LGTM! Consistent generic type usage in function calls.
The changes properly specify the generic type parameter MS
in calls to PopMessageProcessor
static methods, maintaining type safety throughout the acknowledgment process.
Also applies to: 408-410, 418-420
rocketmq-remoting/src/protocol/remoting_command.rs (1)
732-742
: LGTM! Well-implemented extension field addition.
The new add_ext_field_if_not_exist
method is well-implemented:
- Uses idiomatic Rust with the entry API
- Safely handles the None case for ext_fields
- Thread-safe implementation requiring mutable access
rocketmq-broker/src/broker_runtime.rs (1)
564-572
: LGTM! Verify initialization order of dependencies.
The PopMessageProcessor initialization is well-structured with proper thread-safe dependency injection. All required components are correctly passed using appropriate Arc wrappers.
Let's verify the initialization order of dependencies:
✅ Verification successful
Dependencies initialization order verified and correct
Based on the code analysis, all required dependencies for PopMessageProcessor are properly initialized in the correct order within BrokerRuntime::new():
- broker_config is initialized first
- consumer_manager is initialized with broker_config
- message_store is initialized in initialize_message_store()
- topic_config_manager is initialized early with broker_config
- subscription_group_manager and consumer_filter_manager are loaded during initialize_metadata()
The initialization sequence ensures all dependencies are available before PopMessageProcessor is constructed.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that all required dependencies are initialized before PopMessageProcessor
# Check initialization order in the file
rg -B 20 "PopMessageProcessor::new" | rg "let .*(consumer_manager|broker_config|message_store|message_store_config|topic_config_manager|subscription_group_manager|consumer_filter_manager)"
Length of output: 188
Script:
#!/bin/bash
# Let's try a different approach to verify the initialization order
# First, let's check the constructor of PopMessageProcessor
ast-grep --pattern 'impl PopMessageProcessor {
$$$
new($$$) {
$$$
}
$$$
}'
# Then, let's check the broader context of where it's initialized
rg -B 30 "let pop_message_processor = ArcMut::new\(PopMessageProcessor::new\("
Length of output: 2818
Script:
#!/bin/bash
# Let's check the initialization of the dependencies in BrokerRuntime
# Search for where these components are initialized
rg "impl BrokerRuntime" -A 50
# Also check where the dependencies are declared
rg "struct BrokerRuntime" -A 30
Length of output: 17898
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
rocketmq-broker/src/processor/pop_message_processor.rs (1)
Several critical methods in PopMessageProcessor are unimplemented and need attention
The following methods in
rocketmq-broker/src/processor/pop_message_processor.rs
are unimplemented:
process_request
: Core functionality for processing pop message requestsqueue_lock_manager
: Method to get the queue lock managernotify_message_arriving
: Method to handle message arrival notificationsThis indicates incomplete implementation of essential pop message processing functionality that could affect message consumption reliability.
🔗 Analysis chain
Line range hint
1-742
: Verify all unimplemented methods in the codebase.Let's check for any remaining unimplemented methods that need to be addressed.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Search for unimplemented methods in the codebase echo "Searching for unimplemented! macros..." rg "unimplemented!\(" --type rust echo "Searching for TODO comments..." rg "TODO:" --type rust echo "Searching for methods that might need implementation..." ast-grep --pattern 'fn $FNAME($$$) { unimplemented!($$$) }'Length of output: 11303
🧹 Nitpick comments (2)
rocketmq-broker/src/processor/pop_message_processor.rs (2)
106-122
: Consider simplifying the born time initialization logic.The current implementation has redundant checks and assignments for the born time.
- request.add_ext_field_if_not_exist( - CheetahString::from_static_str(BORN_TIME), - begin_time_mills.to_string(), - ); - //need optimize - let old = request - .get_ext_fields() - .map_or(CheetahString::empty(), |fields| { - fields.get(BORN_TIME).cloned().unwrap_or_default() - }); - if "0" == old { - request.add_ext_field( - CheetahString::from_static_str(BORN_TIME), - begin_time_mills.to_string(), - ); - } + if request.get_ext_fields() + .and_then(|fields| fields.get(BORN_TIME)) + .map_or(true, |v| v == "0") { + request.add_ext_field( + CheetahString::from_static_str(BORN_TIME), + begin_time_mills.to_string(), + ); + }
17-17
: Consider removing the allow(unused_variables) attribute.The
#![allow(unused_variables)]
attribute is too broad and might hide potential issues. Consider addressing unused variables explicitly or using more targeted allows.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
rocketmq-broker/src/processor/pop_message_processor.rs
(6 hunks)
🔇 Additional comments (4)
rocketmq-broker/src/processor/pop_message_processor.rs (4)
63-71
: LGTM: Well-structured generic implementation with proper dependencies.
The PopMessageProcessor<MS>
struct is well-designed with:
- Generic type parameter for MessageStore flexibility
- Thread-safe field types using Arc/ArcMut
- Clear separation of concerns with distinct manager types
73-92
: LGTM: Clean constructor implementation.
The new
method properly initializes all fields and follows Rust's idiomatic constructor pattern.
Line range hint 581-642
: LGTM: Comprehensive test coverage for ID generation.
The test cases thoroughly verify the ID generation logic for different message types:
- Regular ACK messages
- Batch ACK messages
- Checkpoint messages
Line range hint 644-742
: LGTM: Well-structured lock management tests.
The test suite provides good coverage for the TimedLock
and QueueLockManager
implementations:
- Lock state transitions
- Concurrent operations
- Cleanup behavior
); | ||
(subscription_data, None) | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Implement the unimplemented process_request method.
The process_request
method ends with unimplemented!()
macro, which will panic at runtime.
Would you like me to help implement this method? I can provide a proper implementation that handles the message processing logic.
&retry_topic, | ||
&CheetahString::from_static_str(SubscriptionData::SUB_ALL), | ||
request_header.exp_type.clone(), | ||
) { | ||
Ok(value) => value, | ||
Err(_) => { | ||
return Ok(Some( | ||
RemotingCommand::create_response_command_with_code_remark( | ||
ResponseCode::SubscriptionParseFailed, | ||
"parse the consumer's subscription failed", | ||
), | ||
)); | ||
} | ||
}; | ||
self.consumer_manager.compensate_subscribe_data( | ||
&request_header.consumer_group, | ||
&retry_topic, | ||
&retry_subscription_data, | ||
); | ||
let message_filter = | ||
if !ExpressionType::is_tag_type(Some(subscription_data.expression_type.as_str())) { | ||
let consumer_filter_data = ConsumerFilterManager::build( | ||
request_header.consumer_group.clone(), | ||
request_header.topic.clone(), | ||
request_header.exp.clone(), | ||
request_header.exp_type.clone(), | ||
get_current_millis(), | ||
); | ||
if consumer_filter_data.is_none() { | ||
warn!( | ||
"Parse the consumer's subscription[{:?}] failed, group: {}", | ||
request_header.exp, request_header.consumer_group | ||
); | ||
return Ok(Some( | ||
RemotingCommand::create_response_command_with_code_remark( | ||
ResponseCode::SubscriptionParseFailed, | ||
"parse the consumer's subscription failed", | ||
), | ||
)); | ||
} | ||
let consumer_filter_data = consumer_filter_data.unwrap(); | ||
let message_filter: Box<dyn MessageFilter> = | ||
Box::new(ExpressionMessageFilter::new( | ||
Some(subscription_data.clone()), | ||
Some(consumer_filter_data.clone()), | ||
self.consumer_filter_manager.clone(), | ||
)); | ||
Some(message_filter) | ||
} else { | ||
None | ||
}; | ||
(subscription_data, message_filter) | ||
} else { | ||
let subscription_data = match FilterAPI::build( | ||
&request_header.topic, | ||
&request_header.exp.clone().unwrap_or_default(), | ||
request_header.exp_type.clone(), | ||
) { | ||
Ok(value) => value, | ||
Err(_) => { | ||
return Ok(Some( | ||
RemotingCommand::create_response_command_with_code_remark( | ||
ResponseCode::SubscriptionParseFailed, | ||
"parse the consumer's subscription failed", | ||
), | ||
)); | ||
} | ||
}; | ||
self.consumer_manager.compensate_subscribe_data( | ||
&request_header.consumer_group, | ||
&request_header.topic, | ||
&subscription_data, | ||
); | ||
let retry_topic = CheetahString::from_string(KeyBuilder::build_pop_retry_topic( | ||
&request_header.topic, | ||
&request_header.consumer_group, | ||
self.broker_config.enable_retry_topic_v2, | ||
)); | ||
let retry_subscription_data = match FilterAPI::build( | ||
&retry_topic, | ||
&CheetahString::from_static_str(SubscriptionData::SUB_ALL), | ||
request_header.exp_type.clone(), | ||
) { | ||
Ok(value) => value, | ||
Err(_) => { | ||
return Ok(Some( | ||
RemotingCommand::create_response_command_with_code_remark( | ||
ResponseCode::SubscriptionParseFailed, | ||
"parse the consumer's subscription failed", | ||
), | ||
)); | ||
} | ||
}; | ||
self.consumer_manager.compensate_subscribe_data( | ||
&request_header.consumer_group, | ||
&retry_topic, | ||
&retry_subscription_data, | ||
); | ||
(subscription_data, None) | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider extracting subscription data building into a separate method.
The subscription data building logic is duplicated in both branches of the if-else statement. This increases maintenance overhead and the risk of inconsistencies.
fn build_subscription_data(
&self,
request_header: &PopMessageRequestHeader,
) -> Result<(SubscriptionData, Option<Box<dyn MessageFilter>>), RemotingCommand> {
let subscription_data = FilterAPI::build(
&request_header.topic,
&request_header.exp.clone().unwrap_or_default(),
request_header.exp_type.clone(),
).map_err(|_| {
RemotingCommand::create_response_command_with_code_remark(
ResponseCode::SubscriptionParseFailed,
"parse the consumer's subscription failed",
)
})?;
// ... rest of the logic ...
Ok((subscription_data, message_filter))
}
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1906 +/- ##
==========================================
- Coverage 28.54% 28.42% -0.13%
==========================================
Files 477 477
Lines 67031 67335 +304
==========================================
+ Hits 19137 19139 +2
- Misses 47894 48196 +302 ☔ View full report in Codecov by Sentry. |
Which Issue(s) This PR Fixes(Closes)
Fixes #1899
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
enable_retry_topic_v2
in the BrokerConfig.is_timeout_too_much
to the PopMessageRequestHeader for timeout checks.add_ext_field_if_not_exist
in RemotingCommand for managing extension fields.Bug Fixes
Documentation
Refactor