Skip to content

Commit

Permalink
Merge pull request #3400 from junli1026/jun/dev
Browse files Browse the repository at this point in the history
[ISSUE-2457] Add support for CTAS(Create Table As Select) statement
  • Loading branch information
databend-bot authored Dec 13, 2021
2 parents 76a3431 + 148faec commit 1f29cfc
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 11 deletions.
3 changes: 2 additions & 1 deletion common/planners/src/plan_display_indent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ impl<'a> PlanNodeIndentFormatDisplay<'a> {
// need engine to impl Display
write!(f, " engine: {},", plan.engine())?;
write!(f, " if_not_exists:{:},", plan.if_not_exists)?;
write!(f, " option: {:?}", plan.options())
write!(f, " option: {:?},", plan.options())?;
write!(f, " as_select: {:?}", plan.as_select())
}

fn format_drop_table(f: &mut Formatter, plan: &DropTablePlan) -> fmt::Result {
Expand Down
8 changes: 8 additions & 0 deletions common/planners/src/plan_table_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use common_datavalues::DataSchemaRef;
use common_meta_types::CreateTableReq;
use common_meta_types::TableMeta;

use crate::PlanNode;

pub type TableOptions = HashMap<String, String>;

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
Expand All @@ -28,6 +30,8 @@ pub struct CreateTablePlan {
pub table: String,

pub table_meta: TableMeta,

pub as_select: Option<Box<PlanNode>>,
}

impl From<CreateTablePlan> for CreateTableReq {
Expand All @@ -53,4 +57,8 @@ impl CreateTablePlan {
pub fn engine(&self) -> &str {
&self.table_meta.engine
}

pub fn as_select(&self) -> &Option<Box<PlanNode>> {
&self.as_select
}
}
3 changes: 2 additions & 1 deletion common/planners/tests/it/plan_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ fn test_plan_display_indent() -> Result<()> {
engine: "JSON".to_string(),
options,
},
as_select: None,
});

assert_eq!(
"Create table foo.bar DataField { name: \"a\", data_type: Int64, nullable: false }, engine: JSON, if_not_exists:true, option: {\"opt_foo\": \"opt_bar\"}",
"Create table foo.bar DataField { name: \"a\", data_type: Int64, nullable: false }, engine: JSON, if_not_exists:true, option: {\"opt_foo\": \"opt_bar\"}, as_select: None",
format!("{:?}", plan_create)
);

Expand Down
56 changes: 55 additions & 1 deletion query/src/interpreters/interpreter_table_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ use std::sync::Arc;

use common_exception::Result;
use common_planners::CreateTablePlan;
use common_planners::InsertInputSource;
use common_planners::InsertPlan;
use common_planners::PlanNode;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;

use super::InsertInterpreter;
use crate::catalogs::Catalog;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
Expand All @@ -43,8 +47,58 @@ impl Interpreter for CreateTableInterpreter {

async fn execute(
&self,
_input_stream: Option<SendableDataBlockStream>,
input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
match &self.plan.as_select {
Some(select_plan_node) => {
self.create_table_as_select(input_stream, select_plan_node.clone())
.await
}
None => self.create_table().await,
}
}
}

