diff --git a/docs/doc/50-manage/00-metasrv/50-metasrv-metrics.md b/docs/doc/50-manage/00-metasrv/50-metasrv-metrics.md index 053dad2b4633..993fac15674a 100644 --- a/docs/doc/50-manage/00-metasrv/50-metasrv-metrics.md +++ b/docs/doc/50-manage/00-metasrv/50-metasrv-metrics.md @@ -21,6 +21,7 @@ These metrics describe the status of the `metasrv`. All these metrics are prefix | ----------------- | ------------------------------------------------- | ------- | | current_leader_id | Current leader id of cluster, 0 means no leader. | IntGauge | | is_leader | Whether or not this node is current leader. | Gauge | +| node_is_health | Whether or not this node is health. | IntGauge | | leader_changes | Number of leader changes seen. | Counter | | applying_snapshot | Whether or not statemachine is applying snapshot. | Gauge | | proposals_applied | Total number of consensus proposals applied. | Gauge | @@ -32,6 +33,8 @@ These metrics describe the status of the `metasrv`. All these metrics are prefix `is_leader` indicate if this `metasrv` currently is the leader of cluster, and `leader_changes` show the total number of leader changes since start.If change leader too frequently, it will impact the performance of `metasrv`, also it signal that the cluster is unstable. +If and only if the node state is `Follower` or `Leader` , `node_is_health` is 1, otherwise is 0. + `proposals_applied` records the total number of applied write requests. `proposals_pending` indicates how many proposals are queued to commit currently.Rising pending proposals suggests there is a high client load or the member cannot commit proposals. @@ -40,9 +43,9 @@ These metrics describe the status of the `metasrv`. All these metrics are prefix `watchers` show the total number of active watchers currently. -### Network +### Raft Network -These metrics describe the network status of the `metasrv`. All these metrics are prefixed with `metasrv_network_`. +These metrics describe the network status of raft nodes in the `metasrv`. All these metrics are prefixed with `metasrv_raft_network_`. | Name | Description | Labels | Type | | ----------------------- | ------------------------------------------------- | --------------------------------- | ------------- | @@ -71,3 +74,15 @@ These metrics describe the network status of the `metasrv`. All these metrics ar `snapshot_recv_success` and `snapshot_recv_failures` indicates the success and fail number of receive snapshot.`snapshot_recv_inflights` indicate the inflight receiving snapshot, each time receive a snapshot, this field will increment by one, after receiving snapshot is done, this field will decrement by one. `snapshot_recv_seconds` indicate the total latency distributions of snapshot receives. + +### Meta Network + +These metrics describe the network status of meta service in the `metasrv`. All these metrics are prefixed with `metasrv_meta_network_`. + +| Name | Description | Type | +| ---------------- | ------------------------------------------------------ | ---------- | +| meta_sent_bytes | Total number of sent bytes to meta grpc client. | IntCounter | +| meta_recv_bytes | Total number of recv bytes from meta grpc client. | IntCounter | +| meta_inflights | Total number of inflight meta grpc requests. | IntGauge | +| meta_req_success | Total number of success request from meta grpc client. | IntCounter | +| meta_req_failed | Total number of fail request from meta grpc client. | IntCounter | diff --git a/metasrv/src/api/grpc/grpc_service.rs b/metasrv/src/api/grpc/grpc_service.rs index 5eedd0f04544..edb5781ef7f6 100644 --- a/metasrv/src/api/grpc/grpc_service.rs +++ b/metasrv/src/api/grpc/grpc_service.rs @@ -49,6 +49,9 @@ use tonic::Streaming; use crate::executor::ActionHandler; use crate::meta_service::meta_service_impl::GrpcStream; use crate::meta_service::MetaNode; +use crate::metrics::add_meta_metrics_meta_request_inflights; +use crate::metrics::incr_meta_metrics_meta_recv_bytes; +use crate::metrics::incr_meta_metrics_meta_sent_bytes; use crate::version::from_digit_ver; use crate::version::to_digit_ver; use crate::version::METASRV_SEMVER; @@ -147,10 +150,20 @@ impl MetaService for MetaServiceImpl { self.check_token(request.metadata())?; common_tracing::extract_remote_span_as_parent(&request); + incr_meta_metrics_meta_recv_bytes(request.get_ref().encoded_len() as u64); + let action: MetaGrpcWriteReq = request.try_into()?; + + add_meta_metrics_meta_request_inflights(1); + tracing::info!("Receive write_action: {:?}", action); let body = self.action_handler.execute_write(action).await; + + add_meta_metrics_meta_request_inflights(-1); + + incr_meta_metrics_meta_sent_bytes(body.encoded_len() as u64); + Ok(Response::new(body)) } @@ -158,11 +171,20 @@ impl MetaService for MetaServiceImpl { self.check_token(request.metadata())?; common_tracing::extract_remote_span_as_parent(&request); + incr_meta_metrics_meta_recv_bytes(request.get_ref().encoded_len() as u64); + let action: MetaGrpcReadReq = request.try_into()?; + + add_meta_metrics_meta_request_inflights(1); + tracing::info!("Receive read_action: {:?}", action); let res = self.action_handler.execute_read(action).await; + add_meta_metrics_meta_request_inflights(-1); + + incr_meta_metrics_meta_sent_bytes(res.encoded_len() as u64); + Ok(Response::new(res)) } @@ -210,6 +232,9 @@ impl MetaService for MetaServiceImpl { request: Request, ) -> Result, Status> { self.check_token(request.metadata())?; + incr_meta_metrics_meta_recv_bytes(request.get_ref().encoded_len() as u64); + add_meta_metrics_meta_request_inflights(1); + common_tracing::extract_remote_span_as_parent(&request); let request = request.into_inner(); @@ -217,6 +242,10 @@ impl MetaService for MetaServiceImpl { tracing::info!("Receive txn_request: {:?}", request); let body = self.action_handler.execute_txn(request).await; + add_meta_metrics_meta_request_inflights(-1); + + incr_meta_metrics_meta_sent_bytes(body.encoded_len() as u64); + Ok(Response::new(body)) } @@ -228,7 +257,11 @@ impl MetaService for MetaServiceImpl { let members = meta_node.get_meta_addrs().await.map_err(|e| { Status::internal(format!("Cannot get metasrv member list, error: {:?}", e)) })?; - Ok(Response::new(MemberListReply { data: members })) + + let resp = MemberListReply { data: members }; + incr_meta_metrics_meta_sent_bytes(resp.encoded_len() as u64); + + Ok(Response::new(resp)) } } diff --git a/metasrv/src/api/http_service.rs b/metasrv/src/api/http_service.rs index 85d708dd35e2..0735fc5d6aba 100644 --- a/metasrv/src/api/http_service.rs +++ b/metasrv/src/api/http_service.rs @@ -20,13 +20,18 @@ use common_base::base::Stoppable; use common_exception::Result; use common_tracing::tracing; use poem::get; +use poem::http::StatusCode; use poem::listener::RustlsConfig; +use poem::web::Json; use poem::Endpoint; use poem::EndpointExt; +use poem::IntoResponse; +use poem::Response; use poem::Route; use crate::configs::Config; use crate::meta_service::MetaNode; +use crate::metrics::get_meta_metrics_node_is_health; pub struct HttpService { cfg: Config, @@ -46,7 +51,7 @@ impl HttpService { fn build_router(&self) -> impl Endpoint { #[cfg_attr(not(feature = "memory-profiling"), allow(unused_mut))] let mut route = Route::new() - .at("/v1/health", get(super::http::v1::health::health_handler)) + .at("/v1/health", get(health_handler)) .at("/v1/config", get(super::http::v1::config::config_handler)) .at( "/v1/cluster/nodes", @@ -129,3 +134,14 @@ impl Stoppable for HttpService { self.shutdown_handler.stop(force).await } } + +#[poem::handler] +pub async fn health_handler() -> Response { + if !get_meta_metrics_node_is_health() { + return StatusCode::SERVICE_UNAVAILABLE.into_response(); + } + Json(super::http::v1::health::HealthCheckResponse { + status: super::http::v1::health::HealthCheckStatus::Pass, + }) + .into_response() +} diff --git a/metasrv/src/executor/action_handler.rs b/metasrv/src/executor/action_handler.rs index c620b3fdb7b4..24f7e20352b6 100644 --- a/metasrv/src/executor/action_handler.rs +++ b/metasrv/src/executor/action_handler.rs @@ -24,6 +24,7 @@ use common_meta_types::TxnReply; use common_meta_types::TxnRequest; use crate::meta_service::MetaNode; +use crate::metrics::incr_meta_metrics_meta_request_result; pub struct ActionHandler { /// The raft-based meta data entry. @@ -48,6 +49,7 @@ impl ActionHandler { match action { MetaGrpcWriteReq::UpsertKV(a) => { let r = self.meta_node.upsert_kv(a).await; + incr_meta_metrics_meta_request_result(r.is_ok()); RaftReply::from(r) } } @@ -59,25 +61,32 @@ impl ActionHandler { match action { MetaGrpcReadReq::GetKV(a) => { let r = self.meta_node.get_kv(&a.key).await; + incr_meta_metrics_meta_request_result(r.is_ok()); RaftReply::from(r) } MetaGrpcReadReq::MGetKV(a) => { let r = self.meta_node.mget_kv(&a.keys).await; + incr_meta_metrics_meta_request_result(r.is_ok()); RaftReply::from(r) } MetaGrpcReadReq::ListKV(a) => { let r = self.meta_node.prefix_list_kv(&a.prefix).await; + incr_meta_metrics_meta_request_result(r.is_ok()); RaftReply::from(r) } MetaGrpcReadReq::PrefixListKV(a) => { let r = self.meta_node.prefix_list_kv(&a.0).await; + incr_meta_metrics_meta_request_result(r.is_ok()); RaftReply::from(r) } } } pub async fn execute_txn(&self, req: TxnRequest) -> TxnReply { - match self.meta_node.transaction(req).await { + let ret = self.meta_node.transaction(req).await; + incr_meta_metrics_meta_request_result(ret.is_ok()); + + match ret { Ok(resp) => resp, Err(err) => TxnReply { success: false, diff --git a/metasrv/src/meta_service/raftmeta.rs b/metasrv/src/meta_service/raftmeta.rs index 6a03af33d004..cfd6f63c0e94 100644 --- a/metasrv/src/meta_service/raftmeta.rs +++ b/metasrv/src/meta_service/raftmeta.rs @@ -53,6 +53,7 @@ use openraft::Config; use openraft::Raft; use openraft::RaftMetrics; use openraft::SnapshotPolicy; +use openraft::State; use tonic::Status; use crate::meta_service::meta_leader::MetaLeader; @@ -63,6 +64,7 @@ use crate::metrics::incr_meta_metrics_leader_change; use crate::metrics::incr_meta_metrics_read_failed; use crate::metrics::set_meta_metrics_current_leader; use crate::metrics::set_meta_metrics_is_leader; +use crate::metrics::set_meta_metrics_node_is_health; use crate::metrics::set_meta_metrics_proposals_applied; use crate::network::Network; use crate::store::MetaRaftStore; @@ -379,6 +381,11 @@ impl MetaNode { }; if changed.is_ok() { let mm = metrics_rx.borrow().clone(); + + set_meta_metrics_node_is_health( + mm.state == State::Follower || mm.state == State::Leader, + ); + if let Some(cur) = mm.current_leader { // if current leader has changed? if let Some(leader) = current_leader { diff --git a/metasrv/src/metrics/meta_metrics.rs b/metasrv/src/metrics/meta_metrics.rs index f36affed4217..de6c315a62e8 100644 --- a/metasrv/src/metrics/meta_metrics.rs +++ b/metasrv/src/metrics/meta_metrics.rs @@ -31,7 +31,8 @@ use prometheus::Registry; pub const META_NAMESPACE: &str = "metasrv"; pub const SERVER_SUBSYSTEM: &str = "server"; -pub const NETWORK_SUBSYSTEM: &str = "network"; +pub const RAFT_NETWORK_SUBSYSTEM: &str = "raft_network"; +pub const META_NETWORK_SUBSYSTEM: &str = "meta_network"; lazy_static! { pub static ref REGISTRY: Registry = Registry::new(); @@ -51,6 +52,13 @@ lazy_static! { ) .expect("meta metric cannot be created"); + pub static ref NODE_IS_HEALTH: IntGauge = IntGauge::with_opts( + Opts::new("node_is_health", "Whether or not this node is health.") + .namespace(META_NAMESPACE) + .subsystem(SERVER_SUBSYSTEM) + ) + .expect("meta metric cannot be created"); + pub static ref LEADER_CHANGES: IntCounter = IntCounter::with_opts( Opts::new("leader_changes", "Number of leader changes seen.") .namespace(META_NAMESPACE) @@ -110,7 +118,7 @@ lazy_static! { pub static ref ACTIVE_PEERS: GaugeVec = GaugeVec::new( Opts::new("active_peers", "Current number of active connections to peers.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM), + .subsystem(RAFT_NETWORK_SUBSYSTEM), &["id", "address"], ) .expect("meta metric cannot be created"); @@ -118,7 +126,7 @@ lazy_static! { pub static ref CONNECT_TO_PEER_FAIL: CounterVec = CounterVec::new( Opts::new("fail_connect_to_peer", "Total number of fail connections to peers.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM), + .subsystem(RAFT_NETWORK_SUBSYSTEM), &["id", "address"], ) .expect("meta metric cannot be created"); @@ -126,7 +134,7 @@ lazy_static! { pub static ref SENT_BYTES: CounterVec = CounterVec::new( Opts::new("sent_bytes", "Total number of sent bytes to peers.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM), + .subsystem(RAFT_NETWORK_SUBSYSTEM), &["to"], ) .expect("meta metric cannot be created"); @@ -134,7 +142,7 @@ lazy_static! { pub static ref RECV_BYTES: CounterVec = CounterVec::new( Opts::new("recv_bytes", "Total number of received bytes from peers.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM), + .subsystem(RAFT_NETWORK_SUBSYSTEM), &["from"], ) .expect("meta metric cannot be created"); @@ -142,7 +150,7 @@ lazy_static! { pub static ref SENT_FAILURES: CounterVec = CounterVec::new( Opts::new("sent_failures", "Total number of send failures to peers.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM), + .subsystem(RAFT_NETWORK_SUBSYSTEM), &["to"], ) .expect("meta metric cannot be created"); @@ -150,7 +158,7 @@ lazy_static! { pub static ref SNAPSHOT_SEND_SUCCESS: IntCounterVec = IntCounterVec::new( Opts::new("snapshot_send_success", "Total number of successful snapshot sends.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM), + .subsystem(RAFT_NETWORK_SUBSYSTEM), &["to"], ) .expect("meta metric cannot be created"); @@ -158,7 +166,7 @@ lazy_static! { pub static ref SNAPSHOT_SEND_FAILURES: IntCounterVec = IntCounterVec::new( Opts::new("snapshot_send_failures", "Total number of snapshot send failures.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM), + .subsystem(RAFT_NETWORK_SUBSYSTEM), &["to"], ) .expect("meta metric cannot be created"); @@ -166,7 +174,7 @@ lazy_static! { pub static ref SNAPSHOT_SEND_INFLIGHTS: IntGaugeVec = IntGaugeVec::new( Opts::new("snapshot_send_inflights", "Total number of inflight snapshot sends.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM), + .subsystem(RAFT_NETWORK_SUBSYSTEM), &["to"], ) .expect("meta metric cannot be created"); @@ -174,7 +182,7 @@ lazy_static! { pub static ref SNAPSHOT_SENT_SECONDS: HistogramVec = HistogramVec::new( HistogramOpts::new("snapshot_sent_seconds", "Total latency distributions of snapshot sends.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM) + .subsystem(RAFT_NETWORK_SUBSYSTEM) // lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2 // highest bucket start of 0.1 sec * 2^9 == 51.2 sec .buckets(exponential_buckets(0.1, 2.0, 10).unwrap()), @@ -185,7 +193,7 @@ lazy_static! { pub static ref SNAPSHOT_RECV_INFLIGHTS: IntGaugeVec = IntGaugeVec::new( Opts::new("snapshot_recv_inflights", "Total number of inflight snapshot receives.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM), + .subsystem(RAFT_NETWORK_SUBSYSTEM), &["from"], ) .expect("meta metric cannot be created"); @@ -193,7 +201,7 @@ lazy_static! { pub static ref SNAPSHOT_RECV_FAILURES: IntCounterVec = IntCounterVec::new( Opts::new("snapshot_recv_failures", "Total number of snapshot receive failures.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM), + .subsystem(RAFT_NETWORK_SUBSYSTEM), &["from"], ) .expect("meta metric cannot be created"); @@ -201,7 +209,7 @@ lazy_static! { pub static ref SNAPSHOT_RECV_SUCCESS: IntCounterVec = IntCounterVec::new( Opts::new("snapshot_recv_success", "Total number of successful receive snapshot.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM), + .subsystem(RAFT_NETWORK_SUBSYSTEM), &["from"], ) .expect("meta metric cannot be created"); @@ -209,13 +217,48 @@ lazy_static! { pub static ref SNAPSHOT_RECV_SECONDS: HistogramVec = HistogramVec::new( HistogramOpts::new("snapshot_recv_seconds", "Total latency distributions of snapshot receives.") .namespace(META_NAMESPACE) - .subsystem(NETWORK_SUBSYSTEM) + .subsystem(RAFT_NETWORK_SUBSYSTEM) // lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2 // highest bucket start of 0.1 sec * 2^9 == 51.2 sec .buckets(exponential_buckets(0.1, 2.0, 10).unwrap()), &["from"], ) .expect("meta metric cannot be created"); + + pub static ref META_SERVICE_SENT_BYTES: IntCounter = IntCounter::with_opts( + Opts::new("sent_bytes", "Total number of sent bytes to meta grpc client.") + .namespace(META_NAMESPACE) + .subsystem(META_NETWORK_SUBSYSTEM) + ) + .expect("meta metric cannot be created"); + + pub static ref META_SERVICE_RECV_BYTES: IntCounter = IntCounter::with_opts( + Opts::new("recv_bytes", "Total number of recv bytes from meta grpc client.") + .namespace(META_NAMESPACE) + .subsystem(META_NETWORK_SUBSYSTEM) + ) + .expect("meta metric cannot be created"); + + pub static ref META_REQUEST_INFLIGHTS: IntGauge = IntGauge::with_opts( + Opts::new("req_inflights", "Total number of inflight meta grpc requests.") + .namespace(META_NAMESPACE) + .subsystem(META_NETWORK_SUBSYSTEM) + ) + .expect("meta metric cannot be created"); + + pub static ref META_SERVICE_SUCCESS: IntCounter = IntCounter::with_opts( + Opts::new("req_success", "Total number of success request from meta grpc client.") + .namespace(META_NAMESPACE) + .subsystem(META_NETWORK_SUBSYSTEM) + ) + .expect("meta metric cannot be created"); + + pub static ref META_SERVICE_FAILED: IntCounter = IntCounter::with_opts( + Opts::new("req_failed", "Total number of fail request from meta grpc client.") + .namespace(META_NAMESPACE) + .subsystem(META_NETWORK_SUBSYSTEM) + ) + .expect("meta metric cannot be created"); } pub fn init_meta_metrics_recorder() { @@ -233,6 +276,10 @@ fn init_meta_recorder() { .register(Box::new(IS_LEADER.clone())) .expect("collector can be registered"); + REGISTRY + .register(Box::new(NODE_IS_HEALTH.clone())) + .expect("collector can be registered"); + REGISTRY .register(Box::new(LEADER_CHANGES.clone())) .expect("collector can be registered"); @@ -312,6 +359,26 @@ fn init_meta_recorder() { REGISTRY .register(Box::new(SNAPSHOT_RECV_SECONDS.clone())) .expect("collector can be registered"); + + REGISTRY + .register(Box::new(META_SERVICE_SENT_BYTES.clone())) + .expect("collector can be registered"); + + REGISTRY + .register(Box::new(META_SERVICE_RECV_BYTES.clone())) + .expect("collector can be registered"); + + REGISTRY + .register(Box::new(META_REQUEST_INFLIGHTS.clone())) + .expect("collector can be registered"); + + REGISTRY + .register(Box::new(META_SERVICE_SUCCESS.clone())) + .expect("collector can be registered"); + + REGISTRY + .register(Box::new(META_SERVICE_FAILED.clone())) + .expect("collector can be registered"); } pub fn set_meta_metrics_current_leader(current_leader: NodeId) { @@ -322,6 +389,14 @@ pub fn set_meta_metrics_is_leader(is_leader: bool) { IS_LEADER.set(if is_leader { 1 } else { 0 }); } +pub fn set_meta_metrics_node_is_health(is_health: bool) { + NODE_IS_HEALTH.set(if is_health { 1 } else { 0 }); +} + +pub fn get_meta_metrics_node_is_health() -> bool { + NODE_IS_HEALTH.get() > 0 +} + pub fn incr_meta_metrics_leader_change() { LEADER_CHANGES.inc(); } @@ -372,6 +447,14 @@ pub fn incr_meta_metrics_recv_bytes_from_peer(addr: String, bytes: u64) { RECV_BYTES.with_label_values(&[&addr]).inc_by(bytes as f64); } +pub fn incr_meta_metrics_meta_sent_bytes(bytes: u64) { + META_SERVICE_SENT_BYTES.inc_by(bytes); +} + +pub fn incr_meta_metrics_meta_recv_bytes(bytes: u64) { + META_SERVICE_RECV_BYTES.inc_by(bytes); +} + pub fn incr_meta_metrics_sent_failure_to_peer(id: &NodeId) { SENT_FAILURES.with_label_values(&[&id.to_string()]).inc(); } @@ -416,6 +499,18 @@ pub fn sample_meta_metrics_snapshot_recv(addr: String, v: f64) { SNAPSHOT_SENT_SECONDS.with_label_values(&[&addr]).observe(v); } +pub fn add_meta_metrics_meta_request_inflights(cnt: i64) { + META_REQUEST_INFLIGHTS.add(cnt); +} + +pub fn incr_meta_metrics_meta_request_result(success: bool) { + if success { + META_SERVICE_SUCCESS.inc(); + } else { + META_SERVICE_FAILED.inc(); + } +} + /// Encode metrics as prometheus format string pub fn meta_metrics_to_prometheus_string() -> String { use prometheus::Encoder; diff --git a/metasrv/src/metrics/mod.rs b/metasrv/src/metrics/mod.rs index cbcec9f8e615..f9cda11f42e4 100644 --- a/metasrv/src/metrics/mod.rs +++ b/metasrv/src/metrics/mod.rs @@ -14,10 +14,15 @@ mod meta_metrics; +pub use meta_metrics::add_meta_metrics_meta_request_inflights; +pub use meta_metrics::get_meta_metrics_node_is_health; pub use meta_metrics::incr_meta_metrics_active_peers; pub use meta_metrics::incr_meta_metrics_applying_snapshot; pub use meta_metrics::incr_meta_metrics_fail_connections_to_peer; pub use meta_metrics::incr_meta_metrics_leader_change; +pub use meta_metrics::incr_meta_metrics_meta_recv_bytes; +pub use meta_metrics::incr_meta_metrics_meta_request_result; +pub use meta_metrics::incr_meta_metrics_meta_sent_bytes; pub use meta_metrics::incr_meta_metrics_proposals_failed; pub use meta_metrics::incr_meta_metrics_proposals_pending; pub use meta_metrics::incr_meta_metrics_read_failed; @@ -37,4 +42,5 @@ pub use meta_metrics::sample_meta_metrics_snapshot_recv; pub use meta_metrics::sample_meta_metrics_snapshot_sent; pub use meta_metrics::set_meta_metrics_current_leader; pub use meta_metrics::set_meta_metrics_is_leader; +pub use meta_metrics::set_meta_metrics_node_is_health; pub use meta_metrics::set_meta_metrics_proposals_applied; diff --git a/metasrv/src/watcher/watcher_manager.rs b/metasrv/src/watcher/watcher_manager.rs index 716ef84265c0..675543418baf 100644 --- a/metasrv/src/watcher/watcher_manager.rs +++ b/metasrv/src/watcher/watcher_manager.rs @@ -27,9 +27,11 @@ use common_meta_types::protobuf::WatchResponse; use common_meta_types::PbSeqV; use common_meta_types::SeqV; use common_tracing::tracing; +use prost::Message; use tonic::Status; use super::WatcherStream; +use crate::metrics::incr_meta_metrics_meta_sent_bytes; use crate::metrics::incr_meta_metrics_watchers; pub type WatcherId = i64; @@ -155,6 +157,8 @@ impl WatcherManagerCore { }), }; + incr_meta_metrics_meta_sent_bytes(resp.encoded_len() as u64); + if let Err(err) = stream.send(resp).await { tracing::info!( "close watcher stream {:?} cause send err: {:?}",