From 26ba92091e6cf0e0cce2ff65747218271b56f9ac Mon Sep 17 00:00:00 2001 From: devillve084 <786537003@qq.com> Date: Mon, 6 Jun 2022 15:40:44 +0800 Subject: [PATCH 1/2] Make the stop function reentrant --- metasrv/src/meta_service/raftmeta.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/metasrv/src/meta_service/raftmeta.rs b/metasrv/src/meta_service/raftmeta.rs index 4f61652295d5..625d26a4c56e 100644 --- a/metasrv/src/meta_service/raftmeta.rs +++ b/metasrv/src/meta_service/raftmeta.rs @@ -15,6 +15,7 @@ use std::collections::BTreeSet; use std::fmt::Debug; use std::net::Ipv4Addr; +use std::sync::atomic::AtomicI32; use std::sync::Arc; use common_base::base::tokio; @@ -80,6 +81,7 @@ pub struct MetaNode { pub running_tx: watch::Sender<()>, pub running_rx: watch::Receiver<()>, pub join_handles: Mutex>>>, + pub joined_nodes: AtomicI32, } impl Opened for MetaNode { @@ -132,6 +134,7 @@ impl MetaNodeBuilder { running_tx: tx, running_rx: rx, join_handles: Mutex::new(Vec::new()), + joined_nodes: AtomicI32::new(1), }); if self.monitor_metrics { @@ -317,8 +320,6 @@ impl MetaNode { #[tracing::instrument(level = "debug", skip(self))] pub async fn stop(&self) -> MetaResult { - // TODO(xp): need to be reentrant. - let mut rx = self.raft.metrics(); self.raft @@ -338,16 +339,17 @@ impl MetaNode { } tracing::info!("shutdown raft"); - // raft counts 1 - let mut joined = 1; for j in self.join_handles.lock().await.iter_mut() { let _rst = j .await .map_error_to_meta_error(MetaError::MetaServiceError, || "fail to join")?; - joined += 1; + // TODO(luhuanbing): Add joined node information to enrich debugging information + self.joined_nodes + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); } tracing::info!("shutdown: id={}", self.sto.id); + let joined = self.joined_nodes.load(std::sync::atomic::Ordering::Relaxed); Ok(joined) } From 49a9bd2387556a9eb7d9fc3dbb35b5e977dc9608 Mon Sep 17 00:00:00 2001 From: devillve084 <786537003@qq.com> Date: Mon, 6 Jun 2022 18:05:58 +0800 Subject: [PATCH 2/2] Remove todos in subscribe_metrics function --- metasrv/src/meta_service/raftmeta.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/metasrv/src/meta_service/raftmeta.rs b/metasrv/src/meta_service/raftmeta.rs index 625d26a4c56e..a22016af361c 100644 --- a/metasrv/src/meta_service/raftmeta.rs +++ b/metasrv/src/meta_service/raftmeta.rs @@ -81,7 +81,7 @@ pub struct MetaNode { pub running_tx: watch::Sender<()>, pub running_rx: watch::Receiver<()>, pub join_handles: Mutex>>>, - pub joined_nodes: AtomicI32, + pub joined_tasks: AtomicI32, } impl Opened for MetaNode { @@ -134,7 +134,7 @@ impl MetaNodeBuilder { running_tx: tx, running_rx: rx, join_handles: Mutex::new(Vec::new()), - joined_nodes: AtomicI32::new(1), + joined_tasks: AtomicI32::new(1), }); if self.monitor_metrics { @@ -344,25 +344,23 @@ impl MetaNode { .await .map_error_to_meta_error(MetaError::MetaServiceError, || "fail to join")?; // TODO(luhuanbing): Add joined node information to enrich debugging information - self.joined_nodes + self.joined_tasks .fetch_add(1, std::sync::atomic::Ordering::Relaxed); } tracing::info!("shutdown: id={}", self.sto.id); - let joined = self.joined_nodes.load(std::sync::atomic::Ordering::Relaxed); + let joined = self.joined_tasks.load(std::sync::atomic::Ordering::Relaxed); Ok(joined) } // spawn a monitor to watch raft state changes such as leader changes, // and manually add non-voter to cluster so that non-voter receives raft logs. pub async fn subscribe_metrics(mn: Arc, mut metrics_rx: watch::Receiver) { - //TODO: return a handle for join - // TODO: every state change triggers add_non_voter!!! + // TODO(luhuanbing): every state change triggers add_non_voter is not very reasonable let mut running_rx = mn.running_rx.clone(); let mut jh = mn.join_handles.lock().await; let mut current_leader: Option = None; - // TODO: reduce dependency: it does not need all of the fields in MetaNode let mn = mn.clone(); let span = tracing::span!(tracing::Level::INFO, "watch-metrics"); @@ -394,7 +392,6 @@ impl MetaNode { } if cur == mn.sto.id { - // TODO: check result let _rst = mn.add_configured_non_voters().await; if _rst.is_err() {