Skip to content

Commit

Permalink
chore: use builder pattern to builder httpserver
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Mar 30, 2023
1 parent ab834ae commit 9b96ee6
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 147 deletions.
18 changes: 9 additions & 9 deletions src/common/runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::sync::oneshot;
pub use tokio::task::{JoinError, JoinHandle};

use crate::error::*;
use crate::metrics as constant_metrics;
use crate::metrics::*;

/// A runtime to run future tasks
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -152,29 +152,29 @@ impl Builder {

fn on_thread_start(thread_name: String) -> impl Fn() + 'static {
move || {
let labels = [(constant_metrics::THREAD_NAME_LABEL, thread_name.clone())];
increment_gauge!(constant_metrics::METRIC_RUNTIME_THREADS_ALIVE, 1.0, &labels);
let labels = [(THREAD_NAME_LABEL, thread_name.clone())];
increment_gauge!(METRIC_RUNTIME_THREADS_ALIVE, 1.0, &labels);
}
}

fn on_thread_stop(thread_name: String) -> impl Fn() + 'static {
move || {
let labels = [(constant_metrics::THREAD_NAME_LABEL, thread_name.clone())];
decrement_gauge!(constant_metrics::METRIC_RUNTIME_THREADS_ALIVE, 1.0, &labels);
let labels = [(THREAD_NAME_LABEL, thread_name.clone())];
decrement_gauge!(METRIC_RUNTIME_THREADS_ALIVE, 1.0, &labels);
}
}

fn on_thread_park(thread_name: String) -> impl Fn() + 'static {
move || {
let labels = [(constant_metrics::THREAD_NAME_LABEL, thread_name.clone())];
increment_gauge!(constant_metrics::METRIC_RUNTIME_THREADS_IDLE, 1.0, &labels);
let labels = [(THREAD_NAME_LABEL, thread_name.clone())];
increment_gauge!(METRIC_RUNTIME_THREADS_IDLE, 1.0, &labels);
}
}

fn on_thread_unpark(thread_name: String) -> impl Fn() + 'static {
move || {
let labels = [(constant_metrics::THREAD_NAME_LABEL, thread_name.clone())];
decrement_gauge!(constant_metrics::METRIC_RUNTIME_THREADS_IDLE, 1.0, &labels);
let labels = [(THREAD_NAME_LABEL, thread_name.clone())];
decrement_gauge!(METRIC_RUNTIME_THREADS_IDLE, 1.0, &labels);
}
}

Expand Down
19 changes: 15 additions & 4 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,17 @@ impl Default for DatanodeOptions {
/// Datanode service.
pub struct Datanode {
opts: DatanodeOptions,
services: Services,
services: Option<Services>,
instance: InstanceRef,
}

impl Datanode {
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
let instance = Arc::new(Instance::new(&opts).await?);
let services = Services::try_new(instance.clone(), &opts).await?;
let services = match opts.mode {
Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?),
Mode::Standalone => None,
};
Ok(Self {
opts,
services,
Expand All @@ -251,7 +254,11 @@ impl Datanode {

/// Start services of datanode. This method call will block until services are shutdown.
pub async fn start_services(&mut self) -> Result<()> {
self.services.start(&self.opts).await
if let Some(service) = self.services.as_mut() {
service.start(&self.opts).await
} else {
Ok(())
}
}

pub fn get_instance(&self) -> InstanceRef {
Expand All @@ -263,7 +270,11 @@ impl Datanode {
}

async fn shutdown_services(&self) -> Result<()> {
self.services.shutdown().await
if let Some(service) = self.services.as_ref() {
service.shutdown().await
} else {
Ok(())
}
}

pub async fn shutdown(&self) -> Result<()> {
Expand Down
36 changes: 14 additions & 22 deletions src/datanode/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ use std::sync::Arc;

use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::GrpcServer;
use servers::http::{HttpOptions, HttpServer};
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::server::Server;
use servers::Mode;
use snafu::ResultExt;
use tokio::select;

Expand All @@ -37,7 +36,7 @@ pub mod grpc;
/// All rpc services.
pub struct Services {
grpc_server: GrpcServer,
http_server: Option<HttpServer>,
http_server: HttpServer,
}

impl Services {
Expand All @@ -56,13 +55,9 @@ impl Services {
None,
grpc_runtime,
),
http_server: match opts.mode {
Mode::Distributed => Some(HttpServer::new_with_metrics_handler(
MetricsHandler,
HttpOptions::default(),
)),
Mode::Standalone => None,
},
http_server: HttpServerBuilder::new(opts.http_opts.clone())
.with_metrics_handler(MetricsHandler)
.build(),
})
}

Expand All @@ -74,15 +69,11 @@ impl Services {
addr: &opts.http_opts.addr,
})?;
let grpc = self.grpc_server.start(grpc_addr);
if let Some(ref http_server) = self.http_server {
let http = http_server.start(http_addr);
select!(
v = grpc => v.context(StartServerSnafu)?,
v= http => v.context(StartServerSnafu)?,);
} else {
grpc.await.context(StartServerSnafu)?;
}

let http = self.http_server.start(http_addr);
select!(
v = grpc => v.context(StartServerSnafu)?,
v = http => v.context(StartServerSnafu)?,
);
Ok(())
}

Expand All @@ -91,9 +82,10 @@ impl Services {
.shutdown()
.await
.context(ShutdownServerSnafu)?;
if let Some(ref http_server) = self.http_server {
http_server.shutdown().await.context(ShutdownServerSnafu)?;
}
self.http_server
.shutdown()
.await
.context(ShutdownServerSnafu)?;
Ok(())
}
}
24 changes: 12 additions & 12 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use common_telemetry::info;
use servers::auth::UserProviderRef;
use servers::error::Error::InternalIo;
use servers::grpc::GrpcServer;
use servers::http::HttpServer;
use servers::http::HttpServerBuilder;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
Expand Down Expand Up @@ -150,33 +150,33 @@ impl Services {
if let Some(http_options) = &opts.http_options {
let http_addr = parse_addr(&http_options.addr)?;

let mut http_server = HttpServer::new(
ServerSqlQueryHandlerAdaptor::arc(instance.clone()),
ServerGrpcQueryHandlerAdaptor::arc(instance.clone()),
http_options.clone(),
);
let mut http_server_builder = HttpServerBuilder::new(http_options.clone());
http_server_builder
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone()));

if let Some(user_provider) = user_provider.clone() {
http_server.set_user_provider(user_provider);
http_server_builder.with_user_provider(user_provider);
}

if set_opentsdb_handler {
http_server.set_opentsdb_handler(instance.clone());
http_server_builder.with_opentsdb_handler(instance.clone());
}
if matches!(
opts.influxdb_options,
Some(InfluxdbOptions { enable: true })
) {
http_server.set_influxdb_handler(instance.clone());
http_server_builder.with_influxdb_handler(instance.clone());
}

if matches!(
opts.prometheus_options,
Some(PrometheusOptions { enable: true })
) {
http_server.set_prom_handler(instance.clone());
http_server_builder.with_prom_handler(instance.clone());
}
http_server.set_script_handler(instance.clone());

http_server_builder.with_script_handler(instance.clone());
let http_server = http_server_builder.build();
result.push((Box::new(http_server), http_addr));
}

