Skip to content

Commit

Permalink
[ISSUE #2217]💫Refactor Broker crate with BrokerRuntimeInner♻️ (#2218)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jan 12, 2025
1 parent 309d455 commit f8080ff
Show file tree
Hide file tree
Showing 37 changed files with 3,685 additions and 2,705 deletions.
2,996 changes: 1,550 additions & 1,446 deletions rocketmq-broker/src/broker_runtime.rs

Large diffs are not rendered by default.

192 changes: 132 additions & 60 deletions rocketmq-broker/src/failover/escape_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;

use bytes::Bytes;
use cheetah_string::CheetahString;
use rocketmq_client_rust::consumer::pull_status::PullStatus;
use rocketmq_client_rust::producer::send_result::SendResult;
use rocketmq_client_rust::producer::send_status::SendStatus;
use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::hasher::string_hasher::JavaStringHasher;
use rocketmq_common::common::message::message_decoder;
use rocketmq_common::common::message::message_ext::MessageExt;
Expand All @@ -43,8 +41,7 @@ use rocketmq_store::log_file::MessageStore;
use tracing::error;
use tracing::warn;

use crate::out_api::broker_outer_api::BrokerOuterAPI;
use crate::topic::manager::topic_route_info_manager::TopicRouteInfoManager;
use crate::broker_runtime::BrokerRuntimeInner;
use crate::transaction::queue::transactional_message_util::TransactionalMessageUtil;

const SEND_TIMEOUT: u64 = 3_000;
Expand Down Expand Up @@ -95,45 +92,52 @@ pub(crate) struct EscapeBridge<MS> {
inner_consumer_group_name: CheetahString,
escape_bridge_runtime: Option<RocketMQRuntime>,
message_store: Option<ArcMut<MS>>,
broker_config: Arc<BrokerConfig>,
topic_route_info_manager: Arc<TopicRouteInfoManager>,
broker_outer_api: Arc<BrokerOuterAPI>,
broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
}

impl<MS> EscapeBridge<MS> {
pub fn new(
broker_config: Arc<BrokerConfig>,
topic_route_info_manager: Arc<TopicRouteInfoManager>,
broker_outer_api: Arc<BrokerOuterAPI>,
) -> Self {
impl<MS: MessageStore> EscapeBridge<MS> {
pub fn new(broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>) -> Self {
let inner_producer_group_name = CheetahString::from_string(format!(
"InnerProducerGroup_{}_{}",
broker_config.broker_name, broker_config.broker_identity.broker_id
broker_runtime_inner.broker_config().broker_name,
broker_runtime_inner
.broker_config()
.broker_identity
.broker_id
));
let inner_consumer_group_name = CheetahString::from_string(format!(
"InnerConsumerGroup_{}_{}",
broker_config.broker_name, broker_config.broker_identity.broker_id
broker_runtime_inner.broker_config().broker_name,
broker_runtime_inner
.broker_config()
.broker_identity
.broker_id
));

Self {
inner_producer_group_name,
inner_consumer_group_name,
escape_bridge_runtime: None,
message_store: None,
broker_config,
topic_route_info_manager,
broker_outer_api,
broker_runtime_inner,
}
}

pub fn start(&mut self, message_store: Option<ArcMut<MS>>) {
if self.broker_config.enable_slave_acting_master && self.broker_config.enable_remote_escape
pub fn start(&mut self /* message_store: Option<ArcMut<MS>> */) {
if self
.broker_runtime_inner
.broker_config()
.enable_slave_acting_master
&& self
.broker_runtime_inner
.broker_config()
.enable_remote_escape
{
self.escape_bridge_runtime = Some(RocketMQRuntime::new_multi(
num_cpus::get(),
"AsyncEscapeBridgeExecutor",
));
self.message_store = message_store;
//self.message_store = message_store;
}
}
}
Expand All @@ -146,14 +150,26 @@ where
&mut self,
mut message_ext: MessageExtBrokerInner,
) -> PutMessageResult {
if self.broker_config.broker_identity.broker_id == mix_all::MASTER_ID {
if self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_id
== mix_all::MASTER_ID
{
self.message_store
.as_mut()
.unwrap()
.put_message(message_ext)
.await
} else if self.broker_config.enable_slave_acting_master
&& self.broker_config.enable_remote_escape
} else if self
.broker_runtime_inner
.broker_config()
.enable_slave_acting_master
&& self
.broker_runtime_inner
.broker_config()
.enable_remote_escape
{
message_ext.set_wait_store_msg_ok(false);
match self.put_message_to_remote_broker(message_ext, None).await {
Expand All @@ -166,8 +182,12 @@ where
} else {
warn!(
"Put message failed, enableSlaveActingMaster={}, enableRemoteEscape={}.",
self.broker_config.enable_slave_acting_master,
self.broker_config.enable_remote_escape
self.broker_runtime_inner
.broker_config()
.enable_slave_acting_master,
self.broker_runtime_inner
.broker_config()
.enable_remote_escape
);
PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable)
}
Expand All @@ -178,7 +198,12 @@ where
message_ext: MessageExtBrokerInner,
mut broker_name_to_send: Option<CheetahString>,
) -> crate::Result<Option<SendResult>> {
let broker_name = self.broker_config.broker_identity.broker_name.as_str();
let broker_name = self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_name
.as_str();
if broker_name.is_empty()
|| broker_name
== broker_name_to_send
Expand All @@ -198,7 +223,8 @@ where
message_ext
};
let topic_publish_info = self
.topic_route_info_manager
.broker_runtime_inner
.topic_route_info_manager()
.try_to_find_topic_publish_info(message_to_put.get_topic())
.await;
if !topic_publish_info.as_ref().is_some_and(|value| value.ok()) {
Expand All @@ -220,7 +246,12 @@ where
.unwrap();
message_to_put.message_ext_inner.queue_id = mq.get_queue_id();
broker_name_to_send = Some(mq.get_broker_name().clone());
if self.broker_config.broker_identity.broker_name.as_str()
if self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_name
.as_str()
== mq.get_broker_name().as_str()
{
warn!(
Expand All @@ -241,7 +272,8 @@ where
)
};
let broker_addr_to_send = self
.topic_route_info_manager
.broker_runtime_inner
.topic_route_info_manager()
.find_broker_address_in_publish(broker_name_to_send.as_ref());
if broker_addr_to_send.is_none() {
warn!(
Expand All @@ -255,7 +287,8 @@ where
}
let producer_group = self.get_producer_group(&message_to_put);
let result = self
.broker_outer_api
.broker_runtime_inner
.broker_outer_api()
.send_message_to_specific_broker(
broker_addr_to_send.as_ref().unwrap(),
broker_name_to_send.as_ref().unwrap(),
Expand Down Expand Up @@ -284,18 +317,31 @@ where
&mut self,
mut message_ext: MessageExtBrokerInner,
) -> PutMessageResult {
if self.broker_config.broker_identity.broker_id == mix_all::MASTER_ID {
if self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_id
== mix_all::MASTER_ID
{
self.message_store
.as_mut()
.unwrap()
.put_message(message_ext)
.await
} else if self.broker_config.enable_slave_acting_master
&& self.broker_config.enable_remote_escape
} else if self
.broker_runtime_inner
.broker_config()
.enable_slave_acting_master
&& self
.broker_runtime_inner
.broker_config()
.enable_remote_escape
{
message_ext.set_wait_store_msg_ok(false);
let topic_publish_info = self
.topic_route_info_manager
.broker_runtime_inner
.topic_route_info_manager()
.try_to_find_topic_publish_info(message_ext.get_topic())
.await;
if topic_publish_info.is_none() {
Expand All @@ -310,11 +356,13 @@ where
message_ext.message_ext_inner.queue_id = message_queue.get_queue_id();
let broker_name_to_send = message_queue.get_broker_name();
let broker_addr_to_send = self
.topic_route_info_manager
.broker_runtime_inner
.topic_route_info_manager()
.find_broker_address_in_publish(Some(broker_name_to_send));
let producer_group = self.get_producer_group(&message_ext);
let result = self
.broker_outer_api
.broker_runtime_inner
.broker_outer_api()
.send_message_to_specific_broker(
broker_addr_to_send.as_ref().unwrap(),
broker_name_to_send,
Expand All @@ -333,18 +381,31 @@ where
&mut self,
mut message_ext: MessageExtBrokerInner,
) -> PutMessageResult {
if self.broker_config.broker_identity.broker_id == mix_all::MASTER_ID {
if self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_id
== mix_all::MASTER_ID
{
self.message_store
.as_mut()
.unwrap()
.put_message(message_ext)
.await
} else if self.broker_config.enable_slave_acting_master
&& self.broker_config.enable_remote_escape
} else if self
.broker_runtime_inner
.broker_config()
.enable_slave_acting_master
&& self
.broker_runtime_inner
.broker_config()
.enable_remote_escape
{
message_ext.set_wait_store_msg_ok(false);
let topic_publish_info = self
.topic_route_info_manager
.broker_runtime_inner
.topic_route_info_manager()
.try_to_find_topic_publish_info(message_ext.get_topic())
.await;
if topic_publish_info.is_none() {
Expand All @@ -367,11 +428,13 @@ where
message_ext.message_ext_inner.queue_id = message_queue.get_queue_id();
let broker_name_to_send = message_queue.get_broker_name();
let broker_addr_to_send = self
.topic_route_info_manager
.broker_runtime_inner
.topic_route_info_manager()
.find_broker_address_in_publish(Some(broker_name_to_send));
let producer_group = self.get_producer_group(&message_ext);
match self
.broker_outer_api
.broker_runtime_inner
.broker_outer_api()
.send_message_to_specific_broker(
broker_addr_to_send.as_ref().unwrap(),
broker_name_to_send,
Expand All @@ -390,8 +453,12 @@ where
} else {
warn!(
"Put message failed, enableSlaveActingMaster={}, enableRemoteEscape={}.",
self.broker_config.enable_slave_acting_master,
self.broker_config.enable_remote_escape
self.broker_runtime_inner
.broker_config()
.enable_slave_acting_master,
self.broker_runtime_inner
.broker_config()
.enable_remote_escape
);
PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable)
}
Expand All @@ -410,7 +477,13 @@ where
let topic = topic.clone();
let broker_name = broker_name.clone();

if self.broker_config.broker_identity.broker_name == broker_name {
if self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_name
== broker_name
{
Box::pin(async move {
let result = message_store
.get_message(
Expand Down Expand Up @@ -464,28 +537,26 @@ where
queue_id: i32,
broker_name: &CheetahString,
) -> FutureResult {
let topic_route_info_manager = self.topic_route_info_manager.clone();
let broker_outer_api = self.broker_outer_api.clone();
/* let topic_route_info_manager = self.topic_route_info_manager.clone();
let broker_outer_api = self.broker_outer_api.clone();*/
let broker_runtime_inner_ = self.broker_runtime_inner.clone();
let inner_consumer_group_name = self.inner_consumer_group_name.clone();
let topic = topic.clone();
let broker_name = broker_name.clone();

Box::pin(async move {
let mut broker_addr = topic_route_info_manager.find_broker_address_in_subscribe(
Some(&broker_name),
0,
false,
);
let mut broker_addr = broker_runtime_inner_
.topic_route_info_manager()
.find_broker_address_in_subscribe(Some(&broker_name), 0, false);

if broker_addr.is_none() {
topic_route_info_manager
broker_runtime_inner_
.topic_route_info_manager()
.update_topic_route_info_from_name_server_ext(&topic, true, false)
.await;
broker_addr = topic_route_info_manager.find_broker_address_in_subscribe(
Some(&broker_name),
0,
false,
);
broker_addr = broker_runtime_inner_
.topic_route_info_manager()
.find_broker_address_in_subscribe(Some(&broker_name), 0, false);

if broker_addr.is_none() {
warn!(
Expand All @@ -497,7 +568,8 @@ where
}

let broker_addr = broker_addr.unwrap();
match broker_outer_api
match broker_runtime_inner_
.broker_outer_api()
.pull_message_from_specific_broker_async(
&broker_name,
&broker_addr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::filter::manager::consumer_filter_wrapper::ConsumerFilterWrapper;

const MS_24_HOUR: u64 = Duration::from_hours(24).as_millis() as u64;

#[derive(Default)]
#[derive(Default, Clone)]
pub(crate) struct ConsumerFilterManager {
broker_config: Arc<BrokerConfig>,
consumer_filter_wrapper: Arc<parking_lot::RwLock<ConsumerFilterWrapper>>,
Expand Down
Loading

0 comments on commit f8080ff

Please sign in to comment.