Skip to content

Commit

Permalink
chore: resolve todo in dynamic sample (#16629)
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 authored Oct 21, 2024
1 parent ac03117 commit fcf8e09
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -25,8 +26,13 @@ use crate::optimizer::RelExpr;
use crate::optimizer::SExpr;
use crate::optimizer::StatInfo;
use crate::planner::query_executor::QueryExecutor;
use crate::plans::Aggregate;
use crate::plans::AggregateMode;
use crate::plans::Limit;
use crate::plans::Operator;
use crate::plans::ProjectSet;
use crate::plans::RelOperator;
use crate::plans::UnionAll;
use crate::MetadataRef;

#[async_recursion::async_recursion(#[recursive::recursive])]
Expand Down Expand Up @@ -78,11 +84,73 @@ pub async fn dynamic_sample(
RelOperator::Join(_) => {
join_selectivity_sample(ctx, metadata, s_expr, sample_executor).await
}
RelOperator::Scan(_) => s_expr.plan().derive_stats(&RelExpr::with_s_expr(s_expr)),
// Todo: add more operators here, and support more query patterns.
_ => {
let rel_expr = RelExpr::with_s_expr(s_expr);
rel_expr.derive_cardinality()
RelOperator::Scan(_)
| RelOperator::DummyTableScan(_)
| RelOperator::CteScan(_)
| RelOperator::ConstantTableScan(_)
| RelOperator::CacheScan(_)
| RelOperator::ExpressionScan(_)
| RelOperator::RecursiveCteScan(_)
| RelOperator::Mutation(_)
| RelOperator::Recluster(_)
| RelOperator::CompactBlock(_)
| RelOperator::MutationSource(_) => {
s_expr.plan().derive_stats(&RelExpr::with_s_expr(s_expr))
}

RelOperator::Aggregate(agg) => {
let child_stat_info =
dynamic_sample(ctx, metadata, s_expr.child(0)?, sample_executor).await?;
if agg.mode == AggregateMode::Final {
return Ok(child_stat_info);
}
let agg = Aggregate::try_from(s_expr.plan().clone())?;
agg.derive_agg_stats(child_stat_info)
}
RelOperator::Limit(_) => {
let child_stat_info =
dynamic_sample(ctx, metadata, s_expr.child(0)?, sample_executor).await?;
let limit = Limit::try_from(s_expr.plan().clone())?;
limit.derive_limit_stats(child_stat_info)
}
RelOperator::UnionAll(_) => {
let left_stat_info = dynamic_sample(
ctx.clone(),
metadata.clone(),
s_expr.child(0)?,
sample_executor.clone(),
)
.await?;
let right_stat_info =
dynamic_sample(ctx, metadata, s_expr.child(1)?, sample_executor).await?;
let union = UnionAll::try_from(s_expr.plan().clone())?;
union.derive_union_stats(left_stat_info, right_stat_info)
}
RelOperator::ProjectSet(_) => {
let mut child_stat_info =
dynamic_sample(ctx, metadata, s_expr.child(0)?, sample_executor)
.await?
.deref()
.clone();
let project_set = ProjectSet::try_from(s_expr.plan().clone())?;
project_set.derive_project_set_stats(&mut child_stat_info)
}
RelOperator::MaterializedCte(_) => {
let right_stat_info =
dynamic_sample(ctx, metadata, s_expr.child(1)?, sample_executor).await?;
Ok(Arc::new(StatInfo {
cardinality: right_stat_info.cardinality,
statistics: right_stat_info.statistics.clone(),
}))
}

RelOperator::EvalScalar(_)
| RelOperator::Sort(_)
| RelOperator::Exchange(_)
| RelOperator::Window(_)
| RelOperator::Udf(_)
| RelOperator::AsyncFunction(_) => {
dynamic_sample(ctx, metadata, s_expr.child(0)?, sample_executor).await
}
}
}
104 changes: 54 additions & 50 deletions src/query/sql/src/planner/plans/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,59 @@ impl Aggregate {
}
Ok(col_set)
}