Expand Down
17 changes: 10 additions & 7 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use api::v1::meta::lock_server::LockServer;
use api::v1::meta::router_server::RouterServer;
use api::v1::meta::store_server::StoreServer;
use etcd_client::Client;
use servers::http::HttpServer;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
use snafu::ResultExt;
Expand Down Expand Up @@ -58,10 +58,11 @@ pub struct MetaSrvInstance {
impl MetaSrvInstance {
pub async fn new(opts: MetaSrvOptions) -> Result<MetaSrvInstance> {
let meta_srv = build_meta_srv(&opts).await?;
let http_srv = Arc::new(HttpServer::new_with_metrics_handler(
MetricsHandler,
opts.http_opts.clone(),
));
let http_srv = Arc::new(
HttpServerBuilder::new(opts.http_opts.clone())
.with_metrics_handler(MetricsHandler)
.build(),
);
Ok(MetaSrvInstance {
meta_srv,
http_srv,
Expand All @@ -87,7 +88,7 @@ impl MetaSrvInstance {
.addr
.parse()
.context(error::ParseAddrSnafu {
addr: &self.opts.metrics_addr,
addr: &self.opts.http_opts.addr,
})?;
let http_srv = self.http_srv.start(addr);
select! {
Expand All @@ -110,7 +111,9 @@ impl MetaSrvInstance {
self.http_srv
.shutdown()
.await
.context(error::ShutdownServerSnafu)?;
.context(error::ShutdownServerSnafu {
service: self.http_srv.name(),
})?;
Ok(())
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ pub enum Error {
#[snafu(display("Failed to send shutdown signal"))]
SendShutdownSignal { source: SendError<()> },

#[snafu(display("Failed to shutdown server, source: {}", source))]
#[snafu(display("Failed to shutdown {} server, source: {}", service, source))]
ShutdownServer {
#[snafu(backtrace)]
source: servers::error::Error,
service: String,
},

#[snafu(display("Error stream request next is None"))]
Expand Down Expand Up @@ -332,8 +333,6 @@ impl ErrorExt for Error {
| Error::LockNotConfig { .. }
| Error::ExceededRetryLimit { .. }
| Error::SendShutdownSignal { .. }
| Error::ShutdownServer { .. }
| Error::StartMetricsExport { .. }
| Error::ParseAddr { .. }
| Error::StartGrpc { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
Expand All @@ -359,6 +358,8 @@ impl ErrorExt for Error {
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::InvalidCatalogValue { source, .. } => source.status_code(),
Error::MetaInternal { source } => source.status_code(),
Error::ShutdownServer { source, .. } => source.status_code(),
Error::StartMetricsExport { source } => source.status_code(),
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/meta-srv/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ pub struct MetaSrvOptions {
pub datanode_lease_secs: i64,
pub selector: SelectorType,
pub use_memory_store: bool,
pub metrics_addr: String,
pub http_opts: HttpOptions,
}

Expand All @@ -54,7 +53,6 @@ impl Default for MetaSrvOptions {
datanode_lease_secs: 15,
selector: SelectorType::default(),
use_memory_store: false,
metrics_addr: "127.0.0.1:5000".to_string(),
http_opts: HttpOptions::default(),
}
}
Expand Down
Loading

0 comments on commit 9b96ee6

Please sign in to comment.