From 46136282d02573d420afc40fdb3767c9d037c067 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Wed, 27 Nov 2024 15:25:40 -0800 Subject: [PATCH] perf(cubestore): Update DataFusion pointer to new GroupsAccumulator changes (#8985) * Replaces Accumulator usage in grouped hash aggregation with GroupsAccumulator API * Makes all accumulators in DF grouped hash aggregation use GroupsAccumulatorFlatAdapter * Uses PrimitiveGroupsAccumulator and CountGroupsAccumulator for sum, min, max, and count for faster performance * Improves memory layout and traversal order of group by keys saved during grouped aggregation. --- rust/cubestore/Cargo.lock | 6 +++--- rust/cubestore/cubestore-sql-tests/src/tests.rs | 6 +++--- rust/cubestore/cubestore/src/metastore/table.rs | 12 ++++++++++-- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/rust/cubestore/Cargo.lock b/rust/cubestore/Cargo.lock index 167c3d933b4bb..89352f63787dc 100644 --- a/rust/cubestore/Cargo.lock +++ b/rust/cubestore/Cargo.lock @@ -160,7 +160,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "5.0.0" -source = "git+https://github.com/cube-js/arrow-rs.git?branch=cube#1963799bd9fd1de3948eba3fce11ddb9c1392d7c" +source = "git+https://github.com/cube-js/arrow-rs.git?branch=cube#b6c25a93744951fb2c73019e57084132788b0a09" dependencies = [ "bitflags 1.3.2", "chrono", @@ -1329,7 +1329,7 @@ checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" [[package]] name = "datafusion" version = "4.0.0-SNAPSHOT" -source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube#c29478ed9678ad1dcae7508dc0195fb94a689e6c" +source = "git+https://github.com/cube-js/arrow-datafusion?branch=cube#8d4663ba60e4370a953b62a302221c46eca39e5c" dependencies = [ "ahash", "arrow", @@ -3429,7 +3429,7 @@ dependencies = [ [[package]] name = "parquet" version = "5.0.0" -source = "git+https://github.com/cube-js/arrow-rs.git?branch=cube#1963799bd9fd1de3948eba3fce11ddb9c1392d7c" +source = "git+https://github.com/cube-js/arrow-rs.git?branch=cube#b6c25a93744951fb2c73019e57084132788b0a09" dependencies = [ "aes-gcm", "arrow", diff --git a/rust/cubestore/cubestore-sql-tests/src/tests.rs b/rust/cubestore/cubestore-sql-tests/src/tests.rs index 2366ea4d83c6d..048157c2172d9 100644 --- a/rust/cubestore/cubestore-sql-tests/src/tests.rs +++ b/rust/cubestore/cubestore-sql-tests/src/tests.rs @@ -3144,14 +3144,14 @@ async fn planning_inplace_aggregate2(service: Box) { pp_phys_plan_ext(p.router.as_ref(), &verbose), "Projection, [url, SUM(Data.hits)@1:hits]\ \n AggregateTopK, limit: 10, sortBy: [2 desc null last]\ - \n ClusterSend, partitions: [[1, 2]]" + \n ClusterSend, partitions: [[1, 2]], sort_order: [1]" ); assert_eq!( pp_phys_plan_ext(p.worker.as_ref(), &verbose), "Projection, [url, SUM(Data.hits)@1:hits]\ \n AggregateTopK, limit: 10, sortBy: [2 desc null last]\ - \n Worker\ - \n Sort, by: [SUM(hits)@1 desc nulls last]\ + \n Worker, sort_order: [1]\ + \n Sort, by: [SUM(hits)@1 desc nulls last], sort_order: [1]\ \n FullInplaceAggregate, sort_order: [0]\ \n MergeSort, single_vals: [0, 1], sort_order: [0, 1, 2]\ \n Union, single_vals: [0, 1], sort_order: [0, 1, 2]\ diff --git a/rust/cubestore/cubestore/src/metastore/table.rs b/rust/cubestore/cubestore/src/metastore/table.rs index c0e464fadca87..4aec0a159d564 100644 --- a/rust/cubestore/cubestore/src/metastore/table.rs +++ b/rust/cubestore/cubestore/src/metastore/table.rs @@ -11,7 +11,9 @@ use byteorder::{BigEndian, WriteBytesExt}; use chrono::DateTime; use chrono::Utc; use datafusion::arrow::datatypes::Schema as ArrowSchema; -use datafusion::physical_plan::expressions::{Column as FusionColumn, Max, Min, Sum}; +use datafusion::physical_plan::expressions::{ + sum_return_type, Column as FusionColumn, Max, Min, Sum, +}; use datafusion::physical_plan::{udaf, AggregateExpr, PhysicalExpr}; use itertools::Itertools; @@ -76,7 +78,13 @@ impl AggregateColumn { )?); let res: Arc = match self.function { AggregateFunction::SUM => { - Arc::new(Sum::new(col.clone(), col.name(), col.data_type(schema)?)) + let input_data_type = col.data_type(schema)?; + Arc::new(Sum::new( + col.clone(), + col.name(), + sum_return_type(&input_data_type)?, + &input_data_type, + )) } AggregateFunction::MAX => { Arc::new(Max::new(col.clone(), col.name(), col.data_type(schema)?))