pub fn derive_agg_stats(&self, stat_info: Arc<StatInfo>) -> Result<Arc<StatInfo>> {
let (cardinality, mut statistics) = (stat_info.cardinality, stat_info.statistics.clone());
let cardinality = if self.group_items.is_empty() {
// Scalar aggregation
1.0
} else if self
.group_items
.iter()
.any(|item| !statistics.column_stats.contains_key(&item.index))
{
cardinality
} else {
// A upper bound
let res = self.group_items.iter().fold(1.0, |acc, item| {
let item_stat = statistics.column_stats.get(&item.index).unwrap();
acc * item_stat.ndv
});
for item in self.group_items.iter() {
let item_stat = statistics.column_stats.get_mut(&item.index).unwrap();
if let Some(histogram) = &mut item_stat.histogram {
let mut num_values = 0.0;
let mut num_distinct = 0.0;
for bucket in histogram.buckets.iter() {
num_distinct += bucket.num_distinct();
num_values += bucket.num_values();
}
// When there is a high probability that eager aggregation
// is better, we will update the histogram.
if num_values / num_distinct >= 10.0 {
for bucket in histogram.buckets.iter_mut() {
bucket.aggregate_values();
}
}
}
}
// To avoid res is very large
f64::min(res, cardinality)
};

let precise_cardinality = if self.group_items.is_empty() {
Some(1)
} else {
None
};
Ok(Arc::new(StatInfo {
cardinality,
statistics: Statistics {
precise_cardinality,
column_stats: statistics.column_stats,
},
}))
}
}

impl Operator for Aggregate {
Expand Down Expand Up @@ -242,56 +295,7 @@ impl Operator for Aggregate {
return rel_expr.derive_cardinality_child(0);
}
let stat_info = rel_expr.derive_cardinality_child(0)?;
let (cardinality, mut statistics) = (stat_info.cardinality, stat_info.statistics.clone());
let cardinality = if self.group_items.is_empty() {
// Scalar aggregation
1.0
} else if self
.group_items
.iter()
.any(|item| !statistics.column_stats.contains_key(&item.index))
{
cardinality
} else {
// A upper bound
let res = self.group_items.iter().fold(1.0, |acc, item| {
let item_stat = statistics.column_stats.get(&item.index).unwrap();
acc * item_stat.ndv
});
for item in self.group_items.iter() {
let item_stat = statistics.column_stats.get_mut(&item.index).unwrap();
if let Some(histogram) = &mut item_stat.histogram {
let mut num_values = 0.0;
let mut num_distinct = 0.0;
for bucket in histogram.buckets.iter() {
num_distinct += bucket.num_distinct();
num_values += bucket.num_values();
}
// When there is a high probability that eager aggregation
// is better, we will update the histogram.
if num_values / num_distinct >= 10.0 {
for bucket in histogram.buckets.iter_mut() {
bucket.aggregate_values();
}
}
}
}
// To avoid res is very large
f64::min(res, cardinality)
};

let precise_cardinality = if self.group_items.is_empty() {
Some(1)
} else {
None
};
Ok(Arc::new(StatInfo {
cardinality,
statistics: Statistics {
precise_cardinality,
column_stats: statistics.column_stats,
},
}))
self.derive_agg_stats(stat_info)
}

