Skip to content

Commit

Permalink
[ISSUE #722]🚀Support pull message result handle-4 (#728)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jul 2, 2024
1 parent fae0049 commit dae0faa
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 178 deletions.
28 changes: 26 additions & 2 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use crate::client::manager::producer_manager::ProducerManager;
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
use crate::hook::batch_check_before_put_message::BatchCheckBeforePutMessageHook;
use crate::hook::check_before_put_message::CheckBeforePutMessageHook;
use crate::long_polling::long_polling_service::pull_request_hold_service::PullRequestHoldService;
use crate::long_polling::notify_message_arriving_listener::NotifyMessageArrivingListener;
use crate::offset::manager::broadcast_offset_manager::BroadcastOffsetManager;
use crate::offset::manager::consumer_offset_manager::ConsumerOffsetManager;
use crate::offset::manager::consumer_order_info_manager::ConsumerOrderInfoManager;
Expand Down Expand Up @@ -103,6 +105,8 @@ pub(crate) struct BrokerRuntime {
update_master_haserver_addr_periodically: bool,
should_start_time: Arc<AtomicU64>,
is_isolated: Arc<AtomicBool>,
#[cfg(feature = "local_file_store")]
pull_request_hold_service: Option<PullRequestHoldService<DefaultMessageStore>>,
}

impl Clone for BrokerRuntime {
Expand Down Expand Up @@ -134,6 +138,7 @@ impl Clone for BrokerRuntime {
update_master_haserver_addr_periodically: self.update_master_haserver_addr_periodically,
should_start_time: self.should_start_time.clone(),
is_isolated: self.is_isolated.clone(),
pull_request_hold_service: self.pull_request_hold_service.clone(),
}
}
}
Expand Down Expand Up @@ -207,6 +212,7 @@ impl BrokerRuntime {
update_master_haserver_addr_periodically: false,
should_start_time: Arc::new(AtomicU64::new(0)),
is_isolated: Arc::new(AtomicBool::new(false)),
pull_request_hold_service: None,
}
}

Expand Down Expand Up @@ -355,7 +361,7 @@ impl BrokerRuntime {
self.topic_queue_mapping_clean_service = Some(Arc::new(TopicQueueMappingCleanService));
}

