Skip to content

Commit

Permalink
chore: add http metrics server in datanode node when greptime start i…
Browse files Browse the repository at this point in the history
…n distributed mode (GreptimeTeam#1256)

* chore: add http metrics server in datanode node when greptime start in distributed mode

* chore: add some docs and license

* chore: change metrics_addr to resolve address already in use error

* chore add metrics for meta service

* chore: replace metrics exporter http server from hyper to axum

* chore: format

* fix: datanode mode branching error

* fix: sqlness test address already in use and start metrics in defualt config

* chore: change metrics location

* chore: use builder pattern to builder httpserver

* chore: remove useless debug_assert macro in httpserver builder

* chore: resolve conflicting build error

* chore: format code
  • Loading branch information
paomian committed Oct 19, 2023
1 parent 4b9a979 commit 86c564d
Show file tree
Hide file tree
Showing 34 changed files with 342 additions and 151 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::{
Expand Down Expand Up @@ -86,6 +88,10 @@ struct StartCommand {
wal_dir: Option<String>,
#[clap(long)]
procedure_dir: Option<String>,
#[clap(long)]
http_addr: Option<String>,
#[clap(long)]
http_timeout: Option<u64>,
}

impl StartCommand {
Expand Down Expand Up @@ -155,6 +161,12 @@ impl TryFrom<StartCommand> for DatanodeOptions {
if let Some(procedure_dir) = cmd.procedure_dir {
opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir));
}
if let Some(http_addr) = cmd.http_addr {
opts.http_opts.addr = http_addr
}
if let Some(http_timeout) = cmd.http_timeout {
opts.http_opts.timeout = Duration::from_secs(http_timeout)
}

Ok(opts)
}
Expand Down
17 changes: 17 additions & 0 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use clap::Parser;
use common_telemetry::{info, logging, warn};
use meta_srv::bootstrap::MetaSrvInstance;
Expand Down Expand Up @@ -80,6 +82,10 @@ struct StartCommand {
selector: Option<String>,
#[clap(long)]
use_memory_store: bool,
#[clap(long)]
http_addr: Option<String>,
#[clap(long)]
http_timeout: Option<u64>,
}

impl StartCommand {
Expand Down Expand Up @@ -128,6 +134,13 @@ impl TryFrom<StartCommand> for MetaSrvOptions {
opts.use_memory_store = true;
}

if let Some(http_addr) = cmd.http_addr {
opts.http_opts.addr = http_addr;
}
if let Some(http_timeout) = cmd.http_timeout {
opts.http_opts.timeout = Duration::from_secs(http_timeout);
}

Ok(opts)
}
}
Expand All @@ -150,6 +163,8 @@ mod tests {
config_file: None,
selector: Some("LoadBased".to_string()),
use_memory_store: false,
http_addr: None,
http_timeout: None,
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
Expand Down Expand Up @@ -178,6 +193,8 @@ mod tests {
selector: None,
config_file: Some(file.path().to_str().unwrap().to_string()),
use_memory_store: false,
http_addr: None,
http_timeout: None,
};
let options: MetaSrvOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
Expand Down
2 changes: 1 addition & 1 deletion src/common/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

pub mod error;
mod global;
pub mod metric;
mod metrics;
mod repeated_task;
pub mod runtime;

Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion src/common/runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::sync::oneshot;
pub use tokio::task::{JoinError, JoinHandle};

use crate::error::*;
use crate::metric::*;
use crate::metrics::*;

/// A runtime to run future tasks
#[derive(Clone, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion src/common/telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ deadlock_detection = ["parking_lot"]
backtrace = "0.3"
common-error = { path = "../error" }
console-subscriber = { version = "0.1", optional = true }
metrics = "0.20"
metrics = "0.20.1"
metrics-exporter-prometheus = { version = "0.11", default-features = false }
once_cell = "1.10"
opentelemetry = { version = "0.17", default-features = false, features = [
Expand Down
22 changes: 18 additions & 4 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use meta_client::MetaClientOptions;
use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::Mode;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::scheduler::SchedulerConfig;
Expand Down Expand Up @@ -224,6 +225,7 @@ pub struct DatanodeOptions {
pub rpc_runtime_size: usize,
pub mysql_addr: String,
pub mysql_runtime_size: usize,
pub http_opts: HttpOptions,
pub meta_client_options: Option<MetaClientOptions>,
pub wal: WalConfig,
pub storage: StorageConfig,
Expand All @@ -241,6 +243,7 @@ impl Default for DatanodeOptions {
rpc_runtime_size: 8,
mysql_addr: "127.0.0.1:4406".to_string(),
mysql_runtime_size: 2,
http_opts: HttpOptions::default(),
meta_client_options: None,
wal: WalConfig::default(),
storage: StorageConfig::default(),
Expand All @@ -252,14 +255,17 @@ impl Default for DatanodeOptions {
/// Datanode service.
pub struct Datanode {
opts: DatanodeOptions,
services: Services,
services: Option<Services>,
instance: InstanceRef,
}

impl Datanode {
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
let instance = Arc::new(Instance::new(&opts).await?);
let services = Services::try_new(instance.clone(), &opts).await?;
let services = match opts.mode {
Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?),
Mode::Standalone => None,
};
Ok(Self {
opts,
services,
Expand All @@ -280,7 +286,11 @@ impl Datanode {

/// Start services of datanode. This method call will block until services are shutdown.
pub async fn start_services(&mut self) -> Result<()> {
self.services.start(&self.opts).await
if let Some(service) = self.services.as_mut() {
service.start(&self.opts).await
} else {
Ok(())
}
}

pub fn get_instance(&self) -> InstanceRef {
Expand All @@ -292,7 +302,11 @@ impl Datanode {
}

async fn shutdown_services(&self) -> Result<()> {
self.services.shutdown().await
if let Some(service) = self.services.as_ref() {
service.shutdown().await
} else {
Ok(())
}
}

pub async fn shutdown(&self) -> Result<()> {
Expand Down
6 changes: 3 additions & 3 deletions src/datanode/src/instance/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_telemetry::timer;
use servers::query_handler::ScriptHandler;

use crate::instance::Instance;
use crate::metric;
use crate::metrics;

#[async_trait]
impl ScriptHandler for Instance {
Expand All @@ -30,7 +30,7 @@ impl ScriptHandler for Instance {
name: &str,
script: &str,
) -> servers::error::Result<()> {
let _timer = timer!(metric::METRIC_HANDLE_SCRIPTS_ELAPSED);
let _timer = timer!(metrics::METRIC_HANDLE_SCRIPTS_ELAPSED);
self.script_executor
.insert_script(schema, name, script)
.await
Expand All @@ -42,7 +42,7 @@ impl ScriptHandler for Instance {
name: &str,
params: HashMap<String, String>,
) -> servers::error::Result<Output> {
let _timer = timer!(metric::METRIC_RUN_SCRIPT_ELAPSED);
let _timer = timer!(metrics::METRIC_RUN_SCRIPT_ELAPSED);
self.script_executor
.execute_script(schema, name, params)
.await
Expand Down
6 changes: 3 additions & 3 deletions src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::error::{
TableIdProviderNotFoundSnafu,
};
use crate::instance::Instance;
use crate::metric;
use crate::metrics;
use crate::sql::{SqlHandler, SqlRequest};

impl Instance {
Expand Down Expand Up @@ -190,7 +190,7 @@ impl Instance {
promql: &PromQuery,
query_ctx: QueryContextRef,
) -> Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);
let _timer = timer!(metrics::METRIC_HANDLE_PROMQL_ELAPSED);

let stmt = QueryLanguageParser::parse_promql(promql).context(ExecuteSqlSnafu)?;

Expand Down Expand Up @@ -294,7 +294,7 @@ impl StatementHandler for Instance {
#[async_trait]
impl PromHandler for Instance {
async fn do_query(&self, query: &PromQuery) -> server_error::Result<Output> {
let _timer = timer!(metric::METRIC_HANDLE_PROMQL_ELAPSED);
let _timer = timer!(metrics::METRIC_HANDLE_PROMQL_ELAPSED);

self.execute_promql(query, QueryContext::arc())
.await
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod datanode;
pub mod error;
mod heartbeat;
pub mod instance;
pub mod metric;
pub mod metrics;
mod mock;
mod script;
pub mod server;
Expand Down
File renamed without changes.
27 changes: 22 additions & 5 deletions src/datanode/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ use std::sync::Arc;

use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::GrpcServer;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::server::Server;
use snafu::ResultExt;
use tokio::select;

use crate::datanode::DatanodeOptions;
use crate::error::{
Expand All @@ -33,6 +36,7 @@ pub mod grpc;
/// All rpc services.
pub struct Services {
grpc_server: GrpcServer,
http_server: HttpServer,
}

impl Services {
Expand All @@ -51,24 +55,37 @@ impl Services {
None,
grpc_runtime,
),
http_server: HttpServerBuilder::new(opts.http_opts.clone())
.with_metrics_handler(MetricsHandler)
.build(),
})
}

pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> {
let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &opts.rpc_addr,
})?;
self.grpc_server
.start(grpc_addr)
.await
.context(StartServerSnafu)?;
let http_addr = opts.http_opts.addr.parse().context(ParseAddrSnafu {
addr: &opts.http_opts.addr,
})?;
let grpc = self.grpc_server.start(grpc_addr);
let http = self.http_server.start(http_addr);
select!(
v = grpc => v.context(StartServerSnafu)?,
v = http => v.context(StartServerSnafu)?,
);
Ok(())
}

pub async fn shutdown(&self) -> Result<()> {
self.grpc_server
.shutdown()
.await
.context(ShutdownServerSnafu)
.context(ShutdownServerSnafu)?;
self.http_server
.shutdown()
.await
.context(ShutdownServerSnafu)?;
Ok(())
}
}
4 changes: 2 additions & 2 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use common_telemetry::timer;
use datafusion::sql::sqlparser::ast::ObjectName;
use datanode::instance::sql::table_idents_to_full_name;
use datanode::instance::InstanceRef as DnInstanceRef;
use datanode::metric;
use datanode::metrics;
use datatypes::schema::Schema;
use distributed::DistInstance;
use meta_client::client::{MetaClient, MetaClientBuilder};
Expand Down Expand Up @@ -532,7 +532,7 @@ impl SqlQueryHandler for Instance {
type Error = Error;

async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
let _timer = timer!(metric::METRIC_HANDLE_SQL_ELAPSED);
let _timer = timer!(metrics::METRIC_HANDLE_SQL_ELAPSED);

let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
Expand Down
24 changes: 12 additions & 12 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use common_telemetry::info;
use servers::auth::UserProviderRef;
use servers::error::Error::InternalIo;
use servers::grpc::GrpcServer;
use servers::http::HttpServer;
use servers::http::HttpServerBuilder;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
Expand Down Expand Up @@ -150,33 +150,33 @@ impl Services {
if let Some(http_options) = &opts.http_options {
let http_addr = parse_addr(&http_options.addr)?;

let mut http_server = HttpServer::new(
ServerSqlQueryHandlerAdaptor::arc(instance.clone()),
ServerGrpcQueryHandlerAdaptor::arc(instance.clone()),
http_options.clone(),
);
let mut http_server_builder = HttpServerBuilder::new(http_options.clone());
http_server_builder
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone()));

if let Some(user_provider) = user_provider.clone() {
http_server.set_user_provider(user_provider);
http_server_builder.with_user_provider(user_provider);
}

if set_opentsdb_handler {
http_server.set_opentsdb_handler(instance.clone());
http_server_builder.with_opentsdb_handler(instance.clone());
}
if matches!(
opts.influxdb_options,
Some(InfluxdbOptions { enable: true })
) {
http_server.set_influxdb_handler(instance.clone());
http_server_builder.with_influxdb_handler(instance.clone());
}

if matches!(
opts.prometheus_options,
Some(PrometheusOptions { enable: true })
) {
http_server.set_prom_handler(instance.clone());
http_server_builder.with_prom_handler(instance.clone());
}
http_server.set_script_handler(instance.clone());

http_server_builder.with_script_handler(instance.clone());
let http_server = http_server_builder.build();
result.push((Box::new(http_server), http_addr));
}

Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tokio-stream = { version = "0.1", features = ["net"] }
tonic.workspace = true
tower = "0.4"
url = "2.3"
servers = { path = "../servers" }

[dev-dependencies]
tracing = "0.1"
Expand Down
Loading

0 comments on commit 86c564d

Please sign in to comment.