Skip to content

Commit

Permalink
Merge pull request #5534 from xudong963/hash_join_1
Browse files Browse the repository at this point in the history
improvement(planner): use `DataBlock::gather_blocks` in hash join
  • Loading branch information
mergify[bot] authored May 23, 2022
2 parents 2f9e425 + 3d108ca commit 140507d
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use std::sync::Mutex;
use std::sync::RwLock;

use common_datablocks::DataBlock;
use common_datavalues::Column;
use common_datavalues::ColumnRef;
use common_datavalues::ConstColumn;
use common_datavalues::DataSchemaRef;
use common_datavalues::Series;
use common_datavalues::SmallVu8;
Expand Down Expand Up @@ -119,22 +121,25 @@ impl HashJoinState for ChainingHashTable {
}
let probe_result_ptrs = probe_result_ptr.unwrap().get_value();
// `result_block` is the block of build table
let result_blocks = self.row_space.gather(probe_result_ptrs)?;
let result_block = self.row_space.gather(probe_result_ptrs)?;
let probe_block = DataBlock::block_take_by_indices(input, &[i as u32])?;
let mut replicated_probe_block = DataBlock::empty();
for (i, col) in probe_block.columns().iter().enumerate() {
let replicated_col = ConstColumn::new(col.clone(), result_block.num_rows()).arc();

for result_block in result_blocks.iter() {
assert_eq!(result_block.clone().num_rows(), 1);
assert_eq!(probe_block.clone().num_rows(), 1);
let mut input_block = probe_block.clone();
for (col, field) in result_block
.columns()
.iter()
.zip(result_block.schema().fields().iter())
{
input_block = input_block.add_column(col.clone(), field.clone())?;
}
results.push(input_block);
replicated_probe_block = replicated_probe_block
.add_column(replicated_col, probe_block.schema().field(i).clone())?;
}
for (col, field) in result_block
.columns()
.iter()
.zip(result_block.schema().fields().iter())
{
replicated_probe_block =
replicated_probe_block.add_column(col.clone(), field.clone())?;
}

results.push(replicated_probe_block);
}

Ok(results)
Expand Down
18 changes: 15 additions & 3 deletions query/src/pipelines/new/processors/transforms/hash_join/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ impl RowSpace {
Ok(())
}

// TODO(leiysky): gather into multiple blocks, since there are possibly massive results
pub fn gather(&self, row_ptrs: &[RowPtr]) -> Result<Vec<DataBlock>> {
pub fn gather(&self, row_ptrs: &[RowPtr]) -> Result<DataBlock> {
let mut data_blocks = vec![];
let mut indices = vec![];

{
// Acquire read lock in current scope
Expand All @@ -76,12 +76,24 @@ impl RowSpace {
row_ptr.row_index,
])?;
if !block.is_empty() {
indices.push((data_blocks.len(), 0));
data_blocks.push(block);
}
}
}

Ok(data_blocks)
// If build_key doesn't have duplicated columns, the length of row_ptrs will be one.
// So we don't need to `gather_blocks`, directly return.
if data_blocks.len() == 1 {
return Ok(data_blocks[0].clone());
}

if !data_blocks.is_empty() {
let data_block = DataBlock::gather_blocks(&data_blocks, indices.as_slice())?;
Ok(data_block)
} else {
Ok(DataBlock::empty_with_schema(self.data_schema.clone()))
}
}

fn gather_single_chunk(&self, chunk: &Chunk, indices: &[u32]) -> Result<DataBlock> {
Expand Down
4 changes: 4 additions & 0 deletions tests/suites/0_stateless/20+_others/20_0001_planner_v2.result
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,7 @@ default
6
4 3 4
6 5 6
===Inner Join with duplicate keys===
1 2 1 2
1 2 1 3
2 6 2 4
10 changes: 10 additions & 0 deletions tests/suites/0_stateless/20+_others/20_0001_planner_v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,14 @@ select * from t1 natural join t2;
drop table t1;
drop table t2;

-- Join: right table with duplicate build keys
select '===Inner Join with duplicate keys===';
create table t1(a int, b int);
insert into t1 values(1, 2), (1, 3), (2, 4);
create table t2(c int, d int);
insert into t2 values(1, 2), (2, 6);
select * from t2 inner join t1 on t1.a = t2.c;
drop table t1;
drop table t2;

set enable_planner_v2 = 0;

0 comments on commit 140507d

Please sign in to comment.