Skip to content

Commit

Permalink
Merge pull request #5821 from sundy-li/stage-new-parser
Browse files Browse the repository at this point in the history
  • Loading branch information
sundy-li authored Jun 9, 2022
2 parents b180407 + 89fc2db commit 89f563c
Show file tree
Hide file tree
Showing 28 changed files with 516 additions and 46 deletions.
113 changes: 113 additions & 0 deletions common/ast/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::fmt::Display;
use std::fmt::Formatter;

Expand Down Expand Up @@ -182,6 +183,23 @@ pub enum Statement<'a> {
definition: Box<Expr<'a>>,
description: Option<String>,
},
// stages
CreateStage(CreateStageStmt),
ShowStages,
DropStage {
if_exists: bool,
stage_name: String,
},
DescStage {
stage_name: String,
},
RemoveStage {
stage_name: String,
},
ListStage {
stage_name: String,
pattern: String,
},
}

#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -320,6 +338,22 @@ pub enum OptimizeTableAction {
Compact,
}

#[derive(Debug, Clone, PartialEq)]
pub struct CreateStageStmt {
pub if_not_exists: bool,
pub stage_name: String,

pub location: String,
pub credential_options: BTreeMap<String, String>,
pub encryption_options: BTreeMap<String, String>,

pub file_format_options: BTreeMap<String, String>,
pub on_error: String,
pub size_limit: usize,
pub validation_mode: String,
pub comments: String,
}

#[derive(Debug, Clone, PartialEq)]
pub enum KillTarget {
Query,
Expand Down Expand Up @@ -869,6 +903,85 @@ impl<'a> Display for Statement<'a> {
write!(f, " DESC = '{description}'")?;
}
}
Statement::ListStage {
stage_name,
pattern,
} => {
write!(f, "LIST @{stage_name}")?;
if !pattern.is_empty() {
write!(f, " PATTERN = '{pattern}'")?;
}
}
Statement::ShowStages => {
write!(f, "SHOW STAGES")?;
}
Statement::DropStage {
if_exists,
stage_name,
} => {
write!(f, "DROP STAGES")?;
if *if_exists {
write!(f, " IF EXISTS")?;
}
write!(f, " {stage_name}")?;
}
Statement::CreateStage(stmt) => {
write!(f, "CREATE STAGE")?;
if stmt.if_not_exists {
write!(f, " IF NOT EXISTS")?;
}
write!(f, " {}", stmt.stage_name)?;

if !stmt.location.is_empty() {
write!(f, " URL = '{}'", stmt.location)?;

if !stmt.credential_options.is_empty() {
write!(f, " CREDENTIALS = (")?;
for (k, v) in stmt.credential_options.iter() {
write!(f, " {} = '{}'", k, v)?;
}
write!(f, " )")?;
}

if !stmt.encryption_options.is_empty() {
write!(f, " ENCRYPTION = (")?;
for (k, v) in stmt.encryption_options.iter() {
write!(f, " {} = '{}'", k, v)?;
}
write!(f, " )")?;
}
}

if !stmt.file_format_options.is_empty() {
write!(f, " FILE_FORMAT = (")?;
for (k, v) in stmt.file_format_options.iter() {
write!(f, " {} = '{}'", k, v)?;
}
write!(f, " )")?;
}

if !stmt.on_error.is_empty() {
write!(f, " ON_ERROR = {}", stmt.on_error)?;
}

if stmt.size_limit != 0 {
write!(f, " SIZE_LIMIT = {}", stmt.size_limit)?;
}

if !stmt.validation_mode.is_empty() {
write!(f, " VALIDATION_MODE = {}", stmt.validation_mode)?;
}

if !stmt.comments.is_empty() {
write!(f, " COMMENTS = '{}'", stmt.comments)?;
}
}
Statement::RemoveStage { stage_name } => {
write!(f, "REMOVE STAGE @{stage_name}")?;
}
Statement::DescStage { stage_name } => {
write!(f, "DESC STAGE {stage_name}")?;
}
}
Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions common/ast/src/parser/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,11 @@ pub fn literal_string(i: Input) -> IResult<String> {
)(i)
}

pub fn at_string(i: Input) -> IResult<String> {
match_token(AtString)(i)
.map(|(i2, token)| (i2, token.text()[1..token.text().len()].to_string()))
}

