-
Notifications
You must be signed in to change notification settings - Fork 111
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE #1029]🚀Support order message consume for client🔥 #1034
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
use std::sync::atomic::AtomicI64; | ||
use std::sync::Arc; | ||
|
||
use rocketmq_client::consumer::default_mq_push_consumer::DefaultMQPushConsumer; | ||
use rocketmq_client::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus; | ||
use rocketmq_client::consumer::listener::consume_orderly_context::ConsumeOrderlyContext; | ||
use rocketmq_client::consumer::listener::consume_orderly_status::ConsumeOrderlyStatus; | ||
use rocketmq_client::consumer::listener::message_listener_orderly::MessageListenerOrderly; | ||
use rocketmq_client::consumer::mq_push_consumer::MQPushConsumer; | ||
use rocketmq_client::Result; | ||
use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere; | ||
use rocketmq_common::common::message::message_ext::MessageExt; | ||
use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; | ||
use rocketmq_rust::rocketmq; | ||
use tracing::info; | ||
|
||
pub const MESSAGE_COUNT: usize = 1; | ||
pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_3"; | ||
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876"; | ||
pub const TOPIC: &str = "TopicTest"; | ||
pub const TAG: &str = "*"; | ||
|
||
#[rocketmq::main] | ||
pub async fn main() -> Result<()> { | ||
//init logger | ||
rocketmq_common::log::init_logger(); | ||
|
||
// create a producer builder with default configuration | ||
let builder = DefaultMQPushConsumer::builder(); | ||
|
||
let mut consumer = builder | ||
.consumer_group(CONSUMER_GROUP.to_string()) | ||
.name_server_addr(DEFAULT_NAMESRVADDR.to_string()) | ||
.message_model(MessageModel::Clustering) | ||
.build(); | ||
consumer.subscribe(TOPIC, TAG)?; | ||
consumer.set_consume_from_where(ConsumeFromWhere::ConsumeFromFirstOffset); | ||
consumer.register_message_listener_orderly(MyMessageListener::new()); | ||
consumer.start().await?; | ||
let _ = tokio::signal::ctrl_c().await; | ||
Ok(()) | ||
} | ||
|
||
pub struct MyMessageListener { | ||
consume_times: Arc<AtomicI64>, | ||
} | ||
|
||
impl MyMessageListener { | ||
pub fn new() -> Self { | ||
Self { | ||
consume_times: Arc::new(AtomicI64::new(0)), | ||
} | ||
} | ||
} | ||
|
||
impl MessageListenerOrderly for MyMessageListener { | ||
fn consume_message( | ||
&self, | ||
msgs: &[&MessageExt], | ||
context: &mut ConsumeOrderlyContext, | ||
) -> Result<ConsumeOrderlyStatus> { | ||
context.set_auto_commit(true); | ||
for msg in msgs { | ||
println!("Receive message: {:?}", msg); | ||
info!("Receive message: {:?}", msg); | ||
} | ||
if self | ||
.consume_times | ||
.load(std::sync::atomic::Ordering::Acquire) | ||
% 2 | ||
== 0 | ||
{ | ||
return Ok(ConsumeOrderlyStatus::Success); | ||
} else if self | ||
.consume_times | ||
.load(std::sync::atomic::Ordering::Acquire) | ||
% 5 | ||
== 0 | ||
{ | ||
context.set_suspend_current_queue_time_millis(3000); | ||
return Ok(ConsumeOrderlyStatus::SuspendCurrentQueueAMoment); | ||
} | ||
Ok(ConsumeOrderlyStatus::Success) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -103,7 +103,7 @@ | |
|
||
async fn process_consume_result( | ||
&mut self, | ||
this: ArcRefCellWrapper<Self>, | ||
this: WeakCellWrapper<Self>, | ||
Check warning on line 106 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L106
|
||
status: ConsumeConcurrentlyStatus, | ||
context: &ConsumeConcurrentlyContext, | ||
consume_request: &mut ConsumeRequest, | ||
|
@@ -138,6 +138,7 @@ | |
if !consume_request | ||
.process_queue | ||
.contains_message(&msg.message_ext_inner) | ||
.await | ||
Check warning on line 141 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L141
|
||
{ | ||
/*info!("Message is not found in its process queue; skip send-back-procedure, topic={}, " | ||
+ "brokerName={}, queueId={}, queueOffset={}", msg.get_topic(), msg.get_broker_name(), | ||
|
@@ -191,15 +192,17 @@ | |
fn submit_consume_request_later( | ||
&self, | ||
msgs: Vec<ArcRefCellWrapper<MessageClientExt>>, | ||
this: ArcRefCellWrapper<Self>, | ||
this: WeakCellWrapper<Self>, | ||
process_queue: Arc<ProcessQueue>, | ||
message_queue: MessageQueue, | ||
) { | ||
self.consume_runtime.get_handle().spawn(async move { | ||
tokio::time::sleep(Duration::from_secs(5)).await; | ||
let this_ = this.clone(); | ||
this.submit_consume_request(this_, msgs, process_queue, message_queue, true) | ||
.await; | ||
if let Some(this) = this.upgrade() { | ||
this.submit_consume_request(this_, msgs, process_queue, message_queue, true) | ||
.await; | ||
} | ||
Check warning on line 205 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L202-L205
|
||
}); | ||
} | ||
|
||
|
@@ -232,19 +235,21 @@ | |
} | ||
|
||
impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService { | ||
fn start(&mut self, mut this: ArcRefCellWrapper<Self>) { | ||
fn start(&mut self, this: WeakCellWrapper<Self>) { | ||
Check warning on line 238 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L238
|
||
self.consume_runtime.get_handle().spawn(async move { | ||
let timeout = this.consumer_config.consume_timeout; | ||
let mut interval = tokio::time::interval(Duration::from_secs(timeout * 60)); | ||
interval.tick().await; | ||
loop { | ||
if let Some(mut this) = this.upgrade() { | ||
let timeout = this.consumer_config.consume_timeout; | ||
let mut interval = tokio::time::interval(Duration::from_secs(timeout * 60)); | ||
Check warning on line 242 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L240-L242
|
||
interval.tick().await; | ||
this.clean_expire_msg().await; | ||
loop { | ||
interval.tick().await; | ||
this.clean_expire_msg().await; | ||
Check warning on line 246 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L245-L246
|
||
} | ||
Comment on lines
+238
to
+247
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Provide a shutdown mechanism for the spawned task in The infinite loop inside the spawned task in Consider implementing a cancellation token or checking a shutdown flag to gracefully exit the loop when shutting down the consumer. Would you like assistance in implementing a shutdown mechanism for this loop? 🧰 Tools🪛 GitHub Check: codecov/patch
|
||
} | ||
}); | ||
} | ||
|
||
fn shutdown(&mut self, await_terminate_millis: u64) { | ||
async fn shutdown(&mut self, await_terminate_millis: u64) { | ||
Check warning on line 252 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L252
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implement the The Please provide an implementation for the Would you like help drafting the implementation for 🧰 Tools🪛 GitHub Check: codecov/patch
|
||
// todo!() | ||
} | ||
|
||
|
@@ -274,7 +279,7 @@ | |
|
||
async fn submit_consume_request( | ||
&self, | ||
this: ArcRefCellWrapper<Self>, | ||
this: WeakCellWrapper<Self>, | ||
Check warning on line 282 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L282
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Upgrade In Add an upgrade check at the beginning of the method: if let Some(this) = this.upgrade() {
// Proceed with using `this`
} else {
// Handle invalid reference
return;
} 🧰 Tools🪛 GitHub Check: codecov/patch
|
||
msgs: Vec<ArcRefCellWrapper<MessageClientExt>>, | ||
process_queue: Arc<ProcessQueue>, | ||
message_queue: MessageQueue, | ||
|
@@ -341,9 +346,7 @@ | |
impl ConsumeRequest { | ||
async fn run( | ||
&mut self, | ||
mut consume_message_concurrently_service: ArcRefCellWrapper< | ||
ConsumeMessageConcurrentlyService, | ||
>, | ||
consume_message_concurrently_service: WeakCellWrapper<ConsumeMessageConcurrentlyService>, | ||
Check warning on line 349 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L349
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Upgrade In the Ensure an upgrade check is performed: if let Some(consume_service) = consume_message_concurrently_service.upgrade() {
// Use `consume_service` safely
} else {
// Handle the case where the service is no longer available
return;
} 🧰 Tools🪛 GitHub Check: codecov/patch
|
||
) { | ||
if self.process_queue.is_dropped() { | ||
info!( | ||
|
@@ -465,9 +468,11 @@ | |
); | ||
} else { | ||
let this = consume_message_concurrently_service.clone(); | ||
consume_message_concurrently_service | ||
.process_consume_result(this, status.unwrap(), &context, self) | ||
.await; | ||
if let Some(mut consume_message_concurrently_service) = this.upgrade() { | ||
consume_message_concurrently_service | ||
.process_consume_result(this, status.unwrap(), &context, self) | ||
.await; | ||
} | ||
Check warning on line 475 in rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs Codecov / codecov/patchrocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs#L471-L475
|
||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upgrade 'this' before use in
process_consume_result
The parameter
this
is now aWeakCellWrapper<Self>
, but there is no upgrade check before it's used withinprocess_consume_result
. This could lead to a panic if the weak reference has been dropped.Please add an upgrade check to ensure
this
is valid before use:🧰 Tools
🪛 GitHub Check: codecov/patch
Increase test coverage for new code changes
The following lines are not covered by tests:
To ensure reliability and prevent future regressions, please add unit tests that cover these new code paths.
Would you like assistance in creating tests for these areas?
Also applies to: 141-141, 202-205, 238-238, 240-242, 245-246, 252-252, 282-282, 349-349, 471-475
🧰 Tools
🪛 GitHub Check: codecov/patch