Skip to content

Commit

Permalink
[ISSUE #2225]📌Remove BrokerRuntime #[derive(Clone)] ⚡️
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 13, 2025
1 parent 6d5fe77 commit f2bde38
Showing 1 changed file with 180 additions and 117 deletions.
297 changes: 180 additions & 117 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ use crate::transaction::queue::transactional_message_bridge::TransactionalMessag
use crate::transaction::transaction_metrics_flush_service::TransactionMetricsFlushService;
use crate::transaction::transactional_message_check_service::TransactionalMessageCheckService;

#[derive(Clone)]
pub(crate) struct BrokerRuntime {
/* store_host: SocketAddr,
broker_config: Arc<BrokerConfig>,
Expand Down Expand Up @@ -149,7 +148,7 @@ pub(crate) struct BrokerRuntime {
#[cfg(feature = "local_file_store")]
transactional_message_service:
Option<ArcMut<DefaultTransactionalMessageService<DefaultMessageStore>>>,
broker_runtime: Option<Arc<RocketMQRuntime>>,
broker_runtime: Option<RocketMQRuntime>,
shutdown: Arc<AtomicBool>,
shutdown_hook: Option<BrokerShutdownHook>,
}
Expand Down Expand Up @@ -238,7 +237,7 @@ impl BrokerRuntime {
Self {
inner,
transactional_message_service: None,
broker_runtime: Some(Arc::new(runtime)),
broker_runtime: Some(runtime),

Check warning on line 240 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L240

Added line #L240 was not covered by tests
shutdown: Arc::new(AtomicBool::new(false)),
shutdown_hook: None,
}
Expand Down Expand Up @@ -266,7 +265,7 @@ impl BrokerRuntime {
pull_request_hold_service.shutdown();
}

/*if let Some(runtime) = self.broker_runtime.take() {
/* if let Some(runtime) = self.broker_runtime.take() {
runtime.shutdown();
}*/
}
Expand Down Expand Up @@ -674,7 +673,7 @@ impl BrokerRuntime {
}
});

let mut runtime = self.clone();
let mut runtime = self.inner.clone();

Check warning on line 676 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L676

Added line #L676 was not covered by tests
self.broker_runtime
.as_ref()
.unwrap()
Expand Down Expand Up @@ -724,7 +723,7 @@ impl BrokerRuntime {
"Set user specified name remoting_server address: {}",
namesrv_address
);
let mut broker_runtime = self.clone();
let mut broker_runtime = self.inner.clone();

Check warning on line 726 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L726

