diff --git a/rocketmq-client/Cargo.toml b/rocketmq-client/Cargo.toml index 8eb949a1..4fda147c 100644 --- a/rocketmq-client/Cargo.toml +++ b/rocketmq-client/Cargo.toml @@ -78,3 +78,7 @@ path = "examples/broadcast/push_consumer.rs" [[example]] name = "ordermessage-producer" path = "examples/ordermessage/ordermessage_producer.rs" + +[[example]] +name = "ordermessage-consumer" +path = "examples/ordermessage/ordermessage_consumer.rs" \ No newline at end of file diff --git a/rocketmq-client/examples/ordermessage/ordermessage_consumer.rs b/rocketmq-client/examples/ordermessage/ordermessage_consumer.rs new file mode 100644 index 00000000..47b5f4ac --- /dev/null +++ b/rocketmq-client/examples/ordermessage/ordermessage_consumer.rs @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::sync::atomic::AtomicI64; +use std::sync::Arc; + +use rocketmq_client::consumer::default_mq_push_consumer::DefaultMQPushConsumer; +use rocketmq_client::consumer::listener::consume_concurrently_status::ConsumeConcurrentlyStatus; +use rocketmq_client::consumer::listener::consume_orderly_context::ConsumeOrderlyContext; +use rocketmq_client::consumer::listener::consume_orderly_status::ConsumeOrderlyStatus; +use rocketmq_client::consumer::listener::message_listener_orderly::MessageListenerOrderly; +use rocketmq_client::consumer::mq_push_consumer::MQPushConsumer; +use rocketmq_client::Result; +use rocketmq_common::common::consumer::consume_from_where::ConsumeFromWhere; +use rocketmq_common::common::message::message_ext::MessageExt; +use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; +use rocketmq_rust::rocketmq; +use tracing::info; + +pub const MESSAGE_COUNT: usize = 1; +pub const CONSUMER_GROUP: &str = "please_rename_unique_group_name_3"; +pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876"; +pub const TOPIC: &str = "TopicTest"; +pub const TAG: &str = "*"; + +#[rocketmq::main] +pub async fn main() -> Result<()> { + //init logger + rocketmq_common::log::init_logger(); + + // create a producer builder with default configuration + let builder = DefaultMQPushConsumer::builder(); + + let mut consumer = builder + .consumer_group(CONSUMER_GROUP.to_string()) + .name_server_addr(DEFAULT_NAMESRVADDR.to_string()) + .message_model(MessageModel::Clustering) + .build(); + consumer.subscribe(TOPIC, TAG)?; + consumer.set_consume_from_where(ConsumeFromWhere::ConsumeFromFirstOffset); + consumer.register_message_listener_orderly(MyMessageListener::new()); + consumer.start().await?; + let _ = tokio::signal::ctrl_c().await; + Ok(()) +} + +pub struct MyMessageListener { + consume_times: Arc, +} + +impl MyMessageListener { + pub fn new() -> Self { + Self { + consume_times: Arc::new(AtomicI64::new(0)), + } + } +} + +impl MessageListenerOrderly for MyMessageListener { + fn consume_message( + &self, + msgs: &[&MessageExt], + context: &mut ConsumeOrderlyContext, + ) -> Result { + context.set_auto_commit(true); + for msg in msgs { + println!("Receive message: {:?}", msg); + info!("Receive message: {:?}", msg); + } + if self + .consume_times + .load(std::sync::atomic::Ordering::Acquire) + % 2 + == 0 + { + return Ok(ConsumeOrderlyStatus::Success); + } else if self + .consume_times + .load(std::sync::atomic::Ordering::Acquire) + % 5 + == 0 + { + context.set_suspend_current_queue_time_millis(3000); + return Ok(ConsumeOrderlyStatus::SuspendCurrentQueueAMoment); + } + Ok(ConsumeOrderlyStatus::Success) + } +} diff --git a/rocketmq-client/src/consumer.rs b/rocketmq-client/src/consumer.rs index aa1e64c2..0de68b9a 100644 --- a/rocketmq-client/src/consumer.rs +++ b/rocketmq-client/src/consumer.rs @@ -20,6 +20,7 @@ pub mod default_mq_push_consumer; pub mod default_mq_push_consumer_builder; pub mod listener; pub mod message_queue_listener; +pub(crate) mod message_queue_lock; pub mod message_selector; pub mod mq_consumer; pub(crate) mod mq_consumer_inner; diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs index 959b81bf..93a44cf3 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_concurrently_service.rs @@ -103,7 +103,7 @@ impl ConsumeMessageConcurrentlyService { async fn process_consume_result( &mut self, - this: ArcRefCellWrapper, + this: WeakCellWrapper, status: ConsumeConcurrentlyStatus, context: &ConsumeConcurrentlyContext, consume_request: &mut ConsumeRequest, @@ -138,6 +138,7 @@ impl ConsumeMessageConcurrentlyService { if !consume_request .process_queue .contains_message(&msg.message_ext_inner) + .await { /*info!("Message is not found in its process queue; skip send-back-procedure, topic={}, " + "brokerName={}, queueId={}, queueOffset={}", msg.get_topic(), msg.get_broker_name(), @@ -191,15 +192,17 @@ impl ConsumeMessageConcurrentlyService { fn submit_consume_request_later( &self, msgs: Vec>, - this: ArcRefCellWrapper, + this: WeakCellWrapper, process_queue: Arc, message_queue: MessageQueue, ) { self.consume_runtime.get_handle().spawn(async move { tokio::time::sleep(Duration::from_secs(5)).await; let this_ = this.clone(); - this.submit_consume_request(this_, msgs, process_queue, message_queue, true) - .await; + if let Some(this) = this.upgrade() { + this.submit_consume_request(this_, msgs, process_queue, message_queue, true) + .await; + } }); } @@ -232,19 +235,21 @@ impl ConsumeMessageConcurrentlyService { } impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService { - fn start(&mut self, mut this: ArcRefCellWrapper) { + fn start(&mut self, this: WeakCellWrapper) { self.consume_runtime.get_handle().spawn(async move { - let timeout = this.consumer_config.consume_timeout; - let mut interval = tokio::time::interval(Duration::from_secs(timeout * 60)); - interval.tick().await; - loop { + if let Some(mut this) = this.upgrade() { + let timeout = this.consumer_config.consume_timeout; + let mut interval = tokio::time::interval(Duration::from_secs(timeout * 60)); interval.tick().await; - this.clean_expire_msg().await; + loop { + interval.tick().await; + this.clean_expire_msg().await; + } } }); } - fn shutdown(&mut self, await_terminate_millis: u64) { + async fn shutdown(&mut self, await_terminate_millis: u64) { // todo!() } @@ -274,7 +279,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessageConcurrentlyService { async fn submit_consume_request( &self, - this: ArcRefCellWrapper, + this: WeakCellWrapper, msgs: Vec>, process_queue: Arc, message_queue: MessageQueue, @@ -341,9 +346,7 @@ struct ConsumeRequest { impl ConsumeRequest { async fn run( &mut self, - mut consume_message_concurrently_service: ArcRefCellWrapper< - ConsumeMessageConcurrentlyService, - >, + consume_message_concurrently_service: WeakCellWrapper, ) { if self.process_queue.is_dropped() { info!( @@ -465,9 +468,11 @@ impl ConsumeRequest { ); } else { let this = consume_message_concurrently_service.clone(); - consume_message_concurrently_service - .process_consume_result(this, status.unwrap(), &context, self) - .await; + if let Some(mut consume_message_concurrently_service) = this.upgrade() { + consume_message_concurrently_service + .process_consume_result(this, status.unwrap(), &context, self) + .await; + } } } } diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs index 5da430d9..f79f498d 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_orderly_service.rs @@ -14,27 +14,411 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use std::sync::atomic::AtomicBool; use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; +use once_cell::sync::Lazy; use rocketmq_common::common::message::message_client_ext::MessageClientExt; use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::common::message::message_single::Message; +use rocketmq_common::common::message::MessageConst; +use rocketmq_common::common::message::MessageTrait; +use rocketmq_common::common::mix_all; use rocketmq_common::ArcRefCellWrapper; +use rocketmq_common::MessageAccessor::MessageAccessor; +use rocketmq_common::WeakCellWrapper; use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; +use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; +use rocketmq_runtime::RocketMQRuntime; +use rocketmq_rust::RocketMQTokioMutex; +use tracing::warn; +use crate::base::client_config::ClientConfig; use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait; +use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl; use crate::consumer::consumer_impl::pop_process_queue::PopProcessQueue; use crate::consumer::consumer_impl::process_queue::ProcessQueue; +use crate::consumer::consumer_impl::process_queue::REBALANCE_LOCK_INTERVAL; +use crate::consumer::default_mq_push_consumer::ConsumerConfig; +use crate::consumer::listener::consume_orderly_context::ConsumeOrderlyContext; +use crate::consumer::listener::consume_orderly_status::ConsumeOrderlyStatus; +use crate::consumer::listener::consume_return_type::ConsumeReturnType; +use crate::consumer::listener::message_listener_orderly::ArcBoxMessageListenerOrderly; +use crate::consumer::message_queue_lock::MessageQueueLock; +use crate::consumer::mq_consumer_inner::MQConsumerInnerLocal; +use crate::hook::consume_message_context::ConsumeMessageContext; +use crate::producer::mq_producer::MQProducer; -pub struct ConsumeMessageOrderlyService; +static MAX_TIME_CONSUME_CONTINUOUSLY: Lazy = Lazy::new(|| { + std::env::var("rocketmq.client.maxTimeConsumeContinuously") + .unwrap_or("60000".to_string()) + .parse() + .unwrap_or(60000) +}); + +pub struct ConsumeMessageOrderlyService { + pub(crate) default_mqpush_consumer_impl: Option>, + pub(crate) client_config: ArcRefCellWrapper, + pub(crate) consumer_config: ArcRefCellWrapper, + pub(crate) consumer_group: Arc, + pub(crate) message_listener: ArcBoxMessageListenerOrderly, + pub(crate) consume_runtime: RocketMQRuntime, + pub(crate) stopped: AtomicBool, + pub(crate) global_lock: Arc>, + pub(crate) message_queue_lock: MessageQueueLock, +} + +impl ConsumeMessageOrderlyService { + pub fn new( + client_config: ArcRefCellWrapper, + consumer_config: ArcRefCellWrapper, + consumer_group: String, + message_listener: ArcBoxMessageListenerOrderly, + default_mqpush_consumer_impl: Option>, + ) -> Self { + let consume_thread = consumer_config.consume_thread_max; + let consumer_group_tag = format!("{}_{}", "ConsumeMessageThread_", consumer_group); + Self { + default_mqpush_consumer_impl, + client_config, + consumer_config, + consumer_group: Arc::new(consumer_group), + message_listener, + consume_runtime: RocketMQRuntime::new_multi( + consume_thread as usize, + consumer_group_tag.as_str(), + ), + stopped: AtomicBool::new(false), + global_lock: Arc::new(Default::default()), + message_queue_lock: Default::default(), + } + } + + pub async fn lock_mqperiodically(&mut self) { + let lock = self.global_lock.lock().await; + if self.stopped.load(std::sync::atomic::Ordering::Acquire) { + return; + } + if let Some(mut default_mqpush_consumer_impl) = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade() + { + default_mqpush_consumer_impl + .rebalance_impl + .rebalance_impl_inner + .lock_all() + .await; + } + drop(lock); + } + + pub async fn unlock_all_mq(&mut self) { + let lock = self.global_lock.lock().await; + if let Some(mut default_mqpush_consumer_impl) = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade() + { + default_mqpush_consumer_impl + .rebalance_impl + .rebalance_impl_inner + .unlock_all(false) + .await; + } + drop(lock); + } + + pub async fn try_lock_later_and_reconsume( + &mut self, + consume_message_orderly_service: WeakCellWrapper, + message_queue: &MessageQueue, + process_queue: Arc, + delay_mills: u64, + ) { + let consume_message_orderly_service_cloned = consume_message_orderly_service.clone(); + let message_queue = message_queue.clone(); + self.consume_runtime.get_handle().spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(delay_mills)).await; + if let Some(mut consume_message_orderly_service) = + consume_message_orderly_service.upgrade() + { + if consume_message_orderly_service + .lock_one_mq(&message_queue) + .await + { + consume_message_orderly_service.submit_consume_request_later( + process_queue, + message_queue.clone(), + 10, + consume_message_orderly_service_cloned, + ); + } else { + consume_message_orderly_service.submit_consume_request_later( + process_queue, + message_queue.clone(), + 3_000, + consume_message_orderly_service_cloned, + ); + } + } + }); + } + + pub async fn lock_one_mq(&self, message_queue: &MessageQueue) -> bool { + if self.stopped.load(std::sync::atomic::Ordering::Acquire) { + return false; + } + if let Some(mut default_mqpush_consumer_impl) = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade() + { + return default_mqpush_consumer_impl + .rebalance_impl + .rebalance_impl_inner + .lock(message_queue) + .await; + } + false + } + + fn submit_consume_request_later( + &mut self, + process_queue: Arc, + message_queue: MessageQueue, + suspend_time_millis: i64, + this: WeakCellWrapper, + ) { + let mut time_millis = suspend_time_millis; + if time_millis == -1 { + time_millis = if let Some(default_mqpush_consumer_impl) = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade() + { + default_mqpush_consumer_impl + .consumer_config + .suspend_current_queue_time_millis as i64 + } else { + 10 + }; + } + + time_millis = time_millis.clamp(10, 30000); + + let delay = Duration::from_millis(time_millis as u64); + + tokio::spawn(async move { + tokio::time::sleep(delay).await; + // Call the submit_consume_request function here + // ConsumeMessageOrderlyService::submit_consume_request(None, process_queue_clone, + // message_queue_clone, true).await; + let this_ = this.clone(); + if let Some(this) = this.upgrade() { + this.submit_consume_request(this_, vec![], process_queue, message_queue, true) + .await; + } + }); + } + + #[inline] + fn get_max_reconsume_times(&self) -> i32 { + if self.consumer_config.max_reconsume_times == -1 { + i32::MAX + } else { + self.consumer_config.max_reconsume_times + } + } + + pub async fn send_message_back(&mut self, msg: &MessageExt) -> bool { + let mut new_msg = Message::new( + mix_all::get_retry_topic(self.consumer_group.as_str()), + msg.get_body().unwrap(), + ); + MessageAccessor::set_properties(&mut new_msg, msg.get_properties().clone()); + let origin_msg_id = + MessageAccessor::get_origin_message_id(msg).unwrap_or(msg.msg_id.clone()); + MessageAccessor::set_origin_message_id(&mut new_msg, origin_msg_id.as_str()); + new_msg.set_flag(msg.get_flag()); + MessageAccessor::put_property( + &mut new_msg, + MessageConst::PROPERTY_RETRY_TOPIC, + msg.get_topic(), + ); + MessageAccessor::set_reconsume_time( + &mut new_msg, + (msg.reconsume_times() + 1).to_string().as_str(), + ); + MessageAccessor::set_max_reconsume_times( + &mut new_msg, + self.get_max_reconsume_times().to_string().as_str(), + ); + MessageAccessor::clear_property(&mut new_msg, MessageConst::PROPERTY_TRANSACTION_PREPARED); + new_msg.set_delay_time_level(3 + msg.reconsume_times()); + if let Some(mut default_mqpush_consumer_impl) = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade() + { + let result = default_mqpush_consumer_impl + .client_instance + .as_mut() + .unwrap() + .default_producer + .send(new_msg) + .await; + result.is_ok() + } else { + false + } + } + + async fn check_reconsume_times( + &mut self, + msgs: &mut [ArcRefCellWrapper], + ) -> bool { + let mut suspend = false; + if !msgs.is_empty() { + for msg in msgs { + let reconsume_times = msg.message_ext_inner.reconsume_times; + if reconsume_times >= self.get_max_reconsume_times() { + MessageAccessor::set_reconsume_time( + &mut msg.message_ext_inner, + reconsume_times.to_string().as_ref(), + ); + if !self.send_message_back(&msg.message_ext_inner).await { + suspend = true; + msg.message_ext_inner.reconsume_times = reconsume_times + 1; + } + } else { + suspend = true; + msg.message_ext_inner.reconsume_times = reconsume_times + 1; + } + } + } + suspend + } + + #[allow(deprecated)] + async fn process_consume_result( + &mut self, + mut msgs: Vec>, + this: WeakCellWrapper, + status: ConsumeOrderlyStatus, + context: &ConsumeOrderlyContext, + consume_request: &mut ConsumeRequest, + ) -> bool { + let (continue_consume, commit_offset) = if context.is_auto_commit() { + match status { + ConsumeOrderlyStatus::Success + | ConsumeOrderlyStatus::Rollback + | ConsumeOrderlyStatus::Commit => { + (true, consume_request.process_queue.commit().await) + } + ConsumeOrderlyStatus::SuspendCurrentQueueAMoment => { + if self.check_reconsume_times(&mut msgs).await { + consume_request + .process_queue + .make_message_to_consume_again(&msgs) + .await; + self.submit_consume_request_later( + consume_request.process_queue.clone(), + consume_request.message_queue.clone(), + context.get_suspend_current_queue_time_millis(), + this, + ); + (false, -1) + } else { + (true, consume_request.process_queue.commit().await) + } + } + } + } else { + match status { + ConsumeOrderlyStatus::Success => (true, -1), + ConsumeOrderlyStatus::Rollback => { + (true, consume_request.process_queue.commit().await) + } + ConsumeOrderlyStatus::Commit => { + consume_request.process_queue.rollback().await; + self.submit_consume_request_later( + consume_request.process_queue.clone(), + consume_request.message_queue.clone(), + context.get_suspend_current_queue_time_millis(), + this, + ); + (false, -1) + } + ConsumeOrderlyStatus::SuspendCurrentQueueAMoment => { + if self.check_reconsume_times(&mut msgs).await { + consume_request + .process_queue + .make_message_to_consume_again(&msgs) + .await; + self.submit_consume_request_later( + consume_request.process_queue.clone(), + consume_request.message_queue.clone(), + context.get_suspend_current_queue_time_millis(), + this, + ); + (false, -1) + } else { + (true, -1) + } + } + } + }; + + if commit_offset >= 0 && consume_request.process_queue.is_dropped() { + if let Some(mut default_mqpush_consumer_impl) = self + .default_mqpush_consumer_impl + .as_ref() + .unwrap() + .upgrade() + { + default_mqpush_consumer_impl + .offset_store + .as_mut() + .unwrap() + .update_offset(&consume_request.message_queue, commit_offset, false) + .await; + } + } + continue_consume + } +} impl ConsumeMessageServiceTrait for ConsumeMessageOrderlyService { - fn start(&mut self, this: ArcRefCellWrapper) { - todo!() + fn start(&mut self, this: WeakCellWrapper) { + if MessageModel::Clustering == self.consumer_config.message_model { + self.consume_runtime.get_handle().spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(1_000)).await; + loop { + if let Some(mut this) = this.upgrade() { + this.lock_mqperiodically().await; + tokio::time::sleep(tokio::time::Duration::from_millis( + *REBALANCE_LOCK_INTERVAL, + )) + .await; + } + } + }); + } } - fn shutdown(&mut self, await_terminate_millis: u64) { - unimplemented!("shutdown") + async fn shutdown(&mut self, await_terminate_millis: u64) { + if MessageModel::Clustering == self.consumer_config.message_model { + self.unlock_all_mq().await; + } } fn update_core_pool_size(&self, core_pool_size: usize) { @@ -63,13 +447,24 @@ impl ConsumeMessageServiceTrait for ConsumeMessageOrderlyService { async fn submit_consume_request( &self, - this: ArcRefCellWrapper, + this: WeakCellWrapper, msgs: Vec>, process_queue: Arc, message_queue: MessageQueue, dispatch_to_consume: bool, ) { - todo!() + if !dispatch_to_consume { + return; + } + let mut consume_request = ConsumeRequest { + process_queue, + message_queue, + default_mqpush_consumer_impl: self.default_mqpush_consumer_impl.clone(), + consumer_group: self.consumer_group.clone(), + }; + self.consume_runtime.get_handle().spawn(async move { + consume_request.run(this).await; + }); } async fn submit_pop_consume_request( @@ -81,3 +476,248 @@ impl ConsumeMessageServiceTrait for ConsumeMessageOrderlyService { todo!() } } + +struct ConsumeRequest { + process_queue: Arc, + message_queue: MessageQueue, + default_mqpush_consumer_impl: Option>, + consumer_group: Arc, +} + +impl ConsumeRequest { + #[allow(deprecated)] + async fn run( + &mut self, + consume_message_orderly_service: WeakCellWrapper, + ) { + if self.process_queue.is_dropped() { + warn!( + "run, the message queue not be able to consume, because it's dropped. {}", + self.message_queue + ); + return; + } + let consume_message_orderly_service_op = consume_message_orderly_service.upgrade(); + if consume_message_orderly_service_op.is_none() { + warn!("run, consume_message_concurrently_service is none."); + return; + } + let mut consume_message_orderly_service_inner = consume_message_orderly_service_op.unwrap(); + let lock = consume_message_orderly_service_inner + .message_queue_lock + .fetch_lock_object(&self.message_queue) + .await; + let locked = lock.lock().await; + let default_mqpush_consumer_impl_op = self + .default_mqpush_consumer_impl + .as_mut() + .unwrap() + .upgrade(); + if default_mqpush_consumer_impl_op.is_none() { + warn!("run, default_mqpush_consumer_impl is none."); + return; + } + let mut default_mqpush_consumer_impl = default_mqpush_consumer_impl_op.unwrap(); + if MessageModel::Broadcasting == default_mqpush_consumer_impl.message_model() + || self.process_queue.is_locked() && !self.process_queue.is_lock_expired() + { + let begin_time = Instant::now(); + loop { + if self.process_queue.is_dropped() { + warn!( + "the message queue not be able to consume, because it's dropped. {}", + self.message_queue + ); + break; + } + if MessageModel::Clustering == default_mqpush_consumer_impl.message_model() + && !self.process_queue.is_locked() + { + warn!( + "the message queue not be able to consume, because it's not locked. {}", + self.message_queue + ); + consume_message_orderly_service_inner + .try_lock_later_and_reconsume( + consume_message_orderly_service.clone(), + &self.message_queue, + self.process_queue.clone(), + 10, + ) + .await; + break; + } + + if MessageModel::Clustering == default_mqpush_consumer_impl.message_model() + && self.process_queue.is_lock_expired() + { + warn!( + "the message queue lock expired, so consume later {}", + self.message_queue + ); + consume_message_orderly_service_inner + .try_lock_later_and_reconsume( + consume_message_orderly_service.clone(), + &self.message_queue, + self.process_queue.clone(), + 10, + ) + .await; + break; + } + let interval = begin_time.elapsed().as_millis() as u64; + if interval > *MAX_TIME_CONSUME_CONTINUOUSLY { + consume_message_orderly_service_inner + .try_lock_later_and_reconsume( + consume_message_orderly_service.clone(), + &self.message_queue, + self.process_queue.clone(), + 10, + ) + .await; + break; + } + let consume_batch_size = consume_message_orderly_service_inner + .consumer_config + .consume_message_batch_max_size; + let mut msgs = self.process_queue.take_messages(consume_batch_size).await; + default_mqpush_consumer_impl.reset_retry_and_namespace( + &mut msgs, + consume_message_orderly_service_inner + .consumer_group + .as_ref(), + ); + if msgs.is_empty() { + break; + } + let mut context = ConsumeOrderlyContext::new(self.message_queue.clone()); + let mut consume_message_context = None; + let mut status = None; + if default_mqpush_consumer_impl.has_hook() { + let queue = self.message_queue.clone(); + consume_message_context = Some(ConsumeMessageContext { + consumer_group: self.consumer_group.as_ref().clone(), + msg_list: &msgs, + mq: Some(queue), + success: false, + status: "".to_string(), + mq_trace_context: None, + props: Default::default(), + namespace: default_mqpush_consumer_impl + .client_config + .get_namespace() + .unwrap_or("".to_string()), + access_channel: Default::default(), + }); + default_mqpush_consumer_impl.execute_hook_before(&mut consume_message_context); + } + let begin_timestamp = Instant::now(); + let mut return_type = ConsumeReturnType::Success; + let mut has_exception = false; + let consume_lock = self.process_queue.consume_lock.write().await; + if self.process_queue.is_dropped() { + warn!( + "consumeMessage, the message queue not be able to consume, because it's \ + dropped. {}", + self.message_queue + ); + break; + } + let vec = msgs + .iter() + .map(|msg| &msg.message_ext_inner) + .collect::>(); + + match consume_message_orderly_service_inner + .message_listener + .consume_message(&vec, &mut context) + { + Ok(value) => { + status = Some(value); + } + Err(_) => { + has_exception = true; + } + } + drop(consume_lock); + if status.is_none() + || *status.as_ref().unwrap() == ConsumeOrderlyStatus::Rollback + || *status.as_ref().unwrap() == ConsumeOrderlyStatus::SuspendCurrentQueueAMoment + { + warn!( + "consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", + self.consumer_group.as_ref(), + msgs.len(), + self.message_queue, + ); + } + let consume_rt = begin_timestamp.elapsed().as_millis() as u64; + if status.is_none() { + if has_exception { + return_type = ConsumeReturnType::Exception; + } else { + return_type = ConsumeReturnType::ReturnNull; + } + } else if consume_rt + >= default_mqpush_consumer_impl.consumer_config.consume_timeout * 60 * 1000 + { + return_type = ConsumeReturnType::TimeOut; + } else if *status.as_ref().unwrap() + == ConsumeOrderlyStatus::SuspendCurrentQueueAMoment + { + return_type = ConsumeReturnType::Failed; + } else if *status.as_ref().unwrap() == ConsumeOrderlyStatus::Success { + return_type = ConsumeReturnType::Success; + } + if default_mqpush_consumer_impl.has_hook() { + consume_message_context.as_mut().unwrap().props.insert( + mix_all::CONSUME_CONTEXT_TYPE.to_string(), + return_type.to_string(), + ); + } + if status.is_none() { + status = Some(ConsumeOrderlyStatus::SuspendCurrentQueueAMoment); + } + if default_mqpush_consumer_impl.has_hook() { + let status = *status.as_ref().unwrap(); + consume_message_context.as_mut().unwrap().success = status + == ConsumeOrderlyStatus::Success + || status == ConsumeOrderlyStatus::Commit; + consume_message_context.as_mut().unwrap().status = status.to_string(); + default_mqpush_consumer_impl.execute_hook_after(&mut consume_message_context); + } + let continue_consume = consume_message_orderly_service_inner + .process_consume_result( + msgs, + consume_message_orderly_service.clone(), + status.unwrap(), + &context, + self, + ) + .await; + if !continue_consume { + break; + } + } + } else { + if self.process_queue.is_dropped() { + warn!( + "the message queue not be able to consume, because it's dropped. {}", + self.message_queue + ); + return; + } + let consume_message_orderly_service_weak = + ArcRefCellWrapper::downgrade(&consume_message_orderly_service_inner); + consume_message_orderly_service_inner + .try_lock_later_and_reconsume( + consume_message_orderly_service_weak, + &self.message_queue, + self.process_queue.clone(), + 100, + ) + .await; + } + drop(locked); + } +} diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs index 62501f92..c3cde044 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_concurrently_service.rs @@ -57,11 +57,11 @@ impl ConsumeMessagePopConcurrentlyService { } impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService { - fn start(&mut self, this: ArcRefCellWrapper) { + fn start(&mut self, this: WeakCellWrapper) { //todo!() } - fn shutdown(&mut self, await_terminate_millis: u64) { + async fn shutdown(&mut self, await_terminate_millis: u64) { todo!() } @@ -91,7 +91,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopConcurrentlyService { async fn submit_consume_request( &self, - this: ArcRefCellWrapper, + this: WeakCellWrapper, msgs: Vec>, process_queue: Arc, message_queue: MessageQueue, diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs index 6691aa6a..d3ab931e 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_pop_orderly_service.rs @@ -20,6 +20,7 @@ use rocketmq_common::common::message::message_client_ext::MessageClientExt; use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::ArcRefCellWrapper; +use rocketmq_common::WeakCellWrapper; use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait; @@ -29,11 +30,9 @@ use crate::consumer::consumer_impl::process_queue::ProcessQueue; pub struct ConsumeMessagePopOrderlyService; impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService { - fn start(&mut self, this: ArcRefCellWrapper) { - todo!() - } + fn start(&mut self, this: WeakCellWrapper) {} - fn shutdown(&mut self, await_terminate_millis: u64) { + async fn shutdown(&mut self, await_terminate_millis: u64) { todo!() } @@ -63,7 +62,7 @@ impl ConsumeMessageServiceTrait for ConsumeMessagePopOrderlyService { async fn submit_consume_request( &self, - this: ArcRefCellWrapper, + this: WeakCellWrapper, msgs: Vec>, process_queue: Arc, message_queue: MessageQueue, diff --git a/rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs b/rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs index 3b5980cb..eebc017d 100644 --- a/rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs +++ b/rocketmq-client/src/consumer/consumer_impl/consume_message_service.rs @@ -20,32 +20,224 @@ use rocketmq_common::common::message::message_client_ext::MessageClientExt; use rocketmq_common::common::message::message_ext::MessageExt; use rocketmq_common::common::message::message_queue::MessageQueue; use rocketmq_common::ArcRefCellWrapper; +use rocketmq_common::WeakCellWrapper; use rocketmq_remoting::protocol::body::consume_message_directly_result::ConsumeMessageDirectlyResult; use crate::consumer::consumer_impl::pop_process_queue::PopProcessQueue; use crate::consumer::consumer_impl::process_queue::ProcessQueue; -pub struct ConsumeMessageConcurrentlyServiceGeneral +pub struct ConsumeMessageServiceGeneral { + consume_message_concurrently_service: Option>, + consume_message_orderly_service: Option>, +} +impl ConsumeMessageServiceGeneral { + pub fn new( + consume_message_concurrently_service: Option>, + consume_message_orderly_service: Option>, + ) -> Self { + Self { + consume_message_concurrently_service, + consume_message_orderly_service, + } + } + + pub fn get_consume_message_concurrently_service_weak(&self) -> WeakCellWrapper { + ArcRefCellWrapper::downgrade(self.consume_message_concurrently_service.as_ref().unwrap()) + } +} + +impl ConsumeMessageServiceGeneral where T: ConsumeMessageServiceTrait, K: ConsumeMessageServiceTrait, { - pub consume_message_concurrently_service: ArcRefCellWrapper, - pub consume_message_pop_concurrently_service: ArcRefCellWrapper, + pub fn start(&mut self) { + if let Some(consume_message_concurrently_service) = + &mut self.consume_message_concurrently_service + { + let consume_message_concurrently_service_weak = + ArcRefCellWrapper::downgrade(consume_message_concurrently_service); + consume_message_concurrently_service.start(consume_message_concurrently_service_weak); + } + + if let Some(consume_message_orderly_service) = &mut self.consume_message_orderly_service { + let consume_message_pop_concurrently_service_weak = + ArcRefCellWrapper::downgrade(consume_message_orderly_service); + consume_message_orderly_service.start(consume_message_pop_concurrently_service_weak); + } + } + + pub async fn shutdown(&mut self, await_terminate_millis: u64) { + todo!() + } + + pub fn update_core_pool_size(&self, core_pool_size: usize) { + todo!() + } + + pub fn inc_core_pool_size(&self) { + todo!() + } + + pub fn dec_core_pool_size(&self) { + todo!() + } + + pub fn get_core_pool_size(&self) -> usize { + todo!() + } + + pub async fn consume_message_directly( + &self, + msg: &MessageExt, + broker_name: &str, + ) -> ConsumeMessageDirectlyResult { + todo!() + } + + pub async fn submit_consume_request( + &self, + msgs: Vec>, + process_queue: Arc, + message_queue: MessageQueue, + dispatch_to_consume: bool, + ) { + if let Some(consume_message_concurrently_service) = + &self.consume_message_concurrently_service + { + consume_message_concurrently_service + .submit_consume_request( + ArcRefCellWrapper::downgrade(consume_message_concurrently_service), + msgs, + process_queue, + message_queue, + dispatch_to_consume, + ) + .await; + } else if let Some(consume_message_orderly_service) = &self.consume_message_orderly_service + { + consume_message_orderly_service + .submit_consume_request( + ArcRefCellWrapper::downgrade(consume_message_orderly_service), + msgs, + process_queue, + message_queue, + dispatch_to_consume, + ) + .await; + } + } + + pub async fn submit_pop_consume_request( + &self, + msgs: Vec, + process_queue: &PopProcessQueue, + message_queue: &MessageQueue, + ) { + unimplemented!("ConsumeMessageServiceGeneral not support submit_pop_consume_request") + } + + pub fn get_consume_message_concurrently_service(&self) -> ArcRefCellWrapper { + self.consume_message_concurrently_service + .as_ref() + .unwrap() + .clone() + } +} + +pub struct ConsumeMessagePopServiceGeneral { + consume_message_pop_concurrently_service: Option>, + consume_message_pop_orderly_service: Option>, +} + +impl ConsumeMessagePopServiceGeneral { + pub fn new( + consume_message_pop_concurrently_service: Option>, + consume_message_pop_orderly_service: Option>, + ) -> Self { + Self { + consume_message_pop_concurrently_service, + consume_message_pop_orderly_service, + } + } } -pub struct ConsumeMessageOrderlyServiceGeneral + +impl ConsumeMessagePopServiceGeneral where T: ConsumeMessageServiceTrait, K: ConsumeMessageServiceTrait, { - pub consume_message_orderly_service: ArcRefCellWrapper, - pub consume_message_pop_orderly_service: ArcRefCellWrapper, + pub fn start(&mut self) { + if let Some(consume_message_pop_concurrently_service) = + &mut self.consume_message_pop_concurrently_service + { + let consume_message_pop_concurrently_service_weak = + ArcRefCellWrapper::downgrade(consume_message_pop_concurrently_service); + consume_message_pop_concurrently_service + .start(consume_message_pop_concurrently_service_weak); + } + + if let Some(consume_message_pop_orderly_service) = + &mut self.consume_message_pop_orderly_service + { + let consume_message_pop_orderly_service_weak = + ArcRefCellWrapper::downgrade(consume_message_pop_orderly_service); + consume_message_pop_orderly_service.start(consume_message_pop_orderly_service_weak); + } + } + + pub async fn shutdown(&mut self, await_terminate_millis: u64) { + todo!() + } + + fn update_core_pool_size(&self, core_pool_size: usize) { + todo!() + } + + fn inc_core_pool_size(&self) { + todo!() + } + + fn dec_core_pool_size(&self) { + todo!() + } + + fn get_core_pool_size(&self) -> usize { + todo!() + } + + async fn consume_message_directly( + &self, + msg: &MessageExt, + broker_name: &str, + ) -> ConsumeMessageDirectlyResult { + todo!() + } + + pub async fn submit_consume_request( + &self, + msgs: Vec>, + process_queue: Arc, + message_queue: MessageQueue, + dispatch_to_consume: bool, + ) { + unimplemented!("submit_consume_request") + } + + async fn submit_pop_consume_request( + &self, + msgs: Vec, + process_queue: &PopProcessQueue, + message_queue: &MessageQueue, + ) { + todo!() + } } pub trait ConsumeMessageServiceTrait { - fn start(&mut self, this: ArcRefCellWrapper); + fn start(&mut self, this: WeakCellWrapper); - fn shutdown(&mut self, await_terminate_millis: u64); + async fn shutdown(&mut self, await_terminate_millis: u64); fn update_core_pool_size(&self, core_pool_size: usize); @@ -63,7 +255,7 @@ pub trait ConsumeMessageServiceTrait { async fn submit_consume_request( &self, - this: ArcRefCellWrapper, + this: WeakCellWrapper, msgs: Vec>, process_queue: Arc, message_queue: MessageQueue, diff --git a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs index c32dcc02..9305352c 100644 --- a/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/default_mq_push_consumer_impl.rs @@ -58,9 +58,8 @@ use crate::consumer::consumer_impl::consume_message_concurrently_service::Consum use crate::consumer::consumer_impl::consume_message_orderly_service::ConsumeMessageOrderlyService; use crate::consumer::consumer_impl::consume_message_pop_concurrently_service::ConsumeMessagePopConcurrentlyService; use crate::consumer::consumer_impl::consume_message_pop_orderly_service::ConsumeMessagePopOrderlyService; -use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageConcurrentlyServiceGeneral; -use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageOrderlyServiceGeneral; -use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait; +use crate::consumer::consumer_impl::consume_message_service::ConsumeMessagePopServiceGeneral; +use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceGeneral; use crate::consumer::consumer_impl::pop_request::PopRequest; use crate::consumer::consumer_impl::pull_api_wrapper::PullAPIWrapper; use crate::consumer::consumer_impl::pull_request::PullRequest; @@ -106,24 +105,24 @@ pub struct DefaultMQPushConsumerImpl { consume_message_hook_list: Vec>>, rpc_hook: Option>>, service_state: ArcRefCellWrapper, - client_instance: Option>, + pub(crate) client_instance: Option>, pub(crate) pull_api_wrapper: Option>, pause: Arc, consume_orderly: bool, message_listener: Option>, pub(crate) offset_store: Option>, - pub(crate) consume_message_concurrently_service: Option< + pub(crate) consume_message_service: Option< ArcRefCellWrapper< - ConsumeMessageConcurrentlyServiceGeneral< + ConsumeMessageServiceGeneral< ConsumeMessageConcurrentlyService, - ConsumeMessagePopConcurrentlyService, + ConsumeMessageOrderlyService, >, >, >, - consume_message_orderly_service: Option< + consume_message_pop_service: Option< ArcRefCellWrapper< - ConsumeMessageOrderlyServiceGeneral< - ConsumeMessageOrderlyService, + ConsumeMessagePopServiceGeneral< + ConsumeMessagePopConcurrentlyService, ConsumeMessagePopOrderlyService, >, >, @@ -159,8 +158,8 @@ impl DefaultMQPushConsumerImpl { consume_orderly: false, message_listener: None, offset_store: None, - consume_message_concurrently_service: None, - consume_message_orderly_service: None, + consume_message_service: None, + consume_message_pop_service: None, queue_flow_control_times: 0, queue_max_span_flow_control_times: 0, pop_delay_level: Arc::new([ @@ -180,11 +179,9 @@ impl DefaultMQPushConsumerImpl { self.rebalance_impl .set_default_mqpush_consumer_impl(default_mqpush_consumer_impl.clone()); self.default_mqpush_consumer_impl = Some(default_mqpush_consumer_impl.clone()); - if let Some(ref mut consume_message_concurrently_service) = - self.consume_message_concurrently_service - { + if let Some(ref mut consume_message_concurrently_service) = self.consume_message_service { consume_message_concurrently_service - .consume_message_concurrently_service + .get_consume_message_concurrently_service() .default_mqpush_consumer_impl = Some(default_mqpush_consumer_impl); } } @@ -270,77 +267,72 @@ impl DefaultMQPushConsumerImpl { .clone() .unwrap(); self.consume_orderly = false; - self.consume_message_concurrently_service = Some(ArcRefCellWrapper::new( - ConsumeMessageConcurrentlyServiceGeneral { - consume_message_concurrently_service: ArcRefCellWrapper::new( - ConsumeMessageConcurrentlyService::new( - self.client_config.clone(), - self.consumer_config.clone(), - self.consumer_config.consumer_group.clone(), - listener.clone().expect("listener is None"), - self.default_mqpush_consumer_impl.clone(), - ), - ), - - consume_message_pop_concurrently_service: ArcRefCellWrapper::new( - ConsumeMessagePopConcurrentlyService::new( - self.client_config.clone(), - self.consumer_config.clone(), - self.consumer_config.consumer_group.clone(), - listener.expect("listener is None"), - ), - ), - }, + let consume_message_concurrently_service = + ArcRefCellWrapper::new(ConsumeMessageConcurrentlyService::new( + self.client_config.clone(), + self.consumer_config.clone(), + self.consumer_config.consumer_group.clone(), + listener.clone().expect("listener is None"), + self.default_mqpush_consumer_impl.clone(), + )); + self.consume_message_service = + Some(ArcRefCellWrapper::new(ConsumeMessageServiceGeneral::new( + Some(consume_message_concurrently_service), + None, + ))); + let consume_message_pop_concurrently_service = + ArcRefCellWrapper::new(ConsumeMessagePopConcurrentlyService::new( + self.client_config.clone(), + self.consumer_config.clone(), + self.consumer_config.consumer_group.clone(), + listener.expect("listener is None"), + )); + + self.consume_message_pop_service = Some(ArcRefCellWrapper::new( + ConsumeMessagePopServiceGeneral::new( + Some(consume_message_pop_concurrently_service), + None, + ), )); } else if message_listener.message_listener_orderly.is_some() { + let (listener, _) = + message_listener.message_listener_orderly.clone().unwrap(); self.consume_orderly = true; - self.consume_message_orderly_service = Some(ArcRefCellWrapper::new( - ConsumeMessageOrderlyServiceGeneral { - consume_message_orderly_service: ArcRefCellWrapper::new( - ConsumeMessageOrderlyService, - ), - consume_message_pop_orderly_service: ArcRefCellWrapper::new( - ConsumeMessagePopOrderlyService, - ), - }, + let consume_message_orderly_service = + ArcRefCellWrapper::new(ConsumeMessageOrderlyService::new( + self.client_config.clone(), + self.consumer_config.clone(), + self.consumer_config.consumer_group.clone(), + listener.clone().expect("listener is None"), + self.default_mqpush_consumer_impl.clone(), + )); + self.consume_message_service = + Some(ArcRefCellWrapper::new(ConsumeMessageServiceGeneral::new( + None, + Some(consume_message_orderly_service), + ))); + + let consume_message_pop_orderly_service = + ArcRefCellWrapper::new(ConsumeMessagePopOrderlyService); + self.consume_message_pop_service = Some(ArcRefCellWrapper::new( + ConsumeMessagePopServiceGeneral::new( + None, + Some(consume_message_pop_orderly_service), + ), )); } } if let Some(consume_message_concurrently_service) = - self.consume_message_concurrently_service.as_mut() + self.consume_message_service.as_mut() { - let this = consume_message_concurrently_service - .consume_message_concurrently_service - .clone(); - consume_message_concurrently_service - .consume_message_concurrently_service - .start(this); - - let wrapper = consume_message_concurrently_service - .consume_message_pop_concurrently_service - .clone(); - consume_message_concurrently_service - .consume_message_pop_concurrently_service - .start(wrapper); + consume_message_concurrently_service.start(); } if let Some(consume_message_orderly_service) = - self.consume_message_orderly_service.as_mut() + self.consume_message_pop_service.as_mut() { - let wrapper = consume_message_orderly_service - .consume_message_orderly_service - .clone(); - consume_message_orderly_service - .consume_message_orderly_service - .start(wrapper); - - let wrapper = consume_message_orderly_service - .consume_message_pop_orderly_service - .clone(); - consume_message_orderly_service - .consume_message_pop_orderly_service - .start(wrapper); + consume_message_orderly_service.start(); } self.client_instance .as_mut() @@ -409,11 +401,11 @@ impl DefaultMQPushConsumerImpl { } ServiceState::Running => { if let Some(consume_message_concurrently_service) = - self.consume_message_concurrently_service.as_mut() + self.consume_message_service.as_mut() { consume_message_concurrently_service - .consume_message_concurrently_service - .shutdown(await_terminate_millis); + .shutdown(await_terminate_millis) + .await; } self.persist_consumer_offset().await; let client = self.client_instance.as_mut().unwrap(); diff --git a/rocketmq-client/src/consumer/consumer_impl/process_queue.rs b/rocketmq-client/src/consumer/consumer_impl/process_queue.rs index 72c9bea9..aeca9dbf 100644 --- a/rocketmq-client/src/consumer/consumer_impl/process_queue.rs +++ b/rocketmq-client/src/consumer/consumer_impl/process_queue.rs @@ -19,6 +19,7 @@ use std::sync::atomic::AtomicI64; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Instant; use once_cell::sync::Lazy; use rocketmq_common::common::message::message_client_ext::MessageClientExt; @@ -36,14 +37,14 @@ use tokio::sync::RwLock; use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl; use crate::consumer::consumer_impl::PULL_MAX_IDLE_TIME; -static REBALANCE_LOCK_MAX_LIVE_TIME: Lazy = Lazy::new(|| { +pub static REBALANCE_LOCK_MAX_LIVE_TIME: Lazy = Lazy::new(|| { std::env::var("rocketmq.client.rebalance.lockMaxLiveTime") .unwrap_or_else(|_| "30000".into()) .parse() .unwrap_or(30000) }); -static REBALANCE_LOCK_INTERVAL: Lazy = Lazy::new(|| { +pub static REBALANCE_LOCK_INTERVAL: Lazy = Lazy::new(|| { std::env::var("rocketmq.client.rebalance.lockInterval") .unwrap_or_else(|_| "20000".into()) .parse() @@ -280,28 +281,85 @@ impl ProcessQueue { result } - pub(crate) fn rollback(&self) { - unimplemented!("rollback") + pub(crate) async fn rollback(&self) { + let mut msg_tree_map = self.msg_tree_map.write().await; + let mut consuming_msg_orderly_tree_map = self.consuming_msg_orderly_tree_map.write().await; + consuming_msg_orderly_tree_map.iter().for_each(|(k, v)| { + msg_tree_map.insert(*k, v.clone()); + }); + consuming_msg_orderly_tree_map.clear(); } - pub(crate) fn commit(&self) -> u64 { - unimplemented!("commit") + pub(crate) async fn commit(&self) -> i64 { + let mut consuming_msg_orderly_tree_map = self.consuming_msg_orderly_tree_map.write().await; + let key_value = consuming_msg_orderly_tree_map.last_key_value(); + let offset = if let Some((key, _)) = key_value { + *key + 1 + } else { + -1 + }; + self.msg_count.fetch_sub( + consuming_msg_orderly_tree_map.len() as u64, + Ordering::AcqRel, + ); + if self.msg_count.load(Ordering::Acquire) == 0 { + self.msg_size.store(0, Ordering::Release); + } else { + for message in consuming_msg_orderly_tree_map.values() { + self.msg_size.fetch_sub( + message.message_ext_inner.body().as_ref().unwrap().len() as u64, + Ordering::AcqRel, + ); + } + } + consuming_msg_orderly_tree_map.clear(); + offset } - pub(crate) fn make_message_to_consume_again(&self, messages: Vec) { - unimplemented!("make_message_to_consume_again") + pub(crate) async fn make_message_to_consume_again( + &self, + messages: &[ArcRefCellWrapper], + ) { + let mut consuming_msg_orderly_tree_map = self.consuming_msg_orderly_tree_map.write().await; + let mut msg_tree_map = self.msg_tree_map.write().await; + for message in messages { + consuming_msg_orderly_tree_map.remove(&message.message_ext_inner.queue_offset); + msg_tree_map.insert(message.message_ext_inner.queue_offset, message.clone()); + } } - pub(crate) fn take_messages(&self, batch_size: u32) -> Vec { - unimplemented!("take_messages") + pub(crate) async fn take_messages( + &self, + batch_size: u32, + ) -> Vec> { + let mut messages = Vec::with_capacity(batch_size as usize); + let now = Instant::now(); + let mut msg_tree_map = self.msg_tree_map.write().await; + if !msg_tree_map.is_empty() { + for _ in 0..batch_size { + if let Some((_, message)) = msg_tree_map.pop_first() { + messages.push(message); + } else { + break; + } + } + } + messages } - pub(crate) fn contains_message(&self, message_ext: &MessageExt) -> bool { - unimplemented!("contains_message") + pub(crate) async fn contains_message(&self, message_ext: &MessageExt) -> bool { + let msg_tree_map = self.msg_tree_map.read().await; + msg_tree_map.contains_key(&message_ext.queue_offset) } - pub(crate) fn clear(&self) { - unimplemented!("clear") + pub(crate) async fn clear(&self) { + let lock = self.tree_map_lock.write().await; + self.msg_tree_map.write().await.clear(); + self.consuming_msg_orderly_tree_map.write().await.clear(); + self.msg_count.store(0, Ordering::Release); + self.msg_size.store(0, Ordering::Release); + self.queue_offset_max.store(0, Ordering::Release); + drop(lock); } pub(crate) fn fill_process_queue_info(&self, info: ProcessQueueInfo) { diff --git a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs index 97a8834e..ceed2504 100644 --- a/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs +++ b/rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::collections::HashSet; -use std::ops::DerefMut; +use std::ops::Deref; use std::sync::Arc; use rocketmq_common::common::message::message_queue::MessageQueue; @@ -26,6 +26,7 @@ use rocketmq_common::ArcRefCellWrapper; use rocketmq_common::TimeUtils::get_current_millis; use rocketmq_common::WeakCellWrapper; use rocketmq_remoting::protocol::body::request::lock_batch_request_body::LockBatchRequestBody; +use rocketmq_remoting::protocol::body::unlock_batch_request_body::UnlockBatchRequestBody; use rocketmq_remoting::protocol::heartbeat::consume_type::ConsumeType; use rocketmq_remoting::protocol::heartbeat::message_model::MessageModel; use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; @@ -239,7 +240,7 @@ where let mut process_queue_table = process_queue_table_cloned.write().await; for mq in mq_set { if !process_queue_table.contains_key(mq) { - if is_order && !self.lock(mq, process_queue_table.deref_mut()).await { + if is_order && !self.lock_with(mq, process_queue_table.deref()).await { warn!( "doRebalance, {:?}, add a new mq failed, {}, because lock failed", self.consumer_group, @@ -447,10 +448,17 @@ where queue_set } - pub async fn lock( + pub async fn lock(&mut self, mq: &MessageQueue) -> bool { + let process_queue_table_ = self.process_queue_table.clone(); + let process_queue_table = process_queue_table_.read().await; + let table = process_queue_table.deref(); + self.lock_with(mq, table).await + } + + pub async fn lock_with( &mut self, mq: &MessageQueue, - process_queue_table: &mut HashMap>, + process_queue_table: &HashMap>, ) -> bool { let client = self.client_instance.as_mut().unwrap(); let broker_name = client.get_broker_name_from_message_queue(mq).await; @@ -620,6 +628,55 @@ where }*/ } + pub async fn unlock_all(&mut self, oneway: bool) { + let broker_mqs = self.build_process_queue_table_by_broker_name().await; + for (broker_name, mqs) in broker_mqs { + if mqs.is_empty() { + continue; + } + let client = self.client_instance.as_mut().unwrap(); + let find_broker_result = client + .find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true) + .await; + if let Some(find_broker_result) = find_broker_result { + let request_body = UnlockBatchRequestBody { + consumer_group: Some(self.consumer_group.clone().unwrap()), + client_id: Some(client.client_id.clone()), + mq_set: mqs.clone(), + ..Default::default() + }; + let result = client + .mq_client_api_impl + .as_mut() + .unwrap() + .unlock_batch_mq( + find_broker_result.broker_addr.as_str(), + request_body, + 1_000, + oneway, + ) + .await; + match result { + Ok(_) => { + let process_queue_table = self.process_queue_table.read().await; + for mq in &mqs { + if let Some(pq) = process_queue_table.get(mq) { + pq.set_locked(false); + info!( + "the message queue unlock OK, Group: {:?} {}", + self.consumer_group, mq + ); + } + } + } + Err(e) => { + error!("unlockBatchMQ exception {}", e); + } + } + } + } + } + async fn build_process_queue_table_by_broker_name( &self, ) -> HashMap> { diff --git a/rocketmq-client/src/consumer/default_mq_push_consumer.rs b/rocketmq-client/src/consumer/default_mq_push_consumer.rs index bc757a96..84f64963 100644 --- a/rocketmq-client/src/consumer/default_mq_push_consumer.rs +++ b/rocketmq-client/src/consumer/default_mq_push_consumer.rs @@ -575,11 +575,19 @@ impl MQPushConsumer for DefaultMQPushConsumer { todo!() } - async fn register_message_listener_orderly(&mut self, message_listener: ML) + fn register_message_listener_orderly(&mut self, message_listener: ML) where - ML: MessageListenerOrderly + Send + Sync, + ML: MessageListenerOrderly + Send + Sync + 'static, { - todo!() + let message_listener = MessageListener { + message_listener_concurrently: None, + message_listener_orderly: Some((Some(Arc::new(Box::new(message_listener))), None)), + }; + self.consumer_config.message_listener = Some(ArcRefCellWrapper::new(message_listener)); + self.default_mqpush_consumer_impl + .as_mut() + .unwrap() + .register_message_listener(self.consumer_config.message_listener.clone()); } fn subscribe(&mut self, topic: &str, sub_expression: &str) -> crate::Result<()> { diff --git a/rocketmq-client/src/consumer/listener/message_listener_orderly.rs b/rocketmq-client/src/consumer/listener/message_listener_orderly.rs index ba6e7dea..39993a9a 100644 --- a/rocketmq-client/src/consumer/listener/message_listener_orderly.rs +++ b/rocketmq-client/src/consumer/listener/message_listener_orderly.rs @@ -25,8 +25,8 @@ use crate::Result; pub trait MessageListenerOrderly: Sync + Send { fn consume_message( &self, - msgs: Vec, - context: ConsumeOrderlyContext, + msgs: &[&MessageExt], + context: &mut ConsumeOrderlyContext, ) -> Result; } diff --git a/rocketmq-client/src/consumer/message_queue_lock.rs b/rocketmq-client/src/consumer/message_queue_lock.rs new file mode 100644 index 00000000..165028d3 --- /dev/null +++ b/rocketmq-client/src/consumer/message_queue_lock.rs @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashMap; +use std::sync::Arc; + +use rocketmq_common::common::message::message_queue::MessageQueue; +use tokio::sync::Mutex; + +type LockObject = Arc>; +type LockTable = Arc>>>>>; + +#[derive(Default)] +pub(crate) struct MessageQueueLock { + mq_lock_table: LockTable, +} + +impl MessageQueueLock { + pub fn new() -> Self { + MessageQueueLock { + mq_lock_table: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub async fn fetch_lock_object(&self, mq: &MessageQueue) -> Arc> { + self.fetch_lock_object_with_sharding_key(mq, -1).await + } + + pub async fn fetch_lock_object_with_sharding_key( + &self, + mq: &MessageQueue, + sharding_key_index: i32, + ) -> Arc> { + let mut mq_lock_table = self.mq_lock_table.lock().await; + let obj_map = mq_lock_table + .entry(mq.clone()) + .or_insert_with(|| Arc::new(Mutex::new(HashMap::new()))); + let mut obj_map = obj_map.lock().await; + let lock = obj_map + .entry(sharding_key_index) + .or_insert_with(|| Arc::new(Mutex::new(()))); + lock.clone() + } +} diff --git a/rocketmq-client/src/consumer/mq_push_consumer.rs b/rocketmq-client/src/consumer/mq_push_consumer.rs index 24e78c71..99cf3b26 100644 --- a/rocketmq-client/src/consumer/mq_push_consumer.rs +++ b/rocketmq-client/src/consumer/mq_push_consumer.rs @@ -75,9 +75,9 @@ pub trait MQPushConsumerLocal: MQConsumer { + Send + Sync; - async fn register_message_listener_orderly(&mut self, message_listener: ML) + fn register_message_listener_orderly(&mut self, message_listener: ML) where - ML: MessageListenerOrderly + Send + Sync; + ML: MessageListenerOrderly + Send + Sync + 'static; /// Subscribes to a topic with a subscription expression. /// diff --git a/rocketmq-client/src/consumer/pull_callback.rs b/rocketmq-client/src/consumer/pull_callback.rs index 6fc11b8d..56bd2d89 100644 --- a/rocketmq-client/src/consumer/pull_callback.rs +++ b/rocketmq-client/src/consumer/pull_callback.rs @@ -24,7 +24,6 @@ use rocketmq_remoting::code::response_code::ResponseCode; use rocketmq_remoting::protocol::heartbeat::subscription_data::SubscriptionData; use tracing::warn; -use crate::consumer::consumer_impl::consume_message_service::ConsumeMessageServiceTrait; use crate::consumer::consumer_impl::default_mq_push_consumer_impl::DefaultMQPushConsumerImpl; use crate::consumer::consumer_impl::default_mq_push_consumer_impl::PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL; use crate::consumer::consumer_impl::pull_request::PullRequest; @@ -49,192 +48,6 @@ pub(crate) struct DefaultPullCallback { pub(crate) pull_request: Option, } -impl DefaultPullCallback { - /*fn kkk(){ - let pull_callback = - |pull_result_ext: Option, - err: Option>| { - tokio::spawn(async move { - if let Some(mut pull_result_ext) = pull_result_ext { - this.pull_api_wrapper.as_mut().unwrap().process_pull_result( - &message_queue_inner, - &mut pull_result_ext, - subscription_data.as_ref().unwrap(), - ); - match pull_result_ext.pull_result.pull_status { - PullStatus::Found => { - let prev_request_offset = pull_request.next_offset; - pull_request.set_next_offset( - pull_result_ext.pull_result.next_begin_offset as i64, - ); - /*let pull_rt = get_current_millis() - begin_timestamp.elapsed().as_millis() as u64; - self.client_instance.as_mut().unwrap().*/ - let mut first_msg_offset = i64::MAX; - if pull_result_ext.pull_result.msg_found_list.is_empty() { - this.execute_pull_request_immediately(pull_request).await; - } else { - first_msg_offset = pull_result_ext - .pull_result - .msg_found_list() - .first() - .unwrap() - .message_ext_inner - .queue_offset; - // DefaultMQPushConsumerImpl.this.getConsumerStatsManager(). - // incPullTPS(pullRequest.getConsumerGroup(), - // - // pullRequest.getMessageQueue().getTopic(), - // pullResult.getMsgFoundList().size()); - let vec = pull_result_ext - .pull_result - .msg_found_list - .clone() - .into_iter() - .map(|msg| msg.message_ext_inner) - .collect(); - let dispatch_to_consume = - pull_request.process_queue.put_message(vec).await; - this.consume_message_concurrently_service - .as_mut() - .unwrap() - .consume_message_concurrently_service - .submit_consume_request( - pull_result_ext.pull_result.msg_found_list, - pull_request.get_process_queue().clone(), - pull_request.get_message_queue().clone(), - dispatch_to_consume, - ) - .await; - if this.consumer_config.pull_interval > 0 { - this.execute_pull_request_later( - pull_request, - this.consumer_config.pull_interval, - ); - } else { - this.execute_pull_request_immediately(pull_request).await; - } - } - if pull_result_ext.pull_result.next_begin_offset - < prev_request_offset as u64 - || first_msg_offset < prev_request_offset - { - warn!( - "[BUG] pull message result maybe data wrong, \ - nextBeginOffset: {} firstMsgOffset: {} \ - prevRequestOffset: {}", - pull_result_ext.pull_result.next_begin_offset, - prev_request_offset, - prev_request_offset - ); - } - } - PullStatus::NoNewMsg | PullStatus::NoMatchedMsg => { - pull_request.next_offset = - pull_result_ext.pull_result.next_begin_offset as i64; - this.correct_tags_offset(&pull_request).await; - this.execute_pull_request_immediately(pull_request).await; - } - - PullStatus::OffsetIllegal => { - warn!( - "the pull request offset illegal, {},{}", - pull_result_ext.pull_result, - pull_result_ext.pull_result.pull_status - ); - pull_request.next_offset = - pull_result_ext.pull_result.next_begin_offset as i64; - pull_request.process_queue.set_dropped(true); - tokio::spawn(async move { - let offset_store = this.offset_store.as_mut().unwrap(); - offset_store - .update_and_freeze_offset( - pull_request.get_message_queue(), - pull_request.next_offset, - ) - .await; - offset_store.persist(pull_request.get_message_queue()).await; - this.rebalance_impl - .remove_process_queue(pull_request.get_message_queue()) - .await; - this.rebalance_impl - .rebalance_impl_inner - .client_instance - .as_ref() - .unwrap() - .re_balance_immediately() - }); - } - }; - return; - } - - if let Some(err) = err { - if !topic.starts_with(mix_all::RETRY_GROUP_TOPIC_PREFIX) { - if let Some(er) = err.downcast_ref::() { - match er { - MQClientError::MQBrokerError(code, msg, addr) => { - if ResponseCode::from(*code) - == ResponseCode::SubscriptionNotLatest - { - warn!( - "the subscription is not latest, group={}", - this.consumer_config.consumer_group, - ); - } else { - warn!( - "execute the pull request exception, group={}", - this.consumer_config.consumer_group - ); - } - } - _ => { - warn!( - "execute the pull request exception, group={}", - this.consumer_config.consumer_group - ); - } - } - } else { - warn!( - "execute the pull request exception, group={}", - this.consumer_config.consumer_group - ); - } - } - if let Some(er) = err.downcast_ref::() { - match er { - MQClientError::MQBrokerError(code, msg, addr) => { - if ResponseCode::from(*code) == ResponseCode::FlowControl { - this.execute_pull_request_later( - pull_request, - PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL, - ); - } else { - this.execute_pull_request_later( - pull_request, - this.pull_time_delay_mills_when_exception, - ); - } - } - _ => { - this.execute_pull_request_later( - pull_request, - this.pull_time_delay_mills_when_exception, - ); - } - } - } else { - this.execute_pull_request_later( - pull_request, - this.pull_time_delay_mills_when_exception, - ); - } - } - }); - }; - }*/ -} - impl PullCallback for DefaultPullCallback { async fn on_success(&mut self, mut pull_result_ext: PullResultExt) { let push_consumer_impl = self.push_consumer_impl.upgrade(); @@ -276,26 +89,13 @@ impl PullCallback for DefaultPullCallback { .unwrap() .message_ext_inner .queue_offset; - // DefaultMQPushConsumerImpl.self.push_consumer_impl.getConsumerStatsManager(). - // incPullTPS(pullRequest.getConsumerGroup(), - // - // pullRequest.getMessageQueue().getTopic(), - // pullResult.getMsgFoundList().size()); let vec = pull_result_ext.pull_result.msg_found_list.clone(); let dispatch_to_consume = pull_request.process_queue.put_message(vec).await; - let consume_message_concurrently_service_inner = push_consumer_impl - .consume_message_concurrently_service - .as_mut() - .unwrap() - .consume_message_concurrently_service - .clone(); push_consumer_impl - .consume_message_concurrently_service + .consume_message_service .as_mut() .unwrap() - .consume_message_concurrently_service .submit_consume_request( - consume_message_concurrently_service_inner, pull_result_ext.pull_result.msg_found_list, pull_request.get_process_queue().clone(), pull_request.get_message_queue().clone(), @@ -407,32 +207,14 @@ impl PullCallback for DefaultPullCallback { match er { MQClientError::MQBrokerError(code, _, _) => { if ResponseCode::from(*code) == ResponseCode::FlowControl { - /*self.push_consumer_impl.execute_pull_request_later( - pull_request, - PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL, - );*/ PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL } else { - /*self.push_consumer_impl.execute_pull_request_later( - pull_request, - self.push_consumer_impl.pull_time_delay_mills_when_exception, - );*/ push_consumer_impl.pull_time_delay_mills_when_exception } } - _ => { - /*self.push_consumer_impl.execute_pull_request_later( - pull_request, - self.push_consumer_impl.pull_time_delay_mills_when_exception, - );*/ - push_consumer_impl.pull_time_delay_mills_when_exception - } + _ => push_consumer_impl.pull_time_delay_mills_when_exception, } } else { - /*self.push_consumer_impl.execute_pull_request_later( - pull_request, - self.push_consumer_impl.pull_time_delay_mills_when_exception, - );*/ push_consumer_impl.pull_time_delay_mills_when_exception }; diff --git a/rocketmq-client/src/implementation/client_remoting_processor.rs b/rocketmq-client/src/implementation/client_remoting_processor.rs index 728e666e..18c9c9a6 100644 --- a/rocketmq-client/src/implementation/client_remoting_processor.rs +++ b/rocketmq-client/src/implementation/client_remoting_processor.rs @@ -179,12 +179,6 @@ impl ClientRemotingProcessor { let request_header = request .decode_command_custom_header::() .unwrap(); - println!( - "receive broker's notification[{}], the consumer group: {} changed, rebalance \ - immediately", - channel.remote_address(), - request_header.consumer_group - ); info!( "receive broker's notification[{}], the consumer group: {} changed, rebalance \ immediately", diff --git a/rocketmq-client/src/implementation/mq_client_api_impl.rs b/rocketmq-client/src/implementation/mq_client_api_impl.rs index 26b61f5d..47d41cf7 100644 --- a/rocketmq-client/src/implementation/mq_client_api_impl.rs +++ b/rocketmq-client/src/implementation/mq_client_api_impl.rs @@ -908,7 +908,6 @@ impl MQClientAPIImpl { { Ok(response) => { let result = self.process_pull_response(response, addr).await; - match result { Ok(pull_result) => { pull_callback.on_success(pull_result).await;