Skip to content

Commit

Permalink
[ISSUE #2236]💫BrokerRuntime implement unregister broker🚀
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Jan 14, 2025
1 parent 9896960 commit b7ab871
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ impl BrokerRuntime {
message_store_config: MessageStoreConfig,
server_config: ServerConfig,
) -> Self {
let store_host = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port)
let broker_address = format!("{}:{}", broker_config.broker_ip1, broker_config.listen_port);
let store_host = broker_address

Check warning on line 164 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L163-L164

Added lines #L163 - L164 were not covered by tests
.parse::<SocketAddr>()
.expect("parse store_host failed");
let runtime = RocketMQRuntime::new_multi(10, "broker-thread");
Expand Down Expand Up @@ -189,6 +190,7 @@ impl BrokerRuntime {
let mut inner = ArcMut::new(BrokerRuntimeInner::<DefaultMessageStore> {
shutdown: Arc::new(AtomicBool::new(false)),
store_host,
broker_addr: CheetahString::from(broker_address),

Check warning on line 193 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L193

Added line #L193 was not covered by tests
broker_config,
message_store_config,
server_config,
Expand Down Expand Up @@ -253,7 +255,9 @@ impl BrokerRuntime {
self.inner.message_store_config()
}

pub fn shutdown(&mut self) {
pub async fn shutdown(&mut self) {
self.shutdown_basic_service().await;

Check warning on line 259 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L258-L259

Added lines #L258 - L259 were not covered by tests

self.inner.broker_outer_api.shutdown();
if let Some(message_store) = &mut self.inner.message_store {
message_store.shutdown()
Expand All @@ -272,9 +276,23 @@ impl BrokerRuntime {
}
}

pub(crate) fn shutdown_basic_service(&mut self) {
async fn unregister_broker(&mut self) {
self.inner
.broker_outer_api
.unregister_broker_all(
&self.inner.broker_config.broker_identity.broker_cluster_name,
&self.inner.broker_config.broker_identity.broker_name,
self.inner.get_broker_addr(),
self.inner.broker_config.broker_identity.broker_id,
)
.await;
}

Check warning on line 289 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L279-L289

Added lines #L279 - L289 were not covered by tests

pub(crate) async fn shutdown_basic_service(&mut self) {

Check warning on line 291 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L291

Added line #L291 was not covered by tests
self.inner.shutdown.store(true, Ordering::SeqCst);

self.unregister_broker().await;

Check warning on line 294 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L294

Added line #L294 was not covered by tests

if let Some(hook) = self.shutdown_hook.as_ref() {
hook.before_shutdown();
}
Expand Down Expand Up @@ -918,7 +936,7 @@ impl BrokerRuntime {
tokio::select! {
_ = self.shutdown_rx.as_mut().unwrap().recv() => {
info!("Broker Shutdown received, initiating graceful shutdown...");
self.shutdown();
self.shutdown().await;

Check warning on line 939 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L939

Added line #L939 was not covered by tests
info!("Broker Shutdown complete");
}
}
Expand Down Expand Up @@ -1182,6 +1200,7 @@ impl<MS: MessageStore> StateGetter for ConsumerStateGetter<MS> {
pub(crate) struct BrokerRuntimeInner<MS> {
shutdown: Arc<AtomicBool>,
store_host: SocketAddr,
broker_addr: CheetahString,
broker_config: BrokerConfig,
message_store_config: MessageStoreConfig,
server_config: ServerConfig,
Expand Down Expand Up @@ -1917,6 +1936,10 @@ impl<MS: MessageStore> BrokerRuntimeInner<MS> {
)
.await;
}

pub fn get_broker_addr(&self) -> &CheetahString {
&self.broker_addr
}

Check warning on line 1942 in rocketmq-broker/src/broker_runtime.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/broker_runtime.rs#L1940-L1942

Added lines #L1940 - L1942 were not covered by tests
}

fn need_register(
Expand Down

0 comments on commit b7ab871

Please sign in to comment.