Skip to content

Commit

Permalink
chore add metrics for meta service
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Mar 27, 2023
1 parent 65827c6 commit b1beccb
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ struct StartCommand {
selector: Option<String>,
#[clap(long)]
use_memory_store: bool,
#[clap(long)]
metrics_addr: Option<String>,
}

impl StartCommand {
Expand Down Expand Up @@ -128,6 +130,10 @@ impl TryFrom<StartCommand> for MetaSrvOptions {
opts.use_memory_store = true;
}

if let Some(metrics_addr) = cmd.metrics_addr {
opts.metrics_addr = metrics_addr;
}

Ok(opts)
}
}
Expand All @@ -150,6 +156,7 @@ mod tests {
config_file: None,
selector: Some("LoadBased".to_string()),
use_memory_store: false,
metrics_addr: None,
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
Expand Down Expand Up @@ -178,6 +185,7 @@ mod tests {
selector: None,
config_file: Some(file.path().to_str().unwrap().to_string()),
use_memory_store: false,
metrics_addr: None,
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
Expand Down
10 changes: 7 additions & 3 deletions src/datanode/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::net::SocketAddr;
use std::sync::Arc;

use common_runtime::Builder as RuntimeBuilder;
use servers::datanode_metrics_server::DatanodeMetricsServer;
use servers::grpc::GrpcServer;
use servers::metrics_server::MetricsServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::server::Server;
use servers::Mode;
Expand All @@ -36,7 +36,7 @@ pub mod grpc;
/// All rpc services.
pub struct Services {
grpc_server: GrpcServer,
metrics_server: DatanodeMetricsServer,
metrics_server: MetricsServer,
}

impl Services {
Expand All @@ -54,7 +54,7 @@ impl Services {
None,
grpc_runtime,
),
metrics_server: DatanodeMetricsServer::new(),
metrics_server: MetricsServer::new(),
})
}

Expand All @@ -80,6 +80,10 @@ impl Services {

pub async fn shutdown(&self) -> Result<()> {
self.grpc_server
.shutdown()
.await
.context(ShutdownServerSnafu)?;
self.metrics_server
.shutdown()
.await
.context(ShutdownServerSnafu)
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ tokio-stream = { version = "0.1", features = ["net"] }
tonic.workspace = true
tower = "0.4"
url = "2.3"
servers = { path = "../servers" }

[dev-dependencies]
tracing = "0.1"
Expand Down
23 changes: 21 additions & 2 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use api::v1::meta::lock_server::LockServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
use etcd_client::Client;
use servers::metrics_server::MetricsServer;
use servers::server::Server;
use snafu::ResultExt;
use tokio::net::TcpListener;
use tokio::sync::mpsc::{self, Receiver, Sender};
Expand All @@ -44,6 +46,8 @@ use crate::{error, Result};
pub struct MetaSrvInstance {
meta_srv: MetaSrv,

metrics_srv: Arc<MetricsServer>,

opts: MetaSrvOptions,

signal_sender: Option<Sender<()>>,
Expand All @@ -52,9 +56,10 @@ pub struct MetaSrvInstance {
impl MetaSrvInstance {
pub async fn new(opts: MetaSrvOptions) -> Result<MetaSrvInstance> {
let meta_srv = build_meta_srv(&opts).await?;

let metrics_srv = Arc::new(MetricsServer::new());
Ok(MetaSrvInstance {
meta_srv,
metrics_srv,
opts,
signal_sender: None,
})
Expand All @@ -72,6 +77,17 @@ impl MetaSrvInstance {
&mut rx,
)
.await?;
let addr = self
.opts
.metrics_addr
.parse()
.context(error::ParseAddrSnafu {
addr: &self.opts.metrics_addr,
})?;
self.metrics_srv
.start(addr)
.await
.context(error::StartMetricsExportSnafu)?;

Ok(())
}
Expand All @@ -85,7 +101,10 @@ impl MetaSrvInstance {
}

self.meta_srv.shutdown();

self.metrics_srv
.shutdown()
.await
.context(error::ShutdownServerSnafu)?;
Ok(())
}
}
Expand Down
20 changes: 19 additions & 1 deletion src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ pub enum Error {
#[snafu(display("Failed to send shutdown signal"))]
SendShutdownSignal { source: SendError<()> },

#[snafu(display("Failed to shutdown server, source: {}", source))]
ShutdownServer {
#[snafu(backtrace)]
source: servers::error::Error,
},

#[snafu(display("Error stream request next is None"))]
StreamNone { backtrace: Backtrace },

Expand Down Expand Up @@ -55,7 +61,16 @@ pub enum Error {
source: tonic::transport::Error,
backtrace: Backtrace,
},

#[snafu(display("Failed to start gRPC server, source: {}", source))]
StartMetricsExport {
#[snafu(backtrace)]
source: servers::error::Error,
},
#[snafu(display("Failed to parse address {}, source: {}", addr, source))]
ParseAddr {
addr: String,
source: std::net::AddrParseError,
},
#[snafu(display("Empty table name"))]
EmptyTableName { backtrace: Backtrace },

Expand Down Expand Up @@ -317,6 +332,9 @@ impl ErrorExt for Error {
| Error::LockNotConfig { .. }
| Error::ExceededRetryLimit { .. }
| Error::SendShutdownSignal { .. }
| Error::ShutdownServer { .. }
| Error::StartMetricsExport { .. }
| Error::ParseAddr { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct MetaSrvOptions {
pub datanode_lease_secs: i64,
pub selector: SelectorType,
pub use_memory_store: bool,
pub metrics_addr: String,
}

impl Default for MetaSrvOptions {
Expand All @@ -51,6 +52,7 @@ impl Default for MetaSrvOptions {
datanode_lease_secs: 15,
selector: SelectorType::default(),
use_memory_store: false,
metrics_addr: "127.0.0.1:5000".to_string(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ use common_catalog::consts::DEFAULT_CATALOG_NAME;
use serde::{Deserialize, Serialize};

pub mod auth;
pub mod datanode_metrics_server;
pub mod error;
pub mod grpc;
pub mod http;
pub mod influxdb;
pub mod interceptor;
pub mod line_writer;
pub mod metrics_server;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ pub const METRIC_SERVER: &str = "METRIC_SERVER";
/// a server that serves metrics
/// only start when datanode starts in distributed mode
#[derive(Default)]
pub struct DatanodeMetricsServer {
pub struct MetricsServer {
shutdown_tx: Mutex<Option<Sender<()>>>,
}

#[async_trait]
impl ServerTrait for DatanodeMetricsServer {
impl ServerTrait for MetricsServer {
async fn start(&self, listening: SocketAddr) -> Result<SocketAddr> {
self.start(listening).await
}
Expand All @@ -59,7 +59,7 @@ impl ServerTrait for DatanodeMetricsServer {
}
}

impl DatanodeMetricsServer {
impl MetricsServer {
pub fn new() -> Self {
Self {
shutdown_tx: Mutex::new(None),
Expand Down

0 comments on commit b1beccb

Please sign in to comment.