Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve aggregatation documentation #172

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ config_namespace! {

/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false

/// Aggregation ratio (number of distinct groups / number of input rows)
/// threshold for skipping partial aggregation. If the value is greater
/// then partial aggregation will skip aggregation for further input
pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8

/// Number of input rows partial aggregation partition should process, before
/// aggregation ratio check and trying to switch to skipping aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
}
}

Expand Down
105 changes: 103 additions & 2 deletions datafusion/expr/src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub trait Accumulator: Send + Sync + Debug {
///
/// Intermediate state is used for "multi-phase" grouping in
/// DataFusion, where an aggregate is computed in parallel with
/// multiple `Accumulator` instances, as illustrated below:
/// multiple `Accumulator` instances, as described below:
///
/// # MultiPhase Grouping
///
Expand Down Expand Up @@ -130,7 +130,7 @@ pub trait Accumulator: Send + Sync + Debug {
/// `───────' `───────'
/// ```
///
/// The partial state is serialied as `Arrays` and then combined
/// The partial state is serialized as `Arrays` and then combined
/// with other partial states from different instances of this
/// Accumulator (that ran on different partitions, for example).
///
Expand All @@ -147,6 +147,107 @@ pub trait Accumulator: Send + Sync + Debug {
/// Note that [`ScalarValue::List`] can be used to pass multiple
/// values if the number of intermediate values is not known at
/// planning time (e.g. for `MEDIAN`)
///
/// # Multi-phase repartitioned Grouping
///
/// Many multi-phase grouping plans contain a Repartition operation
/// as well as shown below:
///
/// ```text
/// ▲ ▲
/// │ │
/// │ │
/// │ │
/// │ │
/// │ │
/// ┌───────────────────────┐ ┌───────────────────────┐ 4. Each AggregateMode::Final
/// │GroupBy │ │GroupBy │ GroupBy has an entry for its
/// │(AggregateMode::Final) │ │(AggregateMode::Final) │ subset of groups (in this case
/// │ │ │ │ that means half the entries)
/// └───────────────────────┘ └───────────────────────┘
/// ▲ ▲
/// │ │
/// └─────────────┬────────────┘
/// │
/// │
/// │
/// ┌─────────────────────────┐ 3. Repartitioning by hash(group
/// │ Repartition │ keys) ensures that each distinct
/// │ HASH(x) │ group key now appears in exactly
/// └─────────────────────────┘ one partition
/// ▲
/// │
/// ┌───────────────┴─────────────┐
/// │ │
/// │ │
/// ┌─────────────────────────┐ ┌──────────────────────────┐ 2. Each AggregateMode::Partial
/// │ GroubyBy │ │ GroubyBy │ GroupBy has an entry for *all*
/// │(AggregateMode::Partial) │ │ (AggregateMode::Partial) │ the groups
/// └─────────────────────────┘ └──────────────────────────┘
/// ▲ ▲
/// │ ┌┘
/// │ │
/// .─────────. .─────────.
/// ,─' '─. ,─' '─.
/// ; Input : ; Input : 1. Since input data is
/// : Partition 0 ; : Partition 1 ; arbitrarily or RoundRobin
/// ╲ ╱ ╲ ╱ distributed, each partition
/// '─. ,─' '─. ,─' likely has all distinct
/// `───────' `───────'
/// ```
///
/// This structure is used so that the `AggregateMode::Partial` accumulators
/// reduces the cardinality of the input as soon as possible. Typically,
/// each partial accumulator sees all groups in the input as the group keys
/// are evenly distributed across the input.
///
/// The final output is computed by repartitioning the result of
/// [`Self::state`] from each Partial aggregate and `hash(group keys)` so
/// that each distinct group key appears in exactly one of the
/// `AggregateMode::Final` GroupBy nodes. The output of the final nodes are
/// then unioned together to produce the overall final output.
///
/// Here is an example that shows the distribution of groups in the
/// different phases
///
/// ```text
/// ┌─────┐ ┌─────┐
/// │ 1 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 2 │ │ 4 │ After repartitioning by
/// └─────┘ └─────┘ hash(group keys), each distinct
/// ┌─────┐ ┌─────┐ group key now appears in exactly
/// │ 1 │ │ 3 │ one partition
/// ├─────┤ ├─────┤
/// │ 2 │ │ 4 │
/// └─────┘ └─────┘
///
///
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
///
/// ┌─────┐ ┌─────┐
/// │ 2 │ │ 2 │
/// ├─────┤ ├─────┤
/// │ 1 │ │ 2 │
/// ├─────┤ ├─────┤
/// │ 3 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 4 │ │ 1 │
/// └─────┘ └─────┘ Input data is arbitrarily or
/// ... ... RoundRobin distributed, each
/// ┌─────┐ ┌─────┐ partition likely has all
/// │ 1 │ │ 4 │ distinct group keys
/// ├─────┤ ├─────┤
/// │ 4 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 1 │ │ 1 │
/// ├─────┤ ├─────┤
/// │ 4 │ │ 3 │
/// └─────┘ └─────┘
///
/// group values group values
/// in partition 0 in partition 1
/// ```
fn state(&mut self) -> Result<Vec<ScalarValue>>;

/// Updates the accumulator's state from an `Array` containing one
Expand Down
66 changes: 61 additions & 5 deletions datafusion/expr/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Vectorized [`GroupsAccumulator`]

use arrow_array::{ArrayRef, BooleanArray};
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Result};

/// Describes how many rows should be emitted during grouping.
#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -128,18 +128,23 @@ pub trait GroupsAccumulator: Send {
/// Returns the intermediate aggregate state for this accumulator,
/// used for multi-phase grouping, resetting its internal state.
///
/// See [`Accumulator::state`] for more information on multi-phase
/// aggregation.
///
/// For example, `AVG` might return two arrays: `SUM` and `COUNT`
/// but the `MIN` aggregate would just return a single array.
///
/// Note more sophisticated internal state can be passed as
/// single `StructArray` rather than multiple arrays.
///
/// See [`Self::evaluate`] for details on the required output
/// order and `emit_to`.
/// order and `emit_to`.
///
/// [`Accumulator::state`]: crate::Accumulator::state
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;

/// Merges intermediate state (the output from [`Self::state`])
/// into this accumulator's values.
/// into this accumulator's current state.
///
/// For some aggregates (such as `SUM`), `merge_batch` is the same
/// as `update_batch`, but for some aggregates (such as `COUNT`,
Expand All @@ -158,8 +163,59 @@ pub trait GroupsAccumulator: Send {
total_num_groups: usize,
) -> Result<()>;

/// Converts an input batch directly the intermediate aggregate state.
///
/// This is the equivalent of treating each input row as its own group. It
/// is invoked when the Partial phase of a multi-phase aggregation is not
/// reducing the cardinality enough to warrant spending more effort on
/// pre-aggregation (see `Background` section below), and switches to
/// passing intermediate state directly on to the next aggregation phase.
///
/// Examples:
/// * `COUNT`: an array of 1s for each row in the input batch.
/// * `SUM/MIN/MAX`: the input values themselves.
///
/// # Arguments
/// * `values`: the input arguments to the accumulator
/// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored
///
/// # Background
///
/// In a multi-phase aggregation (see [`Accumulator::state`]), the initial
/// Partial phase reduces the cardinality of the input data as soon as
/// possible in the plan.
///
/// This strategy is very effective for queries with a small number of
/// groups, as most of the data is aggregated immediately and only a small
/// amount of data must be repartitioned (see [`Accumulator::state`] for
/// background)
///
/// However, for queries with a large number of groups, the Partial phase
/// often does not reduce the cardinality enough to warrant the memory and
/// CPU cost of actually performing the aggregation. For such cases, the
/// HashAggregate operator will dynamically switch to passing intermediate
/// state directly to the next aggregation phase with minimal processing
/// using this method.
///
/// [`Accumulator::state`]: crate::Accumulator::state
fn convert_to_state(
&self,
_values: &[ArrayRef],
_opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
not_impl_err!("Input batch conversion to state not implemented")
}

/// Returns `true` if [`Self::convert_to_state`] is implemented to support
/// intermediate aggregate state conversion.
fn convert_to_state_supported(&self) -> bool {
false
}

/// Amount of memory used to store the state of this accumulator,
/// in bytes. This function is called once per batch, so it should
/// be `O(n)` to compute, not `O(num_groups)`
/// in bytes.
///
/// This function is called once per batch, so it should be `O(n)` to
/// compute, not `O(num_groups)`
fn size(&self) -> usize;
}
5 changes: 4 additions & 1 deletion datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,16 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {

/// Return the fields used to store the intermediate state of this accumulator.
///
/// See [`Accumulator::state`] for background information.
///
/// # Arguments:
/// 1. `name`: the name of the expression (e.g. AVG, SUM, etc)
/// 2. `value_type`: Aggregate function output returned by [`Self::return_type`] if defined, otherwise
/// it is equivalent to the data type of the first arguments
/// 3. `ordering_fields`: the fields used to order the input arguments, if any.
/// Empty if no ordering expression is provided.
///
///
/// # Notes:
///
/// The default implementation returns a single state field named `name`
Expand Down Expand Up @@ -384,7 +387,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// # Notes
///
/// Even if this function returns true, DataFusion will still use
/// `Self::accumulator` for certain queries, such as when this aggregate is
/// [`Self::accumulator`] for certain queries, such as when this aggregate is
/// used as a window function or when there no GROUP BY columns in the
/// query.
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
Expand Down
50 changes: 49 additions & 1 deletion datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use arrow::{
};

use arrow::{
array::{Array, BooleanArray, Int64Array, PrimitiveArray},
array::{Array, BooleanArray, Int64Array, Int64Builder, PrimitiveArray},
buffer::BooleanBuffer,
};
use datafusion_common::{
Expand Down Expand Up @@ -433,6 +433,54 @@ impl GroupsAccumulator for CountGroupsAccumulator {
Ok(vec![Arc::new(counts) as ArrayRef])
}

/// Converts an input batch directly to a state batch
///
/// The state of `COUNT` is always a single Int64Array:
/// * `1` (for non-null, non filtered values)
/// * `0` (for null values)
fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
let values = &values[0];

let state_array = match (values.logical_nulls(), opt_filter) {
(Some(nulls), None) => {
let mut builder = Int64Builder::with_capacity(values.len());
nulls
.into_iter()
.for_each(|is_valid| builder.append_value(is_valid as i64));
builder.finish()
}
(Some(nulls), Some(filter)) => {
let mut builder = Int64Builder::with_capacity(values.len());
nulls.into_iter().zip(filter.iter()).for_each(
|(is_valid, filter_value)| {
builder.append_value(
(is_valid && filter_value.is_some_and(|val| val)) as i64,
)
},
);
builder.finish()
}
(None, Some(filter)) => {
let mut builder = Int64Builder::with_capacity(values.len());
filter.into_iter().for_each(|filter_value| {
builder.append_value(filter_value.is_some_and(|val| val) as i64)
});
builder.finish()
}
(None, None) => Int64Array::from_value(1, values.len()),
};

Ok(vec![Arc::new(state_array)])
}

fn convert_to_state_supported(&self) -> bool {
true
}

fn size(&self) -> usize {
self.counts.capacity() * std::mem::size_of::<usize>()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray};
use arrow::array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray, PrimitiveBuilder};
use arrow::datatypes::ArrowPrimitiveType;
use arrow::datatypes::DataType;
use datafusion_common::Result;
Expand Down Expand Up @@ -134,6 +134,52 @@ where
self.update_batch(values, group_indices, opt_filter, total_num_groups)
}

/// Converts an input batch directly to a state batch
///
/// The state is:
/// - self.prim_fn for all non null, non filtered values
/// - null otherwise
///
fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
let values = values[0].as_primitive::<T>();
let mut state = PrimitiveBuilder::<T>::with_capacity(values.len())
.with_data_type(self.data_type.clone());

match opt_filter {
Some(filter) => {
values
.iter()
.zip(filter.iter())
.for_each(|(val, filter_val)| match (val, filter_val) {
(Some(val), Some(true)) => {
let mut state_val = self.starting_value;
(self.prim_fn)(&mut state_val, val);
state.append_value(state_val);
}
(_, _) => state.append_null(),
})
}
None => values.iter().for_each(|val| match val {
Some(val) => {
let mut state_val = self.starting_value;
(self.prim_fn)(&mut state_val, val);
state.append_value(state_val);
}
None => state.append_null(),
}),
};

Ok(vec![Arc::new(state.finish())])
}

fn convert_to_state_supported(&self) -> bool {
true
}

fn size(&self) -> usize {
self.values.capacity() * std::mem::size_of::<T::Native>() + self.null_state.size()
}
Expand Down
Loading
Loading