Skip to content

Commit

Permalink
Merge pull request #5978 from lichuang/add_rtt_metrics
Browse files Browse the repository at this point in the history
Feature: add meta grpc client network metrics
  • Loading branch information
mergify[bot] authored Jun 15, 2022
2 parents 4f5cddc + fdf0040 commit 8c8892d
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/meta/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ common-grpc = { path = "../../grpc" }
common-meta-api = { path = "../api" }
common-meta-app = { path = "../app" }
common-meta-types = { path = "../types" }
common-metrics = { path = "../../metrics" }
common-proto-conv = { path = "../../proto-conv" }
common-protos = { path = "../../protos" }
common-tracing = { path = "../../tracing" }
Expand Down
99 changes: 96 additions & 3 deletions common/meta/grpc/src/grpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use std::fmt::Debug;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;

use common_arrow::arrow_format::flight::data::BasicAuth;
use common_base::base::tokio::sync::mpsc;
Expand Down Expand Up @@ -49,6 +51,10 @@ use common_meta_types::MetaNetworkError;
use common_meta_types::MetaResultError;
use common_meta_types::TxnReply;
use common_meta_types::TxnRequest;
use common_metrics::label_counter_with_val_and_labels;
use common_metrics::label_decrement_gauge_with_val_and_labels;
use common_metrics::label_histogram_with_val;
use common_metrics::label_increment_gauge_with_val_and_labels;
use common_tracing::tracing;
use futures::stream::StreamExt;
use prost::Message;
Expand All @@ -75,6 +81,13 @@ use crate::METACLI_COMMIT_SEMVER;
use crate::MIN_METASRV_SEMVER;

const AUTH_TOKEN_KEY: &str = "auth-token-bin";
const META_GRPC_CLIENT_REQUEST_DURATION_MS: &str = "meta_grpc_client_request_duration_ms";
const META_GRPC_CLIENT_REQUEST_INFLIGHT: &str = "meta_grpc_client_request_inflight";
const META_GRPC_CLIENT_REQUEST_SUCCESS: &str = "meta_grpc_client_request_success";
const META_GRPC_CLIENT_REQUEST_FAILED: &str = "meta_grpc_client_request_fail";
const META_GRPC_MAKE_CLIENT_FAILED: &str = "meta_grpc_make_client_fail";
const LABEL_ENDPOINT: &str = "endpoint";
const LABEL_ERROR: &str = "error";

