Skip to content

Commit

Permalink
[ISSUE #2217]💫Refactor Broker crate with BrokerRuntimeInner♻️
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 12, 2025
1 parent 309d455 commit ae7b806
Show file tree
Hide file tree
Showing 37 changed files with 3,685 additions and 2,705 deletions.
2,996 changes: 1,550 additions & 1,446 deletions rocketmq-broker/src/broker_runtime.rs

Large diffs are not rendered by default.

192 changes: 132 additions & 60 deletions rocketmq-broker/src/failover/escape_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;

use bytes::Bytes;
use cheetah_string::CheetahString;
use rocketmq_client_rust::consumer::pull_status::PullStatus;
use rocketmq_client_rust::producer::send_result::SendResult;
use rocketmq_client_rust::producer::send_status::SendStatus;
use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::hasher::string_hasher::JavaStringHasher;
use rocketmq_common::common::message::message_decoder;
use rocketmq_common::common::message::message_ext::MessageExt;
Expand All @@ -43,8 +41,7 @@ use rocketmq_store::log_file::MessageStore;
use tracing::error;
use tracing::warn;

use crate::out_api::broker_outer_api::BrokerOuterAPI;
use crate::topic::manager::topic_route_info_manager::TopicRouteInfoManager;
use crate::broker_runtime::BrokerRuntimeInner;
use crate::transaction::queue::transactional_message_util::TransactionalMessageUtil;

const SEND_TIMEOUT: u64 = 3_000;
Expand Down Expand Up @@ -95,45 +92,52 @@ pub(crate) struct EscapeBridge<MS> {
inner_consumer_group_name: CheetahString,
escape_bridge_runtime: Option<RocketMQRuntime>,
message_store: Option<ArcMut<MS>>,
broker_config: Arc<BrokerConfig>,
topic_route_info_manager: Arc<TopicRouteInfoManager>,
broker_outer_api: Arc<BrokerOuterAPI>,
broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>,
}

impl<MS> EscapeBridge<MS> {
pub fn new(
broker_config: Arc<BrokerConfig>,
topic_route_info_manager: Arc<TopicRouteInfoManager>,
broker_outer_api: Arc<BrokerOuterAPI>,
) -> Self {
impl<MS: MessageStore> EscapeBridge<MS> {
pub fn new(broker_runtime_inner: ArcMut<BrokerRuntimeInner<MS>>) -> Self {

Check warning on line 99 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L99

Added line #L99 was not covered by tests
let inner_producer_group_name = CheetahString::from_string(format!(
"InnerProducerGroup_{}_{}",
broker_config.broker_name, broker_config.broker_identity.broker_id
broker_runtime_inner.broker_config().broker_name,
broker_runtime_inner
.broker_config()
.broker_identity
.broker_id

Check warning on line 106 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L102-L106

Added lines #L102 - L106 were not covered by tests
));
let inner_consumer_group_name = CheetahString::from_string(format!(
"InnerConsumerGroup_{}_{}",
broker_config.broker_name, broker_config.broker_identity.broker_id
broker_runtime_inner.broker_config().broker_name,
broker_runtime_inner
.broker_config()
.broker_identity
.broker_id

Check warning on line 114 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L110-L114

Added lines #L110 - L114 were not covered by tests
));

Self {
inner_producer_group_name,
inner_consumer_group_name,
escape_bridge_runtime: None,
message_store: None,
broker_config,
topic_route_info_manager,
broker_outer_api,
broker_runtime_inner,

Check warning on line 122 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L122

Added line #L122 was not covered by tests
}
}

pub fn start(&mut self, message_store: Option<ArcMut<MS>>) {
if self.broker_config.enable_slave_acting_master && self.broker_config.enable_remote_escape
pub fn start(&mut self /* message_store: Option<ArcMut<MS>> */) {
if self
.broker_runtime_inner
.broker_config()
.enable_slave_acting_master
&& self
.broker_runtime_inner
.broker_config()
.enable_remote_escape

Check warning on line 134 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L126-L134

Added lines #L126 - L134 were not covered by tests
{
self.escape_bridge_runtime = Some(RocketMQRuntime::new_multi(
num_cpus::get(),
"AsyncEscapeBridgeExecutor",
));
self.message_store = message_store;
//self.message_store = message_store;

Check warning on line 140 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L140

Added line #L140 was not covered by tests
}
}
}
Expand All @@ -146,14 +150,26 @@ where
&mut self,
mut message_ext: MessageExtBrokerInner,
) -> PutMessageResult {
if self.broker_config.broker_identity.broker_id == mix_all::MASTER_ID {
if self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_id
== mix_all::MASTER_ID

Check warning on line 158 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L153-L158

Added lines #L153 - L158 were not covered by tests
{
self.message_store
.as_mut()
.unwrap()
.put_message(message_ext)
.await
} else if self.broker_config.enable_slave_acting_master
&& self.broker_config.enable_remote_escape
} else if self
.broker_runtime_inner
.broker_config()
.enable_slave_acting_master
&& self
.broker_runtime_inner
.broker_config()
.enable_remote_escape

Check warning on line 172 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L165-L172

Added lines #L165 - L172 were not covered by tests
{
message_ext.set_wait_store_msg_ok(false);
match self.put_message_to_remote_broker(message_ext, None).await {
Expand All @@ -166,8 +182,12 @@ where
} else {
warn!(
"Put message failed, enableSlaveActingMaster={}, enableRemoteEscape={}.",
self.broker_config.enable_slave_acting_master,
self.broker_config.enable_remote_escape
self.broker_runtime_inner
.broker_config()
.enable_slave_acting_master,
self.broker_runtime_inner
.broker_config()

Check warning on line 189 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L185-L189

Added lines #L185 - L189 were not covered by tests
.enable_remote_escape
);
PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable)
}
Expand All @@ -178,7 +198,12 @@ where
message_ext: MessageExtBrokerInner,
mut broker_name_to_send: Option<CheetahString>,
) -> crate::Result<Option<SendResult>> {
let broker_name = self.broker_config.broker_identity.broker_name.as_str();
let broker_name = self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_name
.as_str();

Check warning on line 206 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L201-L206

Added lines #L201 - L206 were not covered by tests
if broker_name.is_empty()
|| broker_name
== broker_name_to_send
Expand All @@ -198,7 +223,8 @@ where
message_ext
};
let topic_publish_info = self
.topic_route_info_manager
.broker_runtime_inner
.topic_route_info_manager()

Check warning on line 227 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L226-L227

Added lines #L226 - L227 were not covered by tests
.try_to_find_topic_publish_info(message_to_put.get_topic())
.await;
if !topic_publish_info.as_ref().is_some_and(|value| value.ok()) {
Expand All @@ -220,7 +246,12 @@ where
.unwrap();
message_to_put.message_ext_inner.queue_id = mq.get_queue_id();
broker_name_to_send = Some(mq.get_broker_name().clone());
if self.broker_config.broker_identity.broker_name.as_str()
if self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_name
.as_str()

Check warning on line 254 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L249-L254

Added lines #L249 - L254 were not covered by tests
== mq.get_broker_name().as_str()
{
warn!(
Expand All @@ -241,7 +272,8 @@ where
)
};
let broker_addr_to_send = self
.topic_route_info_manager
.broker_runtime_inner
.topic_route_info_manager()

Check warning on line 276 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L275-L276

Added lines #L275 - L276 were not covered by tests
.find_broker_address_in_publish(broker_name_to_send.as_ref());
if broker_addr_to_send.is_none() {
warn!(
Expand All @@ -255,7 +287,8 @@ where
}
let producer_group = self.get_producer_group(&message_to_put);
let result = self
.broker_outer_api
.broker_runtime_inner
.broker_outer_api()

Check warning on line 291 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L290-L291

Added lines #L290 - L291 were not covered by tests
.send_message_to_specific_broker(
broker_addr_to_send.as_ref().unwrap(),
broker_name_to_send.as_ref().unwrap(),
Expand Down Expand Up @@ -284,18 +317,31 @@ where
&mut self,
mut message_ext: MessageExtBrokerInner,
) -> PutMessageResult {
if self.broker_config.broker_identity.broker_id == mix_all::MASTER_ID {
if self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_id
== mix_all::MASTER_ID

Check warning on line 325 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L320-L325

Added lines #L320 - L325 were not covered by tests
{
self.message_store
.as_mut()
.unwrap()
.put_message(message_ext)
.await
} else if self.broker_config.enable_slave_acting_master
&& self.broker_config.enable_remote_escape
} else if self
.broker_runtime_inner
.broker_config()
.enable_slave_acting_master
&& self
.broker_runtime_inner
.broker_config()
.enable_remote_escape

Check warning on line 339 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L332-L339

Added lines #L332 - L339 were not covered by tests
{
message_ext.set_wait_store_msg_ok(false);
let topic_publish_info = self
.topic_route_info_manager
.broker_runtime_inner
.topic_route_info_manager()

Check warning on line 344 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L343-L344

Added lines #L343 - L344 were not covered by tests
.try_to_find_topic_publish_info(message_ext.get_topic())
.await;
if topic_publish_info.is_none() {
Expand All @@ -310,11 +356,13 @@ where
message_ext.message_ext_inner.queue_id = message_queue.get_queue_id();
let broker_name_to_send = message_queue.get_broker_name();
let broker_addr_to_send = self
.topic_route_info_manager
.broker_runtime_inner
.topic_route_info_manager()

Check warning on line 360 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L359-L360

Added lines #L359 - L360 were not covered by tests
.find_broker_address_in_publish(Some(broker_name_to_send));
let producer_group = self.get_producer_group(&message_ext);
let result = self
.broker_outer_api
.broker_runtime_inner
.broker_outer_api()

Check warning on line 365 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L364-L365

Added lines #L364 - L365 were not covered by tests
.send_message_to_specific_broker(
broker_addr_to_send.as_ref().unwrap(),
broker_name_to_send,
Expand All @@ -333,18 +381,31 @@ where
&mut self,
mut message_ext: MessageExtBrokerInner,
) -> PutMessageResult {
if self.broker_config.broker_identity.broker_id == mix_all::MASTER_ID {
if self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_id
== mix_all::MASTER_ID

Check warning on line 389 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L384-L389

Added lines #L384 - L389 were not covered by tests
{
self.message_store
.as_mut()
.unwrap()
.put_message(message_ext)
.await
} else if self.broker_config.enable_slave_acting_master
&& self.broker_config.enable_remote_escape
} else if self
.broker_runtime_inner
.broker_config()
.enable_slave_acting_master
&& self
.broker_runtime_inner
.broker_config()
.enable_remote_escape

Check warning on line 403 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L396-L403

Added lines #L396 - L403 were not covered by tests
{
message_ext.set_wait_store_msg_ok(false);
let topic_publish_info = self
.topic_route_info_manager
.broker_runtime_inner
.topic_route_info_manager()

Check warning on line 408 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L407-L408

Added lines #L407 - L408 were not covered by tests
.try_to_find_topic_publish_info(message_ext.get_topic())
.await;
if topic_publish_info.is_none() {
Expand All @@ -367,11 +428,13 @@ where
message_ext.message_ext_inner.queue_id = message_queue.get_queue_id();
let broker_name_to_send = message_queue.get_broker_name();
let broker_addr_to_send = self
.topic_route_info_manager
.broker_runtime_inner
.topic_route_info_manager()

Check warning on line 432 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L431-L432

Added lines #L431 - L432 were not covered by tests
.find_broker_address_in_publish(Some(broker_name_to_send));
let producer_group = self.get_producer_group(&message_ext);
match self
.broker_outer_api
.broker_runtime_inner
.broker_outer_api()

Check warning on line 437 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L436-L437

Added lines #L436 - L437 were not covered by tests
.send_message_to_specific_broker(
broker_addr_to_send.as_ref().unwrap(),
broker_name_to_send,
Expand All @@ -390,8 +453,12 @@ where
} else {
warn!(
"Put message failed, enableSlaveActingMaster={}, enableRemoteEscape={}.",
self.broker_config.enable_slave_acting_master,
self.broker_config.enable_remote_escape
self.broker_runtime_inner
.broker_config()
.enable_slave_acting_master,
self.broker_runtime_inner
.broker_config()

Check warning on line 460 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L456-L460

Added lines #L456 - L460 were not covered by tests
.enable_remote_escape
);
PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable)
}
Expand All @@ -410,7 +477,13 @@ where
let topic = topic.clone();
let broker_name = broker_name.clone();

if self.broker_config.broker_identity.broker_name == broker_name {
if self
.broker_runtime_inner
.broker_config()
.broker_identity
.broker_name
== broker_name

Check warning on line 485 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L480-L485

Added lines #L480 - L485 were not covered by tests
{
Box::pin(async move {
let result = message_store
.get_message(
Expand Down Expand Up @@ -464,28 +537,26 @@ where
queue_id: i32,
broker_name: &CheetahString,
) -> FutureResult {
let topic_route_info_manager = self.topic_route_info_manager.clone();
let broker_outer_api = self.broker_outer_api.clone();
/* let topic_route_info_manager = self.topic_route_info_manager.clone();
let broker_outer_api = self.broker_outer_api.clone();*/
let broker_runtime_inner_ = self.broker_runtime_inner.clone();

Check warning on line 542 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L540-L542

Added lines #L540 - L542 were not covered by tests
let inner_consumer_group_name = self.inner_consumer_group_name.clone();
let topic = topic.clone();
let broker_name = broker_name.clone();

Box::pin(async move {
let mut broker_addr = topic_route_info_manager.find_broker_address_in_subscribe(
Some(&broker_name),
0,
false,
);
let mut broker_addr = broker_runtime_inner_
.topic_route_info_manager()
.find_broker_address_in_subscribe(Some(&broker_name), 0, false);

Check warning on line 550 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L548-L550

Added lines #L548 - L550 were not covered by tests

if broker_addr.is_none() {
topic_route_info_manager
broker_runtime_inner_
.topic_route_info_manager()

Check warning on line 554 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L553-L554

Added lines #L553 - L554 were not covered by tests
.update_topic_route_info_from_name_server_ext(&topic, true, false)
.await;
broker_addr = topic_route_info_manager.find_broker_address_in_subscribe(
Some(&broker_name),
0,
false,
);
broker_addr = broker_runtime_inner_
.topic_route_info_manager()
.find_broker_address_in_subscribe(Some(&broker_name), 0, false);

Check warning on line 559 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L557-L559

Added lines #L557 - L559 were not covered by tests

if broker_addr.is_none() {
warn!(
Expand All @@ -497,7 +568,8 @@ where
}

let broker_addr = broker_addr.unwrap();
match broker_outer_api
match broker_runtime_inner_
.broker_outer_api()

Check warning on line 572 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L571-L572

Added lines #L571 - L572 were not covered by tests
.pull_message_from_specific_broker_async(
&broker_name,
&broker_addr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::filter::manager::consumer_filter_wrapper::ConsumerFilterWrapper;

const MS_24_HOUR: u64 = Duration::from_hours(24).as_millis() as u64;

#[derive(Default)]
#[derive(Default, Clone)]
pub(crate) struct ConsumerFilterManager {
broker_config: Arc<BrokerConfig>,
consumer_filter_wrapper: Arc<parking_lot::RwLock<ConsumerFilterWrapper>>,
Expand Down
Loading

0 comments on commit ae7b806

Please sign in to comment.