-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #722]🚀Support pull message result handle-4 #728
Conversation
🚀Thanks for your contribution🎉. CodeRabbit(AI) will review your code first🔥 |
WalkthroughThis update introduces a new feature to the RocketMQ Broker, primarily focused on enhancing the handling of pull message results. Key changes include the addition of new imports, updates to the Changes
Sequence Diagram(s)sequenceDiagram
participant BrokerRuntime
participant PullRequestHoldService
participant DefaultMessageStore
participant Listener
BrokerRuntime->>PullRequestHoldService: Initialize with BrokerConfig
PullRequestHoldService->>DefaultMessageStore: Set message_arriving_listener
BrokerRuntime->>DefaultMessageStore: Notify message arrival
DefaultMessageStore->>Listener: Message arrival notification
Assessment against linked issues
Poem
Tip Early access features: enabledWe are currently testing the following features in early access:
Note:
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files selected for processing (12)
- rocketmq-broker/src/broker_runtime.rs (9 hunks)
- rocketmq-broker/src/coldctr/cold_data_pull_request_hold_service.rs (1 hunks)
- rocketmq-broker/src/long_polling.rs (1 hunks)
- rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs (8 hunks)
- rocketmq-broker/src/long_polling/notify_message_arriving_listener.rs (1 hunks)
- rocketmq-broker/src/long_polling/pull_request.rs (1 hunks)
- rocketmq-broker/src/processor/pull_message_processor.rs (6 hunks)
- rocketmq-broker/src/util/hook_utils.rs (1 hunks)
- rocketmq-remoting/src/runtime/server.rs (1 hunks)
- rocketmq-store/src/base.rs (1 hunks)
- rocketmq-store/src/log_file.rs (2 hunks)
- rocketmq-store/src/message_store/default_message_store.rs (7 hunks)
Files skipped from review due to trivial changes (1)
- rocketmq-broker/src/util/hook_utils.rs
Additional comments not posted (29)
rocketmq-broker/src/coldctr/cold_data_pull_request_hold_service.rs (1)
17-17
: Constant Definition:NO_SUSPEND_KEY
The constant
NO_SUSPEND_KEY
is appropriately defined and named. Ensure it is used consistently throughout the codebase.rocketmq-broker/src/long_polling.rs (1)
Line range hint
8-8
: New Module:notify_message_arriving_listener
The new module
notify_message_arriving_listener
is correctly added. Ensure that all references to the oldmessage_arriving_listener
module are updated to use this new module.rocketmq-store/src/base.rs (1)
27-27
: New Module:message_arriving_listener
The new module
message_arriving_listener
is correctly added. Ensure that it is used appropriately for handling message arrival events.rocketmq-broker/src/long_polling/notify_message_arriving_listener.rs (4)
19-19
: Import Path UpdateThe import path for
MessageArrivingListener
has been updated torocketmq_store::base::message_arriving_listener
. Ensure that all references are updated accordingly.
21-21
: New Struct:NotifyMessageArrivingListener
The
NotifyMessageArrivingListener
struct is correctly defined with thepull_request_hold_service
field. Ensure that the field is correctly initialized and used.
Line range hint
23-29
: Method:new
The
new
method correctly initializes theNotifyMessageArrivingListener
struct with thepull_request_hold_service
field.
Line range hint
31-47
: Implementation ofMessageArrivingListener
The implementation of the
MessageArrivingListener
trait forNotifyMessageArrivingListener
is correct. Thearriving
method correctly delegates to thenotify_message_arriving_ext
method ofpull_request_hold_service
.rocketmq-broker/src/long_polling/pull_request.rs (1)
92-94
: LGTM!The new method
connection_handler_context
correctly returns a reference to thectx
field.rocketmq-store/src/log_file.rs (1)
112-112
: LGTM!The new method
notify_message_arrive_if_necessary
is correctly added to theRocketMQMessageStore
trait.rocketmq-broker/src/long_polling/long_polling_service/pull_request_hold_service.rs (5)
Line range hint
40-56
: LGTM!The
PullRequestHoldService::new
method correctly initializes thebroker_config
field.
66-89
: LGTM!The
PullRequestHoldService::start
method correctly implements the polling logic based on thebroker_config
.
108-108
: LGTM!The
check_hold_request
method correctly callsnotify_message_arriving
.
162-162
: LGTM!The
notify_message_arriving
method correctly callsexecute_request_when_wakeup
with the new context parameter.
201-201
: LGTM!The
notify_master_online
method correctly callsexecute_request_when_wakeup
with the new context parameter.rocketmq-broker/src/processor/pull_message_processor.rs (4)
53-53
: New Import:NO_SUSPEND_KEY
.The new import
NO_SUSPEND_KEY
appears to be used in the updated methods.
75-75
: Refactor: UseArc
formessage_store
.The
message_store
field is now wrapped inArc
, improving thread safety and shared ownership.
324-324
: Refactor: Asynchronousprocess_request
method.The
process_request
method has been updated to be asynchronous and now accepts additional parameters includingctx
.
788-818
: New Method: Asynchronousexecute_request_when_wakeup
.The
execute_request_when_wakeup
method now includes asynchronous processing and usesNO_SUSPEND_KEY
.rocketmq-broker/src/broker_runtime.rs (6)
57-58
: New Imports:PullRequestHoldService
andNotifyMessageArrivingListener
.The new imports are used for handling pull requests and notifying message arrivals.
108-109
: New Field:pull_request_hold_service
.The
BrokerRuntime
struct now includes an optionalpull_request_hold_service
field, which is initialized and used within the struct's implementation.
141-141
: Clone Implementation:pull_request_hold_service
.The
pull_request_hold_service
field is correctly cloned in theClone
implementation ofBrokerRuntime
.
215-215
: Initialization:pull_request_hold_service
.The
pull_request_hold_service
field is correctly initialized toNone
in thenew
method.
Line range hint
364-407
: Initialization:pull_request_hold_service
andmessage_arriving_listener
.The
init_processor
method correctly initializespull_request_hold_service
and setsmessage_arriving_listener
.
598-600
: Startpull_request_hold_service
.The
pull_request_hold_service
is correctly started if it is initialized.rocketmq-store/src/message_store/default_message_store.rs (5)
59-59
: Import Addition Approved.The import of
MessageArrivingListener
is necessary for the new functionality.
111-112
: Field Addition Approved.The addition of the
message_arriving_listener
field and its inclusion in theClone
implementation andnew
method is necessary to support the new functionality for handling pull message results.Also applies to: 139-139, 216-216
454-461
: Method Addition Approved.The
set_message_arriving_listener
method allows setting the listener for message arrivals, which is necessary for the new functionality.
975-989
: Method Addition Approved.The
notify_message_arrive_if_necessary
method handles the message arrival notifications, which is necessary for the new functionality.
1015-1017
: Method Addition Approved.The
notify_message_arrive4multi_queue
method is necessary for the new functionality.
pub async fn write(&mut self, cmd: RemotingCommand) { | ||
match self.connection.framed.send(cmd).await { | ||
Ok(_) => {} | ||
Err(error) => { | ||
error!("send response failed: {}", error); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure proper error handling in write
method.
The write
method logs an error if sending the response fails but does not handle the error beyond logging. Consider adding error handling logic to ensure robustness.
pub async fn write(&mut self, cmd: RemotingCommand) -> Result<(), Box<dyn std::error::Error>> {
match self.connection.framed.send(cmd).await {
Ok(_) => Ok(()),
Err(error) => {
error!("send response failed: {}", error);
Err(Box::new(error))
}
}
}
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #728 +/- ##
==========================================
- Coverage 27.15% 26.86% -0.29%
==========================================
Files 263 264 +1
Lines 20755 20844 +89
==========================================
- Hits 5636 5600 -36
- Misses 15119 15244 +125 ☔ View full report in Codecov by Sentry. |
Which Issue(s) This PR Fixes(Closes)
Fixes #722
Brief Description
How Did You Test This Change?
Summary by CodeRabbit
New Features
pull_request_hold_service
to enhance message processing hold capabilities.NO_SUSPEND_KEY
constant for improved message handling control.write
method inConnectionHandlerContextWrapper
for asynchronous command handling.message_arriving_listener
toDefaultMessageStore
for enhanced message arrival notifications.Improvements
execute_request_when_wakeup
function for asynchronous processing.