diff --git a/src/query/ast/src/ast/expr.rs b/src/query/ast/src/ast/expr.rs index 57a3d874f1e7d..a845806671fa0 100644 --- a/src/query/ast/src/ast/expr.rs +++ b/src/query/ast/src/ast/expr.rs @@ -789,6 +789,23 @@ pub enum Literal { Null, } +impl Literal { + pub fn as_double(&self) -> Result { + match self { + Literal::UInt64(val) => Ok(*val as f64), + Literal::Float64(val) => Ok(*val), + Literal::Decimal256 { value, scale, .. } => { + let div = 10_f64.powi(*scale as i32); + Ok(value.as_f64() / div) + } + _ => Err(ParseError( + None, + format!("Cannot convert {:?} to double", self), + )), + } + } +} + impl Display for Literal { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { match self { diff --git a/src/query/ast/src/ast/format/ast_format.rs b/src/query/ast/src/ast/format/ast_format.rs index a6350fff2540b..d73089791ae83 100644 --- a/src/query/ast/src/ast/format/ast_format.rs +++ b/src/query/ast/src/ast/format/ast_format.rs @@ -3327,6 +3327,7 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor { consume, pivot, unpivot, + sample, } => { let mut name = String::new(); name.push_str("TableIdentifier "); @@ -3354,6 +3355,11 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor { name.push_str(&unpivot.to_string()); } + if let Some(sample) = sample { + name.push(' '); + name.push_str(&sample.to_string()); + } + let mut children = Vec::new(); if let Some(temporal) = temporal { diff --git a/src/query/ast/src/ast/format/syntax/query.rs b/src/query/ast/src/ast/format/syntax/query.rs index 57de3ae1ff65f..9e264477ad3a5 100644 --- a/src/query/ast/src/ast/format/syntax/query.rs +++ b/src/query/ast/src/ast/format/syntax/query.rs @@ -322,6 +322,7 @@ pub(crate) fn pretty_table(table: TableReference) -> RcDoc<'static> { consume, pivot, unpivot, + sample, } => if let Some(catalog) = catalog { RcDoc::text(catalog.to_string()).append(RcDoc::text(".")) } else { @@ -353,6 +354,11 @@ pub(crate) fn pretty_table(table: TableReference) -> RcDoc<'static> { } else { RcDoc::nil() }) + .append(if let Some(sample) = sample { + RcDoc::text(format!(" {sample}")) + } else { + RcDoc::nil() + }) .append(if let Some(alias) = alias { RcDoc::text(format!(" AS {alias}")) } else { diff --git a/src/query/ast/src/ast/query.rs b/src/query/ast/src/ast/query.rs index 01c51711d0f61..e30b7949479a9 100644 --- a/src/query/ast/src/ast/query.rs +++ b/src/query/ast/src/ast/query.rs @@ -19,6 +19,7 @@ use derive_visitor::Drive; use derive_visitor::DriveMut; use super::Lambda; +use super::Literal; use crate::ast::write_comma_separated_list; use crate::ast::write_dot_separated_list; use crate::ast::Expr; @@ -608,6 +609,39 @@ impl Display for TemporalClause { } } +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub enum SampleLevel { + ROW, + BLOCK, +} + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub enum SampleConfig { + Probability(Literal), + RowsNum(Literal), +} + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct Sample { + pub sample_level: SampleLevel, + pub sample_conf: SampleConfig, +} + +impl Display for Sample { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "SAMPLE ")?; + match self.sample_level { + SampleLevel::ROW => write!(f, "ROW ")?, + SampleLevel::BLOCK => write!(f, "BLOCK ")?, + } + match &self.sample_conf { + SampleConfig::Probability(prob) => write!(f, "({})", prob)?, + SampleConfig::RowsNum(rows) => write!(f, "({} ROWS)", rows)?, + } + Ok(()) + } +} + /// A table name or a parenthesized subquery with an optional alias #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] pub enum TableReference { @@ -623,6 +657,7 @@ pub enum TableReference { consume: bool, pivot: Option>, unpivot: Option>, + sample: Option, }, // `TABLE(expr)[ AS alias ]` TableFunction { @@ -697,6 +732,7 @@ impl Display for TableReference { consume, pivot, unpivot, + sample, } => { write_dot_separated_list( f, @@ -721,6 +757,10 @@ impl Display for TableReference { if let Some(unpivot) = unpivot { write!(f, " {unpivot}")?; } + + if let Some(sample) = sample { + write!(f, " {sample}")?; + } } TableReference::TableFunction { span: _, diff --git a/src/query/ast/src/ast/statements/merge_into.rs b/src/query/ast/src/ast/statements/merge_into.rs index 554d4ae1220b2..8755a02cafbf2 100644 --- a/src/query/ast/src/ast/statements/merge_into.rs +++ b/src/query/ast/src/ast/statements/merge_into.rs @@ -218,6 +218,7 @@ impl MergeSource { consume: false, pivot: None, unpivot: None, + sample: None, }, } } diff --git a/src/query/ast/src/parser/query.rs b/src/query/ast/src/parser/query.rs index 2b5bfa5f4c886..2a863c9327170 100644 --- a/src/query/ast/src/parser/query.rs +++ b/src/query/ast/src/parser/query.rs @@ -685,6 +685,7 @@ pub enum TableReferenceElement { consume: bool, pivot: Option>, unpivot: Option>, + sample: Option, }, // `TABLE(expr)[ AS alias ]` TableFunction { @@ -741,9 +742,48 @@ pub fn table_reference_element(i: Input) -> IResult match level.kind { + ROW => SampleLevel::ROW, + BLOCK => SampleLevel::BLOCK, + _ => unreachable!(), + }, + None => SampleLevel::ROW, + }; + let mut default_sample_conf = SampleConfig::Probability(Literal::Float64(100.0)); + if let Some((_, expr, rows, _)) = sample_conf { + if let Expr::Literal { value, .. } = expr { + default_sample_conf = if rows.is_some() { + SampleConfig::RowsNum(value) + } else { + SampleConfig::Probability(value) + }; + } + } + table_sample = Some(Sample { + sample_level, + sample_conf: default_sample_conf, + }) + }; TableReferenceElement::Table { catalog, database, @@ -753,6 +793,7 @@ pub fn table_reference_element(i: Input) -> IResult>> PrattParser consume, pivot, unpivot, + sample, } => TableReference::Table { span: transform_span(input.span.tokens), catalog, @@ -874,6 +916,7 @@ impl<'a, I: Iterator>> PrattParser consume, pivot, unpivot, + sample, }, TableReferenceElement::TableFunction { lateral, diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index 2b41ab833cc9f..63c7d6e75a32e 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -4140,6 +4140,7 @@ pub fn table_reference_with_alias(i: Input) -> IResult { consume: false, pivot: None, unpivot: None, + sample: None, }, )(i) } @@ -4159,6 +4160,7 @@ pub fn table_reference_only(i: Input) -> IResult { consume: false, pivot: None, unpivot: None, + sample: None, }, )(i) } diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs index 5d154f98a6d49..0c3632aa20dfa 100644 --- a/src/query/ast/src/parser/token.rs +++ b/src/query/ast/src/parser/token.rs @@ -405,6 +405,8 @@ pub enum TokenKind { BROTLI, #[token("BZ2", ignore(ascii_case))] BZ2, + #[token("BLOCK", ignore(ascii_case))] + BLOCK, #[token("CALL", ignore(ascii_case))] CALL, #[token("CASE", ignore(ascii_case))] @@ -924,6 +926,8 @@ pub enum TokenKind { RETURN_FAILED_ONLY, #[token("REVERSE", ignore(ascii_case))] REVERSE, + #[token("SAMPLE", ignore(ascii_case))] + SAMPLE, #[token("MERGE", ignore(ascii_case))] MERGE, #[token("MATCHED", ignore(ascii_case))] @@ -1567,6 +1571,7 @@ impl TokenKind { // | TokenKind::AUTHORIZATION // | TokenKind::BINARY | TokenKind::BOTH + | TokenKind::BLOCK | TokenKind::CASE | TokenKind::CAST // | TokenKind::CHECK @@ -1620,10 +1625,13 @@ impl TokenKind { | TokenKind::SELECT | TokenKind::PIVOT | TokenKind::UNPIVOT + | TokenKind::ROW + | TokenKind::ROWS // | TokenKind::SESSION_USER // | TokenKind::SIMILAR | TokenKind::SOME | TokenKind::SEMI + | TokenKind::SAMPLE // | TokenKind::SYMMETRIC // | TokenKind::TABLESAMPLE | TokenKind::THEN @@ -1660,7 +1668,6 @@ impl TokenKind { | TokenKind::OVER | TokenKind::PARTITION | TokenKind::QUALIFY - | TokenKind::ROWS | TokenKind::RANGE // | TokenKind::OVERLAPS // | TokenKind::RETURNING diff --git a/src/query/sql/src/planner/binder/bind_mutation/merge.rs b/src/query/sql/src/planner/binder/bind_mutation/merge.rs index a1b7a3179d230..78c29fea4aa47 100644 --- a/src/query/sql/src/planner/binder/bind_mutation/merge.rs +++ b/src/query/sql/src/planner/binder/bind_mutation/merge.rs @@ -59,6 +59,7 @@ impl Binder { consume: false, pivot: None, unpivot: None, + sample: None, }; let source_reference = stmt.source.transform_table_reference(); diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind.rs index 15a108fc39ff3..56adad136a974 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind.rs @@ -36,6 +36,7 @@ impl Binder { pivot: _, unpivot: _, consume, + sample, } => self.bind_table( bind_context, span, @@ -45,6 +46,7 @@ impl Binder { alias, temporal, *consume, + sample, ), TableReference::TableFunction { span, diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs index fc5bc5be428e4..78e70e4b20f86 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table.rs @@ -13,6 +13,7 @@ // limitations under the License. use databend_common_ast::ast::Identifier; +use databend_common_ast::ast::Sample; use databend_common_ast::ast::Statement; use databend_common_ast::ast::TableAlias; use databend_common_ast::ast::TemporalClause; @@ -44,6 +45,7 @@ impl Binder { alias: &Option, temporal: &Option, consume: bool, + sample: &Option, ) -> Result<(SExpr, BindContext)> { let table_identifier = TableIdentifier::new(self, catalog, database, table, alias); let (catalog, database, table_name, table_name_alias) = ( @@ -142,6 +144,7 @@ impl Binder { database.as_str(), table_index, change_type, + sample, )?; if let Some(alias) = alias { @@ -247,8 +250,13 @@ impl Binder { false, ); - let (s_expr, mut bind_context) = - self.bind_base_table(bind_context, database.as_str(), table_index, None)?; + let (s_expr, mut bind_context) = self.bind_base_table( + bind_context, + database.as_str(), + table_index, + None, + sample, + )?; if let Some(alias) = alias { bind_context.apply_table_alias(alias, &self.name_resolution_ctx)?; } diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs index 9e506f14e60c1..ebc422b5d656e 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_table_function.rs @@ -152,7 +152,7 @@ impl Binder { ); let (s_expr, mut bind_context) = - self.bind_base_table(bind_context, "system", table_index, None)?; + self.bind_base_table(bind_context, "system", table_index, None, &None)?; if let Some(alias) = alias { bind_context.apply_table_alias(alias, &self.name_resolution_ctx)?; } @@ -220,7 +220,7 @@ impl Binder { ); let (s_expr, mut bind_context) = - self.bind_base_table(bind_context, "system", table_index, None)?; + self.bind_base_table(bind_context, "system", table_index, None, &None)?; if let Some(alias) = alias { bind_context.apply_table_alias(alias, &self.name_resolution_ctx)?; } diff --git a/src/query/sql/src/planner/binder/table.rs b/src/query/sql/src/planner/binder/table.rs index 7e0b8d0b7292d..927db3781b612 100644 --- a/src/query/sql/src/planner/binder/table.rs +++ b/src/query/sql/src/planner/binder/table.rs @@ -21,6 +21,9 @@ use chrono::Utc; use dashmap::DashMap; use databend_common_ast::ast::Identifier; use databend_common_ast::ast::Indirection; +use databend_common_ast::ast::Sample; +use databend_common_ast::ast::SampleConfig; +use databend_common_ast::ast::SampleLevel; use databend_common_ast::ast::SelectTarget; use databend_common_ast::ast::SetExpr; use databend_common_ast::ast::SetOperator; @@ -138,7 +141,7 @@ impl Binder { ); let (s_expr, mut bind_context) = - self.bind_base_table(bind_context, "system", table_index, None)?; + self.bind_base_table(bind_context, "system", table_index, None, &None)?; if let Some(alias) = alias { bind_context.apply_table_alias(alias, &self.name_resolution_ctx)?; } @@ -415,7 +418,9 @@ impl Binder { database_name: &str, table_index: IndexType, change_type: Option, + sample: &Option, ) -> Result<(SExpr, BindContext)> { + dbg!(sample); let mut bind_context = BindContext::with_parent(Box::new(bind_context.clone())); let table = self.metadata.read().table(table_index).clone(); @@ -468,6 +473,7 @@ impl Binder { columns: columns.into_iter().map(|col| col.index()).collect(), statistics: Arc::new(Statistics::default()), change_type, + sample_conf: table_sample(sample)?, ..Default::default() } .into(), @@ -657,3 +663,15 @@ impl Binder { Ok(index_metas) } } + +fn table_sample(sample: &Option) -> Result> { + if let Some(sample) = sample { + if sample.sample_level == SampleLevel::BLOCK { + return Err(ErrorCode::SyntaxException( + "BLOCK sampling is not supported.".to_string(), + )); + } + return Ok(Some(sample.sample_conf.clone())); + } + Ok(None) +} diff --git a/src/query/sql/src/planner/dataframe.rs b/src/query/sql/src/planner/dataframe.rs index 8f263dc674cf1..aab71c714470b 100644 --- a/src/query/sql/src/planner/dataframe.rs +++ b/src/query/sql/src/planner/dataframe.rs @@ -65,6 +65,7 @@ impl Dataframe { consume: false, pivot: None, unpivot: None, + sample: None, }; let settings = query_ctx.get_settings(); @@ -104,7 +105,7 @@ impl Dataframe { false, ); - binder.bind_base_table(&bind_context, database, table_index, None) + binder.bind_base_table(&bind_context, database, table_index, None, &None) } else { binder.bind_table_reference(&mut bind_context, &table) }?; @@ -469,6 +470,7 @@ impl Dataframe { consume: false, pivot: None, unpivot: None, + sample: None, }; table_ref.push(table); } diff --git a/src/query/sql/src/planner/optimizer/statistics/collect_statistics.rs b/src/query/sql/src/planner/optimizer/statistics/collect_statistics.rs index 33c67d10aee8b..70dd803b152fb 100644 --- a/src/query/sql/src/planner/optimizer/statistics/collect_statistics.rs +++ b/src/query/sql/src/planner/optimizer/statistics/collect_statistics.rs @@ -23,6 +23,7 @@ use log::info; use crate::optimizer::RelExpr; use crate::optimizer::SExpr; use crate::optimizer::StatInfo; +use crate::plans::Filter; use crate::plans::RelOperator; use crate::plans::Statistics; use crate::BaseTableColumn; @@ -68,6 +69,9 @@ impl CollectStatisticsOptimizer { .table_statistics(self.table_ctx.clone(), true, scan.change_type.clone()) .await?; + let sample_filter = scan.sample_filter(&table_stats)?; + dbg!(&sample_filter); + let mut column_stats = HashMap::new(); let mut histograms = HashMap::new(); for column in columns.iter() { @@ -104,8 +108,14 @@ impl CollectStatisticsOptimizer { column_stats, histograms, }); - - Ok(s_expr.replace_plan(Arc::new(RelOperator::Scan(scan)))) + let mut s_expr = s_expr.replace_plan(Arc::new(RelOperator::Scan(scan))); + if let Some(sample_filter) = sample_filter { + let filter = Filter { + predicates: vec![sample_filter], + }; + s_expr = SExpr::create_unary(Arc::new(filter.into()), Arc::new(s_expr)) + } + Ok(s_expr) } RelOperator::MaterializedCte(materialized_cte) => { // Collect the common table expression statistics first. diff --git a/src/query/sql/src/planner/plans/scan.rs b/src/query/sql/src/planner/plans/scan.rs index 1c9606a1b40dd..7c55fd59b4c02 100644 --- a/src/query/sql/src/planner/plans/scan.rs +++ b/src/query/sql/src/planner/plans/scan.rs @@ -16,12 +16,17 @@ use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; +use databend_common_ast::ast::Literal; +use databend_common_ast::ast::SampleConfig; use databend_common_catalog::plan::InvertedIndexInfo; use databend_common_catalog::statistics::BasicColumnStatistics; use databend_common_catalog::table::TableStatistics; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::types::NumberScalar; +use databend_common_expression::types::F64; +use databend_common_expression::Scalar; use databend_common_expression::TableSchemaRef; use databend_common_storage::Histogram; use databend_common_storage::DEFAULT_HISTOGRAM_BUCKETS; @@ -42,6 +47,8 @@ use crate::optimizer::SelectivityEstimator; use crate::optimizer::StatInfo; use crate::optimizer::Statistics as OpStatistics; use crate::optimizer::MAX_SELECTIVITY; +use crate::plans::ConstantExpr; +use crate::plans::FunctionCall; use crate::plans::Operator; use crate::plans::RelOp; use crate::plans::ScalarExpr; @@ -105,6 +112,7 @@ pub struct Scan { pub inverted_index: Option, // Lazy row fetch. pub is_lazy_table: bool, + pub sample_conf: Option, pub statistics: Arc, } @@ -144,6 +152,7 @@ impl Scan { update_stream_columns: self.update_stream_columns, inverted_index: self.inverted_index.clone(), is_lazy_table: self.is_lazy_table, + sample_conf: self.sample_conf.clone(), } } @@ -165,6 +174,57 @@ impl Scan { used_columns.extend(self.columns.iter()); used_columns } + + pub fn sample_filter(&self, stats: &Option) -> Result> { + if let Some(sample_conf) = &self.sample_conf { + let rand = match sample_conf { + SampleConfig::Probability(probability) => probability.as_double()? / 100.0, + SampleConfig::RowsNum(rows) => { + let rows = if let Literal::UInt64(rows) = rows { + *rows + } else { + return Err(ErrorCode::SyntaxException( + "Sample rows should be a positive integer".to_string(), + )); + }; + if let Some(stats) = stats { + if let Some(row_num) = stats.num_rows + && row_num > 0 + { + rows as f64 / row_num as f64 + } else { + return Err(ErrorCode::Internal( + "Number of rows in stats is invalid".to_string(), + )); + } + } else { + return Err(ErrorCode::Internal( + "Table statistics is not available".to_string(), + )); + } + } + }; + let rand_expr = ScalarExpr::FunctionCall(FunctionCall { + span: None, + func_name: "rand".to_string(), + params: vec![], + arguments: vec![], + }); + return Ok(Some(ScalarExpr::FunctionCall(FunctionCall { + span: None, + func_name: "lte".to_string(), + params: vec![], + arguments: vec![ + rand_expr, + ScalarExpr::ConstantExpr(ConstantExpr { + span: None, + value: Scalar::Number(NumberScalar::Float64(F64::from(rand))), + }), + ], + }))); + } + Ok(None) + } } impl PartialEq for Scan { diff --git a/src/query/sql/src/planner/semantic/view_rewriter.rs b/src/query/sql/src/planner/semantic/view_rewriter.rs index cf9d06363b6c3..0cf47078fe01c 100644 --- a/src/query/sql/src/planner/semantic/view_rewriter.rs +++ b/src/query/sql/src/planner/semantic/view_rewriter.rs @@ -34,6 +34,7 @@ impl ViewRewriter { consume, pivot, unpivot, + sample, } = table_ref { // Must rewrite view query when table_ref::database is none. If not: @@ -53,6 +54,7 @@ impl ViewRewriter { consume: *consume, pivot: pivot.clone(), unpivot: unpivot.clone(), + sample: sample.clone(), } } } diff --git a/src/tests/sqlsmith/src/sql_gen/dml.rs b/src/tests/sqlsmith/src/sql_gen/dml.rs index 4d8003db44492..487b34ebd6777 100644 --- a/src/tests/sqlsmith/src/sql_gen/dml.rs +++ b/src/tests/sqlsmith/src/sql_gen/dml.rs @@ -302,6 +302,7 @@ impl<'a, R: Rng + 'a> SqlGenerator<'a, R> { consume: false, pivot: None, unpivot: None, + sample: None, }; (table, table_reference) } @@ -505,6 +506,7 @@ impl<'a, R: Rng + 'a> SqlGenerator<'a, R> { consume: false, pivot: None, unpivot: None, + sample: None, }; Some(( AlterTableStmt { diff --git a/src/tests/sqlsmith/src/sql_gen/query.rs b/src/tests/sqlsmith/src/sql_gen/query.rs index d98825f76e3c9..b6e40c106a17d 100644 --- a/src/tests/sqlsmith/src/sql_gen/query.rs +++ b/src/tests/sqlsmith/src/sql_gen/query.rs @@ -483,6 +483,7 @@ impl<'a, R: Rng> SqlGenerator<'a, R> { pivot: None, // TODO unpivot: None, + sample: None, }; (table_ref, schema) }