Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: disable mysql server on datande when running standalone mode #593

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ frontend = { path = "../frontend" }
futures = "0.3"
meta-srv = { path = "../meta-srv" }
serde = "1.0"
servers = {path = "../servers"}
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.18", features = ["full"] }
toml = "0.5"
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::{Datanode, DatanodeOptions};
use frontend::frontend::Mode;
use meta_client::MetaClientOpts;
use servers::Mode;
use snafu::ResultExt;

use crate::error::{Error, MissingConfigSnafu, Result, StartDatanodeSnafu};
Expand Down Expand Up @@ -124,7 +124,7 @@ mod tests {
use std::assert_matches::assert_matches;

use datanode::datanode::ObjectStoreConfig;
use frontend::frontend::Mode;
use servers::Mode;

use super::*;

Expand Down
3 changes: 2 additions & 1 deletion src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
// limitations under the License.

use clap::Parser;
use frontend::frontend::{Frontend, FrontendOptions, Mode};
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::instance::Instance;
use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use meta_client::MetaClientOpts;
use servers::Mode;
use snafu::ResultExt;

use crate::error::{self, Result};
Expand Down
3 changes: 2 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use clap::Parser;
use common_telemetry::info;
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig};
use datanode::instance::InstanceRef;
use frontend::frontend::{Frontend, FrontendOptions, Mode};
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::instance::Instance as FeInstance;
Expand All @@ -25,6 +25,7 @@ use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::prometheus::PrometheusOptions;
use serde::{Deserialize, Serialize};
use servers::Mode;
use snafu::ResultExt;
use tokio::try_join;

Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
use std::sync::Arc;

use common_telemetry::info;
use frontend::frontend::Mode;
use meta_client::MetaClientOpts;
use serde::{Deserialize, Serialize};
use servers::Mode;

use crate::error::Result;
use crate::instance::{Instance, InstanceRef};
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use catalog::remote::MetaKvBackend;
use catalog::CatalogManagerRef;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::logging::info;
use frontend::frontend::Mode;
use log_store::fs::config::LogConfig;
use log_store::fs::log::LocalFileLogStore;
use meta_client::client::{MetaClient, MetaClientBuilder};
Expand All @@ -31,6 +30,7 @@ use object_store::layers::LoggingLayer;
use object_store::services::fs::Builder;
use object_store::{util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::Mode;
use snafu::prelude::*;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
Expand Down
57 changes: 37 additions & 20 deletions src/datanode/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ use std::net::SocketAddr;
use std::sync::Arc;

use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::tracing::log::info;
use servers::grpc::GrpcServer;
use servers::mysql::server::MysqlServer;
use servers::server::Server;
use servers::Mode;
use snafu::ResultExt;
use tokio::try_join;

use crate::datanode::DatanodeOptions;
use crate::error::{ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu};
Expand All @@ -32,7 +33,7 @@ pub mod grpc;
/// All rpc services.
pub struct Services {
grpc_server: GrpcServer,
mysql_server: Box<dyn Server>,
mysql_server: Option<Box<dyn Server>>,
}

impl Services {
Expand All @@ -45,33 +46,49 @@ impl Services {
.context(RuntimeResourceSnafu)?,
);

let mysql_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.mysql_runtime_size as usize)
.thread_name("mysql-io-handlers")
.build()
.context(RuntimeResourceSnafu)?,
);
let mysql_server = match opts.mode {
Mode::Standalone => {
info!("Disable MySQL server on datanode when running in standalone mode");
None
}
Mode::Distributed => {
let mysql_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.mysql_runtime_size as usize)
.thread_name("mysql-io-handlers")
.build()
.context(RuntimeResourceSnafu)?,
);
Some(MysqlServer::create_server(
instance.clone(),
mysql_io_runtime,
))
}
};

Ok(Self {
grpc_server: GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime),
mysql_server: MysqlServer::create_server(instance, mysql_io_runtime),
grpc_server: GrpcServer::new(instance.clone(), instance, grpc_runtime),
mysql_server,
})
}

pub async fn start(&mut self, opts: &DatanodeOptions) -> Result<()> {
let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &opts.rpc_addr,
})?;
let mysql_addr = &opts.mysql_addr;
let mysql_addr: SocketAddr = mysql_addr
.parse()
.context(ParseAddrSnafu { addr: mysql_addr })?;
try_join!(
self.grpc_server.start(grpc_addr),
self.mysql_server.start(mysql_addr),
)
.context(StartServerSnafu)?;