Added line #L726 was not covered by tests
self.broker_runtime
.as_ref()
.unwrap()
Expand All @@ -733,7 +732,7 @@ impl BrokerRuntime {
tokio::time::sleep(Duration::from_secs(10)).await;
loop {
let current_execution_time = tokio::time::Instant::now();
broker_runtime.update_namesrv_addr().await;
broker_runtime.update_namesrv_addr_inner().await;

Check warning on line 735 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L735

Added line #L735 was not covered by tests
let next_execution_time = current_execution_time + Duration::from_secs(60);
let delay = next_execution_time
.saturating_duration_since(tokio::time::Instant::now());
Expand Down Expand Up @@ -778,8 +777,6 @@ impl BrokerRuntime {

fn initial_request_pipeline(&mut self) {}

fn protect_broker(&mut self) {}

fn start_basic_service(&mut self) {
let request_processor = self.init_processor();
let fast_request_processor = request_processor.clone();
Expand Down Expand Up @@ -811,19 +808,7 @@ impl BrokerRuntime {
}

async fn update_namesrv_addr(&mut self) {
if self.inner.broker_config.fetch_name_srv_addr_by_dns_lookup {
if let Some(namesrv_addr) = &self.inner.broker_config.namesrv_addr {
self.inner
.broker_outer_api
.update_name_server_address_list_by_dns_lookup(namesrv_addr.clone())
.await;
}
} else if let Some(namesrv_addr) = &self.inner.broker_config.namesrv_addr {
self.inner
.broker_outer_api
.update_name_server_address_list(namesrv_addr.clone())
.await;
}
self.inner.update_namesrv_addr_inner().await;

Check warning on line 811 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L811

Added line #L811 was not covered by tests
}

pub async fn start(&mut self) {
Expand All @@ -848,7 +833,7 @@ impl BrokerRuntime {
self.register_broker_all(true, false, true).await;
}

let mut cloned_broker_runtime = self.clone();
let cloned_broker_runtime = self.inner.clone();

Check warning on line 836 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L836

Added line #L836 was not covered by tests
let should_start_time = self.inner.should_start_time.clone();
let is_isolated = self.inner.is_isolated.clone();
let broker_config = self.inner.broker_config.clone();
Expand All @@ -875,8 +860,9 @@ impl BrokerRuntime {
// record current execution time
let current_execution_time = tokio::time::Instant::now();
// execute task
let this = cloned_broker_runtime.clone();

Check warning on line 863 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L863

Added line #L863 was not covered by tests
cloned_broker_runtime
.register_broker_all(true, false, broker_config.force_register)
.register_broker_all_inner(this, true, false, broker_config.force_register)

Check warning on line 865 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L865

Added line #L865 was not covered by tests
.await;
// Calculate the time of the next execution
let next_execution_time = current_execution_time + period;
Expand Down Expand Up @@ -940,100 +926,14 @@ impl BrokerRuntime {
oneway: bool,
force_register: bool,
) {
let mut topic_config_table = HashMap::new();
let table = self.inner.topic_config_manager().topic_config_table();
for topic_config in table.lock().values() {
let new_topic_config =
if !PermName::is_writeable(self.inner.broker_config.broker_permission)
|| !PermName::is_readable(self.inner.broker_config.broker_permission)
{
TopicConfig {
topic_name: topic_config.topic_name.clone(),
read_queue_nums: topic_config.read_queue_nums,
write_queue_nums: topic_config.write_queue_nums,
perm: topic_config.perm & self.inner.broker_config.broker_permission,
..TopicConfig::default()
}
} else {
topic_config.clone()
};
topic_config_table.insert(
new_topic_config.topic_name.as_ref().unwrap().clone(),
new_topic_config,
);
}

// Handle split registration logic
if self.inner.broker_config.enable_split_registration
&& topic_config_table.len() as i32 >= self.inner.broker_config.split_registration_size
{
let topic_config_wrapper = self
.inner
.topic_config_manager()
.build_serialize_wrapper(topic_config_table.clone());
self.do_register_broker_all(check_order_config, oneway, topic_config_wrapper)
.await;
topic_config_table.clear();
}

// Collect topicQueueMappingInfoMap
let topic_queue_mapping_info_map = self
.inner
.topic_queue_mapping_manager
.topic_queue_mapping_table
.lock()
.iter()
.map(|(key, value)| {
(
key.clone(),
TopicQueueMappingDetail::clone_as_mapping_info(value),
)
})
.collect();

let topic_config_wrapper = self
.inner
.topic_config_manager()
.build_serialize_wrapper_with_topic_queue_map(
topic_config_table,
topic_queue_mapping_info_map,
);

if self.inner.broker_config.enable_split_registration
|| force_register
|| Self::need_register(
self.inner
.broker_config
.broker_identity
.broker_cluster_name
.clone()
.as_str(),
self.inner.broker_config.broker_ip1.clone().as_str(),
self.inner
.broker_config
.broker_identity
.broker_name
.clone()
.as_str(),
self.inner.broker_config.broker_identity.broker_id,
self.inner.broker_config.register_broker_timeout_mills,
self.inner.broker_config.is_in_broker_container,
self.inner
.register_broker_all_inner(
self.inner.clone(),
check_order_config,
oneway,
force_register,

Check warning on line 934 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L929-L934

Added lines #L929 - L934 were not covered by tests
)
{
self.do_register_broker_all(check_order_config, oneway, topic_config_wrapper)
.await;
}
}

fn need_register(
_cluster_name: &str,
_broker_addr: &str,
_broker_name: &str,
_broker_id: u64,
_register_timeout_mills: i32,
_in_broker_container: bool,
) -> bool {
unimplemented!()
.await;

Check warning on line 936 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L936

Added line #L936 was not covered by tests
}

async fn do_register_broker_all(
Expand Down Expand Up @@ -1854,4 +1754,167 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
) {
self.pop_inflight_message_counter = pop_inflight_message_counter;
}

fn protect_broker(&mut self) {}

Check warning on line 1758 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1758

Added line #L1758 was not covered by tests

async fn update_namesrv_addr_inner(&mut self) {
if self.broker_config.fetch_name_srv_addr_by_dns_lookup {
if let Some(namesrv_addr) = &self.broker_config.namesrv_addr {
self.broker_outer_api
.update_name_server_address_list_by_dns_lookup(namesrv_addr.clone())
.await;
}
} else if let Some(namesrv_addr) = &self.broker_config.namesrv_addr {
self.broker_outer_api
.update_name_server_address_list(namesrv_addr.clone())
.await;
}
}

Check warning on line 1772 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1760-L1772

Added lines #L1760 - L1772 were not covered by tests

/// Register broker to name remoting_server
pub(crate) async fn register_broker_all_inner(
&self,
this: ArcMut<BrokerRuntimeInner<MS>>,
check_order_config: bool,
oneway: bool,
force_register: bool,
) {
let mut topic_config_table = HashMap::new();
let table = self.topic_config_manager().topic_config_table();
for topic_config in table.lock().values() {
let new_topic_config = if !PermName::is_writeable(self.broker_config.broker_permission)
|| !PermName::is_readable(self.broker_config.broker_permission)

Check warning on line 1786 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1775-L1786

Added lines #L1775 - L1786 were not covered by tests
{
TopicConfig {
topic_name: topic_config.topic_name.clone(),
read_queue_nums: topic_config.read_queue_nums,
write_queue_nums: topic_config.write_queue_nums,
perm: topic_config.perm & self.broker_config.broker_permission,
..TopicConfig::default()
}

Check warning on line 1794 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1788-L1794

Added lines #L1788 - L1794 were not covered by tests
} else {
topic_config.clone()

Check warning on line 1796 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1796

Added line #L1796 was not covered by tests
};
topic_config_table.insert(
new_topic_config.topic_name.as_ref().unwrap().clone(),
new_topic_config,
);

Check warning on line 1801 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1798-L1801

Added lines #L1798 - L1801 were not covered by tests
}

// Handle split registration logic
if self.broker_config.enable_split_registration
&& topic_config_table.len() as i32 >= self.broker_config.split_registration_size

Check warning on line 1806 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1805-L1806

Added lines #L1805 - L1806 were not covered by tests
{
let topic_config_wrapper = this
.topic_config_manager()
.build_serialize_wrapper(topic_config_table.clone());
BrokerRuntimeInner::<MS>::do_register_broker_all(
this.clone(),
check_order_config,
oneway,
topic_config_wrapper,
)
.await;
topic_config_table.clear();
}

Check warning on line 1819 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1808-L1819

Added lines #L1808 - L1819 were not covered by tests

// Collect topicQueueMappingInfoMap
let topic_queue_mapping_info_map = self
.topic_queue_mapping_manager
.topic_queue_mapping_table
.lock()
.iter()
.map(|(key, value)| {
(
key.clone(),
TopicQueueMappingDetail::clone_as_mapping_info(value),
)
})
.collect();

let topic_config_wrapper = this
.topic_config_manager()
.build_serialize_wrapper_with_topic_queue_map(
topic_config_table,
topic_queue_mapping_info_map,
);

if self.broker_config.enable_split_registration
|| force_register
|| need_register(
self.broker_config
.broker_identity
.broker_cluster_name
.clone()
.as_str(),
self.broker_config.broker_ip1.clone().as_str(),
self.broker_config
.broker_identity
.broker_name
.clone()
.as_str(),
self.broker_config.broker_identity.broker_id,
self.broker_config.register_broker_timeout_mills,
self.broker_config.is_in_broker_container,
)

Check warning on line 1859 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1822-L1859

Added lines #L1822 - L1859 were not covered by tests
{
BrokerRuntimeInner::<MS>::do_register_broker_all(
this,
check_order_config,
oneway,
topic_config_wrapper,
)
.await;
}
}

Check warning on line 1869 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1861-L1869

Added lines #L1861 - L1869 were not covered by tests

async fn do_register_broker_all_inner(
this: ArcMut<BrokerRuntimeInner<MS>>,
_check_order_config: bool,
oneway: bool,
topic_config_wrapper: TopicConfigAndMappingSerializeWrapper,
) {
let cluster_name = this
.broker_config
.broker_identity
.broker_cluster_name
.clone();
let broker_name = this.broker_config.broker_identity.broker_name.clone();
let broker_addr = CheetahString::from_string(format!(
"{}:{}",
this.broker_config.broker_ip1, this.server_config.listen_port
));
let broker_id = this.broker_config.broker_identity.broker_id;
// let weak = Arc::downgrade(&self.inner.broker_outer_api);
let this_ = this.clone();
this.broker_outer_api
.register_broker_all(
cluster_name,
broker_addr.clone(),
broker_name,
broker_id,
broker_addr,
topic_config_wrapper,
vec![],
oneway,
10000,
false,
false,
None,
Default::default(),
this_,
)
.await;
}

Check warning on line 1908 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1871-L1908

Added lines #L1871 - L1908 were not covered by tests
}

fn need_register(
_cluster_name: &str,
_broker_addr: &str,
_broker_name: &str,
_broker_id: u64,
_register_timeout_mills: i32,
_in_broker_container: bool,
) -> bool {
unimplemented!()

Check warning on line 1919 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1911-L1919

Added lines #L1911 - L1919 were not covered by tests
}

0 comments on commit f2bde38

Please sign in to comment.