Skip to content

Commit

Permalink
[ISSUE #2227]💫Implement broker graceful shutdown🧑‍💻 (#2228)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jan 13, 2025
1 parent 9c34ea8 commit 1c3b543
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
18 changes: 16 additions & 2 deletions rocketmq-broker/src/broker_bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use log::info;
use rocketmq_common::common::broker::broker_config::BrokerConfig;
use rocketmq_common::common::server::config::ServerConfig;
use rocketmq_rust::wait_for_signal;
use rocketmq_store::config::message_store_config::MessageStoreConfig;
use tracing::error;

Expand All @@ -32,7 +33,10 @@ impl BrokerBootstrap {
error!("initialize fail");
return;
}
let (_start_result, _ctrl_c) = tokio::join!(self.start(), tokio::signal::ctrl_c());
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
self.broker_runtime.shutdown_rx = Some(shutdown_rx);

tokio::join!(self.start(), wait_for_signal_inner(shutdown_tx));
}

async fn initialize(&mut self) -> bool {
Expand All @@ -44,6 +48,16 @@ impl BrokerBootstrap {
}
}

async fn wait_for_signal_inner(shutdown_tx: tokio::sync::broadcast::Sender<()>) {
tokio::select! {
_ = wait_for_signal() => {
info!("Broker Received signal, initiating shutdown...");
}
}
// Send shutdown signal to all tasks
let _ = shutdown_tx.send(());
}

pub struct Builder {
broker_config: BrokerConfig,
message_store_config: MessageStoreConfig,
Expand Down
14 changes: 12 additions & 2 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ pub(crate) struct BrokerRuntime {
broker_runtime: Option<RocketMQRuntime>,
shutdown: Arc<AtomicBool>,
shutdown_hook: Option<BrokerShutdownHook>,
// receiver for shutdown signal
pub(crate) shutdown_rx: Option<tokio::sync::broadcast::Receiver<()>>,
}

impl BrokerRuntime {
Expand Down Expand Up @@ -240,6 +242,7 @@ impl BrokerRuntime {
broker_runtime: Some(runtime),
shutdown: Arc::new(AtomicBool::new(false)),
shutdown_hook: None,
shutdown_rx: None,
}
}

Expand All @@ -265,9 +268,9 @@ impl BrokerRuntime {
pull_request_hold_service.shutdown();
}

/* if let Some(runtime) = self.broker_runtime.take() {
if let Some(runtime) = self.broker_runtime.take() {
runtime.shutdown();
}*/
}
}

pub(crate) fn shutdown_basic_service(&mut self) {
Expand Down Expand Up @@ -913,6 +916,13 @@ impl BrokerRuntime {
"Rocketmq Broker({} ----Rust) start success",
self.inner.broker_config.broker_identity.broker_name
);
tokio::select! {
_ = self.shutdown_rx.as_mut().unwrap().recv() => {
info!("Broker Shutdown received, initiating graceful shutdown...");
self.shutdown();
info!("Broker Shutdown complete");
}
}
}

pub(crate) fn schedule_send_heartbeat(&mut self) {}
Expand Down

0 comments on commit 1c3b543

Please sign in to comment.