Skip to content

Commit

Permalink
perf(cubestore): Update DataFusion pointer to new GroupsAccumulator c…
Browse files Browse the repository at this point in the history
…hanges (#8985)

* Replaces Accumulator usage in grouped hash aggregation with
  GroupsAccumulator API

* Makes all accumulators in DF grouped hash aggregation use
  GroupsAccumulatorFlatAdapter

* Uses PrimitiveGroupsAccumulator and CountGroupsAccumulator for sum,
  min, max, and count for faster performance

* Improves memory layout and traversal order of group by keys saved
  during grouped aggregation.
  • Loading branch information
srh authored Nov 27, 2024
1 parent f61afc3 commit 4613628
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
6 changes: 3 additions & 3 deletions rust/cubestore/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions rust/cubestore/cubestore-sql-tests/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3144,14 +3144,14 @@ async fn planning_inplace_aggregate2(service: Box<dyn SqlClient>) {
pp_phys_plan_ext(p.router.as_ref(), &verbose),
"Projection, [url, SUM(Data.hits)@1:hits]\
\n AggregateTopK, limit: 10, sortBy: [2 desc null last]\
\n ClusterSend, partitions: [[1, 2]]"
\n ClusterSend, partitions: [[1, 2]], sort_order: [1]"
);
assert_eq!(
pp_phys_plan_ext(p.worker.as_ref(), &verbose),
"Projection, [url, SUM(Data.hits)@1:hits]\
\n AggregateTopK, limit: 10, sortBy: [2 desc null last]\
\n Worker\
\n Sort, by: [SUM(hits)@1 desc nulls last]\
\n Worker, sort_order: [1]\
\n Sort, by: [SUM(hits)@1 desc nulls last], sort_order: [1]\
\n FullInplaceAggregate, sort_order: [0]\
\n MergeSort, single_vals: [0, 1], sort_order: [0, 1, 2]\
\n Union, single_vals: [0, 1], sort_order: [0, 1, 2]\
Expand Down
12 changes: 10 additions & 2 deletions rust/cubestore/cubestore/src/metastore/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use byteorder::{BigEndian, WriteBytesExt};
use chrono::DateTime;
use chrono::Utc;
use datafusion::arrow::datatypes::Schema as ArrowSchema;
use datafusion::physical_plan::expressions::{Column as FusionColumn, Max, Min, Sum};
use datafusion::physical_plan::expressions::{
sum_return_type, Column as FusionColumn, Max, Min, Sum,
};
use datafusion::physical_plan::{udaf, AggregateExpr, PhysicalExpr};
use itertools::Itertools;

Expand Down Expand Up @@ -76,7 +78,13 @@ impl AggregateColumn {
)?);
let res: Arc<dyn AggregateExpr> = match self.function {
AggregateFunction::SUM => {
Arc::new(Sum::new(col.clone(), col.name(), col.data_type(schema)?))
let input_data_type = col.data_type(schema)?;
Arc::new(Sum::new(
col.clone(),
col.name(),
sum_return_type(&input_data_type)?,
&input_data_type,
))
}
AggregateFunction::MAX => {
Arc::new(Max::new(col.clone(), col.name(), col.data_type(schema)?))
Expand Down

0 comments on commit 4613628

Please sign in to comment.