From fa1c964b2cef16383876ee3249ad42cf9f3b45e2 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Sat, 8 Oct 2022 08:03:53 +0800 Subject: [PATCH 1/5] fix(processor): support interrupt join build side --- .../pipeline/core/src/processors/processor.rs | 8 ++++++++ .../sinks/src/processors/sinks/sync_sink.rs | 6 ++++++ .../processors/transforms/transform_compact.rs | 17 +++++++++++------ .../src/pipelines/executor/executor_graph.rs | 11 ++++------- .../src/pipelines/executor/pipeline_executor.rs | 1 + .../transforms/hash_join/hash_join_state.rs | 4 ++++ .../transforms/hash_join/join_hash_table.rs | 15 +++++++++++++++ .../transforms/transform_hash_join.rs | 4 ++++ .../src/sql/executor/pipeline_builder.rs | 5 ----- .../storages/fuse/src/operations/append.rs | 1 - .../storages/fuse/src/operations/compact.rs | 1 - .../storages/fuse/src/operations/recluster.rs | 3 --- 12 files changed, 53 insertions(+), 23 deletions(-) diff --git a/src/query/pipeline/core/src/processors/processor.rs b/src/query/pipeline/core/src/processors/processor.rs index fcde05701803..6ff6d549075c 100644 --- a/src/query/pipeline/core/src/processors/processor.rs +++ b/src/query/pipeline/core/src/processors/processor.rs @@ -42,6 +42,9 @@ pub trait Processor: Send { fn event(&mut self) -> Result; + // When the synchronization task needs to run for a long time, the interrupt function needs to be implemented. + fn interrupt(&self) {} + // Synchronous work. fn process(&mut self) -> Result<()> { Err(ErrorCode::UnImplement("Unimplemented process.")) @@ -96,6 +99,11 @@ impl ProcessorPtr { (*self.inner.get()).event() } + /// # Safety + pub unsafe fn interrupt(&self) { + (*self.inner.get()).interrupt() + } + /// # Safety pub unsafe fn process(&self) -> Result<()> { (*self.inner.get()).process() diff --git a/src/query/pipeline/sinks/src/processors/sinks/sync_sink.rs b/src/query/pipeline/sinks/src/processors/sinks/sync_sink.rs index faa3b5c450b5..9236767521f2 100644 --- a/src/query/pipeline/sinks/src/processors/sinks/sync_sink.rs +++ b/src/query/pipeline/sinks/src/processors/sinks/sync_sink.rs @@ -33,6 +33,8 @@ pub trait Sink: Send { Ok(()) } + fn interrupt(&self) {} + fn consume(&mut self, data_block: DataBlock) -> Result<()>; } @@ -94,6 +96,10 @@ impl Processor for Sinker { } } + fn interrupt(&self) { + self.inner.interrupt() + } + fn process(&mut self) -> Result<()> { if !self.called_on_start { self.called_on_start = true; diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs index 4ee8b979b926..02d3459b2b48 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::collections::VecDeque; +use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -27,14 +28,14 @@ use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_core::processors::Processor; +pub type Aborting = Arc bool + Send + Sync + 'static>>; + pub struct TransformCompact { state: ProcessorState, compactor: T, - aborting: Aborting, + aborting: Arc, } -pub type Aborting = Arc bool + Send + Sync + 'static>>; - /// Compactor is a trait that defines how to compact blocks. pub trait Compactor { fn name() -> &'static str; @@ -55,12 +56,10 @@ pub trait Compactor { impl TransformCompact { pub fn try_create( - ctx: Arc, input_port: Arc, output_port: Arc, compactor: T, ) -> Result { - let aborting = ctx.get_aborting(); let state = ProcessorState::Consume(ConsumeState { input_port, output_port, @@ -71,7 +70,7 @@ impl TransformCompact { Ok(ProcessorPtr::create(Box::new(Self { state, compactor, - aborting: Arc::new(Box::new(move || aborting.load(Ordering::Relaxed))), + aborting: Arc::new(AtomicBool::new(false)), }))) } @@ -152,6 +151,10 @@ impl Processor for TransformCompact { } } + fn interrupt(&self) { + self.aborting.store(true, Ordering::Release); + } + fn process(&mut self) -> Result<()> { match &mut self.state { ProcessorState::Consume(state) => { @@ -166,6 +169,8 @@ impl Processor for TransformCompact { } ProcessorState::Compacting(state) => { let aborting = self.aborting.clone(); + let aborting = Arc::new(Box::new(move || aborting.load(Ordering::Relaxed))); + let compacted_blocks = self.compactor.compact_final(&state.blocks, aborting)?; let mut temp_state = ProcessorState::Finished; diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index c60f46cd57a6..a8a9b94185d5 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -375,15 +375,12 @@ impl RunningGraph { Ok(schedule_queue) } - pub fn all_node_is_finished(&self) -> bool { - for node_index in self.0.graph.node_indices() { - let state = self.0.graph[node_index].state.lock().unwrap(); - if !matches!(&*state, State::Finished) { - return false; + pub fn interrupt_running_nodes(&self) { + unsafe { + for node_index in self.0.graph.node_indices() { + self.0.graph[node_index].processor.interrupt(); } } - - true } } diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 48767c505501..0a4c71ea4eed 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -146,6 +146,7 @@ impl PipelineExecutor { pub fn finish(&self, cause: Option) { *self.finished_error.lock() = cause; self.global_tasks_queue.finish(self.workers_condvar.clone()); + self.graph.interrupt_running_nodes(); self.finished_notify.notify_waiters(); } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index 91087ee3310c..79a4c6c57a0e 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use common_datablocks::DataBlock; use common_exception::Result; use common_pipeline_transforms::processors::transforms::Aborting; @@ -27,6 +29,8 @@ pub trait HashJoinState: Send + Sync { /// Probe the hash table and retrieve matched rows as DataBlocks fn probe(&self, input: &DataBlock, probe_state: &mut ProbeState) -> Result>; + fn interrupt(&self); + /// Attach to state fn attach(&self) -> Result<()>; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs index 81a79e24e15f..c8817808708d 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs @@ -15,6 +15,8 @@ use std::borrow::BorrowMut; use std::collections::HashSet; use std::fmt::Debug; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; @@ -132,6 +134,7 @@ pub struct JoinHashTable { pub(crate) row_ptrs: RwLock>, pub(crate) probe_schema: DataSchemaRef, finished_notify: Arc, + interrupt: Arc, } impl JoinHashTable { @@ -258,6 +261,7 @@ impl JoinHashTable { row_ptrs: RwLock::new(vec![]), probe_schema: probe_data_schema, finished_notify: Arc::new(Notify::new()), + interrupt: Arc::new(AtomicBool::new(false)), }) } @@ -484,6 +488,10 @@ impl HashJoinState for JoinHashTable { } } + fn interrupt(&self) { + self.interrupt.store(true, Ordering::Release); + } + fn attach(&self) -> Result<()> { let mut count = self.ref_count.lock().unwrap(); *count += 1; @@ -535,9 +543,16 @@ impl HashJoinState for JoinHashTable { }}; } + let interrupt = self.interrupt.clone(); let mut chunks = self.row_space.chunks.write().unwrap(); let mut has_null = false; for chunk_index in 0..chunks.len() { + if interrupt.load(Ordering::Relaxed) { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } + let chunk = &mut chunks[chunk_index]; let mut columns = Vec::with_capacity(chunk.cols.len()); let markers = if matches!( diff --git a/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs index b6f28dfea15b..824ac5eb144b 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs @@ -49,6 +49,10 @@ impl Sink for SinkBuildHashTable { self.join_state.detach() } + fn interrupt(&self) { + self.join_state.interrupt() + } + fn consume(&mut self, data_block: DataBlock) -> Result<()> { self.join_state.build(data_block) } diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index d01e777c3441..4ed639b6488e 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -453,7 +453,6 @@ impl PipelineBuilder { // Merge self.main_pipeline.add_transform(|input, output| { TransformSortMerge::try_create( - self.ctx.clone(), input, output, SortMergeCompactor::new(sort.limit, sort_desc.clone()), @@ -465,7 +464,6 @@ impl PipelineBuilder { // Concat merge in single thread self.main_pipeline.add_transform(|input, output| { TransformSortMerge::try_create( - self.ctx.clone(), input, output, SortMergeCompactor::new(sort.limit, sort_desc.clone()), @@ -499,7 +497,6 @@ impl PipelineBuilder { self.main_pipeline.resize(1)?; self.main_pipeline.add_transform(|input, output| { TransformMarkJoin::try_create( - self.ctx.clone(), input, output, MarkJoinCompactor::create(state.clone()), @@ -511,7 +508,6 @@ impl PipelineBuilder { self.main_pipeline.resize(1)?; self.main_pipeline.add_transform(|input, output| { TransformRightJoin::try_create( - self.ctx.clone(), input, output, RightJoinCompactor::create(state.clone()), @@ -523,7 +519,6 @@ impl PipelineBuilder { self.main_pipeline.resize(1)?; self.main_pipeline.add_transform(|input, output| { TransformRightSemiAntiJoin::try_create( - self.ctx.clone(), input, output, RightSemiAntiJoinCompactor::create(state.clone()), diff --git a/src/query/storages/fuse/src/operations/append.rs b/src/query/storages/fuse/src/operations/append.rs index 9396e1379aac..a3c6d036484d 100644 --- a/src/query/storages/fuse/src/operations/append.rs +++ b/src/query/storages/fuse/src/operations/append.rs @@ -48,7 +48,6 @@ impl FuseTable { let block_compactor = self.get_block_compactor(); pipeline.add_transform(|transform_input_port, transform_output_port| { TransformCompact::try_create( - ctx.clone(), transform_input_port, transform_output_port, block_compactor.to_compactor(false), diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index 01c09d7074c3..4e77a7d05ca1 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -94,7 +94,6 @@ impl FuseTable { pipeline.add_transform(|transform_input_port, transform_output_port| { TransformCompact::try_create( - ctx.clone(), transform_input_port, transform_output_port, block_compactor.to_compactor(false), diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 4ffd40dc9fb5..e7b353e32a25 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -156,7 +156,6 @@ impl FuseTable { })?; pipeline.add_transform(|transform_input_port, transform_output_port| { TransformSortMerge::try_create( - ctx.clone(), transform_input_port, transform_output_port, SortMergeCompactor::new(None, sort_descs.clone()), @@ -165,7 +164,6 @@ impl FuseTable { pipeline.resize(1)?; pipeline.add_transform(|transform_input_port, transform_output_port| { TransformSortMerge::try_create( - ctx.clone(), transform_input_port, transform_output_port, SortMergeCompactor::new(None, sort_descs.clone()), @@ -174,7 +172,6 @@ impl FuseTable { pipeline.add_transform(|transform_input_port, transform_output_port| { TransformCompact::try_create( - ctx.clone(), transform_input_port, transform_output_port, block_compactor.to_compactor(true), From 9867bf1ad2c69c307469891d2d55ac1a926965fe Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 10 Oct 2022 08:27:22 +0800 Subject: [PATCH 2/5] fix(processor): support interrupt inner join probe side --- .../transforms/transform_compact.rs | 4 +- .../transforms/hash_join/hash_join_state.rs | 2 - .../transforms/hash_join/join_hash_table.rs | 3 +- .../transforms/hash_join/result_blocks.rs | 186 +++++++++++------- .../transforms/transform_hash_join.rs | 4 + 5 files changed, 128 insertions(+), 71 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs index 02d3459b2b48..63bcdd97f6af 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs @@ -18,7 +18,6 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -use common_catalog::table_context::TableContext; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; @@ -169,7 +168,8 @@ impl Processor for TransformCompact { } ProcessorState::Compacting(state) => { let aborting = self.aborting.clone(); - let aborting = Arc::new(Box::new(move || aborting.load(Ordering::Relaxed))); + let aborting: Aborting = + Arc::new(Box::new(move || aborting.load(Ordering::Relaxed))); let compacted_blocks = self.compactor.compact_final(&state.blocks, aborting)?; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index 79a4c6c57a0e..62828252d8d6 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use common_datablocks::DataBlock; use common_exception::Result; use common_pipeline_transforms::processors::transforms::Aborting; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs index c8817808708d..53639306fe14 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs @@ -38,6 +38,7 @@ use common_datavalues::DataSchemaRefExt; use common_datavalues::DataType; use common_datavalues::DataTypeImpl; use common_datavalues::DataValue; +use common_exception::ErrorCode; use common_exception::Result; use common_hashtable::HashMap; use common_pipeline_transforms::processors::transforms::Aborting; @@ -133,8 +134,8 @@ pub struct JoinHashTable { pub(crate) hash_join_desc: HashJoinDesc, pub(crate) row_ptrs: RwLock>, pub(crate) probe_schema: DataSchemaRef, + pub(crate) interrupt: Arc, finished_notify: Arc, - interrupt: Arc, } impl JoinHashTable { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs index d556eb627597..220bb8ffc66e 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::iter::TrustedLen; +use std::sync::atomic::Ordering; use common_arrow::arrow::bitmap::Bitmap; use common_arrow::arrow::bitmap::MutableBitmap; @@ -60,13 +61,17 @@ impl JoinHashTable { Key: HashTableKeyable + Clone + 'static, IT: Iterator + TrustedLen, { - let probe_indexs = &mut probe_state.probe_indexs; - let build_indexs = &mut probe_state.build_indexs; let valids = &probe_state.valids; - let mut results: Vec = vec![]; match self.hash_join_desc.join_type { JoinType::Inner => { + let block_size = 8192; + + // The inner join will return multiple data blocks of similar size + let mut probed_blocks = vec![]; + let mut probe_indexes = Vec::with_capacity(block_size); + let mut build_indexes = Vec::with_capacity(block_size); + for (i, key) in keys_iter.enumerate() { // If the join is derived from correlated subquery, then null equality is safe. let probe_result_ptr = if self.hash_join_desc.from_correlated_subquery { @@ -74,101 +79,149 @@ impl JoinHashTable { } else { Self::probe_key(hash_table, key, valids, i) }; - match probe_result_ptr { - Some(v) => { - let probe_result_ptrs = v.get_value(); - build_indexs.extend_from_slice(probe_result_ptrs); - for _ in probe_result_ptrs { - probe_indexs.push(i as u32); + if let Some(v) = probe_result_ptr { + let probed_rows = v.get_value(); + + if probe_indexes.len() + probed_rows.len() < probe_indexes.capacity() { + // fast path + build_indexes.extend_from_slice(&probed_rows); + probe_indexes + .extend(std::iter::repeat(i as u32).take(probed_rows.len())); + } else { + let mut index = 0_usize; + let mut remain = probed_rows.len(); + + while index < probed_rows.len() { + if probe_indexes.len() + remain < probe_indexes.capacity() { + build_indexes.extend_from_slice(&probed_rows[index..]); + probe_indexes.extend(std::iter::repeat(i as u32).take(remain)); + index += remain; + } else { + if self.interrupt.load(Ordering::Relaxed) { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } + + let append_rows = + probe_indexes.capacity() - probe_indexes.len(); + let new_index = index + append_rows; + + build_indexes.extend_from_slice(&probed_rows[index..new_index]); + probe_indexes + .extend(std::iter::repeat(i as u32).take(append_rows)); + + let build_block = self.row_space.gather(&build_indexes)?; + let probe_block = + DataBlock::block_take_by_indices(input, &probe_indexes)?; + probed_blocks + .push(self.merge_eq_block(&build_block, &probe_block)?); + + index = new_index; + remain -= append_rows; + + build_indexes.clear(); + probe_indexes.clear(); + } } } - None => continue, } } - let build_block = self.row_space.gather(build_indexs)?; - let probe_block = DataBlock::block_take_by_indices(input, probe_indexs)?; - let merged_block = self.merge_eq_block(&build_block, &probe_block)?; - match &self.hash_join_desc.other_predicate { + None => Ok(probed_blocks), Some(other_predicate) => { let func_ctx = self.ctx.try_get_function_context()?; - let filter_vector = other_predicate.eval(&func_ctx, &merged_block)?; - results.push(DataBlock::filter_block( - merged_block, - filter_vector.vector(), - )?); + let mut filtered_blocks = Vec::with_capacity(probed_blocks.len()); + + for probed_block in probed_blocks { + if self.interrupt.load(Ordering::Relaxed) { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } + + let predicate = other_predicate.eval(&func_ctx, &probed_block)?; + let res = DataBlock::filter_block(probed_block, predicate.vector())?; + + if !res.is_empty() { + filtered_blocks.push(res); + } + } + + Ok(filtered_blocks) } - None => results.push(merged_block), } } JoinType::LeftSemi => { if self.hash_join_desc.other_predicate.is_none() { - let result = self.left_semi_anti_join::( + Ok(vec![self.left_semi_anti_join::( hash_table, probe_state, keys_iter, input, - )?; - return Ok(vec![result]); + )?]) } else { - let result = self.left_semi_anti_join_with_other_conjunct::( - hash_table, - probe_state, - keys_iter, - input, - )?; - return Ok(vec![result]); + Ok(vec![ + self.left_semi_anti_join_with_other_conjunct::( + hash_table, + probe_state, + keys_iter, + input, + )?, + ]) } } JoinType::LeftAnti => { if self.hash_join_desc.other_predicate.is_none() { - let result = self.left_semi_anti_join::( + Ok(vec![self.left_semi_anti_join::( hash_table, probe_state, keys_iter, input, - )?; - return Ok(vec![result]); + )?]) } else { - let result = self.left_semi_anti_join_with_other_conjunct::( - hash_table, - probe_state, - keys_iter, - input, - )?; - return Ok(vec![result]); + Ok(vec![ + self.left_semi_anti_join_with_other_conjunct::( + hash_table, + probe_state, + keys_iter, + input, + )?, + ]) } } - JoinType::RightSemi | JoinType::RightAnti => { - let result = self.right_join::<_, _>(hash_table, probe_state, keys_iter, input)?; - return Ok(vec![result]); - } + JoinType::RightSemi | JoinType::RightAnti => Ok(vec![self.right_join::<_, _>( + hash_table, + probe_state, + keys_iter, + input, + )?]), // Single join is similar to left join, but the result is a single row. JoinType::Left | JoinType::Single | JoinType::Full => { if self.hash_join_desc.other_predicate.is_none() { - let result = self.left_or_single_join::( + Ok(vec![self.left_or_single_join::( hash_table, probe_state, keys_iter, input, - )?; - return Ok(vec![result]); + )?]) } else { - let result = self.left_or_single_join::( + Ok(vec![self.left_or_single_join::( hash_table, probe_state, keys_iter, input, - )?; - return Ok(vec![result]); + )?]) } } - JoinType::Right => { - let result = self.right_join::<_, _>(hash_table, probe_state, keys_iter, input)?; - return Ok(vec![result]); - } + JoinType::Right => Ok(vec![self.right_join::<_, _>( + hash_table, + probe_state, + keys_iter, + input, + )?]), // Three cases will produce Mark join: // 1. uncorrelated ANY subquery: only have one kind of join condition, equi-condition or non-equi-condition. // 2. correlated ANY subquery: must have two kinds of join condition, one is equi-condition and the other is non-equi-condition. @@ -180,21 +233,22 @@ impl JoinHashTable { // 3. Correlated Exists subquery: only have one kind of join condition, equi-condition. // equi-condition is subquery's outer columns with subquery's derived columns. (see the above example in correlated ANY subquery) JoinType::LeftMark => { + let mut results: Vec = vec![]; results.push(DataBlock::empty()); self.left_mark_join(hash_table, probe_state, keys_iter, input)?; - } - JoinType::RightMark => { - let result = self.right_mark_join(hash_table, probe_state, keys_iter, input)?; - return Ok(vec![result]); - } - _ => { - return Err(ErrorCode::UnImplement(format!( - "{} is unimplemented", - self.hash_join_desc.join_type - ))); - } + Ok(results) + } + JoinType::RightMark => Ok(vec![self.right_mark_join( + hash_table, + probe_state, + keys_iter, + input, + )?]), + _ => Err(ErrorCode::UnImplement(format!( + "{} is unimplemented", + self.hash_join_desc.join_type + ))), } - Ok(results) } fn left_mark_join( diff --git a/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs index 824ac5eb144b..632111ef2209 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs @@ -153,6 +153,10 @@ impl Processor for TransformHashJoinProbe { } } + fn interrupt(&self) { + todo!() + } + fn process(&mut self) -> Result<()> { match self.step { HashJoinStep::Build => Ok(()), From 7f64d6d8e51d8d52958b2c408f93d56914bb3006 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 10 Oct 2022 12:27:25 +0800 Subject: [PATCH 3/5] fix(processor): support interrupt inner join probe side --- .../transforms/transform_block_compact.rs | 19 +++++++--- .../transforms/transform_compact.rs | 21 +++-------- .../transforms/transform_sort_merge.rs | 15 +++++++- .../transforms/hash_join/hash_join_state.rs | 11 ++---- .../transforms/hash_join/join_hash_table.rs | 11 ++---- .../transforms/hash_join/result_blocks.rs | 37 +++++++++---------- .../transforms/transform_hash_join.rs | 2 +- .../transforms/transform_mark_join.rs | 5 +-- .../transforms/transform_right_join.rs | 5 +-- .../transform_right_semi_anti_join.rs | 6 +-- 10 files changed, 65 insertions(+), 67 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_block_compact.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_block_compact.rs index be0fb636bf17..669b7fb8e602 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_block_compact.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_block_compact.rs @@ -12,13 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; + use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; use super::Compactor; use super::TransformCompact; -use crate::processors::transforms::Aborting; pub struct BlockCompactor { max_rows_per_block: usize, @@ -27,6 +30,7 @@ pub struct BlockCompactor { // A flag denoting whether it is a recluster operation. // Will be removed later. is_recluster: bool, + aborting: Arc, } impl BlockCompactor { @@ -41,6 +45,7 @@ impl BlockCompactor { min_rows_per_block, max_bytes_per_block, is_recluster, + aborting: Arc::new(AtomicBool::new(false)), } } } @@ -54,6 +59,10 @@ impl Compactor for BlockCompactor { true } + fn interrupt(&self) { + self.aborting.store(true, Ordering::Release); + } + fn compact_partial(&self, blocks: &mut Vec) -> Result> { if blocks.is_empty() { return Ok(vec![]); @@ -108,13 +117,13 @@ impl Compactor for BlockCompactor { Ok(res) } - fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result> { + fn compact_final(&self, blocks: &[DataBlock]) -> Result> { let mut res = Vec::with_capacity(blocks.len()); let mut temp_blocks = vec![]; let mut accumulated_rows = 0; for block in blocks.iter() { - if aborting() { + if self.aborting.load(Ordering::Relaxed) { return Err(ErrorCode::AbortedQuery( "Aborted query, because the server is shutting down or the query was killed.", )); @@ -142,7 +151,7 @@ impl Compactor for BlockCompactor { temp_blocks.push(block); while accumulated_rows >= self.max_rows_per_block { - if aborting() { + if self.aborting.load(Ordering::Relaxed) { return Err(ErrorCode::AbortedQuery( "Aborted query, because the server is shutting down or the query was killed.", )); @@ -164,7 +173,7 @@ impl Compactor for BlockCompactor { } if accumulated_rows != 0 { - if aborting() { + if self.aborting.load(Ordering::Relaxed) { return Err(ErrorCode::AbortedQuery( "Aborted query, because the server is shutting down or the query was killed.", )); diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs index 63bcdd97f6af..07b6e1ff0020 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs @@ -14,8 +14,6 @@ use std::any::Any; use std::collections::VecDeque; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; use std::sync::Arc; use common_datablocks::DataBlock; @@ -32,7 +30,6 @@ pub type Aborting = Arc bool + Send + Sync + 'static>>; pub struct TransformCompact { state: ProcessorState, compactor: T, - aborting: Arc, } /// Compactor is a trait that defines how to compact blocks. @@ -44,13 +41,15 @@ pub trait Compactor { false } + fn interrupt(&self) {} + /// `compact_partial` is called when a new block is pushed and `use_partial_compact` is enabled fn compact_partial(&self, _blocks: &mut Vec) -> Result> { Ok(vec![]) } /// `compact_final` is called when all the blocks are pushed to finish the compaction - fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result>; + fn compact_final(&self, blocks: &[DataBlock]) -> Result>; } impl TransformCompact { @@ -66,11 +65,7 @@ impl TransformCompact { output_data_blocks: VecDeque::new(), }); - Ok(ProcessorPtr::create(Box::new(Self { - state, - compactor, - aborting: Arc::new(AtomicBool::new(false)), - }))) + Ok(ProcessorPtr::create(Box::new(Self { state, compactor }))) } #[inline(always)] @@ -151,7 +146,7 @@ impl Processor for TransformCompact { } fn interrupt(&self) { - self.aborting.store(true, Ordering::Release); + self.compactor.interrupt(); } fn process(&mut self) -> Result<()> { @@ -167,11 +162,7 @@ impl Processor for TransformCompact { Ok(()) } ProcessorState::Compacting(state) => { - let aborting = self.aborting.clone(); - let aborting: Aborting = - Arc::new(Box::new(move || aborting.load(Ordering::Relaxed))); - - let compacted_blocks = self.compactor.compact_final(&state.blocks, aborting)?; + let compacted_blocks = self.compactor.compact_final(&state.blocks)?; let mut temp_state = ProcessorState::Finished; std::mem::swap(&mut self.state, &mut temp_state); diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs index b1e432ce78a2..50890ec51f4e 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; + use common_datablocks::DataBlock; use common_datablocks::SortColumnDescription; use common_exception::Result; @@ -23,6 +27,7 @@ use crate::processors::transforms::Aborting; pub struct SortMergeCompactor { limit: Option, sort_columns_descriptions: Vec, + aborting: Arc, } impl SortMergeCompactor { @@ -33,6 +38,7 @@ impl SortMergeCompactor { SortMergeCompactor { limit, sort_columns_descriptions, + aborting: Arc::new(AtomicBool::new(false)), } } } @@ -42,10 +48,17 @@ impl Compactor for SortMergeCompactor { "SortMergeTransform" } - fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result> { + fn interrupt(&self) { + self.aborting.store(true, Ordering::Release); + } + + fn compact_final(&self, blocks: &[DataBlock]) -> Result> { if blocks.is_empty() { Ok(vec![]) } else { + let aborting = self.aborting.clone(); + let aborting: Aborting = Arc::new(Box::new(move || aborting.load(Ordering::Relaxed))); + let block = DataBlock::merge_sort_blocks( blocks, &self.sort_columns_descriptions, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index 62828252d8d6..0c8866efaeb3 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -14,7 +14,6 @@ use common_datablocks::DataBlock; use common_exception::Result; -use common_pipeline_transforms::processors::transforms::Aborting; use super::ProbeState; @@ -46,15 +45,11 @@ pub trait HashJoinState: Send + Sync { async fn wait_finish(&self) -> Result<()>; /// Get mark join results - fn mark_join_blocks(&self, flag: Aborting) -> Result>; + fn mark_join_blocks(&self) -> Result>; /// Get right join results - fn right_join_blocks(&self, blocks: &[DataBlock], flag: Aborting) -> Result>; + fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result>; /// Get right semi/anti join results - fn right_anti_semi_join_blocks( - &self, - blocks: &[DataBlock], - flag: Aborting, - ) -> Result>; + fn right_anti_semi_join_blocks(&self, blocks: &[DataBlock]) -> Result>; } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs index 53639306fe14..397a053235ab 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs @@ -41,7 +41,6 @@ use common_datavalues::DataValue; use common_exception::ErrorCode; use common_exception::Result; use common_hashtable::HashMap; -use common_pipeline_transforms::processors::transforms::Aborting; use common_planner::IndexType; use parking_lot::RwLock; use primitive_types::U256; @@ -682,7 +681,7 @@ impl HashJoinState for JoinHashTable { Ok(()) } - fn mark_join_blocks(&self, _aborting: Aborting) -> Result> { + fn mark_join_blocks(&self) -> Result> { let row_ptrs = self.row_ptrs.read(); let has_null = self.hash_join_desc.marker_join_desc.has_null.read(); @@ -692,7 +691,7 @@ impl HashJoinState for JoinHashTable { Ok(vec![self.merge_eq_block(&marker_block, &build_block)?]) } - fn right_join_blocks(&self, blocks: &[DataBlock], _flag: Aborting) -> Result> { + fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result> { let unmatched_build_indexes = self.find_unmatched_build_indexes()?; if unmatched_build_indexes.is_empty() && self.hash_join_desc.other_predicate.is_none() { return Ok(blocks.to_vec()); @@ -779,11 +778,7 @@ impl HashJoinState for JoinHashTable { Ok(vec![merged_block]) } - fn right_anti_semi_join_blocks( - &self, - blocks: &[DataBlock], - _flag: Aborting, - ) -> Result> { + fn right_anti_semi_join_blocks(&self, blocks: &[DataBlock]) -> Result> { // Fast path for right anti join with non-equi conditions if self.hash_join_desc.other_predicate.is_none() && self.hash_join_desc.join_type == JoinType::RightAnti diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs index 220bb8ffc66e..6690534b5138 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::iter::repeat; use std::iter::TrustedLen; use std::sync::atomic::Ordering; @@ -84,10 +85,8 @@ impl JoinHashTable { let probed_rows = v.get_value(); if probe_indexes.len() + probed_rows.len() < probe_indexes.capacity() { - // fast path - build_indexes.extend_from_slice(&probed_rows); - probe_indexes - .extend(std::iter::repeat(i as u32).take(probed_rows.len())); + build_indexes.extend_from_slice(probed_rows); + probe_indexes.extend(repeat(i as u32).take(probed_rows.len())); } else { let mut index = 0_usize; let mut remain = probed_rows.len(); @@ -104,22 +103,19 @@ impl JoinHashTable { )); } - let append_rows = - probe_indexes.capacity() - probe_indexes.len(); - let new_index = index + append_rows; + let addition = probe_indexes.capacity() - probe_indexes.len(); + let new_index = index + addition; build_indexes.extend_from_slice(&probed_rows[index..new_index]); - probe_indexes - .extend(std::iter::repeat(i as u32).take(append_rows)); + probe_indexes.extend(repeat(i as u32).take(addition)); - let build_block = self.row_space.gather(&build_indexes)?; - let probe_block = - DataBlock::block_take_by_indices(input, &probe_indexes)?; - probed_blocks - .push(self.merge_eq_block(&build_block, &probe_block)?); + probed_blocks.push(self.merge_eq_block( + &self.row_space.gather(&build_indexes)?, + &DataBlock::block_take_by_indices(input, &probe_indexes)?, + )?); index = new_index; - remain -= append_rows; + remain -= addition; build_indexes.clear(); probe_indexes.clear(); @@ -129,6 +125,11 @@ impl JoinHashTable { } } + probed_blocks.push(self.merge_eq_block( + &self.row_space.gather(&build_indexes)?, + &DataBlock::block_take_by_indices(input, &probe_indexes)?, + )?); + match &self.hash_join_desc.other_predicate { None => Ok(probed_blocks), Some(other_predicate) => { @@ -233,10 +234,8 @@ impl JoinHashTable { // 3. Correlated Exists subquery: only have one kind of join condition, equi-condition. // equi-condition is subquery's outer columns with subquery's derived columns. (see the above example in correlated ANY subquery) JoinType::LeftMark => { - let mut results: Vec = vec![]; - results.push(DataBlock::empty()); self.left_mark_join(hash_table, probe_state, keys_iter, input)?; - Ok(results) + Ok(vec![DataBlock::empty()]) } JoinType::RightMark => Ok(vec![self.right_mark_join( hash_table, @@ -542,7 +541,7 @@ impl JoinHashTable { (Some(v), _) => { let probe_result_ptrs = v.get_value(); build_indexs.extend_from_slice(probe_result_ptrs); - probe_indexs.extend(std::iter::repeat(i as u32).take(probe_result_ptrs.len())); + probe_indexs.extend(repeat(i as u32).take(probe_result_ptrs.len())); if !SEMI { row_state[i] += probe_result_ptrs.len() as u32; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs index 632111ef2209..0a519394f6ea 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs @@ -154,7 +154,7 @@ impl Processor for TransformHashJoinProbe { } fn interrupt(&self) { - todo!() + self.join_state.interrupt() } fn process(&mut self) -> Result<()> { diff --git a/src/query/service/src/pipelines/processors/transforms/transform_mark_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_mark_join.rs index f749a233e415..efcd11781136 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_mark_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_mark_join.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use common_datablocks::DataBlock; use common_exception::Result; -use common_pipeline_transforms::processors::transforms::Aborting; use crate::pipelines::processors::transforms::Compactor; use crate::pipelines::processors::HashJoinState; @@ -38,8 +37,8 @@ impl Compactor for MarkJoinCompactor { } // `compact_final` is called when all the blocks are pushed - fn compact_final(&self, _blocks: &[DataBlock], aborting: Aborting) -> Result> { - self.hash_join_state.mark_join_blocks(aborting) + fn compact_final(&self, _blocks: &[DataBlock]) -> Result> { + self.hash_join_state.mark_join_blocks() } } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_right_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_right_join.rs index 2422c194e15e..8af9dfd65c79 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_right_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_right_join.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use common_datablocks::DataBlock; use common_exception::Result; -use common_pipeline_transforms::processors::transforms::Aborting; use crate::pipelines::processors::transforms::Compactor; use crate::pipelines::processors::HashJoinState; @@ -38,8 +37,8 @@ impl Compactor for RightJoinCompactor { } // `compact_final` is called when all the blocks are pushed - fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result> { - self.hash_join_state.right_join_blocks(blocks, aborting) + fn compact_final(&self, blocks: &[DataBlock]) -> Result> { + self.hash_join_state.right_join_blocks(blocks) } } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_right_semi_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_right_semi_anti_join.rs index fa22bec7ff06..1648a7e0c4cd 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_right_semi_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_right_semi_anti_join.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use common_datablocks::DataBlock; use common_exception::Result; -use common_pipeline_transforms::processors::transforms::Aborting; use crate::pipelines::processors::transforms::Compactor; use crate::pipelines::processors::HashJoinState; @@ -38,9 +37,8 @@ impl Compactor for RightSemiAntiJoinCompactor { } // `compact_final` is called when all the blocks are pushed - fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result> { - self.hash_join_state - .right_anti_semi_join_blocks(blocks, aborting) + fn compact_final(&self, blocks: &[DataBlock]) -> Result> { + self.hash_join_state.right_anti_semi_join_blocks(blocks) } } From ff20e2e0bc4583a83d25dc87c3fa7fc2c9ca23d2 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 10 Oct 2022 13:21:11 +0800 Subject: [PATCH 4/5] fix(processor): get block size from settings --- .../processors/transforms/hash_join/result_blocks.rs | 6 +++++- .../pipelines/processors/transforms/transform_mark_join.rs | 4 ++++ .../pipelines/processors/transforms/transform_right_join.rs | 4 ++++ .../processors/transforms/transform_right_semi_anti_join.rs | 4 ++++ 4 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs index 6690534b5138..5636c18cbf10 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/result_blocks.rs @@ -66,7 +66,11 @@ impl JoinHashTable { match self.hash_join_desc.join_type { JoinType::Inner => { - let block_size = 8192; + let block_size = self + .ctx + .get_settings() + .get_max_block_size() + .unwrap_or(65535) as usize; // The inner join will return multiple data blocks of similar size let mut probed_blocks = vec![]; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_mark_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_mark_join.rs index efcd11781136..91f10353c500 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_mark_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_mark_join.rs @@ -36,6 +36,10 @@ impl Compactor for MarkJoinCompactor { "MarkJoin" } + fn interrupt(&self) { + self.hash_join_state.interrupt(); + } + // `compact_final` is called when all the blocks are pushed fn compact_final(&self, _blocks: &[DataBlock]) -> Result> { self.hash_join_state.mark_join_blocks() diff --git a/src/query/service/src/pipelines/processors/transforms/transform_right_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_right_join.rs index 8af9dfd65c79..fbff5208fc56 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_right_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_right_join.rs @@ -36,6 +36,10 @@ impl Compactor for RightJoinCompactor { "RightJoinCompactor" } + fn interrupt(&self) { + self.hash_join_state.interrupt(); + } + // `compact_final` is called when all the blocks are pushed fn compact_final(&self, blocks: &[DataBlock]) -> Result> { self.hash_join_state.right_join_blocks(blocks) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_right_semi_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_right_semi_anti_join.rs index 1648a7e0c4cd..e7e137050005 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_right_semi_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_right_semi_anti_join.rs @@ -36,6 +36,10 @@ impl Compactor for RightSemiAntiJoinCompactor { "RightSemiAntiJoinCompactor" } + fn interrupt(&self) { + self.hash_join_state.interrupt(); + } + // `compact_final` is called when all the blocks are pushed fn compact_final(&self, blocks: &[DataBlock]) -> Result> { self.hash_join_state.right_anti_semi_join_blocks(blocks) From 04c3f3460080aedb877a47170e1c84f2b574073d Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Mon, 10 Oct 2022 15:03:18 +0800 Subject: [PATCH 5/5] fix(processor): apply review comment --- .../processors/transforms/transform_hash_join.rs | 8 ++++---- src/query/service/src/sql/executor/pipeline_builder.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs index 0a519394f6ea..c00b1db35347 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_hash_join.rs @@ -81,9 +81,9 @@ impl TransformHashJoinProbe { output_port: Arc, join_state: Arc, _output_schema: DataSchemaRef, - ) -> ProcessorPtr { - let default_block_size = ctx.get_settings().get_max_block_size().unwrap_or(102400); - ProcessorPtr::create(Box::new(TransformHashJoinProbe { + ) -> Result { + let default_block_size = ctx.get_settings().get_max_block_size()?; + Ok(ProcessorPtr::create(Box::new(TransformHashJoinProbe { input_data: None, output_data_blocks: VecDeque::new(), input_port, @@ -91,7 +91,7 @@ impl TransformHashJoinProbe { step: HashJoinStep::Build, join_state, probe_state: ProbeState::with_capacity(default_block_size as usize), - })) + }))) } fn probe(&mut self, block: &DataBlock) -> Result<()> { diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index 4ed639b6488e..2afadaef66f2 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -484,13 +484,13 @@ impl PipelineBuilder { self.build_pipeline(&join.probe)?; self.main_pipeline.add_transform(|input, output| { - Ok(TransformHashJoinProbe::create( + TransformHashJoinProbe::create( self.ctx.clone(), input, output, state.clone(), join.output_schema()?, - )) + ) })?; if join.join_type == JoinType::LeftMark {