Skip to content

Commit

Permalink
[ISSUE #1870]⚡️Replace other structs attribute store_host with Broker…
Browse files Browse the repository at this point in the history
…Runtime store_host (#1871)
  • Loading branch information
mxsm authored Dec 19, 2024
1 parent 4f6ec9c commit 559e382
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 13 deletions.
6 changes: 5 additions & 1 deletion rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ impl BrokerRuntime {
self.transactional_message_service.as_ref().unwrap().clone(),
self.rebalance_lock_manager.clone(),
self.broker_stats_manager.clone(),
self.store_host,
);
let reply_message_processor = ReplyMessageProcessor::new(
self.topic_queue_mapping_manager.clone(),
Expand All @@ -484,6 +485,7 @@ impl BrokerRuntime {
self.broker_stats_manager.clone(),
Some(self.producer_manager.clone()),
self.transactional_message_service.as_ref().unwrap().clone(),
self.store_host,
);
let mut pull_message_result_handler =
ArcMut::new(Box::new(DefaultPullMessageResultHandler::new(
Expand Down Expand Up @@ -566,6 +568,7 @@ impl BrokerRuntime {
self.escape_bridge.clone(),
self.broker_config.clone(),
self.pop_inflight_message_counter.clone(),
self.store_host,
));
BrokerRequestProcessor {
send_message_processor: ArcMut::new(send_message_processor),
Expand Down Expand Up @@ -749,7 +752,8 @@ impl BrokerRuntime {
self.broker_stats_manager.clone(),
self.consumer_offset_manager.clone(),
self.broker_config.clone(),
self.topic_config_manager.clone()
self.topic_config_manager.clone(),
self.store_host
);
let service = DefaultTransactionalMessageService::new(bridge);
self.transactional_message_service = Some(ArcMut::new(service));
Expand Down
4 changes: 1 addition & 3 deletions rocketmq-broker/src/processor/ack_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ where
escape_bridge: ArcMut<EscapeBridge<MS>>,
broker_config: Arc<BrokerConfig>,
pop_inflight_message_counter: Arc<PopInflightMessageCounter>,
store_host: SocketAddr,
) -> AckMessageProcessor<MS> {
let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port)
.parse::<SocketAddr>()
.unwrap();
AckMessageProcessor {
topic_config_manager,
message_store,
Expand Down
4 changes: 1 addition & 3 deletions rocketmq-broker/src/processor/reply_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ where
broker_stats_manager: Arc<BrokerStatsManager>,
producer_manager: Option<Arc<ProducerManager>>,
transactional_message_service: ArcMut<TS>,
store_host: SocketAddr,
) -> Self {
let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port)
.parse::<SocketAddr>()
.unwrap();
Self {
inner: Inner {
broker_config,
Expand Down
4 changes: 1 addition & 3 deletions rocketmq-broker/src/processor/send_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,8 @@ where
transactional_message_service: ArcMut<TS>,
rebalance_lock_manager: Arc<RebalanceLockManager>,
broker_stats_manager: Arc<BrokerStatsManager>,
store_host: SocketAddr,
) -> Self {
let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port)
.parse::<SocketAddr>()
.unwrap();
Self {
inner: ArcMut::new(Inner {
broker_config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ where
consumer_offset_manager: ConsumerOffsetManager,
broker_config: Arc<BrokerConfig>,
topic_config_manager: TopicConfigManager,
store_host: SocketAddr,
) -> Self {
let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port)
.parse::<SocketAddr>()
.expect("parse store host failed");
Self {
op_queue_map: Arc::new(Mutex::new(HashMap::new())),
message_store,
Expand Down

0 comments on commit 559e382

Please sign in to comment.