fn init_processor(&self) -> BrokerRequestProcessor<DefaultMessageStore> {
fn init_processor(&mut self) -> BrokerRequestProcessor<DefaultMessageStore> {
let send_message_processor = SendMessageProcessor::<DefaultMessageStore>::new(
self.topic_queue_mapping_manager.clone(),
self.topic_config_manager.clone(),
Expand All @@ -371,6 +377,7 @@ impl BrokerRuntime {
self.broker_config.clone(),
Arc::new(Default::default()),
);
let message_store = Arc::new(self.message_store.as_ref().unwrap().clone());
let pull_message_processor = PullMessageProcessor::new(
Arc::new(pull_message_result_handler),
self.broker_config.clone(),
Expand All @@ -381,7 +388,7 @@ impl BrokerRuntime {
self.consumer_filter_manager.clone(),
Arc::new(self.consumer_offset_manager.clone()),
Arc::new(BroadcastOffsetManager::default()),
self.message_store.as_ref().unwrap().clone(),
message_store.clone(),
);

let consumer_manage_processor = ConsumerManageProcessor::new(
Expand All @@ -393,6 +400,19 @@ impl BrokerRuntime {
Arc::new(self.topic_config_manager.clone()),
self.message_store.clone().unwrap(),
);
self.pull_request_hold_service = Some(PullRequestHoldService::new(
message_store,
Arc::new(pull_message_processor.clone()),
self.broker_config.clone(),
));

self.message_store
.as_mut()
.unwrap()
.set_message_arriving_listener(Some(Arc::new(Box::new(
NotifyMessageArrivingListener::new(self.pull_request_hold_service.clone().unwrap()),
))));

BrokerRequestProcessor {
send_message_processor,
pull_message_processor,
Expand Down Expand Up @@ -574,6 +594,10 @@ impl BrokerRuntime {
fast_server_config.listen_port = self.server_config.listen_port - 2;
let fast_server = RocketMQServer::new(Arc::new(fast_server_config));
tokio::spawn(async move { fast_server.run(fast_request_processor).await });

if let Some(pull_request_hold_service) = self.pull_request_hold_service.as_mut() {
pull_request_hold_service.start();
}
}

fn update_namesrv_addr(&mut self) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

pub const NO_SUSPEND_KEY: &str = "_noSuspend_";
#[derive(Default)]
pub struct ColdDataPullRequestHoldService {}
1 change: 0 additions & 1 deletion rocketmq-broker/src/long_polling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,5 @@

pub(crate) mod long_polling_service;
pub(crate) mod many_pull_request;
pub(crate) mod message_arriving_listener;
pub(crate) mod notify_message_arriving_listener;
pub(crate) mod pull_request;
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
use std::collections::HashMap;
use std::sync::Arc;

use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::TimeUtils::get_current_millis;
use rocketmq_store::consume_queue::consume_queue_ext::CqExtUnit;
use rocketmq_store::log_file::MessageStore;
use tokio::time::Instant;
use tracing::info;
use tracing::warn;

use crate::long_polling::many_pull_request::ManyPullRequest;
use crate::long_polling::pull_request::PullRequest;
Expand All @@ -34,6 +37,7 @@ pub struct PullRequestHoldService<MS> {
pull_request_table: Arc<parking_lot::RwLock<HashMap<String, ManyPullRequest>>>,
pull_message_processor: Arc<PullMessageProcessor<MS>>,
message_store: Arc<MS>,
broker_config: Arc<BrokerConfig>,
}

impl<MS> PullRequestHoldService<MS>
Expand All @@ -43,11 +47,13 @@ where
pub fn new(
message_store: Arc<MS>,
pull_message_processor: Arc<PullMessageProcessor<MS>>,
broker_config: Arc<BrokerConfig>,
) -> Self {
PullRequestHoldService {
pull_request_table: Arc::new(parking_lot::RwLock::new(HashMap::new())),
pull_message_processor,
message_store,
broker_config,
}
}
}
Expand All @@ -57,6 +63,31 @@ impl<MS> PullRequestHoldService<MS>
where
MS: MessageStore + Send + Sync,
{
pub fn start(&mut self) {
let self_clone = self.clone();
tokio::spawn(async move {
loop {
if self_clone.broker_config.long_polling_enable {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
} else {
tokio::time::sleep(tokio::time::Duration::from_millis(
self_clone.broker_config.short_polling_time_mills,
))
.await;
}
let instant = Instant::now();
self_clone.check_hold_request();
let elapsed = instant.elapsed().as_millis();
if elapsed > 5000 {
warn!(
"PullRequestHoldService: check hold pull request cost {}ms",
elapsed
);
}
}
});
}

pub fn suspend_pull_request(&self, topic: &str, queue_id: i32, mut pull_request: PullRequest) {
let key = build_key(topic, queue_id);
let mut table = self.pull_request_table.write();
Expand All @@ -74,6 +105,7 @@ where
let topic = key_parts[0];
let queue_id = key_parts[1].parse::<i32>().unwrap();
let max_offset = self.message_store.get_max_offset_in_queue(topic, queue_id);
self.notify_message_arriving(topic, queue_id, max_offset);
}
}

Expand Down Expand Up @@ -127,6 +159,7 @@ where
if match_by_commit_log {
self.pull_message_processor.execute_request_when_wakeup(
request.client_channel().clone(),
request.connection_handler_context().clone(),
request.request_command().clone(),
);
continue;
Expand All @@ -138,6 +171,7 @@ where
{
self.pull_message_processor.execute_request_when_wakeup(
request.client_channel().clone(),
request.connection_handler_context().clone(),
request.request_command().clone(),
);
continue;
Expand All @@ -164,6 +198,7 @@ where
);
self.pull_message_processor.execute_request_when_wakeup(
request.client_channel().clone(),
request.connection_handler_context().clone(),
request.request_command().clone(),
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
use std::collections::HashMap;

use rocketmq_store::base::message_arriving_listener::MessageArrivingListener;
use rocketmq_store::log_file::MessageStore;

use crate::long_polling::long_polling_service::pull_request_hold_service::PullRequestHoldService;
use crate::long_polling::message_arriving_listener::MessageArrivingListener;

pub struct NotifyMessageArrivingListener<MS> {
pull_request_hold_service: PullRequestHoldService<MS>,
Expand Down
4 changes: 4 additions & 0 deletions rocketmq-broker/src/long_polling/pull_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,8 @@ impl PullRequest {
pub fn suspend_timestamp(&self) -> u64 {
self.suspend_timestamp
}

pub fn connection_handler_context(&self) -> &ConnectionHandlerContext {
&self.ctx
}
}
40 changes: 35 additions & 5 deletions rocketmq-broker/src/processor/pull_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use std::sync::Arc;

use rocketmq_common::common::broker::broker_config::BrokerConfig;
Expand Down Expand Up @@ -51,6 +50,7 @@ use tracing::warn;

use crate::client::consumer_group_info::ConsumerGroupInfo;
use crate::client::manager::consumer_manager::ConsumerManager;
use crate::coldctr::cold_data_pull_request_hold_service::NO_SUSPEND_KEY;
use crate::filter::expression_for_retry_message_filter::ExpressionForRetryMessageFilter;
use crate::filter::expression_message_filter::ExpressionMessageFilter;
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
Expand All @@ -72,7 +72,7 @@ pub struct PullMessageProcessor<MS> {
consumer_filter_manager: Arc<ConsumerFilterManager>,
consumer_offset_manager: Arc<ConsumerOffsetManager>,
broadcast_offset_manager: Arc<BroadcastOffsetManager>,
message_store: MS,
message_store: Arc<MS>,
}

impl<MS> PullMessageProcessor<MS> {
Expand All @@ -86,7 +86,7 @@ impl<MS> PullMessageProcessor<MS> {
consumer_filter_manager: Arc<ConsumerFilterManager>,
consumer_offset_manager: Arc<ConsumerOffsetManager>,
broadcast_offset_manager: Arc<BroadcastOffsetManager>,
message_store: MS,
message_store: Arc<MS>,
) -> Self {
Self {
pull_message_result_handler,
Expand Down Expand Up @@ -321,7 +321,7 @@ pub fn rewrite_response_for_static_topic(
#[allow(unused_variables)]
impl<MS> PullMessageProcessor<MS>
where
MS: MessageStore,
MS: MessageStore + Send + Sync + 'static,
{
pub async fn process_request(
&mut self,
Expand Down Expand Up @@ -785,7 +785,37 @@ where
-1
}

pub fn execute_request_when_wakeup(&self, channel: Channel, request: RemotingCommand) {}
pub fn execute_request_when_wakeup(
&self,
channel: Channel,
ctx: ConnectionHandlerContext,
request: RemotingCommand,
) {
let mut self_inner = self.clone();
tokio::spawn(async move {
let broker_allow_flow_ctr_suspend = !(request.ext_fields().is_some()
&& request.ext_fields().unwrap().contains_key(NO_SUSPEND_KEY));
let opaque = request.opaque();
let response = self_inner
.process_request(
channel,
ctx.clone(),
RequestCode::from(request.code()),
request,
)
.await;
if let Some(response) = response {
let command = response.set_opaque(opaque).mark_response_type();
match ctx.upgrade() {
None => {}
Some(ctx) => {
let ctx_ref = unsafe { &mut *ctx.get() };
ctx_ref.write(command).await;
}
}
}
});
}
}
pub(crate) fn is_broadcast(
proxy_pull_broadcast: bool,
Expand Down
Loading

0 comments on commit dae0faa

Please sign in to comment.