From 65bab0087eafe5996fd6e37a486e4b4c412a4c81 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Sun, 20 Nov 2022 16:10:39 +0800 Subject: [PATCH 1/2] feat: disable mysql server on datande when running standalone mode --- src/datanode/src/server.rs | 54 ++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 9abd08ae3cce..d56c85af921d 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -17,11 +17,12 @@ use std::net::SocketAddr; use std::sync::Arc; use common_runtime::Builder as RuntimeBuilder; +use common_telemetry::tracing::log::info; +use frontend::frontend::Mode; use servers::grpc::GrpcServer; use servers::mysql::server::MysqlServer; use servers::server::Server; use snafu::ResultExt; -use tokio::try_join; use crate::datanode::DatanodeOptions; use crate::error::{ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu}; @@ -32,7 +33,7 @@ pub mod grpc; /// All rpc services. pub struct Services { grpc_server: GrpcServer, - mysql_server: Box, + mysql_server: Option>, } impl Services { @@ -45,17 +46,26 @@ impl Services { .context(RuntimeResourceSnafu)?, ); - let mysql_io_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(opts.mysql_runtime_size as usize) - .thread_name("mysql-io-handlers") - .build() - .context(RuntimeResourceSnafu)?, - ); + let mysql_server = if let Mode::Distributed = opts.mode { + let mysql_io_runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(opts.mysql_runtime_size as usize) + .thread_name("mysql-io-handlers") + .build() + .context(RuntimeResourceSnafu)?, + ); + Some(MysqlServer::create_server( + instance.clone(), + mysql_io_runtime, + )) + } else { + info!("Disable MySQL server on datanode when running in standalone mode"); + None + }; Ok(Self { - grpc_server: GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime), - mysql_server: MysqlServer::create_server(instance, mysql_io_runtime), + grpc_server: GrpcServer::new(instance.clone(), instance, grpc_runtime), + mysql_server, }) } @@ -63,15 +73,19 @@ impl Services { let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu { addr: &opts.rpc_addr, })?; - let mysql_addr = &opts.mysql_addr; - let mysql_addr: SocketAddr = mysql_addr - .parse() - .context(ParseAddrSnafu { addr: mysql_addr })?; - try_join!( - self.grpc_server.start(grpc_addr), - self.mysql_server.start(mysql_addr), - ) - .context(StartServerSnafu)?; + + let mut res = vec![self.grpc_server.start(grpc_addr)]; + if let Some(mysql_server) = &self.mysql_server { + let mysql_addr = &opts.mysql_addr; + let mysql_addr: SocketAddr = mysql_addr + .parse() + .context(ParseAddrSnafu { addr: mysql_addr })?; + res.push(mysql_server.start(mysql_addr)); + }; + + futures::future::try_join_all(res) + .await + .context(StartServerSnafu)?; Ok(()) } } From f7f6495f60613e7ea8e88bd20a1a61360f7ebf8f Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 21 Nov 2022 11:14:41 +0800 Subject: [PATCH 2/2] refactor: move Mode to servers crate --- Cargo.lock | 1 + src/cmd/Cargo.toml | 1 + src/cmd/src/datanode.rs | 4 +-- src/cmd/src/frontend.rs | 3 ++- src/cmd/src/standalone.rs | 3 ++- src/datanode/src/datanode.rs | 2 +- src/datanode/src/instance.rs | 2 +- src/datanode/src/server.rs | 35 ++++++++++++++----------- src/datanode/src/tests/grpc_test.rs | 4 +-- src/datanode/src/tests/test_util.rs | 2 +- src/frontend/src/frontend.rs | 8 +----- src/frontend/src/instance.rs | 4 +-- src/frontend/src/instance/influxdb.rs | 3 +-- src/frontend/src/instance/opentsdb.rs | 3 +-- src/frontend/src/instance/prometheus.rs | 2 +- src/servers/src/lib.rs | 9 +++++++ 16 files changed, 47 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 84bcfa1de3b7..a26a0a365f34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1098,6 +1098,7 @@ dependencies = [ "meta-client", "meta-srv", "serde", + "servers", "snafu", "tempdir", "tokio", diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 49c05bf4afaf..c446180738c2 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -21,6 +21,7 @@ frontend = { path = "../frontend" } futures = "0.3" meta-srv = { path = "../meta-srv" } serde = "1.0" +servers = {path = "../servers"} snafu = { version = "0.7", features = ["backtraces"] } tokio = { version = "1.18", features = ["full"] } toml = "0.5" diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 10136101c933..44d4d4c4f595 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -15,8 +15,8 @@ use clap::Parser; use common_telemetry::logging; use datanode::datanode::{Datanode, DatanodeOptions}; -use frontend::frontend::Mode; use meta_client::MetaClientOpts; +use servers::Mode; use snafu::ResultExt; use crate::error::{Error, MissingConfigSnafu, Result, StartDatanodeSnafu}; @@ -124,7 +124,7 @@ mod tests { use std::assert_matches::assert_matches; use datanode::datanode::ObjectStoreConfig; - use frontend::frontend::Mode; + use servers::Mode; use super::*; diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 94334e830d10..b6b7b8bfad59 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -13,7 +13,7 @@ // limitations under the License. use clap::Parser; -use frontend::frontend::{Frontend, FrontendOptions, Mode}; +use frontend::frontend::{Frontend, FrontendOptions}; use frontend::grpc::GrpcOptions; use frontend::influxdb::InfluxdbOptions; use frontend::instance::Instance; @@ -21,6 +21,7 @@ use frontend::mysql::MysqlOptions; use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; use meta_client::MetaClientOpts; +use servers::Mode; use snafu::ResultExt; use crate::error::{self, Result}; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 1bb5fc5ed0ed..a61038979c4a 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -16,7 +16,7 @@ use clap::Parser; use common_telemetry::info; use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig}; use datanode::instance::InstanceRef; -use frontend::frontend::{Frontend, FrontendOptions, Mode}; +use frontend::frontend::{Frontend, FrontendOptions}; use frontend::grpc::GrpcOptions; use frontend::influxdb::InfluxdbOptions; use frontend::instance::Instance as FeInstance; @@ -25,6 +25,7 @@ use frontend::opentsdb::OpentsdbOptions; use frontend::postgres::PostgresOptions; use frontend::prometheus::PrometheusOptions; use serde::{Deserialize, Serialize}; +use servers::Mode; use snafu::ResultExt; use tokio::try_join; diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index bcab4856563f..89cb34eda28b 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -15,9 +15,9 @@ use std::sync::Arc; use common_telemetry::info; -use frontend::frontend::Mode; use meta_client::MetaClientOpts; use serde::{Deserialize, Serialize}; +use servers::Mode; use crate::error::Result; use crate::instance::{Instance, InstanceRef}; diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 4fd910479e80..b5c0b028e245 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -20,7 +20,6 @@ use catalog::remote::MetaKvBackend; use catalog::CatalogManagerRef; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_telemetry::logging::info; -use frontend::frontend::Mode; use log_store::fs::config::LogConfig; use log_store::fs::log::LocalFileLogStore; use meta_client::client::{MetaClient, MetaClientBuilder}; @@ -31,6 +30,7 @@ use object_store::layers::LoggingLayer; use object_store::services::fs::Builder; use object_store::{util, ObjectStore}; use query::query_engine::{QueryEngineFactory, QueryEngineRef}; +use servers::Mode; use snafu::prelude::*; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index d56c85af921d..f2cef8ca744a 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -18,10 +18,10 @@ use std::sync::Arc; use common_runtime::Builder as RuntimeBuilder; use common_telemetry::tracing::log::info; -use frontend::frontend::Mode; use servers::grpc::GrpcServer; use servers::mysql::server::MysqlServer; use servers::server::Server; +use servers::Mode; use snafu::ResultExt; use crate::datanode::DatanodeOptions; @@ -46,21 +46,24 @@ impl Services { .context(RuntimeResourceSnafu)?, ); - let mysql_server = if let Mode::Distributed = opts.mode { - let mysql_io_runtime = Arc::new( - RuntimeBuilder::default() - .worker_threads(opts.mysql_runtime_size as usize) - .thread_name("mysql-io-handlers") - .build() - .context(RuntimeResourceSnafu)?, - ); - Some(MysqlServer::create_server( - instance.clone(), - mysql_io_runtime, - )) - } else { - info!("Disable MySQL server on datanode when running in standalone mode"); - None + let mysql_server = match opts.mode { + Mode::Standalone => { + info!("Disable MySQL server on datanode when running in standalone mode"); + None + } + Mode::Distributed => { + let mysql_io_runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(opts.mysql_runtime_size as usize) + .thread_name("mysql-io-handlers") + .build() + .context(RuntimeResourceSnafu)?, + ); + Some(MysqlServer::create_server( + instance.clone(), + mysql_io_runtime, + )) + } }; Ok(Self { diff --git a/src/datanode/src/tests/grpc_test.rs b/src/datanode/src/tests/grpc_test.rs index 2ae06ff8dd60..3116543c1bbe 100644 --- a/src/datanode/src/tests/grpc_test.rs +++ b/src/datanode/src/tests/grpc_test.rs @@ -30,10 +30,10 @@ use client::{Client, Database, ObjectResult}; use common_catalog::consts::MIN_USER_TABLE_ID; use common_runtime::Builder as RuntimeBuilder; use frontend::frontend::FrontendOptions; -use frontend::frontend::Mode::Standalone; use frontend::grpc::GrpcOptions; use servers::grpc::GrpcServer; use servers::server::Server; +use servers::Mode; use crate::instance::Instance; use crate::tests::test_util::{self, TestGuard}; @@ -62,7 +62,7 @@ async fn setup_grpc_server( let fe_grpc_addr = format!("127.0.0.1:{}", frontend_port); let fe_opts = FrontendOptions { - mode: Standalone, + mode: Mode::Standalone, datanode_rpc_addr: datanode_grpc_addr.clone(), grpc_options: Some(GrpcOptions { addr: fe_grpc_addr.clone(), diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index eeb7eaf30a38..4c9a390d5890 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -19,9 +19,9 @@ use catalog::CatalogManagerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; -use frontend::frontend::Mode; use mito::config::EngineConfig; use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine}; +use servers::Mode; use snafu::ResultExt; use table::engine::{EngineContext, TableEngineRef}; use table::requests::CreateTableRequest; diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 78fc23968c5b..8a5a53844922 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use meta_client::MetaClientOpts; use serde::{Deserialize, Serialize}; +use servers::Mode; use snafu::prelude::*; use crate::error::{self, Result}; @@ -97,10 +98,3 @@ where Services::start(&self.opts, instance).await } } - -#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] -#[serde(rename_all = "lowercase")] -pub enum Mode { - Standalone, - Distributed, -} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 552debc0ef70..57dd9a634f34 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -43,11 +43,11 @@ use common_telemetry::{debug, error, info}; use distributed::DistInstance; use meta_client::client::MetaClientBuilder; use meta_client::MetaClientOpts; -use servers::error as server_error; use servers::query_handler::{ GrpcAdminHandler, GrpcQueryHandler, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler, ScriptHandlerRef, SqlQueryHandler, }; +use servers::{error as server_error, Mode}; use snafu::prelude::*; use sql::dialect::GenericDialect; use sql::parser::ParserContext; @@ -64,7 +64,7 @@ use crate::error::{ SchemaNotFoundSnafu, SelectSnafu, }; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; -use crate::frontend::{FrontendOptions, Mode}; +use crate::frontend::FrontendOptions; use crate::sql::insert_to_request; use crate::table::route::TableRoutes; diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index e81639fa3a59..846d4f45272f 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -21,15 +21,14 @@ use async_trait::async_trait; use common_catalog::consts::DEFAULT_CATALOG_NAME; use common_error::prelude::BoxedError; use common_insert::column_to_vector; -use servers::error as server_error; use servers::influxdb::InfluxdbRequest; use servers::query_handler::InfluxdbLineProtocolHandler; +use servers::{error as server_error, Mode}; use snafu::{OptionExt, ResultExt}; use table::requests::InsertRequest; use crate::error; use crate::error::{DeserializeInsertBatchSnafu, InsertBatchToRequestSnafu, Result}; -use crate::frontend::Mode; use crate::instance::Instance; #[async_trait] diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 7ad21fd59571..1a7db0901469 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -14,13 +14,12 @@ use async_trait::async_trait; use common_error::prelude::BoxedError; -use servers::error as server_error; use servers::opentsdb::codec::DataPoint; use servers::query_handler::OpentsdbProtocolHandler; +use servers::{error as server_error, Mode}; use snafu::prelude::*; use crate::error::Result; -use crate::frontend::Mode; use crate::instance::Instance; #[async_trait] diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index e0b81c008c24..246542893d84 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -24,9 +24,9 @@ use prost::Message; use servers::error::{self, Result as ServerResult}; use servers::prometheus::{self, Metrics}; use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; +use servers::Mode; use snafu::{OptionExt, ResultExt}; -use crate::frontend::Mode; use crate::instance::{parse_stmt, Instance}; const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32; diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 88579e2dafbc..34b4f367a889 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -14,6 +14,8 @@ #![feature(assert_matches)] +use serde::{Deserialize, Serialize}; + pub mod context; pub mod error; pub mod grpc; @@ -27,3 +29,10 @@ pub mod prometheus; pub mod query_handler; pub mod server; mod shutdown; + +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum Mode { + Standalone, + Distributed, +}