Skip to content

Commit

Permalink
Merge branch 'main' into decorrelation
Browse files Browse the repository at this point in the history
  • Loading branch information
leiysky authored Jun 18, 2022
2 parents c05b8c8 + 3caf035 commit 5c6e3f9
Show file tree
Hide file tree
Showing 23 changed files with 513 additions and 226 deletions.
6 changes: 3 additions & 3 deletions common/ast/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use self::error::DisplayError;
use crate::ast::Expr;
use crate::ast::Statement;
use crate::parser::error::Backtrace;
use crate::parser::statement::statements;
use crate::parser::statement::statement;
use crate::parser::token::Token;
use crate::parser::token::TokenKind;
use crate::parser::token::Tokenizer;
Expand All @@ -41,8 +41,8 @@ pub fn tokenize_sql(sql: &str) -> Result<Vec<Token>> {
pub fn parse_sql<'a>(
sql_tokens: &'a [Token<'a>],
backtrace: &'a Backtrace<'a>,
) -> Result<Vec<Statement<'a>>> {
match statements(Input(sql_tokens, backtrace)) {
) -> Result<Statement<'a>> {
match statement(Input(sql_tokens, backtrace)) {
Ok((rest, stmts)) if rest[0].kind == TokenKind::EOI => Ok(stmts),
Ok((rest, _)) => Err(ErrorCode::SyntaxException(
rest[0].display_error("unable to parse rest of the sql".to_string()),
Expand Down
76 changes: 30 additions & 46 deletions common/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,6 @@ use crate::parser::token::*;
use crate::parser::util::*;
use crate::rule;

pub fn statements(i: Input) -> IResult<Vec<Statement>> {
let stmt = map(statement, Some);
let eoi = map(rule! { &EOI }, |_| None);

alt((
map(
rule!(
#separated_list1(rule! { ";"+ }, rule! { #stmt | #eoi }) ~ &EOI
),
|(stmts, _)| stmts.into_iter().flatten().collect(),
),
// `INSERT INTO ... FORMAT ...` and `INSERT INTO ... VALUES` statements will
// stop the parser immediately and return the rest tokens by `InsertSource`.
//
// This is a hack to make it able to parse a large streaming insert statement.
map(insert_statement, |insert_stmt| vec![insert_stmt]),
))(i)
}

pub fn statement(i: Input) -> IResult<Statement> {
let explain = map(
rule! {
Expand All @@ -63,6 +44,24 @@ pub fn statement(i: Input) -> IResult<Statement> {
query: Box::new(statement),
},
);
let insert = map(
rule! {
INSERT ~ ( INTO | OVERWRITE ) ~ TABLE?
~ #peroid_separated_idents_1_to_3
~ ( "(" ~ #comma_separated_list1(ident) ~ ")" )?
~ #insert_source
},
|(_, overwrite, _, (catalog, database, table), opt_columns, source)| Statement::Insert {
catalog,
database,
table,
columns: opt_columns
.map(|(_, columns, _)| columns)
.unwrap_or_default(),
source,
overwrite: overwrite.kind == OVERWRITE,
},
);
let show_settings = value(Statement::ShowSettings, rule! { SHOW ~ SETTINGS });
let show_stages = value(Statement::ShowStages, rule! { SHOW ~ STAGES });
let show_process_list = value(Statement::ShowProcessList, rule! { SHOW ~ PROCESSLIST });
Expand Down Expand Up @@ -585,10 +584,11 @@ pub fn statement(i: Input) -> IResult<Statement> {
},
);

alt((
let statement_body = alt((
rule!(
#map(query, |query| Statement::Query(Box::new(query)))
| #explain : "`EXPLAIN [PIPELINE | GRAPH] <statement>`"
| #insert : "`INSERT INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
| #show_settings : "`SHOW SETTINGS`"
| #show_stages : "`SHOW STAGES`"
| #show_process_list : "`SHOW PROCESSLIST`"
Expand Down Expand Up @@ -641,31 +641,20 @@ pub fn statement(i: Input) -> IResult<Statement> {
| #remove_stage: "`REMOVE @<stage_name> [pattern = '<pattern>']`"
| #drop_stage: "`DROP STAGE <stage_name>`"
),
))(i)
}
));

pub fn insert_statement(i: Input) -> IResult<Statement> {
map(
rule! {
INSERT ~ ( INTO | OVERWRITE ) ~ TABLE?
~ #peroid_separated_idents_1_to_3
~ ( "(" ~ #comma_separated_list1(ident) ~ ")" )?
~ #insert_source
: "`INSERT INTO [TABLE] <table> [(<column>, ...)] (FORMAT <format> | VALUES <values> | <query>)`"
},
|(_, overwrite, _, (catalog, database, table), opt_columns, source)| Statement::Insert {
catalog,
database,
table,
columns: opt_columns
.map(|(_, columns, _)| columns)
.unwrap_or_default(),
source,
overwrite: overwrite.kind == OVERWRITE,
#statement_body ~ ";"? ~ &EOI
},
|(stmt, _, _)| stmt,
)(i)
}

// `INSERT INTO ... FORMAT ...` and `INSERT INTO ... VALUES` statements will
// stop the parser immediately and return the rest tokens by `InsertSource`.
//
// This is a hack to make it able to parse a large streaming insert statement.
pub fn insert_source(i: Input) -> IResult<InsertSource> {
let streaming = map(
rule! {
Expand All @@ -682,14 +671,9 @@ pub fn insert_source(i: Input) -> IResult<InsertSource> {
},
|(_, rest_tokens)| InsertSource::Values { rest_tokens },
);
let query = map(
rule! {
#query ~ ( ";" | &EOI )
},
|(query, _)| InsertSource::Select {
query: Box::new(query),
},
);
let query = map(query, |query| InsertSource::Select {
query: Box::new(query),
});

rule!(
#streaming
Expand Down
51 changes: 6 additions & 45 deletions common/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ fn test_statement() {
r#"show create table a.b;"#,
r#"explain pipeline select a from b;"#,
r#"describe a;"#,
r#"describe a; describe b"#,
r#"create table if not exists a.b (c integer not null default 1, b varchar);"#,
r#"create table if not exists a.b (c integer default 1 not null, b varchar) as select * from t;"#,
r#"create table a.b like c.d;"#,
Expand Down Expand Up @@ -131,52 +130,14 @@ fn test_statement() {
for case in cases {
let tokens = tokenize_sql(case).unwrap();
let backtrace = Backtrace::new();
let stmts = parse_sql(&tokens, &backtrace).unwrap();
let stmt = parse_sql(&tokens, &backtrace).unwrap();
writeln!(file, "---------- Input ----------").unwrap();
writeln!(file, "{}", case).unwrap();
for stmt in stmts {
writeln!(file, "---------- Output ---------").unwrap();
writeln!(file, "{}", stmt).unwrap();
writeln!(file, "---------- AST ------------").unwrap();
writeln!(file, "{:#?}", stmt).unwrap();
writeln!(file, "\n").unwrap();
}
}
}

// TODO(andylokandy): remove this test once the new optimizer has been being tested on suites
#[test]
fn test_statements_in_legacy_suites() {
for entry in glob::glob("../../tests/suites/**/*.sql").unwrap() {
let file_content = std::fs::read(entry.unwrap()).unwrap();
let file_str = String::from_utf8_lossy(&file_content).into_owned();

// Remove error cases
let file_str = regex::Regex::new(".+ErrorCode.+\n")
.unwrap()
.replace_all(&file_str, "")
.into_owned();

// TODO(andylokandy): support all cases eventually
// Remove currently unimplemented cases
let file_str = regex::Regex::new(
"(?i).*(SLAVE|MASTER|COMMIT|START|ROLLBACK|FIELDS|GRANT|COPY|ROLE|STAGE|ENGINES|UNDROP|OVER|CHARSET|COLLATION).*\n",
)
.unwrap()
.replace_all(&file_str, "")
.into_owned();
// Remove insert statements
let file_str = regex::Regex::new("(?i).*INSERT INTO[^;]*.*\n")
.unwrap()
.replace_all(&file_str, "")
.into_owned();

let tokens = tokenize_sql(&file_str).unwrap();
let backtrace = Backtrace::new();
parse_sql(&tokens, &backtrace).expect(
"Parser error should not exist in integration suites. \
Please add parser error cases to `common/ast/tests/it/parser.rs`",
);
writeln!(file, "---------- Output ---------").unwrap();
writeln!(file, "{}", stmt).unwrap();
writeln!(file, "---------- AST ------------").unwrap();
writeln!(file, "{:#?}", stmt).unwrap();
writeln!(file, "\n").unwrap();
}
}

Expand Down
34 changes: 0 additions & 34 deletions common/ast/tests/it/testdata/statement.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,40 +131,6 @@ DescribeTable(
)


---------- Input ----------
describe a; describe b
---------- Output ---------
DESCRIBE a
---------- AST ------------
DescribeTable(
DescribeTableStmt {
catalog: None,
database: None,
table: Identifier {
name: "a",
quote: None,
span: Ident(9..10),
},
},
)


---------- Output ---------
DESCRIBE b
---------- AST ------------
DescribeTable(
DescribeTableStmt {
catalog: None,
database: None,
table: Identifier {
name: "b",
quote: None,
span: Ident(21..22),
},
},
)


---------- Input ----------
create table if not exists a.b (c integer not null default 1, b varchar);
---------- Output ---------
Expand Down
2 changes: 2 additions & 0 deletions common/functions/src/scalars/conditionals/conditional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::multi_if::MultiIfFunction;
use super::InFunction;
use super::IsNotNullFunction;
use super::IsNullFunction;
Expand All @@ -24,6 +25,7 @@ pub struct ConditionalFunction;
impl ConditionalFunction {
pub fn register(factory: &mut FunctionFactory) {
factory.register("if", IfFunction::desc());
factory.register("multi_if", MultiIfFunction::desc());
factory.register("is_null", IsNullFunction::desc());
factory.register("is_not_null", IsNotNullFunction::desc());
factory.register("in", InFunction::<false>::desc());
Expand Down
1 change: 1 addition & 0 deletions common/functions/src/scalars/conditionals/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod r#if;
mod in_basic;
mod is_not_null;
mod is_null;
mod multi_if;

pub use conditional::ConditionalFunction;
pub use in_basic::InFunction;
Expand Down
Loading

0 comments on commit 5c6e3f9

Please sign in to comment.