impl CreateTableInterpreter {
async fn create_table_as_select(
&self,
input_stream: Option<SendableDataBlockStream>,
select_plan_node: Box<PlanNode>,
) -> Result<SendableDataBlockStream> {
let catalog = self.ctx.get_catalog();

// TODO: maybe the table creation and insertion should be a transaction, but it may require create_table support 2pc.
catalog.create_table(self.plan.clone().into()).await?;
let table = catalog.get_table(&self.plan.db, &self.plan.table).await?;

// If the table creation query contains column definitions, like 'CREATE TABLE t1(a int) AS SELECT * from t2',
// we use the definitions to create the table schema. It may happen that the "AS SELECT" query's schema doesn't
// match the table's schema. For example,
//
// mysql> create table t2(a int, b int);
// mysql> create table t1(x string, y string) as select * from t2;
//
// For the situation above, we implicitly cast the data type when inserting data.
// The casting and schema checking is in interpreter_insert.rs, function check_schema_cast.
let insert_plan = InsertPlan {
database_name: self.plan.db.clone(),
table_name: self.plan.table.clone(),
table_id: table.get_id(),
schema: self.plan.schema(),
overwrite: false,
source: InsertInputSource::SelectPlan(select_plan_node),
};
let insert_interpreter = InsertInterpreter::try_create(self.ctx.clone(), insert_plan)?;
insert_interpreter.execute(input_stream).await?;

Ok(Box::pin(DataBlockStream::create(
self.plan.schema(),
None,
vec![],
)))
}

async fn create_table(&self) -> Result<SendableDataBlockStream> {
let catalog = self.ctx.get_catalog();
catalog.create_table(self.plan.clone().into()).await?;

Expand Down
28 changes: 26 additions & 2 deletions query/src/sql/statements/statement_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::sessions::QueryContext;
use crate::sql::statements::AnalyzableStatement;
use crate::sql::statements::AnalyzedResult;
use crate::sql::statements::DfQueryStatement;
use crate::sql::DfStatement;
use crate::sql::PlanParser;
use crate::sql::SQLCommon;

#[derive(Debug, Clone, PartialEq)]
Expand All @@ -46,23 +48,45 @@ pub struct DfCreateTable {

// The table name after "create .. like" statement.
pub like: Option<ObjectName>,

// The query of "create table .. as select" statement.
pub query: Option<Box<DfQueryStatement>>,
}

#[async_trait::async_trait]
impl AnalyzableStatement for DfCreateTable {
#[tracing::instrument(level = "info", skip(self, ctx), fields(ctx.id = ctx.get_id().as_str()))]
async fn analyze(&self, ctx: Arc<QueryContext>) -> Result<AnalyzedResult> {
let table_meta = self.table_meta(ctx.clone()).await?;
let mut table_meta = self.table_meta(ctx.clone()).await?;
let if_not_exists = self.if_not_exists;
let (db, table) = Self::resolve_table(ctx, &self.name)?;
let (db, table) = Self::resolve_table(ctx.clone(), &self.name)?;

let as_select_plan_node = match &self.query {
// CTAS
Some(query_statement) => {
let statements = vec![DfStatement::Query(query_statement.clone())];
let select_plan = PlanParser::build_plan(statements, ctx).await?;

// If the current table schema is empty, for example 'CREATE TABLE t1 AS SELECT * FROM t2',
// we use the schema from 'AS SELECT' query.
// TODO: there is a bug that nullable information is missed in select_plan.schema().
// See the TODO item in plan_expression.rs, function nullable.
if table_meta.schema.fields().is_empty() {
table_meta.schema = select_plan.schema();
}
Some(Box::new(select_plan))
}
// Query doesn't contain 'As Select' statement
None => None,
};

Ok(AnalyzedResult::SimpleQuery(Box::new(
PlanNode::CreateTable(CreateTablePlan {
if_not_exists,
db,
table,
table_meta,
as_select: as_select_plan_node,
}),
)))
}
Expand Down
4 changes: 2 additions & 2 deletions query/tests/it/sql/plan_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ fn test_plan_parser() -> Result<()> {
Test {
name: "create-table-passed",
sql: "CREATE TABLE t(c1 int, c2 bigint, c3 varchar(255) ) ENGINE = Parquet location = 'foo.parquet' ",
expect: "Create table default.t DataField { name: \"c1\", data_type: Int32, nullable: true }, DataField { name: \"c2\", data_type: Int64, nullable: true }, DataField { name: \"c3\", data_type: String, nullable: true }, engine: Parquet, if_not_exists:false, option: {\"location\": \"foo.parquet\"}",
expect: "Create table default.t DataField { name: \"c1\", data_type: Int32, nullable: true }, DataField { name: \"c2\", data_type: Int64, nullable: true }, DataField { name: \"c3\", data_type: String, nullable: true }, engine: Parquet, if_not_exists:false, option: {\"location\": \"foo.parquet\"}, as_select: None",
error: "",
},
Test {
name: "create-table-if-not-exists-passed",
sql: "CREATE TABLE IF NOT EXISTS t(c1 int, c2 bigint, c3 varchar(255) ) ENGINE = Parquet location = 'foo.parquet' ",
expect: "Create table default.t DataField { name: \"c1\", data_type: Int32, nullable: true }, DataField { name: \"c2\", data_type: Int64, nullable: true }, DataField { name: \"c3\", data_type: String, nullable: true }, engine: Parquet, if_not_exists:true, option: {\"location\": \"foo.parquet\"}",
expect: "Create table default.t DataField { name: \"c1\", data_type: Int32, nullable: true }, DataField { name: \"c2\", data_type: Int64, nullable: true }, DataField { name: \"c3\", data_type: String, nullable: true }, engine: Parquet, if_not_exists:true, option: {\"location\": \"foo.parquet\"}, as_select: None",
error: "",
},
Test {
Expand Down
34 changes: 34 additions & 0 deletions query/tests/it/sql/sql_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,40 @@ fn create_table() -> Result<()> {
});
expect_parse_ok(sql, expected)?;

// create table as select statement
let sql = "CREATE TABLE db1.test1(c1 int, c2 varchar(255)) ENGINE = Parquet location = 'batcave' AS SELECT * FROM t2";
let expected = DfStatement::CreateTable(DfCreateTable {
if_not_exists: false,
name: ObjectName(vec![Ident::new("db1"), Ident::new("test1")]),
columns: vec![
make_column_def("c1", DataType::Int(None)),
make_column_def("c2", DataType::Varchar(Some(255))),
],
engine: "Parquet".to_string(),

options: maplit::hashmap! {"location".into() => "batcave".into()},
like: None,
query: Some(Box::new(DfQueryStatement {
from: vec![TableWithJoins {
relation: TableFactor::Table {
name: ObjectName(vec![Ident::new("t2")]),
alias: None,
args: vec![],
with_hints: vec![],
},
joins: vec![],
}],
projection: vec![SelectItem::Wildcard],
selection: None,
group_by: vec![],
having: None,
order_by: vec![],
limit: None,
offset: None,
})),
});
expect_parse_ok(sql, expected)?;

Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions query/tests/it/storages/fuse/table_test_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl TestFixture {
// make sure blocks will not be merged
options: [(TBL_OPT_KEY_CHUNK_BLOCK_NUM.to_owned(), "1".to_owned())].into(),
},
as_select: None,
}
}

Expand Down
3 changes: 3 additions & 0 deletions tests/suites/0_stateless/05_0000_ddl_create_tables.result
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
2
4
8
1
2
3
13 changes: 10 additions & 3 deletions tests/suites/0_stateless/05_0000_ddl_create_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@ create table t2(a int,b int) engine=NotExists; -- {ErrorCode 4003}
DROP TABLE IF EXISTS t;
DROP TABLE IF EXISTS t2;

-- prepare test databases for testing 'create table like' and 'as select' statements.
CREATE DATABASE db1;
CREATE DATABASE db2;
CREATE TABLE db1.test1(a INT, b INT);
CREATE TABLE db2.test2 LIKE db1.test1;
CREATE TABLE db1.test1(a INT, b INT) ENGINE=memory;
INSERT INTO db1.test1 VALUES (1, 1), (2, 2), (3, 3);
-- test 'create table like' statement, expect db2.test2 has the same schema with db1.test1
CREATE TABLE db2.test2 LIKE db1.test1 ENGINE=fuse;
INSERT INTO db2.test2 VALUES (3, 5);
SELECT a+b FROM db2.test2;
-- test 'create table as select' statement, expect db2.test3 has the data from db1.test1 with casting
CREATE TABLE db2.test3(x Varchar, y Varchar) ENGINE=fuse AS SELECT * FROM db1.test1;
SELECT x FROM db2.test3;
-- clean up test databases
DROP DATABASE db1;
DROP DATABASE db2;
DROP DATABASE db2;
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,20 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name
name2 type2,
...
) ENGINE = engine
[OPTIONS]
```
```sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name
LIKE [db.]origin_table_name ENGINE = engine
LIKE [db.]origin_table_name
ENGINE = engine
[OPTIONS]
```
```sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name
LIKE [db.]origin_table_name
ENGINE = engine
[OPTIONS]
AS SELECT query
```

:::note
Expand All @@ -35,6 +45,19 @@ mysql> CREATE TABLE test(a UInt64, b Varchar) Engine = Memory;

mysql> INSERT INTO test(a,b) values(888, 'stars');

mysql> SELECT * FROM test;
+------+---------+
| a | b |
+------+---------+
| 888 | stars |
+------+---------+
```
### Create Table Like statement
```sql
mysql> CREATE TABLE test(a UInt64, b Varchar) Engine = Memory;

mysql> INSERT INTO test(a,b) values(888, 'stars');

mysql> SELECT * FROM test;
+------+---------+
| a | b |
Expand All @@ -53,3 +76,36 @@ mysql> SELECT * FROM test2;
| 0 | sun |
+------+------+
```

### Create Table As Select (CTAS) statement

```sql
mysql> CREATE TABLE source(a UInt64, b Varchar) Engine = Memory;

mysql> INSERT INTO source(a,b) values(888, 'stars');

mysql> SELECT * FROM source;
+------+---------+
| a | b |
+------+---------+
| 888 | stars |
+------+---------+

mysql> CREATE TABLE copy1 AS SELECT * FROM source;

mysql> SELECT * FROM copy1;
+------+-------+
| a | b |
+------+-------+
| 888 | stars |
+------+-------+

mysql> CREATE TABLE copy2(x VARCHAR, y VARCHAR) AS SELECT * FROM source;

mysql> SELECT * FROM copy2;
+------+-------+
| x | y |
+------+-------+
| 888 | stars |
+------+-------+
```

1 comment on commit 1f29cfc

@vercel
Copy link

@vercel vercel bot commented on 1f29cfc Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.