Skip to content

Commit

Permalink
Merge pull request #6074 from Xuanwo/migrate-copy-into-to-new-planner
Browse files Browse the repository at this point in the history
refactor(query/planner): Migrate COPY to new planner
  • Loading branch information
mergify[bot] authored Jun 21, 2022
2 parents 9515045 + 336917a commit eee788a
Show file tree
Hide file tree
Showing 37 changed files with 2,153 additions and 43 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/ast/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ nom-rule = "0.3.0"
pratt = "0.3.0"
serde = { version = "1.0.136", features = ["derive"] }
thiserror = "1.0.30"
url = "2.2.2"

[dev-dependencies]
common-base = { path = "../base" }
Expand Down
31 changes: 30 additions & 1 deletion common/ast/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ fn write_period_separated_list(
Ok(())
}

/// Write input items into `a, b, c`
fn write_comma_separated_list(
f: &mut Formatter<'_>,
items: impl IntoIterator<Item = impl Display>,
Expand All @@ -66,7 +67,21 @@ fn write_comma_separated_list(
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{}", item)?;
write!(f, "{item}")?;
}
Ok(())
}

/// Write input items into `'a', 'b', 'c'`
fn write_quoted_comma_separated_list(
f: &mut Formatter<'_>,
items: impl IntoIterator<Item = impl Display>,
) -> std::fmt::Result {
for (i, item) in items.into_iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "'{item}'")?;
}
Ok(())
}
Expand All @@ -83,3 +98,17 @@ fn write_space_seperated_list(
}
Ok(())
}

/// Write input map items into `field_a=x field_b=y`
fn write_space_seperated_map(
f: &mut Formatter<'_>,
items: impl IntoIterator<Item = (impl Display, impl Display)>,
) -> std::fmt::Result {
for (i, (k, v)) in items.into_iter().enumerate() {
if i > 0 {
write!(f, " ")?;
}
write!(f, "{k}='{v}'")?;
}
Ok(())
}
158 changes: 158 additions & 0 deletions common/ast/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use crate::ast::expr::Literal;
use crate::ast::expr::TypeName;
use crate::ast::write_comma_separated_list;
use crate::ast::write_period_separated_list;
use crate::ast::write_quoted_comma_separated_list;
use crate::ast::write_space_seperated_map;
use crate::ast::Identifier;
use crate::ast::Query;
use crate::parser::token::Token;
Expand All @@ -53,6 +55,8 @@ pub enum Statement<'a> {
overwrite: bool,
},

Copy(CopyStmt<'a>),

ShowSettings,
ShowProcessList,
ShowMetrics,
Expand Down Expand Up @@ -167,6 +171,25 @@ pub enum ExplainKind {
Pipeline,
}

/// CopyStmt is the parsed statement of `COPY`.
///
/// ## Examples
///
/// ```sql
/// COPY INTO table from s3://bucket/path/to/x.csv
/// ```
#[derive(Debug, Clone, PartialEq)]
pub struct CopyStmt<'a> {
pub src: CopyUnit<'a>,
pub dst: CopyUnit<'a>,
pub files: Vec<String>,
pub pattern: String,
pub file_format: BTreeMap<String, String>,
/// TODO(xuanwo): parse into validation_mode directly.
pub validation_mode: String,
pub size_limit: usize,
}

#[derive(Debug, Clone, PartialEq)] // Databases
pub struct ShowDatabasesStmt<'a> {
pub limit: Option<ShowLimit<'a>>,
Expand Down Expand Up @@ -351,6 +374,60 @@ pub enum DatabaseEngine {
Github(String),
}

