-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #2161]🚀Implement BatchUnregistrationService💫 #2162
Conversation
WalkthroughThe pull request introduces a new Changes
Assessment against linked issues
Suggested Labels
Suggested Reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
🔊@mxsm 🚀Thanks for your contribution🎉! 💡CodeRabbit(AI) will review your code first🔥! Note 🚨The code review suggestions from CodeRabbit are to be used as a reference only, and the PR submitter can decide whether to make changes based on their own judgment. Ultimately, the project management personnel will conduct the final code review💥. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2162 +/- ##
==========================================
- Coverage 28.56% 28.54% -0.02%
==========================================
Files 497 498 +1
Lines 70961 70992 +31
==========================================
Hits 20267 20267
- Misses 50694 50725 +31 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
rocketmq-namesrv/src/route/batch_unregistration_service.rs (3)
30-41
: Add documentation for the constructor.Consider adding documentation comments explaining:
- The purpose of the service
- The significance of the channel capacity configuration
- The ownership model of the NameServerRuntimeInner
+/// Creates a new BatchUnregistrationService with the specified runtime configuration. +/// +/// # Arguments +/// * `name_server_runtime_inner` - Shared runtime state for the name server +/// +/// The channel capacity is configured based on the name server's unregister_broker_queue_capacity setting. pub(crate) fn new(name_server_runtime_inner: ArcMut<NameServerRuntimeInner>) -> Self {
43-49
: Enhance error handling in submit method.While the current implementation is functional, consider:
- Providing more context in the error message about why the submission failed
- Distinguishing between different error cases (full channel vs closed channel)
pub fn submit(&self, request: UnRegisterBrokerRequestHeader) -> bool { if let Err(e) = self.tx.try_send(request) { - warn!("submit unregister broker request failed: {:?}", e); + warn!( + "Failed to submit unregister request: {}. Reason: {}", + match e { + TrySendError::Full(_) => "channel full - server is busy", + TrySendError::Closed(_) => "service has been shut down", + }, + e + ); return false; } true }
23-66
: Add comprehensive test coverage.The service handles critical broker unregistration operations. Consider adding:
- Unit tests for different scenarios (success, channel full, shutdown)
- Integration tests with RouteInfoManager
- Load tests to verify batch processing behavior
Would you like me to help generate a comprehensive test suite for this service?
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
rocketmq-namesrv/src/route.rs
(1 hunks)rocketmq-namesrv/src/route/batch_unregistration_service.rs
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: build (windows-latest, nightly)
- GitHub Check: build (windows-latest, stable)
- GitHub Check: build (macos-latest, nightly)
- GitHub Check: build (macos-latest, stable)
- GitHub Check: build (ubuntu-latest, nightly)
- GitHub Check: test
- GitHub Check: build (ubuntu-latest, stable)
- GitHub Check: build
- GitHub Check: auto-approve
🔇 Additional comments (3)
rocketmq-namesrv/src/route.rs (1)
18-18
: LGTM! Module declaration is appropriate.The
pub(crate)
visibility modifier correctly restricts the module's accessibility to within the crate.rocketmq-namesrv/src/route/batch_unregistration_service.rs (2)
17-27
: Well-structured implementation with appropriate field visibility.The struct design effectively encapsulates the required components for batch processing using tokio channels.
51-65
:⚠️ Potential issueCritical improvements needed in start() implementation.
Several important concerns need to be addressed:
- No graceful shutdown mechanism
- Hard-coded batch limit
- Missing error handling
- No backpressure mechanism
Consider implementing these improvements:
+const DEFAULT_BATCH_LIMIT: usize = 10; + pub fn start(&mut self) { let mut name_server_runtime_inner = self.name_server_runtime_inner.clone(); let mut rx = self.rx.take().expect("rx is None"); - let limit = 10; + let limit = name_server_runtime_inner + .name_server_config() + .get_unregister_batch_size() + .unwrap_or(DEFAULT_BATCH_LIMIT); + + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + self.shutdown_sender = Some(shutdown_tx); + tokio::spawn(async move { loop { let mut unregistration_requests = Vec::with_capacity(limit); tokio::select! { + _ = &mut shutdown_rx => { + debug!("Batch unregistration service shutting down"); + break; + } _ = rx.recv_many(&mut unregistration_requests,limit) => { - name_server_runtime_inner.route_info_manager_mut().un_register_broker(unregistration_requests); + if let Err(e) = name_server_runtime_inner + .route_info_manager_mut() + .un_register_broker(unregistration_requests) + { + error!("Failed to unregister brokers: {}", e); + // Implement backoff/retry strategy here + tokio::time::sleep(Duration::from_secs(1)).await; + } } } } }); }Additionally:
- Add a corresponding
stop()
method for graceful shutdown- Consider implementing a backoff strategy for error cases
- Add metrics for monitoring batch processing performance
- Consider adding a timeout for batch collection to ensure timely processing even with partial batches
Let's verify the error handling in the route_info_manager:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which Issue(s) This PR Fixes(Closes)
Fixes #2161
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
Chores