#[derive(Debug)]
struct MetaChannelManager {
Expand Down Expand Up @@ -127,6 +140,7 @@ pub struct MetaGrpcClient {
username: String,
password: String,
token: RwLock<Option<Vec<u8>>>,
current_endpoint: Arc<Mutex<Option<String>>>,

/// Dedicated runtime to support meta client background tasks.
///
Expand Down Expand Up @@ -161,18 +175,37 @@ impl ClientHandle {
req: req.into(),
};

self.req_tx.send(req).await.map_err(|e| {
label_increment_gauge_with_val_and_labels(META_GRPC_CLIENT_REQUEST_INFLIGHT, vec![], 1.0);

let res = self.req_tx.send(req).await.map_err(|e| {
MetaError::Fatal(
AnyError::new(&e).add_context(|| "when sending req to MetaGrpcClient worker"),
)
})?;
});

if let Err(err) = res {
label_decrement_gauge_with_val_and_labels(
META_GRPC_CLIENT_REQUEST_INFLIGHT,
vec![],
1.0,
);

return Err(err);
}

let res = rx.await.map_err(|e| {
label_decrement_gauge_with_val_and_labels(
META_GRPC_CLIENT_REQUEST_INFLIGHT,
vec![],
1.0,
);

MetaError::Fatal(
AnyError::new(&e).add_context(|| "when recv resp from MetaGrpcClient worker"),
)
})?;

label_decrement_gauge_with_val_and_labels(META_GRPC_CLIENT_REQUEST_INFLIGHT, vec![], 1.0);
let resp = res?;

let r = Resp::try_from(resp).map_err(|e| {
Expand Down Expand Up @@ -241,6 +274,7 @@ impl MetaGrpcClient {
let worker = Arc::new(Self {
conn_pool: Pool::new(mgr, Duration::from_millis(50)),
endpoints: RwLock::new(endpoints),
current_endpoint: Arc::new(Mutex::new(None)),
username: username.to_string(),
password: password.to_string(),
token: RwLock::new(None),
Expand All @@ -258,6 +292,7 @@ impl MetaGrpcClient {
tracing::info!("MetaGrpcClient::worker spawned");

loop {
let start = Instant::now();
let t = req_rx.recv().await;
let req = match t {
None => {
Expand Down Expand Up @@ -321,7 +356,52 @@ impl MetaGrpcClient {
);

let res = resp_tx.send(resp);
if let Err(err) = res {
let current_endpoint = self.current_endpoint.lock().unwrap();
if let Some(current_endpoint) = &*current_endpoint {
label_histogram_with_val(
META_GRPC_CLIENT_REQUEST_DURATION_MS,
vec![(LABEL_ENDPOINT, current_endpoint.to_string())],
start.elapsed().as_millis() as f64,
);

if let Err(err) = res {
match err {
Err(err) => {
label_counter_with_val_and_labels(
META_GRPC_CLIENT_REQUEST_FAILED,
vec![
(LABEL_ENDPOINT, current_endpoint.to_string()),
(LABEL_ERROR, err.to_string()),
],
1,
);
tracing::warn!(
"MetaGrpcClient failed to send response to the handle:{:?}",
err
);
}
Ok(_) => {
label_counter_with_val_and_labels(
META_GRPC_CLIENT_REQUEST_FAILED,
vec![
(LABEL_ENDPOINT, current_endpoint.to_string()),
(LABEL_ERROR, "MetaGrpcClient recv-end closed".to_string()),
],
1,
);
tracing::warn!(
"MetaGrpcClient failed to send response to the handle. recv-end closed"
);
}
}
} else {
label_counter_with_val_and_labels(
META_GRPC_CLIENT_REQUEST_SUCCESS,
vec![(LABEL_ENDPOINT, current_endpoint.to_string())],
1,
);
}
} else if let Err(err) = res {
tracing::warn!(
err = debug(err),
"MetaGrpcClient failed to send response to the handle. recv-end closed"
Expand All @@ -337,6 +417,10 @@ impl MetaGrpcClient {
MetaServiceClient<InterceptedService<Channel, AuthInterceptor>>,
MetaError,
> {
{
let mut current_endpoint = self.current_endpoint.lock().unwrap();
*current_endpoint = None;
}
let channel = {
let eps = self.endpoints.read().await;
debug_assert!(!eps.is_empty());
Expand All @@ -352,6 +436,10 @@ impl MetaGrpcClient {
let ch = self.conn_pool.get(addr).await;
match ch {
Ok(c) => {
{
let mut current_endpoint = self.current_endpoint.lock().unwrap();
*current_endpoint = Some(addr.clone());
}
break Ok(c);
}
Err(e) => {
Expand All @@ -360,6 +448,11 @@ impl MetaGrpcClient {
addr,
e
);
label_counter_with_val_and_labels(
META_GRPC_MAKE_CLIENT_FAILED,
vec![(LABEL_ENDPOINT, addr.to_string())],
1,
);
if start == end - 1 {
// reach to last addr
break Err(e);
Expand Down
4 changes: 4 additions & 0 deletions common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ pub use metrics_exporter_prometheus::PrometheusHandle;
pub use recorder::init_default_metrics_recorder;
pub use recorder::label_counter;
pub use recorder::label_counter_with_val;
pub use recorder::label_counter_with_val_and_labels;
pub use recorder::label_decrement_gauge_with_val_and_labels;
pub use recorder::label_histogram_with_val;
pub use recorder::label_increment_gauge_with_val_and_labels;
pub use recorder::try_handle;
35 changes: 35 additions & 0 deletions common/metrics/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ use std::sync::Once;
use common_base::infallible::RwLock;
use common_tracing::tracing;
use metrics::counter;
use metrics::decrement_gauge;
use metrics::histogram;
use metrics::increment_gauge;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_exporter_prometheus::PrometheusHandle;
use once_cell::sync::Lazy;
Expand All @@ -28,6 +31,38 @@ static PROMETHEUS_HANDLE: Lazy<Arc<RwLock<Option<PrometheusHandle>>>> =
pub const LABEL_KEY_TENANT: &str = "tenant";
pub const LABEL_KEY_CLUSTER: &str = "cluster_name";

#[inline]
pub fn label_histogram_with_val(name: &'static str, labels: Vec<(&'static str, String)>, val: f64) {
histogram!(name, val, &labels);
}

#[inline]
pub fn label_counter_with_val_and_labels(
name: &'static str,
labels: Vec<(&'static str, String)>,
val: u64,
) {
counter!(name, val, &labels);
}

#[inline]
pub fn label_increment_gauge_with_val_and_labels(
name: &'static str,
labels: Vec<(&'static str, String)>,
val: f64,
) {
increment_gauge!(name, val, &labels);
}

#[inline]
pub fn label_decrement_gauge_with_val_and_labels(
name: &'static str,
labels: Vec<(&'static str, String)>,
val: f64,
) {
decrement_gauge!(name, val, &labels);
}

#[inline]
pub fn label_counter(name: &'static str, tenant_id: &str, cluster_id: &str) {
label_counter_with_val(name, 1, tenant_id, cluster_id)
Expand Down

0 comments on commit 8c8892d

Please sign in to comment.