Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(query): fix aggregate panic in cluster mode #16319

Merged
merged 14 commits into from
Aug 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ where Self: Sized + Clone
fn from_column(col: &Column, desc: &[SortColumnDescription]) -> Result<Self> {
Self::try_from_column(col, desc).ok_or_else(|| {
ErrorCode::BadDataValueType(format!(
"Order column type mismatched. Expecetd {} but got {}",
"Order column type mismatched. Expected {} but got {}",
Self::data_type(),
col.data_type()
))
Expand Down
15 changes: 6 additions & 9 deletions src/query/service/src/pipelines/builders/builder_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,12 @@ impl PipelineBuilder {
.settings
.get_enable_experimental_aggregate_hashtable()?;

let in_cluster = !self.ctx.get_cluster().is_empty();

let params = Self::build_aggregator_params(
aggregate.input.output_schema()?,
&aggregate.group_by,
&aggregate.agg_funcs,
enable_experimental_aggregate_hashtable,
in_cluster,
self.is_exchange_neighbor,
max_block_size as usize,
None,
)?;
Expand All @@ -129,7 +127,7 @@ impl PipelineBuilder {
let method = DataBlock::choose_hash_method(&sample_block, group_cols, efficiently_memory)?;

// Need a global atomic to read the max current radix bits hint
let partial_agg_config = if self.ctx.get_cluster().is_empty() {
let partial_agg_config = if !self.is_exchange_neighbor {
HashTableConfig::default().with_partial(true, max_threads as usize)
} else {
HashTableConfig::default()
Expand Down Expand Up @@ -164,7 +162,7 @@ impl PipelineBuilder {
})?;

// If cluster mode, spill write will be completed in exchange serialize, because we need scatter the block data first
if self.ctx.get_cluster().is_empty() {
if !self.is_exchange_neighbor {
let operator = DataOperator::instance().operator();
let location_prefix =
query_spill_prefix(self.ctx.get_tenant().tenant_name(), &self.ctx.get_id());
Expand Down Expand Up @@ -216,13 +214,12 @@ impl PipelineBuilder {
let enable_experimental_aggregate_hashtable = self
.settings
.get_enable_experimental_aggregate_hashtable()?;
let in_cluster = !self.ctx.get_cluster().is_empty();
let params = Self::build_aggregator_params(
aggregate.before_group_by_schema.clone(),
&aggregate.group_by,
&aggregate.agg_funcs,
enable_experimental_aggregate_hashtable,
in_cluster,
self.is_exchange_neighbor,
max_block_size as usize,
aggregate.limit,
)?;
Expand Down Expand Up @@ -288,7 +285,7 @@ impl PipelineBuilder {
group_by: &[IndexType],
agg_funcs: &[AggregateFunctionDesc],
enable_experimental_aggregate_hashtable: bool,
in_cluster: bool,
cluster_aggregator: bool,
max_block_size: usize,
limit: Option<usize>,
) -> Result<Arc<AggregatorParams>> {
Expand Down Expand Up @@ -330,7 +327,7 @@ impl PipelineBuilder {
&aggs,
&agg_args,
enable_experimental_aggregate_hashtable,
in_cluster,
cluster_aggregator,
max_block_size,
limit,
)?;
Expand Down
7 changes: 6 additions & 1 deletion src/query/service/src/pipelines/builders/builder_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ impl PipelineBuilder {

pub fn build_exchange_sink(&mut self, exchange_sink: &ExchangeSink) -> Result<()> {
// ExchangeSink will be appended by `ExchangeManager::execute_pipeline`
self.build_pipeline(&exchange_sink.input)
let is_exchange_neighbor = self.is_exchange_neighbor;

self.is_exchange_neighbor = true;
self.build_pipeline(&exchange_sink.input)?;
self.is_exchange_neighbor = is_exchange_neighbor;
Ok(())
}
}
6 changes: 5 additions & 1 deletion src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ impl PipelineBuilder {
.insert(build_cache_index, state.clone());
}
self.expand_build_side_pipeline(&join.build, join, state.clone())?;
self.build_join_probe(join, state)
self.build_join_probe(join, state)?;

// In the case of spilling, we need to share state among multiple threads. Quickly fetch all data from this round to quickly start the next round.
self.main_pipeline
.resize(self.main_pipeline.output_len(), true)
}

fn build_join_state(
Expand Down
23 changes: 22 additions & 1 deletion src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct PipelineBuilder {
pub hash_join_states: HashMap<usize, Arc<HashJoinState>>,

pub r_cte_scan_interpreters: Vec<CreateTableInterpreter>,
pub(crate) is_exchange_neighbor: bool,
}

impl PipelineBuilder {
Expand All @@ -78,6 +79,7 @@ impl PipelineBuilder {
join_state: None,
hash_join_states: HashMap::new(),
r_cte_scan_interpreters: vec![],
is_exchange_neighbor: false,
}
}

Expand Down Expand Up @@ -133,9 +135,25 @@ impl PipelineBuilder {
}
}

fn is_exchange_neighbor(&self, plan: &PhysicalPlan) -> bool {
let mut is_empty = true;
let mut all_exchange_source = true;
for children in plan.children() {
is_empty = false;
if !matches!(children, PhysicalPlan::ExchangeSource(_)) {
all_exchange_source = false;
}
}

!is_empty && all_exchange_source
}

#[recursive::recursive]
pub(crate) fn build_pipeline(&mut self, plan: &PhysicalPlan) -> Result<()> {
let _guard = self.add_plan_scope(plan)?;
let is_exchange_neighbor = self.is_exchange_neighbor;
self.is_exchange_neighbor |= self.is_exchange_neighbor(plan);

match plan {
PhysicalPlan::TableScan(scan) => self.build_table_scan(scan),
PhysicalPlan::CteScan(scan) => self.build_cte_scan(scan),
Expand Down Expand Up @@ -235,6 +253,9 @@ impl PipelineBuilder {
PhysicalPlan::ColumnMutation(column_mutation) => {
self.build_column_mutation(column_mutation)
}
}
}?;

self.is_exchange_neighbor = is_exchange_neighbor;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct AggregatorParams {
pub offsets_aggregate_states: Vec<usize>,

pub enable_experimental_aggregate_hashtable: bool,
pub in_cluster: bool,
pub cluster_aggregator: bool,
pub max_block_size: usize,
// Limit is push down to AggregatorTransform
pub limit: Option<usize>,
Expand All @@ -56,7 +56,7 @@ impl AggregatorParams {
agg_funcs: &[AggregateFunctionRef],
agg_args: &[Vec<usize>],
enable_experimental_aggregate_hashtable: bool,
in_cluster: bool,
cluster_aggregator: bool,
max_block_size: usize,
limit: Option<usize>,
) -> Result<Arc<AggregatorParams>> {
Expand All @@ -76,7 +76,7 @@ impl AggregatorParams {
layout: states_layout,
offsets_aggregate_states: states_offsets,
enable_experimental_aggregate_hashtable,
in_cluster,
cluster_aggregator,
max_block_size,
limit,
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl<Method: HashMethodBounds, V: Copy + Send + Sync + 'static>
self.initialized_all_inputs = true;
// in a cluster where partitions are only 8 and 128,
// we need to pull all data where the partition equals 8 until the partition changes to 128 or there is no data available.
if self.params.in_cluster {
if self.params.cluster_aggregator {
for index in 0..self.inputs.len() {
if self.inputs[index].port.is_finished() {
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ impl DefaultSettings {
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("aggregate_spilling_memory_ratio", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
value: UserSettingValue::UInt64(60),
desc: "Sets the maximum memory ratio in bytes that an aggregator can use before spilling data to storage during query execution.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=100)),
Expand Down
14 changes: 13 additions & 1 deletion tests/sqllogictests/suites/tpch/spill.test
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ set window_partition_spilling_memory_ratio = 1;
statement ok
set window_partition_spilling_bytes_threshold_per_proc = 1;

statement ok
set aggregate_spilling_memory_ratio = 1;

statement ok
set aggregate_spilling_bytes_threshold_per_proc = 1;

# TPC-H TEST
include ./queries.test

Expand Down Expand Up @@ -434,4 +440,10 @@ statement ok
set window_partition_spilling_memory_ratio = 60;

statement ok
set window_partition_spilling_bytes_threshold_per_proc = 0;
set window_partition_spilling_bytes_threshold_per_proc = 0;

statement ok
set aggregate_spilling_memory_ratio = 60;

statement ok
set aggregate_spilling_bytes_threshold_per_proc = 0;
Loading