diff --git a/.gitignore b/.gitignore index 7f220c5e8a..83ba82536a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ target .DS_Store .idea/ -.vscode +.vscode \ No newline at end of file diff --git a/server/src/config.rs b/server/src/config.rs index ba68152898..c5ff23ed94 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -106,6 +106,10 @@ pub struct ServerConfig { /// Config for forwarding pub forward: forward::Config, + + /// Whether to create table automatically when data is first written, only + /// used in gRPC + pub auto_create_table: bool, } impl Default for ServerConfig { @@ -120,6 +124,7 @@ impl Default for ServerConfig { grpc_server_cq_count: 20, resp_compress_min_length: ReadableSize::mb(4), forward: forward::Config::default(), + auto_create_table: true, } } } diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 83dc4cfe22..a6144bf7a6 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -212,6 +212,7 @@ pub struct Builder { opened_wals: Option, schema_config_provider: Option, forward_config: Option, + auto_create_table: bool, } impl Builder { @@ -228,6 +229,7 @@ impl Builder { opened_wals: None, schema_config_provider: None, forward_config: None, + auto_create_table: true, } } @@ -287,6 +289,11 @@ impl Builder { self.timeout = timeout; self } + + pub fn auto_create_table(mut self, auto_create_table: bool) -> Self { + self.auto_create_table = auto_create_table; + self + } } impl Builder { @@ -339,6 +346,7 @@ impl Builder { forwarder, timeout: self.timeout, resp_compress_min_length: self.resp_compress_min_length, + auto_create_table: self.auto_create_table, }; let rpc_server = StorageServiceServer::new(storage_service); diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index 0b229a9415..3af6e3f370 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -98,6 +98,7 @@ pub struct HandlerContext<'a, Q> { forwarder: Option, timeout: Option, resp_compress_min_length: usize, + auto_create_table: bool, } impl<'a, Q> HandlerContext<'a, Q> { @@ -110,6 +111,7 @@ impl<'a, Q> HandlerContext<'a, Q> { forwarder: Option, timeout: Option, resp_compress_min_length: usize, + auto_create_table: bool, ) -> Self { // catalog is not exposed to protocol layer let catalog = instance.catalog_manager.default_catalog_name().to_string(); @@ -123,6 +125,7 @@ impl<'a, Q> HandlerContext<'a, Q> { forwarder, timeout, resp_compress_min_length, + auto_create_table, } } @@ -141,6 +144,7 @@ pub struct StorageServiceImpl { pub forwarder: Option, pub timeout: Option, pub resp_compress_min_length: usize, + pub auto_create_table: bool, } macro_rules! handle_request { @@ -158,6 +162,7 @@ macro_rules! handle_request { let forwarder = self.forwarder.clone(); let timeout = self.timeout; let resp_compress_min_length = self.resp_compress_min_length; + let auto_create_table = self.auto_create_table; // The future spawned by tokio cannot be executed by other executor/runtime, so @@ -179,7 +184,7 @@ macro_rules! handle_request { .fail()? } let handler_ctx = - HandlerContext::new(header, router, instance, &schema_config_provider, forwarder, timeout, resp_compress_min_length); + HandlerContext::new(header, router, instance, &schema_config_provider, forwarder, timeout, resp_compress_min_length, auto_create_table); $mod_name::$handle_fn(&handler_ctx, req) .await .map_err(|e| { @@ -254,6 +259,7 @@ impl StorageServiceImpl { self.forwarder.clone(), self.timeout, self.resp_compress_min_length, + self.auto_create_table, ); let mut total_success = 0; @@ -310,6 +316,7 @@ impl StorageServiceImpl { let forwarder = self.forwarder.clone(); let timeout = self.timeout; let resp_compress_min_length = self.resp_compress_min_length; + let auto_create_table = self.auto_create_table; let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN); self.runtimes.read_runtime.spawn(async move { @@ -321,6 +328,7 @@ impl StorageServiceImpl { forwarder, timeout, resp_compress_min_length, + auto_create_table ); let query_req = request.into_inner(); let output = sql_query::fetch_query_output(&handler_ctx, &query_req) diff --git a/server/src/grpc/storage_service/write.rs b/server/src/grpc/storage_service/write.rs index 1dfa15419b..e9728f6d49 100644 --- a/server/src/grpc/storage_service/write.rs +++ b/server/src/grpc/storage_service/write.rs @@ -40,6 +40,32 @@ use crate::{ instance::InstanceRef, }; +#[derive(Debug)] +pub struct WriteContext { + pub request_id: RequestId, + pub deadline: Option, + pub catalog: String, + pub schema: String, + pub auto_create_table: bool, +} + +impl WriteContext { + pub fn new( + request_id: RequestId, + deadline: Option, + catalog: String, + schema: String, + ) -> Self { + let auto_create_table = true; + Self { + request_id, + deadline, + catalog, + schema, + auto_create_table, + } + } +} pub(crate) async fn handle_write( ctx: &HandlerContext<'_, Q>, req: WriteRequest, @@ -70,14 +96,19 @@ pub(crate) async fn handle_write( req.table_requests.len(), ); - let plan_vec = write_request_to_insert_plan( + let write_context = WriteContext { request_id, - catalog, - &schema, + deadline, + catalog: catalog.to_string(), + schema: schema.to_string(), + auto_create_table: ctx.auto_create_table, + }; + + let plan_vec = write_request_to_insert_plan( ctx.instance.clone(), req.table_requests, schema_config, - deadline, + write_context, ) .await?; @@ -169,27 +200,32 @@ pub async fn execute_plan( } pub async fn write_request_to_insert_plan( - request_id: RequestId, - catalog: &str, - schema: &str, instance: InstanceRef, table_requests: Vec, schema_config: Option<&SchemaConfig>, - deadline: Option, + write_context: WriteContext, ) -> Result> { + let WriteContext { + request_id, + catalog, + schema, + deadline, + auto_create_table, + } = write_context; + let mut plan_vec = Vec::with_capacity(table_requests.len()); for write_table_req in table_requests { let table_name = &write_table_req.table; - let mut table = try_get_table(catalog, schema, instance.clone(), table_name)?; + let mut table = try_get_table(&catalog, &schema, instance.clone(), table_name)?; - if table.is_none() { + if table.is_none() && auto_create_table { // TODO: remove this clone? let schema_config = schema_config.cloned().unwrap_or_default(); create_table( request_id, - catalog, - schema, + &catalog, + &schema, instance.clone(), &write_table_req, &schema_config, @@ -197,7 +233,7 @@ pub async fn write_request_to_insert_plan( ) .await?; // try to get table again - table = try_get_table(catalog, schema, instance.clone(), table_name)?; + table = try_get_table(&catalog, &schema, instance.clone(), table_name)?; } match table { diff --git a/server/src/handlers/prom.rs b/server/src/handlers/prom.rs index b6e859abff..3650d69a48 100644 --- a/server/src/handlers/prom.rs +++ b/server/src/handlers/prom.rs @@ -26,8 +26,8 @@ use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; use warp::reject; use crate::{ - context::RequestContext, handlers, instance::InstanceRef, - schema_config_provider::SchemaConfigProviderRef, + context::RequestContext, grpc::storage_service::write::WriteContext, handlers, + instance::InstanceRef, schema_config_provider::SchemaConfigProviderRef, }; #[derive(Debug, Snafu)] @@ -242,14 +242,15 @@ impl RemoteStorage for CeresDBStorage { .schema_config_provider .schema_config(schema) .context(SchemaError)?; + + let write_context = + WriteContext::new(request_id, deadline, catalog.clone(), schema.clone()); + let plans = crate::grpc::storage_service::write::write_request_to_insert_plan( - request_id, - catalog, - schema, self.instance.clone(), Self::convert_write_request(req)?, schema_config, - deadline, + write_context, ) .await .context(GRPCWriteError)?; diff --git a/server/src/server.rs b/server/src/server.rs index 42e313b8a1..f38441371d 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -351,6 +351,7 @@ impl Builder { .schema_config_provider(provider) .forward_config(self.config.forward) .timeout(self.config.timeout.map(|v| v.0)) + .auto_create_table(self.config.auto_create_table) .build() .context(BuildGrpcService)?;