/// CopyUnit is the unit that can be used in `COPY`.
#[derive(Debug, Clone, PartialEq)]
pub enum CopyUnit<'a> {
/// Table can be used in `INTO` or `FROM`.
///
/// While table used as `FROM`, it will be rewrite as `(SELECT * FROM table)`
Table {
catalog: Option<Identifier<'a>>,
database: Option<Identifier<'a>>,
table: Identifier<'a>,
},
/// StageLocation (a.k.a internal and external stage) can be used
/// in `INTO` or `FROM`.
///
/// For examples:
///
/// - internal stage: `@internal_stage/path/to/dir/`
/// - external stage: `@s3_external_stage/path/to/dir/`
StageLocation {
/// The name of the stage.
name: String,
path: String,
},
/// UriLocation (a.k.a external location) can be used in `INTO` or `FROM`.
///
/// For examples: `'s3://example/path/to/dir' CREDENTIALS = (AWS_ACCESS_ID="admin" AWS_SECRET_KEY="admin")`
///
/// TODO(xuanwo): Add endpoint_url support.
/// TODO(xuanwo): We can check if we support this protocol during parsing.
/// TODO(xuanwo): Maybe we can introduce more strict (friendly) report for credentials and encryption, like parsed into StorageConfig?
UriLocation {
protocol: String,
name: String,
path: String,
credentials: BTreeMap<String, String>,
encryption: BTreeMap<String, String>,
},
/// Query can only be used as `FROM`.
///
/// For example:`(SELECT field_a,field_b FROM table)`
Query(Box<Query<'a>>),
}

impl CopyUnit<'_> {
pub fn target(&self) -> &'static str {
match self {
CopyUnit::Table { .. } => "Table",
CopyUnit::StageLocation { .. } => "StageLocation",
CopyUnit::UriLocation { .. } => "UriLocation",
CopyUnit::Query(_) => "Query",
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct CreateViewStmt<'a> {
pub if_not_exists: bool,
Expand Down Expand Up @@ -652,6 +729,56 @@ impl Display for KillTarget {
}
}

impl Display for CopyUnit<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
CopyUnit::Table {
catalog,
database,
table,
} => {
if let Some(catalog) = catalog {
write!(
f,
"{catalog}.{}.{table}",
database.as_ref().expect("database must be valid")
)
} else if let Some(database) = database {
write!(f, "{database}.{table}")
} else {
write!(f, "{table}")
}
}
CopyUnit::StageLocation { name, path } => {
write!(f, "@{name}{path}")
}
CopyUnit::UriLocation {
protocol,
name,
path,
credentials,
encryption,
} => {
write!(f, "'{protocol}://{name}{path}'")?;
if !credentials.is_empty() {
write!(f, " CREDENTIALS = ( ")?;
write_space_seperated_map(f, credentials)?;
write!(f, " )")?;
}
if !encryption.is_empty() {
write!(f, " ENCRYPTION = ( ")?;
write_space_seperated_map(f, encryption)?;
write!(f, " )")?;
}
Ok(())
}
CopyUnit::Query(query) => {
write!(f, "({query})")
}
}
}
}

impl Display for RoleOption {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -717,6 +844,37 @@ impl<'a> Display for Statement<'a> {
InsertSource::Select { query } => write!(f, " {query}")?,
}
}
Statement::Copy(stmt) => {
write!(f, "COPY")?;
write!(f, " INTO {}", stmt.dst)?;
write!(f, " FROM {}", stmt.src)?;

if !stmt.file_format.is_empty() {
write!(f, " FILE_FORMAT = (")?;
for (k, v) in stmt.file_format.iter() {
write!(f, " {} = '{}'", k, v)?;
}
write!(f, " )")?;
}

if !stmt.files.is_empty() {
write!(f, " FILES = (")?;
write_quoted_comma_separated_list(f, &stmt.files)?;
write!(f, " )")?;
}

if !stmt.pattern.is_empty() {
write!(f, " PATTERN = '{}'", stmt.pattern)?;
}

if stmt.size_limit != 0 {
write!(f, " SIZE_LIMIT = {}", stmt.size_limit)?;
}

if !stmt.validation_mode.is_empty() {
write!(f, "VALIDATION_MODE = {}", stmt.validation_mode)?;
}
}
Statement::ShowSettings => {
write!(f, "SHOW SETTINGS")?;
}
Expand Down
Loading

0 comments on commit eee788a

Please sign in to comment.