fn compute_required_prop_children(
Expand Down
42 changes: 24 additions & 18 deletions src/query/sql/src/planner/plans/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,29 @@ pub struct Limit {
pub offset: usize,
}

impl Limit {
pub fn derive_limit_stats(&self, stat_info: Arc<StatInfo>) -> Result<Arc<StatInfo>> {
let cardinality = match self.limit {
Some(limit) if (limit as f64) < stat_info.cardinality => limit as f64,
_ => stat_info.cardinality,
};
let precise_cardinality = match (self.limit, stat_info.statistics.precise_cardinality) {
(Some(limit), Some(pc)) => {
Some((pc.saturating_sub(self.offset as u64)).min(limit as u64))
}
_ => None,
};

Ok(Arc::new(StatInfo {
cardinality,
statistics: Statistics {
precise_cardinality,
column_stats: Default::default(),
},
}))
}
}

impl Operator for Limit {
fn rel_op(&self) -> RelOp {
RelOp::Limit
Expand Down Expand Up @@ -67,23 +90,6 @@ impl Operator for Limit {

fn derive_stats(&self, rel_expr: &RelExpr) -> Result<Arc<StatInfo>> {
let stat_info = rel_expr.derive_cardinality_child(0)?;
let cardinality = match self.limit {
Some(limit) if (limit as f64) < stat_info.cardinality => limit as f64,
_ => stat_info.cardinality,
};
let precise_cardinality = match (self.limit, stat_info.statistics.precise_cardinality) {
(Some(limit), Some(pc)) => {
Some((pc.saturating_sub(self.offset as u64)).min(limit as u64))
}
_ => None,
};

Ok(Arc::new(StatInfo {
cardinality,
statistics: Statistics {
precise_cardinality,
column_stats: Default::default(),
},
}))
self.derive_limit_stats(stat_info)
}
}
14 changes: 11 additions & 3 deletions src/query/sql/src/planner/plans/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::ops::Deref;
use std::sync::Arc;

use databend_common_exception::Result;

use crate::optimizer::RelExpr;
use crate::optimizer::RelationalProperty;
use crate::optimizer::StatInfo;
Expand All @@ -30,6 +32,14 @@ pub struct ProjectSet {
pub srfs: Vec<ScalarItem>,
}

impl ProjectSet {
pub fn derive_project_set_stats(&self, input_stat: &mut StatInfo) -> Result<Arc<StatInfo>> {
// ProjectSet is set-returning functions, precise_cardinality set None
input_stat.statistics.precise_cardinality = None;
Ok(Arc::new(input_stat.clone()))
}
}

impl Operator for ProjectSet {
fn rel_op(&self) -> RelOp {
RelOp::ProjectSet
Expand Down Expand Up @@ -75,8 +85,6 @@ impl Operator for ProjectSet {

fn derive_stats(&self, rel_expr: &RelExpr) -> databend_common_exception::Result<Arc<StatInfo>> {
let mut input_stat = rel_expr.derive_cardinality_child(0)?.deref().clone();
// ProjectSet is set-returning functions, precise_cardinality set None
input_stat.statistics.precise_cardinality = None;
Ok(Arc::new(input_stat))
self.derive_project_set_stats(&mut input_stat)
}
}
48 changes: 28 additions & 20 deletions src/query/sql/src/planner/plans/union_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,33 @@ impl UnionAll {
}
Ok(used_columns)
}

pub fn derive_union_stats(
&self,
left_stat_info: Arc<StatInfo>,
right_stat_info: Arc<StatInfo>,
) -> Result<Arc<StatInfo>> {
let cardinality = left_stat_info.cardinality + right_stat_info.cardinality;

let precise_cardinality =
left_stat_info
.statistics
.precise_cardinality
.and_then(|left_cardinality| {
right_stat_info
.statistics
.precise_cardinality
.map(|right_cardinality| left_cardinality + right_cardinality)
});

Ok(Arc::new(StatInfo {
cardinality,
statistics: Statistics {
precise_cardinality,
column_stats: Default::default(),
},
}))
}
}

impl Operator for UnionAll {
Expand Down Expand Up @@ -117,26 +144,7 @@ impl Operator for UnionAll {
fn derive_stats(&self, rel_expr: &RelExpr) -> Result<Arc<StatInfo>> {
let left_stat_info = rel_expr.derive_cardinality_child(0)?;
let right_stat_info = rel_expr.derive_cardinality_child(1)?;
let cardinality = left_stat_info.cardinality + right_stat_info.cardinality;

let precise_cardinality =
left_stat_info
.statistics
.precise_cardinality
.and_then(|left_cardinality| {
right_stat_info
.statistics
.precise_cardinality
.map(|right_cardinality| left_cardinality + right_cardinality)
});

Ok(Arc::new(StatInfo {
cardinality,
statistics: Statistics {
precise_cardinality,
column_stats: Default::default(),
},
}))
self.derive_union_stats(left_stat_info, right_stat_info)
}

fn compute_required_prop_child(
Expand Down
Loading

0 comments on commit fcf8e09

Please sign in to comment.