diff --git a/crates/deltalake-sql/Cargo.toml b/crates/deltalake-sql/Cargo.toml new file mode 100644 index 0000000000..f6fb345a18 --- /dev/null +++ b/crates/deltalake-sql/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "deltalake-sql" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +datafusion-common = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-sql = { workspace = true } + +[dev-dependencies] +arrow-schema = { workspace = true } diff --git a/crates/deltalake-sql/README.md b/crates/deltalake-sql/README.md new file mode 100644 index 0000000000..42b745d04c --- /dev/null +++ b/crates/deltalake-sql/README.md @@ -0,0 +1,3 @@ +# `deltalake-sql` + +The `deltalake-sql` crate extends the datafusion SQL parser to handle Delta Lake specific commands. diff --git a/crates/deltalake-sql/src/lib.rs b/crates/deltalake-sql/src/lib.rs new file mode 100644 index 0000000000..8d12b5b042 --- /dev/null +++ b/crates/deltalake-sql/src/lib.rs @@ -0,0 +1,18 @@ +pub mod logical_plan; +pub mod parser; +pub mod planner; + +#[cfg(test)] +mod tests { + use datafusion_expr::logical_plan::LogicalPlan; + + pub fn assert_plan_eq(plan: &LogicalPlan, expected_lines: &[&str]) { + let formatted = plan.display_indent().to_string(); + let actual_lines: Vec<_> = formatted.trim().lines().collect(); + assert_eq!( + &actual_lines, expected_lines, + "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + expected_lines, actual_lines + ); + } +} diff --git a/crates/deltalake-sql/src/logical_plan.rs b/crates/deltalake-sql/src/logical_plan.rs new file mode 100644 index 0000000000..164462a90c --- /dev/null +++ b/crates/deltalake-sql/src/logical_plan.rs @@ -0,0 +1,205 @@ +use std::fmt::{self, Debug, Display}; +use std::sync::Arc; + +use datafusion_common::{DFSchema, DFSchemaRef, OwnedTableReference}; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::{Expr, UserDefinedLogicalNodeCore}; + +/// Delta Lake specific operations +#[derive(Clone, PartialEq, Eq, Hash)] +pub enum DeltaStatement { + /// Get provenance information, including the operation, + /// user, and so on, for each write to a table. + DescribeHistory(DescribeHistory), + DescribeDetails(DescribeDetails), + DescribeFiles(DescribeFiles), + /// Remove unused files from a table directory. + Vacuum(Vacuum), +} + +impl Debug for DeltaStatement { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.display()) + } +} + +impl DeltaStatement { + /// Return a `format`able structure with the a human readable + /// description of this LogicalPlan node per node, not including + /// children. + pub fn display(&self) -> impl fmt::Display + '_ { + struct Wrapper<'a>(&'a DeltaStatement); + impl<'a> Display for Wrapper<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.0 { + DeltaStatement::Vacuum(Vacuum { + ref table, + ref dry_run, + ref retention_hours, + .. + }) => { + if let Some(ret) = retention_hours { + write!(f, "Vacuum: {table} retention_hours={ret} dry_run={dry_run}") + } else { + write!(f, "Vacuum: {table} dry_run={dry_run}") + } + } + DeltaStatement::DescribeHistory(DescribeHistory { table, .. }) => { + write!(f, "DescribeHistory: {table:?}") + } + DeltaStatement::DescribeDetails(DescribeDetails { table, .. }) => { + write!(f, "DescribeDetails: {table:?}") + } + DeltaStatement::DescribeFiles(DescribeFiles { table, .. }) => { + write!(f, "DescribeFiles: {table:?}") + } + } + } + } + Wrapper(self) + } +} + +impl UserDefinedLogicalNodeCore for DeltaStatement { + fn name(&self) -> &str { + match self { + Self::DescribeDetails(_) => "DescribeDetails", + Self::DescribeHistory(_) => "DescribeHistory", + Self::DescribeFiles(_) => "DescribeFiles", + Self::Vacuum(_) => "Vacuum", + } + } + + fn schema(&self) -> &DFSchemaRef { + match self { + Self::Vacuum(Vacuum { schema, .. }) => schema, + _ => todo!(), + } + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.display()) + } + + fn from_template(&self, exprs: &[Expr], inputs: &[LogicalPlan]) -> Self { + match self { + Self::Vacuum(_) | Self::DescribeHistory(_) => { + assert_eq!(inputs.len(), 0, "input size inconsistent"); + assert_eq!(exprs.len(), 0, "expression size inconsistent"); + self.clone() + } + _ => todo!(), + } + } +} + +/// Logical Plan for [Vacuum] operation. +/// +/// [Vacuum]: https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/delta-vacuum +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct Vacuum { + /// A reference to the table being vacuumed + pub table: OwnedTableReference, + /// The retention threshold. + pub retention_hours: Option, + /// Return a list of up to 1000 files to be deleted. + pub dry_run: bool, + /// Schema for Vacuum's empty return table + pub schema: DFSchemaRef, +} + +impl Vacuum { + pub fn new(table: OwnedTableReference, retention_hours: Option, dry_run: bool) -> Self { + Self { + table, + retention_hours, + dry_run, + schema: Arc::new(DFSchema::empty()), + } + } +} + +/// Logical Plan for [DescribeHistory] operation. +/// +/// [DescribeHistory]: https://learn.microsoft.com/en-us/azure/databricks/sql/language-manual/delta-describe-history +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct DescribeHistory { + /// A reference to the table + pub table: OwnedTableReference, + /// Schema for commit provenence information + pub schema: DFSchemaRef, +} + +impl DescribeHistory { + pub fn new(table: OwnedTableReference) -> Self { + Self { + table, + // TODO: add proper schema + // https://learn.microsoft.com/en-us/azure/databricks/delta/history#history-schema + schema: Arc::new(DFSchema::empty()), + } + } +} + +/// Logical Plan for DescribeDetails operation. +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct DescribeDetails { + /// A reference to the table + pub table: OwnedTableReference, + /// Schema for commit provenence information + pub schema: DFSchemaRef, +} + +impl DescribeDetails { + pub fn new(table: OwnedTableReference) -> Self { + Self { + table, + // TODO: add proper schema + schema: Arc::new(DFSchema::empty()), + } + } +} + +/// Logical Plan for DescribeFiles operation. +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct DescribeFiles { + /// A reference to the table + pub table: OwnedTableReference, + /// Schema for commit provenence information + pub schema: DFSchemaRef, +} + +impl DescribeFiles { + pub fn new(table: OwnedTableReference) -> Self { + Self { + table, + // TODO: add proper schema + schema: Arc::new(DFSchema::empty()), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_display() { + let stmt = DeltaStatement::Vacuum(Vacuum::new("table".into(), Some(1234), true)); + assert_eq!( + format!("{}", stmt.display()), + "Vacuum: table retention_hours=1234 dry_run=true" + ); + + let stmt = DeltaStatement::Vacuum(Vacuum::new("table".into(), None, true)); + assert_eq!(format!("{}", stmt.display()), "Vacuum: table dry_run=true") + } +} diff --git a/crates/deltalake-sql/src/parser.rs b/crates/deltalake-sql/src/parser.rs new file mode 100644 index 0000000000..c76cced9bd --- /dev/null +++ b/crates/deltalake-sql/src/parser.rs @@ -0,0 +1,380 @@ +use std::collections::VecDeque; +use std::fmt; + +use datafusion_sql::parser::{DFParser, DescribeTableStmt, Statement as DFStatement}; +use datafusion_sql::sqlparser::ast::{ObjectName, Value}; +use datafusion_sql::sqlparser::dialect::{keywords::Keyword, Dialect, GenericDialect}; +use datafusion_sql::sqlparser::parser::{Parser, ParserError}; +use datafusion_sql::sqlparser::tokenizer::{Token, TokenWithLocation, Tokenizer}; + +// Use `Parser::expected` instead, if possible +macro_rules! parser_err { + ($MSG:expr) => { + Err(ParserError::ParserError($MSG.to_string())) + }; +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum DescribeOperation { + Detail, + History, + Files, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DescribeStatement { + pub table: ObjectName, + pub operation: DescribeOperation, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct VacuumStatement { + pub table: ObjectName, + pub retention_hours: Option, + pub dry_run: bool, +} + +/// Delta Lake Statement representations. +/// +/// Tokens parsed by [`DeltaParser`] are converted into these values. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Statement { + /// Datafusion AST node (from datafusion-sql) + Datafusion(DFStatement), + /// Extension: `DESCRIBE [HISTORY | DETAIL] table_name` + Describe(DescribeStatement), + /// Extension: `VACUUM table_name [RETAIN num HOURS] [DRY RUN]` + Vacuum(VacuumStatement), +} + +impl From for Statement { + fn from(value: DFStatement) -> Self { + Self::Datafusion(value) + } +} + +impl fmt::Display for Statement { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Statement::Datafusion(stmt) => write!(f, "{stmt}"), + Statement::Describe(_) => write!(f, "DESCRIBE TABLE ..."), + Statement::Vacuum(_) => write!(f, "VACUUM TABLE ..."), + } + } +} + +/// Delta Lake SQL Parser based on [`sqlparser`] +/// +/// This parser handles Delta Lake specific statements, delegating to +/// [`DFParser`](datafusion_sql::parser::DFParser) for other SQL statements. +pub struct DeltaParser<'a> { + sql: &'a str, + parser: Parser<'a>, +} + +impl<'a> DeltaParser<'a> { + /// Create a new parser for the specified tokens using the [`GenericDialect`]. + pub fn new(sql: &'a str) -> Result { + let dialect = &GenericDialect {}; + DeltaParser::new_with_dialect(sql, dialect) + } + + /// Create a new parser for the specified tokens with the + /// specified dialect. + pub fn new_with_dialect(sql: &'a str, dialect: &'a dyn Dialect) -> Result { + let mut tokenizer = Tokenizer::new(dialect, sql); + let tokens = tokenizer.tokenize()?; + + Ok(Self { + sql, + parser: Parser::new(dialect).with_tokens(tokens), + }) + } + + /// Parse a sql string into one or [`Statement`]s using the + /// [`GenericDialect`]. + pub fn parse_sql(sql: impl AsRef) -> Result, ParserError> { + let dialect: &GenericDialect = &GenericDialect {}; + DeltaParser::parse_sql_with_dialect(sql.as_ref(), dialect) + } + + /// Parse a SQL string and produce one or more [`Statement`]s with + /// with the specified dialect. + pub fn parse_sql_with_dialect( + sql: &str, + dialect: &dyn Dialect, + ) -> Result, ParserError> { + let mut parser = DeltaParser::new_with_dialect(sql, dialect)?; + let mut stmts = VecDeque::new(); + let mut expecting_statement_delimiter = false; + loop { + // ignore empty statements (between successive statement delimiters) + while parser.parser.consume_token(&Token::SemiColon) { + expecting_statement_delimiter = false; + } + + if parser.parser.peek_token() == Token::EOF { + break; + } + if expecting_statement_delimiter { + return parser.expected("end of statement", parser.parser.peek_token()); + } + + let statement = parser.parse_statement()?; + stmts.push_back(statement); + expecting_statement_delimiter = true; + } + + Ok(stmts) + } + + /// Report an unexpected token + fn expected(&self, expected: &str, found: TokenWithLocation) -> Result { + parser_err!(format!("Expected {expected}, found: {found}")) + } + + /// Parse a new expression + pub fn parse_statement(&mut self) -> Result { + match self.parser.peek_token().token { + Token::Word(w) => { + match w.keyword { + Keyword::DESCRIBE => { + self.parser.next_token(); + self.parse_describe() + } + Keyword::VACUUM => { + self.parser.next_token(); + self.parse_vacuum() + } + _ => { + // use the native parser + // TODO fix for multiple statememnts and keeping parsers in sync + let mut df = DFParser::new(self.sql)?; + let stmt = df.parse_statement()?; + self.parser.parse_statement()?; + Ok(Statement::Datafusion(stmt)) + } + } + } + _ => { + // use the native parser + // TODO fix for multiple statememnts and keeping parsers in sync + let mut df = DFParser::new(self.sql)?; + let stmt = df.parse_statement()?; + self.parser.parse_statement()?; + Ok(Statement::Datafusion(stmt)) + } + } + } + + /// Parse a SQL `DESCRIBE` statement + pub fn parse_describe(&mut self) -> Result { + match self.parser.peek_token().token { + Token::Word(w) => match w.keyword { + Keyword::DETAIL => { + self.parser.next_token(); + let table = self.parser.parse_object_name()?; + Ok(Statement::Describe(DescribeStatement { + table, + operation: DescribeOperation::Detail, + })) + } + Keyword::HISTORY => { + self.parser.next_token(); + let table = self.parser.parse_object_name()?; + Ok(Statement::Describe(DescribeStatement { + table, + operation: DescribeOperation::History, + })) + } + Keyword::FILES => { + self.parser.next_token(); + let table = self.parser.parse_object_name()?; + Ok(Statement::Describe(DescribeStatement { + table, + operation: DescribeOperation::Files, + })) + } + _ => { + let table = self.parser.parse_object_name()?; + Ok(Statement::Datafusion(DFStatement::DescribeTableStmt( + DescribeTableStmt { table_name: table }, + ))) + } + }, + _ => { + let table_name = self.parser.parse_object_name()?; + Ok(Statement::Datafusion(DFStatement::DescribeTableStmt( + DescribeTableStmt { table_name }, + ))) + } + } + } + + pub fn parse_vacuum(&mut self) -> Result { + let table_name = self.parser.parse_object_name()?; + match self.parser.peek_token().token { + Token::Word(w) => match w.keyword { + Keyword::RETAIN => { + self.parser.next_token(); + let retention_hours = match self.parser.parse_number_value()? { + Value::Number(value_str, _) => value_str + .parse() + .map_err(|_| ParserError::ParserError(format!("Unexpected token {w}"))), + _ => Err(ParserError::ParserError( + "Expected numeric value for retention hours".to_string(), + )), + }?; + if !self.parser.parse_keyword(Keyword::HOURS) { + return Err(ParserError::ParserError( + "Expected keyword 'HOURS'".to_string(), + )); + }; + Ok(Statement::Vacuum(VacuumStatement { + table: table_name, + retention_hours: Some(retention_hours), + dry_run: self.parser.parse_keywords(&[Keyword::DRY, Keyword::RUN]), + })) + } + Keyword::DRY => { + self.parser.next_token(); + if self.parser.parse_keyword(Keyword::RUN) { + Ok(Statement::Vacuum(VacuumStatement { + table: table_name, + retention_hours: None, + dry_run: true, + })) + } else { + Err(ParserError::ParserError( + "Expected keyword 'RUN'".to_string(), + )) + } + } + _ => Err(ParserError::ParserError(format!("Unexpected token {w}"))), + }, + _ => { + let token = self.parser.next_token(); + if token == Token::EOF || token == Token::SemiColon { + Ok(Statement::Vacuum(VacuumStatement { + table: table_name, + retention_hours: None, + dry_run: false, + })) + } else { + Err(ParserError::ParserError(format!( + "Unexpected token {token}" + ))) + } + } + } + } +} + +#[cfg(test)] +mod tests { + use datafusion_sql::sqlparser::ast::Ident; + + use super::*; + + fn expect_parse_ok(sql: &str, expected: Statement) -> Result<(), ParserError> { + let statements = DeltaParser::parse_sql(sql)?; + assert_eq!( + statements.len(), + 1, + "Expected to parse exactly one statement" + ); + assert_eq!(statements[0], expected, "actual:\n{:#?}", statements[0]); + Ok(()) + } + + #[test] + fn test_parse_describe() { + let stmt = Statement::Describe(DescribeStatement { + table: ObjectName(vec![Ident { + value: "data_table".to_string(), + quote_style: None, + }]), + operation: DescribeOperation::History, + }); + assert!(expect_parse_ok("DESCRIBE HISTORY data_table", stmt).is_ok()); + + let stmt = Statement::Describe(DescribeStatement { + table: ObjectName(vec![Ident { + value: "data_table".to_string(), + quote_style: None, + }]), + operation: DescribeOperation::Detail, + }); + assert!(expect_parse_ok("DESCRIBE DETAIL data_table", stmt).is_ok()); + + let stmt = Statement::Describe(DescribeStatement { + table: ObjectName(vec![Ident { + value: "data_table".to_string(), + quote_style: None, + }]), + operation: DescribeOperation::Files, + }); + assert!(expect_parse_ok("DESCRIBE FILES data_table", stmt).is_ok()); + + let stmt = Statement::Datafusion(DFStatement::DescribeTableStmt(DescribeTableStmt { + table_name: ObjectName(vec![Ident { + value: "data_table".to_string(), + quote_style: None, + }]), + })); + assert!(expect_parse_ok("DESCRIBE data_table", stmt).is_ok()) + } + + #[test] + fn test_parse_vacuum() { + let stmt = Statement::Vacuum(VacuumStatement { + table: ObjectName(vec![Ident { + value: "data_table".to_string(), + quote_style: None, + }]), + retention_hours: None, + dry_run: false, + }); + assert!(expect_parse_ok("VACUUM data_table", stmt).is_ok()); + + let stmt = Statement::Vacuum(VacuumStatement { + table: ObjectName(vec![Ident { + value: "data_table".to_string(), + quote_style: None, + }]), + retention_hours: Some(10), + dry_run: false, + }); + assert!(expect_parse_ok("VACUUM data_table RETAIN 10 HOURS", stmt).is_ok()); + + let stmt = Statement::Vacuum(VacuumStatement { + table: ObjectName(vec![Ident { + value: "data_table".to_string(), + quote_style: None, + }]), + retention_hours: Some(10), + dry_run: true, + }); + assert!(expect_parse_ok("VACUUM data_table RETAIN 10 HOURS DRY RUN", stmt).is_ok()); + + let stmt = Statement::Vacuum(VacuumStatement { + table: ObjectName(vec![Ident { + value: "data_table".to_string(), + quote_style: None, + }]), + retention_hours: None, + dry_run: true, + }); + assert!(expect_parse_ok("VACUUM data_table DRY RUN", stmt).is_ok()); + + // Error cases + + let res = DeltaParser::parse_sql("VACUUM data_table DRY").unwrap_err(); + match res { + ParserError::ParserError(msg) => { + assert_eq!("Expected keyword 'RUN'", msg); + } + _ => unreachable!(), + } + } +} diff --git a/crates/deltalake-sql/src/planner.rs b/crates/deltalake-sql/src/planner.rs new file mode 100644 index 0000000000..3243ed9c7e --- /dev/null +++ b/crates/deltalake-sql/src/planner.rs @@ -0,0 +1,187 @@ +use std::sync::Arc; + +use datafusion_common::{OwnedTableReference, Result as DFResult}; +use datafusion_expr::logical_plan::{Extension, LogicalPlan}; +use datafusion_sql::planner::{ + object_name_to_table_reference, ContextProvider, IdentNormalizer, ParserOptions, SqlToRel, +}; +use datafusion_sql::sqlparser::ast::ObjectName; + +use crate::logical_plan::{DeltaStatement, DescribeFiles, Vacuum}; +use crate::parser::{DescribeStatement, Statement, VacuumStatement}; + +/// Delta SQL query planner +pub struct DeltaSqlToRel<'a, S: ContextProvider> { + pub(crate) context_provider: &'a S, + pub(crate) options: ParserOptions, + pub(crate) _normalizer: IdentNormalizer, +} + +impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { + /// Create a new query planner + pub fn new(schema_provider: &'a S) -> Self { + Self::new_with_options(schema_provider, ParserOptions::default()) + } + + /// Create a new query planner + pub fn new_with_options(schema_provider: &'a S, options: ParserOptions) -> Self { + let normalize = options.enable_ident_normalization; + DeltaSqlToRel { + context_provider: schema_provider, + options, + _normalizer: IdentNormalizer::new(normalize), + } + } + + /// Generate a logical plan from an Delta SQL statement + pub fn statement_to_plan(&self, statement: Statement) -> DFResult { + match statement { + Statement::Datafusion(s) => { + let planner = SqlToRel::new_with_options( + self.context_provider, + ParserOptions { + parse_float_as_decimal: self.options.parse_float_as_decimal, + enable_ident_normalization: self.options.enable_ident_normalization, + }, + ); + planner.statement_to_plan(s) + } + Statement::Describe(describe) => self.describe_to_plan(describe), + Statement::Vacuum(vacuum) => self.vacuum_to_plan(vacuum), + _ => todo!(), + } + } + + fn vacuum_to_plan(&self, vacuum: VacuumStatement) -> DFResult { + let table_ref = self.object_name_to_table_reference(vacuum.table)?; + let plan = DeltaStatement::Vacuum(Vacuum::new( + table_ref.to_owned_reference(), + vacuum.retention_hours, + vacuum.dry_run, + )); + Ok(LogicalPlan::Extension(Extension { + node: Arc::new(plan), + })) + } + + fn describe_to_plan(&self, describe: DescribeStatement) -> DFResult { + let table_ref = self.object_name_to_table_reference(describe.table)?; + let plan = + DeltaStatement::DescribeFiles(DescribeFiles::new(table_ref.to_owned_reference())); + Ok(LogicalPlan::Extension(Extension { + node: Arc::new(plan), + })) + } + + pub(crate) fn object_name_to_table_reference( + &self, + object_name: ObjectName, + ) -> DFResult { + object_name_to_table_reference(object_name, self.options.enable_ident_normalization) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests::assert_plan_eq; + + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_schema::{DataType, Field, Schema}; + use datafusion_common::config::ConfigOptions; + use datafusion_common::DataFusionError; + use datafusion_expr::logical_plan::builder::LogicalTableSource; + use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource}; + use datafusion_sql::TableReference; + + use crate::parser::DeltaParser; + + struct TestSchemaProvider { + options: ConfigOptions, + tables: HashMap>, + } + + impl TestSchemaProvider { + pub fn new() -> Self { + let mut tables = HashMap::new(); + tables.insert( + "table1".to_string(), + create_table_source(vec![Field::new( + "column1".to_string(), + DataType::Utf8, + false, + )]), + ); + + Self { + options: Default::default(), + tables, + } + } + } + + impl ContextProvider for TestSchemaProvider { + fn get_table_provider(&self, name: TableReference) -> DFResult> { + match self.tables.get(name.table()) { + Some(table) => Ok(table.clone()), + _ => Err(DataFusionError::Plan(format!( + "Table not found: {}", + name.table() + ))), + } + } + + fn get_function_meta(&self, _name: &str) -> Option> { + None + } + + fn get_aggregate_meta(&self, _name: &str) -> Option> { + None + } + + fn get_variable_type(&self, _variable_names: &[String]) -> Option { + None + } + + fn options(&self) -> &ConfigOptions { + &self.options + } + + fn get_window_meta(&self, _name: &str) -> Option> { + None + } + } + + fn create_table_source(fields: Vec) -> Arc { + Arc::new(LogicalTableSource::new(Arc::new( + Schema::new_with_metadata(fields, HashMap::new()), + ))) + } + + fn test_statement(sql: &str, expected_lines: &[&str]) { + let cp = TestSchemaProvider::new(); + let planner = DeltaSqlToRel::new(&cp); + let mut stmts = DeltaParser::parse_sql(sql).unwrap(); + let plan = planner + .statement_to_plan(stmts.pop_front().unwrap()) + .unwrap(); + assert_plan_eq(&plan, expected_lines) + } + + #[test] + fn test_planner() { + test_statement( + "SELECT * FROM table1", + &["Projection: table1.column1", " TableScan: table1"], + ); + + test_statement("VACUUM table1", &["Vacuum: table1 dry_run=false"]); + test_statement("VACUUM table1 DRY RUN", &["Vacuum: table1 dry_run=true"]); + test_statement( + "VACUUM table1 RETAIN 1234 HOURS", + &["Vacuum: table1 retention_hours=1234 dry_run=false"], + ); + } +}