From e67ceb07a8a1af7f124235ef67164ac52d804854 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Fri, 15 Jul 2022 12:46:00 +0800 Subject: [PATCH 1/3] fix(planner): InSubquery returns error result --- .../transforms/hash_join/join_hash_table.rs | 117 ++++++------------ .../transforms/hash_join/result_blocks.rs | 24 ++-- .../processors/transforms/hash_join/row.rs | 13 ++ 3 files changed, 63 insertions(+), 91 deletions(-) diff --git a/query/src/pipelines/processors/transforms/hash_join/join_hash_table.rs b/query/src/pipelines/processors/transforms/hash_join/join_hash_table.rs index 6f7ef452890c..894724efcfea 100644 --- a/query/src/pipelines/processors/transforms/hash_join/join_hash_table.rs +++ b/query/src/pipelines/processors/transforms/hash_join/join_hash_table.rs @@ -108,7 +108,7 @@ pub enum HashTable { KeyU512HashTable(KeyU512HashTable), } -#[derive(Clone, Eq, PartialEq, Debug)] +#[derive(Clone, Copy, Eq, PartialEq, Debug)] pub enum MarkerKind { True, False, @@ -116,7 +116,6 @@ pub enum MarkerKind { } pub struct MarkJoinDesc { - pub(crate) marker: RwLock>, pub(crate) marker_index: Option, pub(crate) has_null: RwLock, } @@ -138,6 +137,7 @@ pub struct JoinHashTable { pub(crate) hash_table: RwLock, pub(crate) row_space: RowSpace, pub(crate) hash_join_desc: HashJoinDesc, + pub(crate) row_ptrs: RwLock>, } impl JoinHashTable { @@ -167,7 +167,6 @@ impl JoinHashTable { .map(Evaluator::eval_physical_scalar) .transpose()?, marker_join_desc: MarkJoinDesc { - marker: RwLock::new(vec![]), marker_index, has_null: RwLock::new(false), }, @@ -271,6 +270,7 @@ impl JoinHashTable { hash_join_desc, ctx, hash_table: RwLock::new(hash_table), + row_ptrs: RwLock::new(vec![]), }) } @@ -429,16 +429,13 @@ impl JoinHashTable { } macro_rules! insert_key { - ($table: expr, $method: expr, $chunk: expr, $columns: expr, $chunk_index: expr, ) => {{ + ($table: expr, $row_ptrs: expr, $method: expr, $chunk: expr, $columns: expr, $chunk_index: expr, ) => {{ let keys_state = $method.build_keys_state(&$columns, $chunk.num_rows())?; let build_keys_iter = $method.build_keys_iter(&keys_state)?; for (row_index, key) in build_keys_iter.enumerate().take($chunk.num_rows()) { let mut inserted = true; - let ptr = RowPtr { - chunk_index: $chunk_index as u32, - row_index: row_index as u32, - }; + let ptr = $row_ptrs[row_index].clone(); let entity = $table.insert_key(&key, &mut inserted); if inserted { entity.set_value(vec![ptr]); @@ -509,26 +506,30 @@ impl HashJoinState for JoinHashTable { fn finish(&self) -> Result<()> { let chunks = self.row_space.chunks.read().unwrap(); - let mut marker = self.hash_join_desc.marker_join_desc.marker.write(); - if self.hash_join_desc.join_type == Mark && self.hash_join_desc.other_predicate.is_some() { - let row_nums = chunks.iter().fold(0, |acc, chunk| acc + chunk.num_rows()); - marker.append(&mut vec![MarkerKind::False; row_nums]) - } for chunk_index in 0..chunks.len() { let chunk = &chunks[chunk_index]; let mut columns = vec![]; + let mut row_ptrs = vec![]; if let Some(cols) = chunk.cols.as_ref() { columns = Vec::with_capacity(cols.len()); + row_ptrs = Vec::with_capacity(cols.len()); + let mut marker = None; for col in cols.iter() { - if self.hash_join_desc.join_type == JoinType::Mark { - assert_eq!(cols.len(), 1); - for row_idx in 0..col.len() { - if col.get(row_idx) == DataValue::Null { - marker.push(MarkerKind::Null); + for row_index in 0..col.len() { + if self.hash_join_desc.join_type == Mark { + if col.get(row_index) == DataValue::Null + && self.hash_join_desc.other_predicate.is_none() + { + marker = Some(MarkerKind::Null); } else { - marker.push(MarkerKind::False); + marker = Some(MarkerKind::False); } } + row_ptrs.push(RowPtr { + chunk_index: chunk_index as u32, + row_index: row_index as u32, + marker: marker.clone(), + }) } columns.push(col); } @@ -540,10 +541,7 @@ impl HashJoinState for JoinHashTable { for (row_index, key) in build_keys_iter.enumerate().take(chunk.num_rows()) { let mut inserted = true; - let ptr = RowPtr { - chunk_index: chunk_index as u32, - row_index: row_index as u32, - }; + let ptr = row_ptrs[row_index].clone(); let keys_ref = KeysRef::create(key.as_ptr() as usize, key.len()); let entity = table.hash_table.insert_key(&keys_ref, &mut inserted); if inserted { @@ -556,6 +554,7 @@ impl HashJoinState for JoinHashTable { } HashTable::KeyU8HashTable(table) => insert_key! { &mut table.hash_table, + &row_ptrs, &table.hash_method, chunk, columns, @@ -563,6 +562,7 @@ impl HashJoinState for JoinHashTable { }, HashTable::KeyU16HashTable(table) => insert_key! { &mut table.hash_table, + &row_ptrs, &table.hash_method, chunk, columns, @@ -570,6 +570,7 @@ impl HashJoinState for JoinHashTable { }, HashTable::KeyU32HashTable(table) => insert_key! { &mut table.hash_table, + &row_ptrs, &table.hash_method, chunk, columns, @@ -577,6 +578,7 @@ impl HashJoinState for JoinHashTable { }, HashTable::KeyU64HashTable(table) => insert_key! { &mut table.hash_table, + &row_ptrs, &table.hash_method, chunk, columns, @@ -584,6 +586,7 @@ impl HashJoinState for JoinHashTable { }, HashTable::KeyU128HashTable(table) => insert_key! { &mut table.hash_table, + &row_ptrs, &table.hash_method, chunk, columns, @@ -591,6 +594,7 @@ impl HashJoinState for JoinHashTable { }, HashTable::KeyU256HashTable(table) => insert_key! { &mut table.hash_table, + &row_ptrs, &table.hash_method, chunk, columns, @@ -598,33 +602,35 @@ impl HashJoinState for JoinHashTable { }, HashTable::KeyU512HashTable(table) => insert_key! { &mut table.hash_table, + &row_ptrs, &table.hash_method, chunk, columns, chunk_index, }, } + let mut self_row_ptrs = self.row_ptrs.write(); + self_row_ptrs.append(&mut row_ptrs); } - Ok(()) } fn mark_join_blocks(&self) -> Result> { - let hash_table = self.hash_table.read(); - let mut marker = self.hash_join_desc.marker_join_desc.marker.write(); + let mut row_ptrs = self.row_ptrs.write(); let has_null = self.hash_join_desc.marker_join_desc.has_null.read(); - let mut validity = MutableBitmap::with_capacity(marker.len()); - let mut boolean_bit_map = MutableBitmap::with_capacity(marker.len()); - for m in marker.iter_mut() { - if m == &mut MarkerKind::False && *has_null { - *m = MarkerKind::Null; + let mut validity = MutableBitmap::with_capacity(row_ptrs.len()); + let mut boolean_bit_map = MutableBitmap::with_capacity(row_ptrs.len()); + for row_ptr in row_ptrs.iter_mut() { + let marker = row_ptr.marker.unwrap().clone(); + if marker == MarkerKind::False && *has_null { + row_ptr.marker = Some(MarkerKind::Null); } - if m == &mut MarkerKind::Null { + if marker == MarkerKind::Null { validity.push(false); } else { validity.push(true); } - if m == &mut MarkerKind::True { + if marker == MarkerKind::True { boolean_bit_map.push(true); } else { boolean_bit_map.push(false); @@ -644,50 +650,7 @@ impl HashJoinState for JoinHashTable { )]); let marker_block = DataBlock::create(DataSchemaRef::from(marker_schema), vec![marker_column]); - let mut build_indexes = Vec::new(); - match *hash_table { - HashTable::SerializerHashTable(ref hash_table) => { - for entity in hash_table.hash_table.iter() { - build_indexes.extend_from_slice(entity.get_value()); - } - } - HashTable::KeyU8HashTable(ref hash_table) => { - for entity in hash_table.hash_table.iter() { - build_indexes.extend_from_slice(entity.get_value()); - } - } - HashTable::KeyU16HashTable(ref hash_table) => { - for entity in hash_table.hash_table.iter() { - build_indexes.extend_from_slice(entity.get_value()); - } - } - HashTable::KeyU32HashTable(ref hash_table) => { - for entity in hash_table.hash_table.iter() { - build_indexes.extend_from_slice(entity.get_value()); - } - } - HashTable::KeyU64HashTable(ref hash_table) => { - for entity in hash_table.hash_table.iter() { - build_indexes.extend_from_slice(entity.get_value()); - } - } - HashTable::KeyU128HashTable(ref hash_table) => { - for entity in hash_table.hash_table.iter() { - build_indexes.extend_from_slice(entity.get_value()); - } - } - HashTable::KeyU256HashTable(ref hash_table) => { - for entity in hash_table.hash_table.iter() { - build_indexes.extend_from_slice(entity.get_value()); - } - } - HashTable::KeyU512HashTable(ref hash_table) => { - for entity in hash_table.hash_table.iter() { - build_indexes.extend_from_slice(entity.get_value()); - } - } - }; - let build_block = self.row_space.gather(&build_indexes)?; + let build_block = self.row_space.gather(&row_ptrs)?; Ok(vec![self.merge_eq_block(&marker_block, &build_block)?]) } } diff --git a/query/src/pipelines/processors/transforms/hash_join/result_blocks.rs b/query/src/pipelines/processors/transforms/hash_join/result_blocks.rs index b920afaf678f..b5b17cb1fc5a 100644 --- a/query/src/pipelines/processors/transforms/hash_join/result_blocks.rs +++ b/query/src/pipelines/processors/transforms/hash_join/result_blocks.rs @@ -186,16 +186,10 @@ impl JoinHashTable { let probe_result_ptrs = v.get_value(); for ptr in probe_result_ptrs { // If find join partner, set the marker to true. - let offset = { - let chunks = self.row_space.chunks.read().unwrap(); - chunks - .iter() - .by_ref() - .take(ptr.chunk_index as usize) - .fold(0, |acc, c| acc + c.num_rows()) - }; - let mut marker = self.hash_join_desc.marker_join_desc.marker.write(); - marker[offset + ptr.row_index as usize] = MarkerKind::True; + let mut self_row_ptrs = self.row_ptrs.write(); + self_row_ptrs.iter_mut().find(|p| (*p).eq(&ptr)).map(|p| { + p.marker = Some(MarkerKind::True); + }); } } } @@ -209,7 +203,6 @@ impl JoinHashTable { ) -> Result<()> { let cross_join_blocks = self.probe_cross_join(input, probe_state)?; let func_ctx = self.ctx.try_get_function_context()?; - let mut marker = self.hash_join_desc.marker_join_desc.marker.write(); for block in cross_join_blocks.iter() { let type_vector = self .hash_join_desc @@ -219,16 +212,17 @@ impl JoinHashTable { .eval(&func_ctx, block)?; let filter_column = type_vector.vector(); assert_eq!(filter_column.len(), block.num_rows()); + let mut row_ptrs = self.row_ptrs.write(); for idx in 0..filter_column.len() { match filter_column.get(idx) { DataValue::Null => { - if (*marker)[idx] == MarkerKind::False { - (*marker)[idx] = MarkerKind::Null; + if row_ptrs[idx].marker == Some(MarkerKind::False) { + row_ptrs[idx].marker = Some(MarkerKind::Null); } } DataValue::Boolean(value) => { if value { - (*marker)[idx] = MarkerKind::True; + row_ptrs[idx].marker = Some(MarkerKind::True); } } _ => unreachable!(), @@ -305,6 +299,7 @@ impl JoinHashTable { build_indexs.push(RowPtr { chunk_index: 0, row_index: 0, + marker: None, }); probe_indexs.push(i as u32); @@ -388,6 +383,7 @@ impl JoinHashTable { build_indexs.push(RowPtr { chunk_index: 0, row_index: 0, + marker: None, }); probe_indexs.push(i as u32); validity.push(false); diff --git a/query/src/pipelines/processors/transforms/hash_join/row.rs b/query/src/pipelines/processors/transforms/hash_join/row.rs index 093f9f4d1441..753d03ea4771 100644 --- a/query/src/pipelines/processors/transforms/hash_join/row.rs +++ b/query/src/pipelines/processors/transforms/hash_join/row.rs @@ -20,6 +20,8 @@ use common_datavalues::ColumnRef; use common_datavalues::DataSchemaRef; use common_exception::Result; +use crate::pipelines::processors::transforms::hash_join::join_hash_table::MarkerKind; + pub type ColumnVector = Vec; pub struct Chunk { @@ -38,6 +40,7 @@ impl Chunk { pub struct RowPtr { pub chunk_index: u32, pub row_index: u32, + pub marker: Option, } pub struct RowSpace { @@ -110,3 +113,13 @@ impl RowSpace { } } } + +impl PartialEq for RowPtr { + fn eq(&self, other: &Self) -> bool { + if self.chunk_index == other.chunk_index && self.row_index == other.row_index { + true + } else { + false + } + } +} From 928303d6569dfa9517a3ed7fe788fd4f1cfbb588 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Fri, 15 Jul 2022 16:09:21 +0800 Subject: [PATCH 2/3] fix and add test --- .../transforms/hash_join/join_hash_table.rs | 158 +++++++++--------- .../transforms/hash_join/result_blocks.rs | 4 +- .../processors/transforms/hash_join/row.rs | 27 +-- .../20+_others/20_0001_planner_v2.result | 5 + .../20+_others/20_0001_planner_v2.sql | 1 + 5 files changed, 92 insertions(+), 103 deletions(-) diff --git a/query/src/pipelines/processors/transforms/hash_join/join_hash_table.rs b/query/src/pipelines/processors/transforms/hash_join/join_hash_table.rs index 894724efcfea..2ecebf4e8ccd 100644 --- a/query/src/pipelines/processors/transforms/hash_join/join_hash_table.rs +++ b/query/src/pipelines/processors/transforms/hash_join/join_hash_table.rs @@ -428,24 +428,6 @@ impl JoinHashTable { } } -macro_rules! insert_key { - ($table: expr, $row_ptrs: expr, $method: expr, $chunk: expr, $columns: expr, $chunk_index: expr, ) => {{ - let keys_state = $method.build_keys_state(&$columns, $chunk.num_rows())?; - let build_keys_iter = $method.build_keys_iter(&keys_state)?; - - for (row_index, key) in build_keys_iter.enumerate().take($chunk.num_rows()) { - let mut inserted = true; - let ptr = $row_ptrs[row_index].clone(); - let entity = $table.insert_key(&key, &mut inserted); - if inserted { - entity.set_value(vec![ptr]); - } else { - entity.get_mut_value().push(ptr); - } - } - }}; -} - impl HashJoinState for JoinHashTable { fn build(&self, input: DataBlock) -> Result<()> { let func_ctx = self.ctx.try_get_function_context()?; @@ -455,20 +437,7 @@ impl HashJoinState for JoinHashTable { .iter() .map(|expr| Ok(expr.eval(&func_ctx, &input)?.vector().clone())) .collect::>>()?; - - match &*self.hash_table.read() { - HashTable::SerializerHashTable(table) => { - let mut build_cols_ref = Vec::with_capacity(build_cols.len()); - for build_col in build_cols.iter() { - build_cols_ref.push(build_col); - } - let keys_state = table - .hash_method - .build_keys_state(&build_cols_ref, input.num_rows())?; - self.row_space.push_keys_state(input, keys_state) - } - _ => self.row_space.push_cols(input, build_cols), - } + self.row_space.push_cols(input, build_cols) } fn probe(&self, input: &DataBlock, probe_state: &mut ProbeState) -> Result> { @@ -505,56 +474,91 @@ impl HashJoinState for JoinHashTable { } fn finish(&self) -> Result<()> { - let chunks = self.row_space.chunks.read().unwrap(); + macro_rules! insert_key { + ($table: expr, $markers: expr, $method: expr, $chunk: expr, $columns: expr, $chunk_index: expr, ) => {{ + let keys_state = $method.build_keys_state(&$columns, $chunk.num_rows())?; + let build_keys_iter = $method.build_keys_iter(&keys_state)?; + + for (row_index, key) in build_keys_iter.enumerate().take($chunk.num_rows()) { + let mut inserted = true; + let ptr = RowPtr { + chunk_index: $chunk_index as u32, + row_index: row_index as u32, + marker: $markers[row_index], + }; + { + let mut self_row_ptrs = self.row_ptrs.write(); + self_row_ptrs.push(ptr.clone()); + } + let entity = $table.insert_key(&key, &mut inserted); + if inserted { + entity.set_value(vec![ptr]); + } else { + entity.get_mut_value().push(ptr); + } + } + }}; + } + + let mut chunks = self.row_space.chunks.write().unwrap(); for chunk_index in 0..chunks.len() { - let chunk = &chunks[chunk_index]; - let mut columns = vec![]; - let mut row_ptrs = vec![]; - if let Some(cols) = chunk.cols.as_ref() { - columns = Vec::with_capacity(cols.len()); - row_ptrs = Vec::with_capacity(cols.len()); - let mut marker = None; - for col in cols.iter() { - for row_index in 0..col.len() { - if self.hash_join_desc.join_type == Mark { - if col.get(row_index) == DataValue::Null - && self.hash_join_desc.other_predicate.is_none() - { - marker = Some(MarkerKind::Null); - } else { - marker = Some(MarkerKind::False); - } + let chunk = &mut chunks[chunk_index]; + let mut columns = Vec::with_capacity(chunk.cols.len()); + let mut markers = if self.hash_join_desc.join_type == Mark + && self.hash_join_desc.other_predicate.is_some() + { + vec![Some(MarkerKind::False); chunk.num_rows()] + } else { + vec![None; chunk.num_rows()] + }; + for col in chunk.cols.iter() { + for row_index in 0..col.len() { + if self.hash_join_desc.join_type == Mark { + if col.get(row_index) == DataValue::Null { + markers[row_index] = Some(MarkerKind::Null); + } else { + markers[row_index] = Some(MarkerKind::False); } - row_ptrs.push(RowPtr { - chunk_index: chunk_index as u32, - row_index: row_index as u32, - marker: marker.clone(), - }) } - columns.push(col); } + columns.push(col); } match (*self.hash_table.write()).borrow_mut() { HashTable::SerializerHashTable(table) => { - if let Some(keys_state) = chunk.keys_state.as_ref() { - let build_keys_iter = table.hash_method.build_keys_iter(keys_state)?; - - for (row_index, key) in build_keys_iter.enumerate().take(chunk.num_rows()) { - let mut inserted = true; - let ptr = row_ptrs[row_index].clone(); - let keys_ref = KeysRef::create(key.as_ptr() as usize, key.len()); - let entity = table.hash_table.insert_key(&keys_ref, &mut inserted); - if inserted { - entity.set_value(vec![ptr]); - } else { - entity.get_mut_value().push(ptr); - } + let mut build_cols_ref = Vec::with_capacity(chunk.cols.len()); + for build_col in chunk.cols.iter() { + build_cols_ref.push(build_col); + } + let keys_state = table + .hash_method + .build_keys_state(&build_cols_ref, chunk.num_rows())?; + chunk.keys_state = Some(keys_state); + let build_keys_iter = table + .hash_method + .build_keys_iter(chunk.keys_state.as_ref().unwrap())?; + for (row_index, key) in build_keys_iter.enumerate().take(chunk.num_rows()) { + let mut inserted = true; + let ptr = RowPtr { + chunk_index: chunk_index as u32, + row_index: row_index as u32, + marker: markers[row_index], + }; + { + let mut self_row_ptrs = self.row_ptrs.write(); + self_row_ptrs.push(ptr.clone()); + } + let keys_ref = KeysRef::create(key.as_ptr() as usize, key.len()); + let entity = table.hash_table.insert_key(&keys_ref, &mut inserted); + if inserted { + entity.set_value(vec![ptr]); + } else { + entity.get_mut_value().push(ptr); } } } HashTable::KeyU8HashTable(table) => insert_key! { &mut table.hash_table, - &row_ptrs, + &markers, &table.hash_method, chunk, columns, @@ -562,7 +566,7 @@ impl HashJoinState for JoinHashTable { }, HashTable::KeyU16HashTable(table) => insert_key! { &mut table.hash_table, - &row_ptrs, + &markers, &table.hash_method, chunk, columns, @@ -570,7 +574,7 @@ impl HashJoinState for JoinHashTable { }, HashTable::KeyU32HashTable(table) => insert_key! { &mut table.hash_table, - &row_ptrs, + &markers, &table.hash_method, chunk, columns, @@ -578,7 +582,7 @@ impl HashJoinState for JoinHashTable { }, HashTable::KeyU64HashTable(table) => insert_key! { &mut table.hash_table, - &row_ptrs, + &markers, &table.hash_method, chunk, columns, @@ -586,7 +590,7 @@ impl HashJoinState for JoinHashTable { }, HashTable::KeyU128HashTable(table) => insert_key! { &mut table.hash_table, - &row_ptrs, + &markers, &table.hash_method, chunk, columns, @@ -594,7 +598,7 @@ impl HashJoinState for JoinHashTable { }, HashTable::KeyU256HashTable(table) => insert_key! { &mut table.hash_table, - &row_ptrs, + &markers, &table.hash_method, chunk, columns, @@ -602,15 +606,13 @@ impl HashJoinState for JoinHashTable { }, HashTable::KeyU512HashTable(table) => insert_key! { &mut table.hash_table, - &row_ptrs, + &markers, &table.hash_method, chunk, columns, chunk_index, }, } - let mut self_row_ptrs = self.row_ptrs.write(); - self_row_ptrs.append(&mut row_ptrs); } Ok(()) } diff --git a/query/src/pipelines/processors/transforms/hash_join/result_blocks.rs b/query/src/pipelines/processors/transforms/hash_join/result_blocks.rs index b5b17cb1fc5a..a255e7f2f93c 100644 --- a/query/src/pipelines/processors/transforms/hash_join/result_blocks.rs +++ b/query/src/pipelines/processors/transforms/hash_join/result_blocks.rs @@ -187,9 +187,9 @@ impl JoinHashTable { for ptr in probe_result_ptrs { // If find join partner, set the marker to true. let mut self_row_ptrs = self.row_ptrs.write(); - self_row_ptrs.iter_mut().find(|p| (*p).eq(&ptr)).map(|p| { + if let Some(p) = self_row_ptrs.iter_mut().find(|p| (*p).eq(&ptr)) { p.marker = Some(MarkerKind::True); - }); + } } } } diff --git a/query/src/pipelines/processors/transforms/hash_join/row.rs b/query/src/pipelines/processors/transforms/hash_join/row.rs index 753d03ea4771..aef31c3b1e79 100644 --- a/query/src/pipelines/processors/transforms/hash_join/row.rs +++ b/query/src/pipelines/processors/transforms/hash_join/row.rs @@ -26,7 +26,7 @@ pub type ColumnVector = Vec; pub struct Chunk { pub data_block: DataBlock, - pub cols: Option, + pub cols: ColumnVector, pub keys_state: Option, } @@ -36,7 +36,7 @@ impl Chunk { } } -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] pub struct RowPtr { pub chunk_index: u32, pub row_index: u32, @@ -59,7 +59,7 @@ impl RowSpace { pub fn push_cols(&self, data_block: DataBlock, cols: ColumnVector) -> Result<()> { let chunk = Chunk { data_block, - cols: Some(cols), + cols, keys_state: None, }; @@ -72,21 +72,6 @@ impl RowSpace { Ok(()) } - pub fn push_keys_state(&self, data_block: DataBlock, keys_state: KeysState) -> Result<()> { - let chunk = Chunk { - data_block, - cols: None, - keys_state: Some(keys_state), - }; - - { - // Acquire write lock in current scope - let mut chunks = self.chunks.write().unwrap(); - chunks.push(chunk); - } - Ok(()) - } - pub fn datablocks(&self) -> Vec { let chunks = self.chunks.read().unwrap(); chunks.iter().map(|c| c.data_block.clone()).collect() @@ -116,10 +101,6 @@ impl RowSpace { impl PartialEq for RowPtr { fn eq(&self, other: &Self) -> bool { - if self.chunk_index == other.chunk_index && self.row_index == other.row_index { - true - } else { - false - } + self.chunk_index == other.chunk_index && self.row_index == other.row_index } } diff --git a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result index cfdbab89a8f5..99fc5f26bef6 100644 --- a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result +++ b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.result @@ -432,6 +432,11 @@ NULL NULL NULL 1 1 2 2 3 10000 +0 +1 +2 +3 +4 2 3 2 3 ====Database==== default default default diff --git a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql index 4a769d61f97a..3418a49c370b 100644 --- a/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql +++ b/tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql @@ -376,6 +376,7 @@ create table t3 as select * from numbers(10000); insert into t3 values(1); set enable_planner_v2 = 1; select count(*) from numbers(10000) as t4 where t4.number in (select t3.number from t3); +select * from numbers(10) where number in (select * from numbers(5)) order by number; create table t1_null(a int null , b int null); create table t2_null(a int null , b int null); From 8b7e78427053b31ab165058d9ed46ebcd304f1c4 Mon Sep 17 00:00:00 2001 From: xudong963 Date: Fri, 15 Jul 2022 16:31:52 +0800 Subject: [PATCH 3/3] fix clippy --- .../transforms/hash_join/join_hash_table.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/query/src/pipelines/processors/transforms/hash_join/join_hash_table.rs b/query/src/pipelines/processors/transforms/hash_join/join_hash_table.rs index 2ecebf4e8ccd..3b89f8471202 100644 --- a/query/src/pipelines/processors/transforms/hash_join/join_hash_table.rs +++ b/query/src/pipelines/processors/transforms/hash_join/join_hash_table.rs @@ -512,12 +512,13 @@ impl HashJoinState for JoinHashTable { vec![None; chunk.num_rows()] }; for col in chunk.cols.iter() { - for row_index in 0..col.len() { - if self.hash_join_desc.join_type == Mark { + if self.hash_join_desc.join_type == Mark { + assert_eq!(col.len(), markers.len()); + for (row_index, marker) in markers.iter_mut().enumerate() { if col.get(row_index) == DataValue::Null { - markers[row_index] = Some(MarkerKind::Null); + *marker = Some(MarkerKind::Null); } else { - markers[row_index] = Some(MarkerKind::False); + *marker = Some(MarkerKind::False); } } } @@ -545,7 +546,7 @@ impl HashJoinState for JoinHashTable { }; { let mut self_row_ptrs = self.row_ptrs.write(); - self_row_ptrs.push(ptr.clone()); + self_row_ptrs.push(ptr); } let keys_ref = KeysRef::create(key.as_ptr() as usize, key.len()); let entity = table.hash_table.insert_key(&keys_ref, &mut inserted); @@ -623,7 +624,7 @@ impl HashJoinState for JoinHashTable { let mut validity = MutableBitmap::with_capacity(row_ptrs.len()); let mut boolean_bit_map = MutableBitmap::with_capacity(row_ptrs.len()); for row_ptr in row_ptrs.iter_mut() { - let marker = row_ptr.marker.unwrap().clone(); + let marker = row_ptr.marker.unwrap(); if marker == MarkerKind::False && *has_null { row_ptr.marker = Some(MarkerKind::Null); }