let mut res = vec![self.grpc_server.start(grpc_addr)];
if let Some(mysql_server) = &self.mysql_server {
let mysql_addr = &opts.mysql_addr;
let mysql_addr: SocketAddr = mysql_addr
.parse()
.context(ParseAddrSnafu { addr: mysql_addr })?;
res.push(mysql_server.start(mysql_addr));
};

futures::future::try_join_all(res)
.await
.context(StartServerSnafu)?;
Ok(())
}
}
4 changes: 2 additions & 2 deletions src/datanode/src/tests/grpc_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ use client::{Client, Database, ObjectResult};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_runtime::Builder as RuntimeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::frontend::Mode::Standalone;
use frontend::grpc::GrpcOptions;
use servers::grpc::GrpcServer;
use servers::server::Server;
use servers::Mode;

use crate::instance::Instance;
use crate::tests::test_util::{self, TestGuard};
Expand Down Expand Up @@ -62,7 +62,7 @@ async fn setup_grpc_server(

let fe_grpc_addr = format!("127.0.0.1:{}", frontend_port);
let fe_opts = FrontendOptions {
mode: Standalone,
mode: Mode::Standalone,
datanode_rpc_addr: datanode_grpc_addr.clone(),
grpc_options: Some(GrpcOptions {
addr: fe_grpc_addr.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/tests/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use frontend::frontend::Mode;
use mito::config::EngineConfig;
use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
use servers::Mode;
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
use table::requests::CreateTableRequest;
Expand Down
8 changes: 1 addition & 7 deletions src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use meta_client::MetaClientOpts;
use serde::{Deserialize, Serialize};
use servers::Mode;
use snafu::prelude::*;

use crate::error::{self, Result};
Expand Down Expand Up @@ -97,10 +98,3 @@ where
Services::start(&self.opts, instance).await
}
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Mode {
Standalone,
Distributed,
}
4 changes: 2 additions & 2 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ use common_telemetry::{debug, error, info};
use distributed::DistInstance;
use meta_client::client::MetaClientBuilder;
use meta_client::MetaClientOpts;
use servers::error as server_error;
use servers::query_handler::{
GrpcAdminHandler, GrpcQueryHandler, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler,
PrometheusProtocolHandler, ScriptHandler, ScriptHandlerRef, SqlQueryHandler,
};
use servers::{error as server_error, Mode};
use snafu::prelude::*;
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
Expand All @@ -64,7 +64,7 @@ use crate::error::{
SchemaNotFoundSnafu, SelectSnafu,
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::{FrontendOptions, Mode};
use crate::frontend::FrontendOptions;
use crate::sql::insert_to_request;
use crate::table::route::TableRoutes;

Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/instance/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ use async_trait::async_trait;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
use common_insert::column_to_vector;
use servers::error as server_error;
use servers::influxdb::InfluxdbRequest;
use servers::query_handler::InfluxdbLineProtocolHandler;
use servers::{error as server_error, Mode};
use snafu::{OptionExt, ResultExt};
use table::requests::InsertRequest;

use crate::error;
use crate::error::{DeserializeInsertBatchSnafu, InsertBatchToRequestSnafu, Result};
use crate::frontend::Mode;
use crate::instance::Instance;

#[async_trait]
Expand Down
3 changes: 1 addition & 2 deletions src/frontend/src/instance/opentsdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@

use async_trait::async_trait;
use common_error::prelude::BoxedError;
use servers::error as server_error;
use servers::opentsdb::codec::DataPoint;
use servers::query_handler::OpentsdbProtocolHandler;
use servers::{error as server_error, Mode};
use snafu::prelude::*;

use crate::error::Result;
use crate::frontend::Mode;
use crate::instance::Instance;

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/src/instance/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use prost::Message;
use servers::error::{self, Result as ServerResult};
use servers::prometheus::{self, Metrics};
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use servers::Mode;
use snafu::{OptionExt, ResultExt};

use crate::frontend::Mode;
use crate::instance::{parse_stmt, Instance};

const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
Expand Down
9 changes: 9 additions & 0 deletions src/servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#![feature(assert_matches)]

use serde::{Deserialize, Serialize};

pub mod context;
pub mod error;
pub mod grpc;
Expand All @@ -27,3 +29,10 @@ pub mod prometheus;
pub mod query_handler;
pub mod server;
mod shutdown;

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Mode {
Standalone,
Distributed,
}