Skip to content

Commit

Permalink
[ISSUE #1694]🚀Rocketmq-broker supports EscapeBridge functions🔥 (#1716)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Dec 11, 2024
1 parent ec07c07 commit 79a15e9
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 22 deletions.
45 changes: 29 additions & 16 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use crate::client::manager::consumer_manager::ConsumerManager;
use crate::client::manager::producer_manager::ProducerManager;
use crate::client::net::broker_to_client::Broker2Client;
use crate::client::rebalance::rebalance_lock_manager::RebalanceLockManager;
use crate::failover::escape_bridge::EscapeBridge;
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
use crate::hook::batch_check_before_put_message::BatchCheckBeforePutMessageHook;
use crate::hook::check_before_put_message::CheckBeforePutMessageHook;
Expand Down Expand Up @@ -109,7 +110,7 @@ pub(crate) struct BrokerRuntime {
schedule_message_service: ScheduleMessageService,
timer_message_store: Option<TimerMessageStore>,

broker_out_api: Arc<BrokerOuterAPI>,
broker_outer_api: Arc<BrokerOuterAPI>,

broker_runtime: Option<RocketMQRuntime>,
producer_manager: Arc<ProducerManager>,
Expand All @@ -136,6 +137,8 @@ pub(crate) struct BrokerRuntime {
transactional_message_check_service: Option<Arc<TransactionalMessageCheckService>>,
transaction_metrics_flush_service: Option<Arc<TransactionMetricsFlushService>>,
topic_route_info_manager: Arc<TopicRouteInfoManager>,
#[cfg(feature = "local_file_store")]
escape_bridge: ArcMut<EscapeBridge<DefaultMessageStore>>,
}

impl Clone for BrokerRuntime {
Expand All @@ -154,7 +157,7 @@ impl Clone for BrokerRuntime {
broker_stats: self.broker_stats.clone(),
schedule_message_service: self.schedule_message_service.clone(),
timer_message_store: self.timer_message_store.clone(),
broker_out_api: self.broker_out_api.clone(),
broker_outer_api: self.broker_outer_api.clone(),
broker_runtime: None,
producer_manager: self.producer_manager.clone(),
consumer_manager: self.consumer_manager.clone(),
Expand All @@ -175,6 +178,7 @@ impl Clone for BrokerRuntime {
transactional_message_check_service: None,
transaction_metrics_flush_service: None,
topic_route_info_manager: self.topic_route_info_manager.clone(),
escape_bridge: self.escape_bridge.clone(),
}
}
}
Expand Down Expand Up @@ -226,6 +230,15 @@ impl BrokerRuntime {
broker_config.broker_identity.broker_id,
broker_config.get_broker_addr().into(),
);
let topic_route_info_manager = Arc::new(TopicRouteInfoManager::new(
broker_outer_api.clone(),
broker_config.clone(),
));
let escape_bridge = ArcMut::new(EscapeBridge::new(
broker_config.clone(),
topic_route_info_manager.clone(),
broker_outer_api.clone(),
));
Self {
broker_config: broker_config.clone(),
message_store_config,
Expand All @@ -243,7 +256,7 @@ impl BrokerRuntime {
broker_stats: None,
schedule_message_service: Default::default(),
timer_message_store: None,
broker_out_api: broker_outer_api.clone(),
broker_outer_api,
broker_runtime: Some(runtime),
producer_manager,
consumer_manager,
Expand All @@ -263,10 +276,8 @@ impl BrokerRuntime {
transactional_message_check_listener: None,
transactional_message_check_service: None,
transaction_metrics_flush_service: None,
topic_route_info_manager: Arc::new(TopicRouteInfoManager::new(
broker_outer_api,
broker_config,
)),
topic_route_info_manager,
escape_bridge,
}
}

Expand All @@ -279,7 +290,7 @@ impl BrokerRuntime {
}

pub fn shutdown(&mut self) {
self.broker_out_api.shutdown();
self.broker_outer_api.shutdown();
if let Some(message_store) = &mut self.message_store {
message_store.shutdown()
}
Expand Down Expand Up @@ -476,7 +487,7 @@ impl BrokerRuntime {
Arc::new(self.consumer_offset_manager.clone()),
Arc::new(BroadcastOffsetManager::default()),
message_store.clone(),
self.broker_out_api.clone(),
self.broker_outer_api.clone(),
));

let consumer_manage_processor = ConsumerManageProcessor::new(
Expand Down Expand Up @@ -521,7 +532,7 @@ impl BrokerRuntime {
self.schedule_message_service.clone(),
self.broker_stats.clone(),
self.consumer_manager.clone(),
self.broker_out_api.clone(),
self.broker_outer_api.clone(),
self.broker_stats_manager.clone(),
self.rebalance_lock_manager.clone(),
self.broker_member_group.clone(),
Expand Down Expand Up @@ -756,17 +767,19 @@ impl BrokerRuntime {
}

self.topic_route_info_manager.start();

self.escape_bridge.start(self.message_store.clone());
}

async fn update_namesrv_addr(&mut self) {
if self.broker_config.fetch_name_srv_addr_by_dns_lookup {
if let Some(namesrv_addr) = &self.broker_config.namesrv_addr {
self.broker_out_api
self.broker_outer_api
.update_name_server_address_list_by_dns_lookup(namesrv_addr.clone())
.await;
}
} else if let Some(namesrv_addr) = &self.broker_config.namesrv_addr {
self.broker_out_api
self.broker_outer_api
.update_name_server_address_list(namesrv_addr.clone())
.await;
}
Expand All @@ -784,7 +797,7 @@ impl BrokerRuntime {
self.is_isolated.store(true, Ordering::Release);
}

self.broker_out_api.start().await;
self.broker_outer_api.start().await;
self.start_basic_service();

if !self.is_isolated.load(Ordering::Acquire)
Expand Down Expand Up @@ -846,7 +859,7 @@ impl BrokerRuntime {
self.start_service_without_condition();
}

let broker_out_api = self.broker_out_api.clone();
let broker_out_api = self.broker_outer_api.clone();
self.broker_runtime
.as_ref()
.unwrap()
Expand Down Expand Up @@ -993,8 +1006,8 @@ impl BrokerRuntime {
self.broker_config.broker_ip1, self.server_config.listen_port
));
let broker_id = self.broker_config.broker_identity.broker_id;
let weak = Arc::downgrade(&self.broker_out_api);
self.broker_out_api
let weak = Arc::downgrade(&self.broker_outer_api);
self.broker_outer_api
.register_broker_all(
cluster_name,
broker_addr.clone(),
Expand Down
62 changes: 56 additions & 6 deletions rocketmq-broker/src/failover/escape_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,51 @@ const DEFAULT_PULL_TIMEOUT_MILLIS: u64 = 10_000;
pub(crate) struct EscapeBridge<MS> {
inner_producer_group_name: CheetahString,
inner_consumer_group_name: CheetahString,
escape_bridge_runtime: RocketMQRuntime,
message_store: ArcMut<MS>,
escape_bridge_runtime: Option<RocketMQRuntime>,
message_store: Option<ArcMut<MS>>,
broker_config: Arc<BrokerConfig>,
topic_route_info_manager: Arc<TopicRouteInfoManager>,
broker_outer_api: Arc<BrokerOuterAPI>,
}

impl<MS> EscapeBridge<MS> {
pub fn new(
broker_config: Arc<BrokerConfig>,
topic_route_info_manager: Arc<TopicRouteInfoManager>,
broker_outer_api: Arc<BrokerOuterAPI>,
) -> Self {
let inner_producer_group_name = CheetahString::from_string(format!(
"InnerProducerGroup_{}_{}",
broker_config.broker_name, broker_config.broker_identity.broker_id
));
let inner_consumer_group_name = CheetahString::from_string(format!(
"InnerConsumerGroup_{}_{}",
broker_config.broker_name, broker_config.broker_identity.broker_id
));

Self {
inner_producer_group_name,
inner_consumer_group_name,
escape_bridge_runtime: None,
message_store: None,
broker_config,
topic_route_info_manager,
broker_outer_api,
}
}

pub fn start(&mut self, message_store: Option<ArcMut<MS>>) {
if self.broker_config.enable_slave_acting_master && self.broker_config.enable_remote_escape
{
self.escape_bridge_runtime = Some(RocketMQRuntime::new_multi(
num_cpus::get(),
"AsyncEscapeBridgeExecutor",
));
self.message_store = message_store;
}
}
}

impl<MS> EscapeBridge<MS>
where
MS: MessageStore,
Expand All @@ -97,7 +135,11 @@ where
mut message_ext: MessageExtBrokerInner,
) -> PutMessageResult {
if self.broker_config.broker_identity.broker_id == mix_all::MASTER_ID {
self.message_store.put_message(message_ext).await
self.message_store
.as_mut()
.unwrap()
.put_message(message_ext)
.await
} else if self.broker_config.enable_slave_acting_master
&& self.broker_config.enable_remote_escape
{
Expand Down Expand Up @@ -177,7 +219,7 @@ where
.find_broker_address_in_publish(broker_name_to_send.as_ref());
if broker_addr_to_send.is_none() {
warn!(
"putMessageToRemoteBroker failed, remote broker not found. Topic: {}, MsgId: {}, \
"putMessageToRemoteBroker failed, remote broker not found. Topic: {}, MsgId: {}, \
Broker: {}",
message_to_put.get_topic(),
message_to_put.message_ext_inner.msg_id,
Expand Down Expand Up @@ -223,7 +265,11 @@ where
mut message_ext: MessageExtBrokerInner,
) -> PutMessageResult {
if self.broker_config.broker_identity.broker_id == mix_all::MASTER_ID {
self.message_store.put_message(message_ext).await
self.message_store
.as_mut()
.unwrap()
.put_message(message_ext)
.await
} else if self.broker_config.enable_slave_acting_master
&& self.broker_config.enable_remote_escape
{
Expand Down Expand Up @@ -268,7 +314,11 @@ where
mut message_ext: MessageExtBrokerInner,
) -> PutMessageResult {
if self.broker_config.broker_identity.broker_id == mix_all::MASTER_ID {
self.message_store.put_message(message_ext).await
self.message_store
.as_mut()
.unwrap()
.put_message(message_ext)
.await
} else if self.broker_config.enable_slave_acting_master
&& self.broker_config.enable_remote_escape
{
Expand Down

0 comments on commit 79a15e9

Please sign in to comment.