-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1694]🚀Rocketmq-broker supports EscapeBridge functions🔥 #1716
Conversation
WalkthroughThe pull request introduces significant changes to the Changes
Assessment against linked issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
🔊@mxsm 🚀Thanks for your contribution 🎉. CodeRabbit(AI) will review your code first 🔥 |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1716 +/- ##
==========================================
- Coverage 27.86% 27.84% -0.03%
==========================================
Files 473 473
Lines 63928 63984 +56
==========================================
Hits 17816 17816
- Misses 46112 46168 +56 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (3)
rocketmq-broker/src/failover/escape_bridge.rs (1)
84-85
: Document the optionality of fieldsThe fields
escape_bridge_runtime
andmessage_store
are marked as optional, but there's no documentation explaining under what conditions these fields would be None vs Some. Consider adding documentation to clarify the valid states and usage patterns.rocketmq-broker/src/broker_runtime.rs (2)
140-141
: Document feature flag implicationsThe
escape_bridge
field is conditionally compiled with thelocal_file_store
feature flag, but there's no documentation explaining why this dependency exists or what happens when the feature is disabled.Consider adding documentation explaining the feature flag dependency and its implications.
237-241
: Consider initialization order dependenciesThe
escape_bridge
is initialized with dependencies onbroker_config
,topic_route_info_manager
, andbroker_outer_api
. However, there's no explicit documentation or validation of the initialization order requirements.Consider:
- Adding documentation about initialization order requirements
- Implementing a builder pattern to enforce correct initialization order
- Adding validation to ensure dependencies are properly initialized
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 233-241: rocketmq-broker/src/broker_runtime.rs#L233-L241
Added lines #L233 - L241 were not covered by tests
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
📒 Files selected for processing (2)
rocketmq-broker/src/broker_runtime.rs
(15 hunks)rocketmq-broker/src/failover/escape_bridge.rs
(5 hunks)
🧰 Additional context used
🪛 GitHub Check: codecov/patch
rocketmq-broker/src/broker_runtime.rs
[warning] 160-160: rocketmq-broker/src/broker_runtime.rs#L160
Added line #L160 was not covered by tests
[warning] 181-181: rocketmq-broker/src/broker_runtime.rs#L181
Added line #L181 was not covered by tests
[warning] 233-241: rocketmq-broker/src/broker_runtime.rs#L233-L241
Added lines #L233 - L241 were not covered by tests
[warning] 259-259: rocketmq-broker/src/broker_runtime.rs#L259
Added line #L259 was not covered by tests
[warning] 279-280: rocketmq-broker/src/broker_runtime.rs#L279-L280
Added lines #L279 - L280 were not covered by tests
[warning] 293-293: rocketmq-broker/src/broker_runtime.rs#L293
Added line #L293 was not covered by tests
[warning] 490-490: rocketmq-broker/src/broker_runtime.rs#L490
Added line #L490 was not covered by tests
[warning] 535-535: rocketmq-broker/src/broker_runtime.rs#L535
Added line #L535 was not covered by tests
[warning] 770-771: rocketmq-broker/src/broker_runtime.rs#L770-L771
Added lines #L770 - L771 were not covered by tests
[warning] 777-777: rocketmq-broker/src/broker_runtime.rs#L777
Added line #L777 was not covered by tests
[warning] 782-782: rocketmq-broker/src/broker_runtime.rs#L782
Added line #L782 was not covered by tests
[warning] 800-800: rocketmq-broker/src/broker_runtime.rs#L800
Added line #L800 was not covered by tests
[warning] 862-862: rocketmq-broker/src/broker_runtime.rs#L862
Added line #L862 was not covered by tests
[warning] 1009-1010: rocketmq-broker/src/broker_runtime.rs#L1009-L1010
Added lines #L1009 - L1010 were not covered by tests
rocketmq-broker/src/failover/escape_bridge.rs
[warning] 92-115: rocketmq-broker/src/failover/escape_bridge.rs#L92-L115
Added lines #L92 - L115 were not covered by tests
[warning] 117-126: rocketmq-broker/src/failover/escape_bridge.rs#L117-L126
Added lines #L117 - L126 were not covered by tests
[warning] 138-142: rocketmq-broker/src/failover/escape_bridge.rs#L138-L142
Added lines #L138 - L142 were not covered by tests
[warning] 222-222: rocketmq-broker/src/failover/escape_bridge.rs#L222
Added line #L222 was not covered by tests
[warning] 268-272: rocketmq-broker/src/failover/escape_bridge.rs#L268-L272
Added lines #L268 - L272 were not covered by tests
[warning] 317-321: rocketmq-broker/src/failover/escape_bridge.rs#L317-L321
Added lines #L317 - L321 were not covered by tests
self.message_store | ||
.as_mut() | ||
.unwrap() | ||
.put_message(message_ext) | ||
.await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace unsafe unwraps with proper error handling
Multiple methods use unwrap()
on message_store
which could panic if the store wasn't initialized. This is particularly risky in the put_message
, async_put_message
, and put_message_to_specific_queue
methods.
Consider using proper error handling:
-self.message_store
- .as_mut()
- .unwrap()
- .put_message(message_ext)
- .await
+self.message_store
+ .as_mut()
+ .ok_or(PutMessageStatus::ServiceNotAvailable)?
+ .put_message(message_ext)
+ .await
Also applies to: 268-272, 317-321
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 138-142: rocketmq-broker/src/failover/escape_bridge.rs#L138-L142
Added lines #L138 - L142 were not covered by tests
impl<MS> EscapeBridge<MS> { | ||
pub fn new( | ||
broker_config: Arc<BrokerConfig>, | ||
topic_route_info_manager: Arc<TopicRouteInfoManager>, | ||
broker_outer_api: Arc<BrokerOuterAPI>, | ||
) -> Self { | ||
let inner_producer_group_name = CheetahString::from_string(format!( | ||
"InnerProducerGroup_{}_{}", | ||
broker_config.broker_name, broker_config.broker_identity.broker_id | ||
)); | ||
let inner_consumer_group_name = CheetahString::from_string(format!( | ||
"InnerConsumerGroup_{}_{}", | ||
broker_config.broker_name, broker_config.broker_identity.broker_id | ||
)); | ||
|
||
Self { | ||
inner_producer_group_name, | ||
inner_consumer_group_name, | ||
escape_bridge_runtime: None, | ||
message_store: None, | ||
broker_config, | ||
topic_route_info_manager, | ||
broker_outer_api, | ||
} | ||
} | ||
|
||
pub fn start(&mut self, message_store: Option<ArcMut<MS>>) { | ||
if self.broker_config.enable_slave_acting_master && self.broker_config.enable_remote_escape | ||
{ | ||
self.escape_bridge_runtime = Some(RocketMQRuntime::new_multi( | ||
num_cpus::get(), | ||
"AsyncEscapeBridgeExecutor", | ||
)); | ||
self.message_store = message_store; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve test coverage for critical functionality
The core functionality of EscapeBridge, including initialization and message handling, lacks test coverage. Only the transform_send_result2put_result
helper function is currently tested.
Consider adding tests for:
- Bridge initialization with different config combinations
- Message handling with both initialized and uninitialized states
- Error handling scenarios
- Start method behavior
Would you like me to help generate a comprehensive test suite?
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 92-115: rocketmq-broker/src/failover/escape_bridge.rs#L92-L115
Added lines #L92 - L115 were not covered by tests
[warning] 117-126: rocketmq-broker/src/failover/escape_bridge.rs#L117-L126
Added lines #L117 - L126 were not covered by tests
pub fn start(&mut self, message_store: Option<ArcMut<MS>>) { | ||
if self.broker_config.enable_slave_acting_master && self.broker_config.enable_remote_escape | ||
{ | ||
self.escape_bridge_runtime = Some(RocketMQRuntime::new_multi( | ||
num_cpus::get(), | ||
"AsyncEscapeBridgeExecutor", | ||
)); | ||
self.message_store = message_store; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for uninitialized fields
The start
method only initializes fields if enable_slave_acting_master
and enable_remote_escape
are true. However, these fields are later unwrapped without checking if they were initialized, which could lead to runtime panics.
Consider adding proper error handling:
-pub fn start(&mut self, message_store: Option<ArcMut<MS>>) {
+pub fn start(&mut self, message_store: Option<ArcMut<MS>>) -> Result<(), &'static str> {
if self.broker_config.enable_slave_acting_master && self.broker_config.enable_remote_escape
{
self.escape_bridge_runtime = Some(RocketMQRuntime::new_multi(
num_cpus::get(),
"AsyncEscapeBridgeExecutor",
));
self.message_store = message_store;
+ Ok(())
+ } else {
+ Err("Bridge not started: required features not enabled")
}
}
Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 117-126: rocketmq-broker/src/failover/escape_bridge.rs#L117-L126
Added lines #L117 - L126 were not covered by tests
|
||
self.escape_bridge.start(self.message_store.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for escape_bridge start
The start_basic_service
method calls escape_bridge.start()
without handling potential errors. Given that the start method interacts with the message store, it should include proper error handling.
Consider adding error handling:
-self.escape_bridge.start(self.message_store.clone());
+self.escape_bridge
+ .start(self.message_store.clone())
+ .map_err(|e| {
+ warn!("Failed to start escape bridge: {}", e);
+ // Handle the error appropriately
+ })?;
Committable suggestion skipped: line range outside the PR's diff.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 770-771: rocketmq-broker/src/broker_runtime.rs#L770-L771
Added lines #L770 - L771 were not covered by tests
Which Issue(s) This PR Fixes(Closes)
Fixes #1694
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
escape_bridge
field to enhance service integration.EscapeBridge
struct to allow optional initialization of critical components.Improvements
EscapeBridge
struct for greater flexibility.Bug Fixes
broker_out_api
tobroker_outer_api
.