Skip to content

Commit

Permalink
[ISSUE #2223]💫Remove BrokerRuntime unused code and enhancement code🧑‍💻
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 12, 2025
1 parent 50a3b55 commit 788bd38
Showing 1 changed file with 15 additions and 105 deletions.
120 changes: 15 additions & 105 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,50 +154,6 @@ pub(crate) struct BrokerRuntime {
shutdown_hook: Option<BrokerShutdownHook>,
}

/*impl Clone for BrokerRuntime {
fn clone(&self) -> Self {
Self {
store_host: self.store_host,
broker_config: self.broker_config.clone(),
message_store_config: self.message_store_config.clone(),
server_config: self.server_config.clone(),
topic_config_manager: self.topic_config_manager.clone(),
topic_queue_mapping_manager: self.topic_queue_mapping_manager.clone(),
consumer_offset_manager: self.consumer_offset_manager.clone(),
subscription_group_manager: self.subscription_group_manager.clone(),
consumer_filter_manager: Arc::new(Default::default()),
consumer_order_info_manager: self.consumer_order_info_manager.clone(),
message_store: self.message_store.clone(),
broker_stats: self.broker_stats.clone(),
schedule_message_service: self.schedule_message_service.clone(),
timer_message_store: self.timer_message_store.clone(),
broker_outer_api: self.broker_outer_api.clone(),
broker_runtime: None,
producer_manager: self.producer_manager.clone(),
consumer_manager: self.consumer_manager.clone(),
broadcast_offset_manager: self.broadcast_offset_manager.clone(),
drop: self.drop.clone(),
shutdown: self.shutdown.clone(),
shutdown_hook: self.shutdown_hook.clone(),
broker_stats_manager: self.broker_stats_manager.clone(),
topic_queue_mapping_clean_service: self.topic_queue_mapping_clean_service.clone(),
update_master_haserver_addr_periodically: self.update_master_haserver_addr_periodically,
should_start_time: self.should_start_time.clone(),
is_isolated: self.is_isolated.clone(),
pull_request_hold_service: self.pull_request_hold_service.clone(),
rebalance_lock_manager: self.rebalance_lock_manager.clone(),
broker_member_group: self.broker_member_group.clone(),
transactional_message_service: self.transactional_message_service.clone(),
transactional_message_check_listener: self.transactional_message_check_listener.clone(),
transactional_message_check_service: None,
transaction_metrics_flush_service: None,
topic_route_info_manager: self.topic_route_info_manager.clone(),
escape_bridge: self.escape_bridge.clone(),
pop_inflight_message_counter: self.pop_inflight_message_counter.clone(),
}
}
}*/

impl BrokerRuntime {
pub(crate) fn new(
broker_config: BrokerConfig,
Expand Down Expand Up @@ -225,18 +181,10 @@ impl BrokerRuntime {
Box::new(DefaultConsumerIdsChangeListener {}),
Arc::new(broker_config.clone()),
);
let stats_manager = Arc::new(BrokerStatsManager::new(Arc::new(broker_config.clone())));

Check warning on line 184 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L184

Added line #L184 was not covered by tests
let should_start_time = Arc::new(AtomicU64::new(0));
let pop_inflight_message_counter =
PopInflightMessageCounter::new(should_start_time.clone());
/*stats_manager.set_producer_state_getter(Arc::new(ProducerStateGetter {
topic_config_manager: topic_config_manager.clone(),
producer_manager: producer_manager.clone(),
}));
stats_manager.set_consumer_state_getter(Arc::new(ConsumerStateGetter {
topic_config_manager: topic_config_manager.clone(),
consumer_manager: consumer_manager.clone(),
}));*/

let mut inner = ArcMut::new(BrokerRuntimeInner::<DefaultMessageStore> {
store_host,
Expand All @@ -257,7 +205,7 @@ impl BrokerRuntime {
producer_manager,
consumer_manager,
broadcast_offset_manager: Default::default(),
broker_stats_manager: stats_manager,
broker_stats_manager: None,

Check warning on line 208 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L208

Added line #L208 was not covered by tests
topic_queue_mapping_clean_service: None,
update_master_haserver_addr_periodically: false,
should_start_time: Default::default(),
Expand All @@ -272,65 +220,27 @@ impl BrokerRuntime {
escape_bridge: None,
pop_inflight_message_counter,
});

/* let broker_runtime_inner = Arc::new(BrokerRuntimeInner {
broker_out_api: broker_outer_api.clone(),
broker_config: broker_config.clone(),
message_store_config: message_store_config.clone(),
server_config: server_config.clone(),
topic_queue_mapping_manager: topic_queue_mapping_manager.clone(),
});*/
let mut stats_manager = BrokerStatsManager::new(Arc::new(inner.broker_config.clone()));
stats_manager.set_producer_state_getter(Arc::new(ProducerStateGetter {
broker_runtime_inner: inner.clone(),
}));
stats_manager.set_consumer_state_getter(Arc::new(ConsumerStateGetter {
broker_runtime_inner: inner.clone(),
}));
let stats_manager = Arc::new(stats_manager);

Check warning on line 230 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L223-L230

Added lines #L223 - L230 were not covered by tests
inner.topic_config_manager = Some(TopicConfigManager::new(inner.clone()));
inner.topic_route_info_manager = Some(TopicRouteInfoManager::new(inner.clone()));
inner.escape_bridge = Some(EscapeBridge::new(inner.clone()));
inner.subscription_group_manager = Some(SubscriptionGroupManager::new(inner.clone()));
inner.consumer_order_info_manager = Some(ConsumerOrderInfoManager::new(inner.clone()));

/* let broker_stats_manager = Arc::new(stats_manager);
consumer_manager.set_broker_stats_manager(Some(Arc::downgrade(&broker_stats_manager)));*/
inner.broker_stats_manager = Some(stats_manager);

Check warning on line 236 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L236

Added line #L236 was not covered by tests

Self {
/* store_host,
broker_config: broker_config.clone(),
message_store_config,
server_config,
topic_config_manager,
topic_queue_mapping_manager,
consumer_offset_manager: ConsumerOffsetManager::new(broker_config.clone(), None),
subscription_group_manager,
consumer_filter_manager: Arc::new(Default::default()),
consumer_order_info_manager,
message_store: None,
broker_stats: None,
schedule_message_service: Default::default(),
timer_message_store: None,
broker_outer_api,*/
inner,
transactional_message_service: None,
broker_runtime: Some(Arc::new(runtime)),
shutdown: Arc::new(AtomicBool::new(false)),
shutdown_hook: None,
/* producer_manager,
consumer_manager,
broadcast_offset_manager: Arc::new(Default::default()),
drop: Arc::new(AtomicBool::new(false)),
shutdown_hook: None,
broker_stats_manager,
topic_queue_mapping_clean_service: None,
update_master_haserver_addr_periodically: false,
should_start_time,
is_isolated: Arc::new(AtomicBool::new(false)),
pull_request_hold_service: None,
rebalance_lock_manager: Arc::new(Default::default()),
broker_member_group: Arc::new(broker_member_group),
transactional_message_service: None,
transactional_message_check_listener: None,
transactional_message_check_service: None,
transaction_metrics_flush_service: None,
topic_route_info_manager,
escape_bridge,
pop_inflight_message_counter,*/
}
}

Expand Down Expand Up @@ -416,7 +326,7 @@ impl BrokerRuntime {
Arc::new(self.inner.message_store_config.clone()),
Arc::new(self.inner.broker_config.clone()),
self.inner.topic_config_manager().topic_config_table(),
Some(self.inner.broker_stats_manager.clone()),
self.inner.broker_stats_manager.clone(),

Check warning on line 329 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L329

Added line #L329 was not covered by tests
false,
));
let message_store_clone = message_store.clone();
Expand Down Expand Up @@ -1378,7 +1288,7 @@ pub(crate) struct BrokerRuntimeInner<MS> {
producer_manager: ProducerManager,
consumer_manager: ConsumerManager,
broadcast_offset_manager: BroadcastOffsetManager,
broker_stats_manager: Arc<BrokerStatsManager>,
broker_stats_manager: Option<Arc<BrokerStatsManager>>,
topic_queue_mapping_clean_service: Option<TopicQueueMappingCleanService>,
update_master_haserver_addr_periodically: bool,
should_start_time: Arc<AtomicU64>,
Expand Down Expand Up @@ -1663,7 +1573,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {

#[inline]
pub fn broker_stats_manager(&self) -> &BrokerStatsManager {
&self.broker_stats_manager
self.broker_stats_manager.as_ref().unwrap()

Check warning on line 1576 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1576

Added line #L1576 was not covered by tests
}

#[inline]
Expand Down Expand Up @@ -1845,7 +1755,7 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {

#[inline]
pub fn set_broker_stats_manager(&mut self, broker_stats_manager: Arc<BrokerStatsManager>) {
self.broker_stats_manager = broker_stats_manager;
self.broker_stats_manager = Some(broker_stats_manager);

Check warning on line 1758 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1758

Added line #L1758 was not covered by tests
}

#[inline]
Expand Down

0 comments on commit 788bd38

Please sign in to comment.