diff --git a/common/planners/src/plan_display_indent.rs b/common/planners/src/plan_display_indent.rs index 99e866a7f23f1..bf5a7d1c24267 100644 --- a/common/planners/src/plan_display_indent.rs +++ b/common/planners/src/plan_display_indent.rs @@ -254,7 +254,8 @@ impl<'a> PlanNodeIndentFormatDisplay<'a> { // need engine to impl Display write!(f, " engine: {},", plan.engine())?; write!(f, " if_not_exists:{:},", plan.if_not_exists)?; - write!(f, " option: {:?}", plan.options()) + write!(f, " option: {:?},", plan.options())?; + write!(f, " as_select: {:?}", plan.as_select()) } fn format_drop_table(f: &mut Formatter, plan: &DropTablePlan) -> fmt::Result { diff --git a/common/planners/src/plan_table_create.rs b/common/planners/src/plan_table_create.rs index eeeab9a860b61..039a259096a3d 100644 --- a/common/planners/src/plan_table_create.rs +++ b/common/planners/src/plan_table_create.rs @@ -18,6 +18,8 @@ use common_datavalues::DataSchemaRef; use common_meta_types::CreateTableReq; use common_meta_types::TableMeta; +use crate::PlanNode; + pub type TableOptions = HashMap; #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] @@ -28,6 +30,8 @@ pub struct CreateTablePlan { pub table: String, pub table_meta: TableMeta, + + pub as_select: Option>, } impl From for CreateTableReq { @@ -53,4 +57,8 @@ impl CreateTablePlan { pub fn engine(&self) -> &str { &self.table_meta.engine } + + pub fn as_select(&self) -> &Option> { + &self.as_select + } } diff --git a/common/planners/tests/it/plan_display.rs b/common/planners/tests/it/plan_display.rs index 5768b3278bd8c..adce82ccb5b7a 100644 --- a/common/planners/tests/it/plan_display.rs +++ b/common/planners/tests/it/plan_display.rs @@ -40,10 +40,11 @@ fn test_plan_display_indent() -> Result<()> { engine: "JSON".to_string(), options, }, + as_select: None, }); assert_eq!( - "Create table foo.bar DataField { name: \"a\", data_type: Int64, nullable: false }, engine: JSON, if_not_exists:true, option: {\"opt_foo\": \"opt_bar\"}", + "Create table foo.bar DataField { name: \"a\", data_type: Int64, nullable: false }, engine: JSON, if_not_exists:true, option: {\"opt_foo\": \"opt_bar\"}, as_select: None", format!("{:?}", plan_create) ); diff --git a/query/src/interpreters/interpreter_table_create.rs b/query/src/interpreters/interpreter_table_create.rs index 608051739fd1e..7256cb93627c6 100644 --- a/query/src/interpreters/interpreter_table_create.rs +++ b/query/src/interpreters/interpreter_table_create.rs @@ -16,9 +16,13 @@ use std::sync::Arc; use common_exception::Result; use common_planners::CreateTablePlan; +use common_planners::InsertInputSource; +use common_planners::InsertPlan; +use common_planners::PlanNode; use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; +use super::InsertInterpreter; use crate::catalogs::Catalog; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; @@ -43,8 +47,58 @@ impl Interpreter for CreateTableInterpreter { async fn execute( &self, - _input_stream: Option, + input_stream: Option, ) -> Result { + match &self.plan.as_select { + Some(select_plan_node) => { + self.create_table_as_select(input_stream, select_plan_node.clone()) + .await + } + None => self.create_table().await, + } + } +} + +impl CreateTableInterpreter { + async fn create_table_as_select( + &self, + input_stream: Option, + select_plan_node: Box, + ) -> Result { + let catalog = self.ctx.get_catalog(); + + // TODO: maybe the table creation and insertion should be a transaction, but it may require create_table support 2pc. + catalog.create_table(self.plan.clone().into()).await?; + let table = catalog.get_table(&self.plan.db, &self.plan.table).await?; + + // If the table creation query contains column definitions, like 'CREATE TABLE t1(a int) AS SELECT * from t2', + // we use the definitions to create the table schema. It may happen that the "AS SELECT" query's schema doesn't + // match the table's schema. For example, + // + // mysql> create table t2(a int, b int); + // mysql> create table t1(x string, y string) as select * from t2; + // + // For the situation above, we implicitly cast the data type when inserting data. + // The casting and schema checking is in interpreter_insert.rs, function check_schema_cast. + let insert_plan = InsertPlan { + database_name: self.plan.db.clone(), + table_name: self.plan.table.clone(), + table_id: table.get_id(), + schema: self.plan.schema(), + overwrite: false, + source: InsertInputSource::SelectPlan(select_plan_node), + }; + let insert_interpreter = InsertInterpreter::try_create(self.ctx.clone(), insert_plan)?; + insert_interpreter.execute(input_stream).await?; + + Ok(Box::pin(DataBlockStream::create( + self.plan.schema(), + None, + vec![], + ))) + } + + async fn create_table(&self) -> Result { let catalog = self.ctx.get_catalog(); catalog.create_table(self.plan.clone().into()).await?; diff --git a/query/src/sql/statements/statement_create_table.rs b/query/src/sql/statements/statement_create_table.rs index b75db3fb90c00..66191385d4dcc 100644 --- a/query/src/sql/statements/statement_create_table.rs +++ b/query/src/sql/statements/statement_create_table.rs @@ -33,6 +33,8 @@ use crate::sessions::QueryContext; use crate::sql::statements::AnalyzableStatement; use crate::sql::statements::AnalyzedResult; use crate::sql::statements::DfQueryStatement; +use crate::sql::DfStatement; +use crate::sql::PlanParser; use crate::sql::SQLCommon; #[derive(Debug, Clone, PartialEq)] @@ -46,6 +48,8 @@ pub struct DfCreateTable { // The table name after "create .. like" statement. pub like: Option, + + // The query of "create table .. as select" statement. pub query: Option>, } @@ -53,9 +57,28 @@ pub struct DfCreateTable { impl AnalyzableStatement for DfCreateTable { #[tracing::instrument(level = "info", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))] async fn analyze(&self, ctx: Arc) -> Result { - let table_meta = self.table_meta(ctx.clone()).await?; + let mut table_meta = self.table_meta(ctx.clone()).await?; let if_not_exists = self.if_not_exists; - let (db, table) = Self::resolve_table(ctx, &self.name)?; + let (db, table) = Self::resolve_table(ctx.clone(), &self.name)?; + + let as_select_plan_node = match &self.query { + // CTAS + Some(query_statement) => { + let statements = vec![DfStatement::Query(query_statement.clone())]; + let select_plan = PlanParser::build_plan(statements, ctx).await?; + + // If the current table schema is empty, for example 'CREATE TABLE t1 AS SELECT * FROM t2', + // we use the schema from 'AS SELECT' query. + // TODO: there is a bug that nullable information is missed in select_plan.schema(). + // See the TODO item in plan_expression.rs, function nullable. + if table_meta.schema.fields().is_empty() { + table_meta.schema = select_plan.schema(); + } + Some(Box::new(select_plan)) + } + // Query doesn't contain 'As Select' statement + None => None, + }; Ok(AnalyzedResult::SimpleQuery(Box::new( PlanNode::CreateTable(CreateTablePlan { @@ -63,6 +86,7 @@ impl AnalyzableStatement for DfCreateTable { db, table, table_meta, + as_select: as_select_plan_node, }), ))) } diff --git a/query/tests/it/sql/plan_parser.rs b/query/tests/it/sql/plan_parser.rs index 912be582a4943..b36b2a4464b2a 100644 --- a/query/tests/it/sql/plan_parser.rs +++ b/query/tests/it/sql/plan_parser.rs @@ -54,13 +54,13 @@ fn test_plan_parser() -> Result<()> { Test { name: "create-table-passed", sql: "CREATE TABLE t(c1 int, c2 bigint, c3 varchar(255) ) ENGINE = Parquet location = 'foo.parquet' ", - expect: "Create table default.t DataField { name: \"c1\", data_type: Int32, nullable: true }, DataField { name: \"c2\", data_type: Int64, nullable: true }, DataField { name: \"c3\", data_type: String, nullable: true }, engine: Parquet, if_not_exists:false, option: {\"location\": \"foo.parquet\"}", + expect: "Create table default.t DataField { name: \"c1\", data_type: Int32, nullable: true }, DataField { name: \"c2\", data_type: Int64, nullable: true }, DataField { name: \"c3\", data_type: String, nullable: true }, engine: Parquet, if_not_exists:false, option: {\"location\": \"foo.parquet\"}, as_select: None", error: "", }, Test { name: "create-table-if-not-exists-passed", sql: "CREATE TABLE IF NOT EXISTS t(c1 int, c2 bigint, c3 varchar(255) ) ENGINE = Parquet location = 'foo.parquet' ", - expect: "Create table default.t DataField { name: \"c1\", data_type: Int32, nullable: true }, DataField { name: \"c2\", data_type: Int64, nullable: true }, DataField { name: \"c3\", data_type: String, nullable: true }, engine: Parquet, if_not_exists:true, option: {\"location\": \"foo.parquet\"}", + expect: "Create table default.t DataField { name: \"c1\", data_type: Int32, nullable: true }, DataField { name: \"c2\", data_type: Int64, nullable: true }, DataField { name: \"c3\", data_type: String, nullable: true }, engine: Parquet, if_not_exists:true, option: {\"location\": \"foo.parquet\"}, as_select: None", error: "", }, Test { diff --git a/query/tests/it/sql/sql_parser.rs b/query/tests/it/sql/sql_parser.rs index c7ef517a27651..9ca02d8ef1d90 100644 --- a/query/tests/it/sql/sql_parser.rs +++ b/query/tests/it/sql/sql_parser.rs @@ -210,6 +210,40 @@ fn create_table() -> Result<()> { }); expect_parse_ok(sql, expected)?; + // create table as select statement + let sql = "CREATE TABLE db1.test1(c1 int, c2 varchar(255)) ENGINE = Parquet location = 'batcave' AS SELECT * FROM t2"; + let expected = DfStatement::CreateTable(DfCreateTable { + if_not_exists: false, + name: ObjectName(vec![Ident::new("db1"), Ident::new("test1")]), + columns: vec![ + make_column_def("c1", DataType::Int(None)), + make_column_def("c2", DataType::Varchar(Some(255))), + ], + engine: "Parquet".to_string(), + + options: maplit::hashmap! {"location".into() => "batcave".into()}, + like: None, + query: Some(Box::new(DfQueryStatement { + from: vec![TableWithJoins { + relation: TableFactor::Table { + name: ObjectName(vec![Ident::new("t2")]), + alias: None, + args: vec![], + with_hints: vec![], + }, + joins: vec![], + }], + projection: vec![SelectItem::Wildcard], + selection: None, + group_by: vec![], + having: None, + order_by: vec![], + limit: None, + offset: None, + })), + }); + expect_parse_ok(sql, expected)?; + Ok(()) } diff --git a/query/tests/it/storages/fuse/table_test_fixture.rs b/query/tests/it/storages/fuse/table_test_fixture.rs index f425a2ef52b32..7c8c96215c5a2 100644 --- a/query/tests/it/storages/fuse/table_test_fixture.rs +++ b/query/tests/it/storages/fuse/table_test_fixture.rs @@ -100,6 +100,7 @@ impl TestFixture { // make sure blocks will not be merged options: [(TBL_OPT_KEY_CHUNK_BLOCK_NUM.to_owned(), "1".to_owned())].into(), }, + as_select: None, } } diff --git a/tests/suites/0_stateless/05_0000_ddl_create_tables.result b/tests/suites/0_stateless/05_0000_ddl_create_tables.result index 69c8e54e887ba..572465c8cc820 100644 --- a/tests/suites/0_stateless/05_0000_ddl_create_tables.result +++ b/tests/suites/0_stateless/05_0000_ddl_create_tables.result @@ -2,3 +2,6 @@ 2 4 8 +1 +2 +3 diff --git a/tests/suites/0_stateless/05_0000_ddl_create_tables.sql b/tests/suites/0_stateless/05_0000_ddl_create_tables.sql index bb04e5ee66acf..89c4f7df10324 100644 --- a/tests/suites/0_stateless/05_0000_ddl_create_tables.sql +++ b/tests/suites/0_stateless/05_0000_ddl_create_tables.sql @@ -16,11 +16,18 @@ create table t2(a int,b int) engine=NotExists; -- {ErrorCode 4003} DROP TABLE IF EXISTS t; DROP TABLE IF EXISTS t2; +-- prepare test databases for testing 'create table like' and 'as select' statements. CREATE DATABASE db1; CREATE DATABASE db2; CREATE TABLE db1.test1(a INT, b INT); +INSERT INTO db1.test1 VALUES (1, 1), (2, 2), (3, 3); +-- test 'create table like' statement, expect db2.test2 has the same schema with db1.test1 CREATE TABLE db2.test2 LIKE db1.test1; INSERT INTO db2.test2 VALUES (3, 5); SELECT a+b FROM db2.test2; +-- test 'create table as select' statement, expect db2.test3 has the data from db1.test1 with casting +CREATE TABLE db2.test3(x Varchar, y Varchar) AS SELECT * FROM db1.test1; +SELECT x FROM db2.test3; +-- clean up test databases DROP DATABASE db1; -DROP DATABASE db2; +DROP DATABASE db2; \ No newline at end of file