pub fn type_name(i: Input) -> IResult<TypeName> {
let ty_boolean = value(TypeName::Boolean, rule! { BOOLEAN | BOOL });
let ty_uint8 = value(
Expand Down
132 changes: 132 additions & 0 deletions common/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use common_meta_types::AuthType;
use common_meta_types::UserIdentity;
use nom::branch::alt;
use nom::combinator::map;
use nom::combinator::value;

use super::error::ErrorKind;
use crate::ast::*;
use crate::parser::expr::*;
use crate::parser::query::*;
Expand Down Expand Up @@ -280,6 +283,7 @@ pub fn statement(i: Input) -> IResult<Statement> {
},
);
let show_settings = value(Statement::ShowSettings, rule! { SHOW ~ SETTINGS });
let show_stages = value(Statement::ShowStages, rule! { SHOW ~ STAGES });
let show_process_list = value(Statement::ShowProcessList, rule! { SHOW ~ PROCESSLIST });
let show_metrics = value(Statement::ShowMetrics, rule! { SHOW ~ METRICS });
let show_functions = map(
Expand Down Expand Up @@ -424,6 +428,98 @@ pub fn statement(i: Input) -> IResult<Statement> {
},
);

// stages
let create_stage = map(
rule! {
CREATE ~ STAGE ~ ( IF ~ NOT ~ EXISTS )?
~ #ident
~ ( URL ~ "=" ~ #literal_string
~ (CREDENTIALS ~ "=" ~ #options)?
~ (ENCRYPTION ~ "=" ~ #options)?
)?
~ ( FILE_FORMAT ~ "=" ~ #options)?
~ ( ON_ERROR ~ "=" ~ #ident)?
~ ( SIZE_LIMIT ~ "=" ~ #literal_u64)?
~ ( VALIDATION_MODE ~ "=" ~ #ident)?
~ ( (COMMENT | COMMENTS) ~ "=" ~ #literal_string)?
},
|(
_,
_,
opt_if_not_exists,
stage,
url_opt,
file_format_opt,
on_error_opt,
size_limit_opt,
validation_mode_opt,
comment_opt,
)| {
let (location, credential_options, encryption_options) = url_opt
.map(|(_, _, url, c, e)| {
(
url,
c.map(|v| v.2).unwrap_or_default(),
e.map(|v| v.2).unwrap_or_default(),
)
})
.unwrap_or_default();

Statement::CreateStage(CreateStageStmt {
if_not_exists: opt_if_not_exists.is_some(),
stage_name: stage.to_string(),
location,
credential_options,
encryption_options,
file_format_options: file_format_opt
.map(|(_, _, file_format_opt)| file_format_opt)
.unwrap_or_default(),
on_error: on_error_opt.map(|v| v.2.to_string()).unwrap_or_default(),
size_limit: size_limit_opt.map(|v| v.2 as usize).unwrap_or_default(),
validation_mode: validation_mode_opt
.map(|v| v.2.to_string())
.unwrap_or_default(),
comments: comment_opt.map(|v| v.2).unwrap_or_default(),
})
},
);

let list_stage = map(
rule! {
LIST ~ #at_string ~ (PATTERN ~ "=" ~ #literal_string)?
},
|(_, stage_name, pattern_opt)| Statement::ListStage {
stage_name,
pattern: pattern_opt.map(|v| v.2).unwrap_or_default(),
},
);

let _remove_stage = map(
rule! {
REMOVE ~ #at_string
},
|(_, stage_name)| Statement::RemoveStage { stage_name },
);

let drop_stage = map(
rule! {
DROP ~ STAGE ~ ( IF ~ EXISTS )? ~ #ident
},
|(_, _, opt_if_exists, stage_name)| Statement::DropStage {
if_exists: opt_if_exists.is_some(),
stage_name: stage_name.to_string(),
},
);

let desc_stage = map(
rule! {
DESC ~ STAGE ~ #ident
},
|(_, _, stage_name)| Statement::DescStage {
stage_name: stage_name.to_string(),
},
);

alt((
rule!(
#explain : "`EXPLAIN [PIPELINE | GRAPH] <statement>`"
Expand Down Expand Up @@ -451,6 +547,7 @@ pub fn statement(i: Input) -> IResult<Statement> {
| #drop_view : "`DROP VIEW [IF EXISTS] [<database>.]<view>`"
| #alter_view : "`ALTER VIEW [<database>.]<view> AS SELECT ...`"
| #show_settings : "`SHOW SETTINGS`"
| #show_stages : "`SHOW STAGES`"
| #show_process_list : "`SHOW PROCESSLIST`"
| #show_metrics : "`SHOW METRICS`"
| #show_functions : "`SHOW FUNCTIONS [<show_limit>]`"
Expand All @@ -463,6 +560,14 @@ pub fn statement(i: Input) -> IResult<Statement> {
| #create_udf : "`CREATE FUNCTION [IF NOT EXISTS] <udf_name> (<parameter>, ...) -> <definition expr> [DESC = <description>]`"
| #drop_udf : "`DROP FUNCTION [IF EXISTS] <udf_name>`"
| #alter_udf : "`ALTER FUNCTION <udf_name> (<parameter>, ...) -> <definition_expr> [DESC = <description>]`"
| #create_stage: "`CREATE STAGE [ IF NOT EXISTS ] <internal_stage_name>
[ FILE_FORMAT = ( { TYPE = { CSV | PARQUET } [ formatTypeOptions ] ) } ]
[ COPY_OPTIONS = ( copyOptions ) ]
[ COMMENT = '<string_literal>' ]`"
| #desc_stage: "`DESC STAGE <stage_name>`"
// | #remove_stage: "`REMOVE @<stage_name>`"
| #list_stage: "`LIST @<stage_name> [pattern = '<pattern>']`"
| #drop_stage: "`DROP STAGE <stage_name>`"
),
))(i)
}
Expand Down Expand Up @@ -728,3 +833,30 @@ pub fn auth_type(i: Input) -> IResult<AuthType> {
value(AuthType::JWT, rule! { JWT }),
))(i)
}

// parse: (k = v ...)* into a map
pub fn options(i: Input) -> IResult<BTreeMap<String, String>> {
let ident_to_string = |i| {
map_res(ident, |ident| {
if ident.quote.is_none() {
Ok(ident.to_string())
} else {
Err(ErrorKind::Other(
"unexpected quoted identifier, try to remove the quote",
))
}
})(i)
};

let ident_with_format = alt((
ident_to_string,
map(rule! { FORMAT }, |_| "FORMAT".to_string()),
));

map(
rule! {
"(" ~ ( #ident_with_format ~ "=" ~ (#ident_to_string | #literal_string) )* ~ ")"
},
|(_, opts, _)| BTreeMap::from_iter(opts.iter().map(|(k, _, v)| (k.clone(), v.clone()))),
)(i)
}
Loading

0 comments on commit 89f563c

Please sign in to comment.