Skip to content

Commit

Permalink
implement hash join
Browse files Browse the repository at this point in the history
  • Loading branch information
leiysky committed May 7, 2022
1 parent 6985cc0 commit 8eafa22
Show file tree
Hide file tree
Showing 27 changed files with 1,564 additions and 143 deletions.
40 changes: 40 additions & 0 deletions common/datablocks/src/kernels/data_block_group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use common_datavalues::remove_nullable;
use common_datavalues::DataType;
use common_datavalues::DataTypeImpl;
use common_datavalues::TypeID;
use common_exception::Result;

Expand All @@ -28,6 +29,7 @@ use crate::HashMethod;
use crate::HashMethodSingleString;

impl DataBlock {
// TODO(leiysky): replace with `DataBlock::choose_hash_method_with_types` and deprecate this method
pub fn choose_hash_method(
block: &DataBlock,
column_names: &[String],
Expand Down Expand Up @@ -68,6 +70,44 @@ impl DataBlock {
}
}

pub fn choose_hash_method_with_types(
hash_key_types: &[DataTypeImpl],
) -> Result<HashMethodKind> {
if hash_key_types.len() == 1 {
let typ = &hash_key_types[0];
if typ.data_type_id() == TypeID::String {
return Ok(HashMethodKind::SingleString(
HashMethodSingleString::default(),
));
}
}

let mut group_key_len = 0;
for typ in hash_key_types {
let typ = remove_nullable(typ);

if typ.data_type_id().is_numeric() || typ.data_type_id().is_date_or_date_time() {
group_key_len += typ.data_type_id().numeric_byte_size()?;

//extra one byte for null flag
if typ.is_nullable() {
group_key_len += 1;
}
} else {
return Ok(HashMethodKind::Serializer(HashMethodSerializer::default()));
}
}

match group_key_len {
1 => Ok(HashMethodKind::KeysU8(HashMethodKeysU8::default())),
2 => Ok(HashMethodKind::KeysU16(HashMethodKeysU16::default())),
3..=4 => Ok(HashMethodKind::KeysU32(HashMethodKeysU32::default())),
5..=8 => Ok(HashMethodKind::KeysU64(HashMethodKeysU64::default())),
// TODO support u128, u256
_ => Ok(HashMethodKind::Serializer(HashMethodSerializer::default())),
}
}

pub fn group_by_blocks(block: &DataBlock, column_names: &[String]) -> Result<Vec<DataBlock>> {
let method = Self::choose_hash_method(block, column_names)?;
Ok(match method {
Expand Down
13 changes: 11 additions & 2 deletions query/src/interpreters/interpreter_select_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_tracing::tracing;
use crate::interpreters::stream::ProcessorExecutorStream;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::pipelines::new::executor::PipelineExecutor;
use crate::pipelines::new::executor::PipelinePullingExecutor;
use crate::sessions::QueryContext;
use crate::sql::Planner;
Expand Down Expand Up @@ -52,9 +53,17 @@ impl Interpreter for SelectInterpreterV2 {
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let mut planner = Planner::new(self.ctx.clone());
let pipeline = planner.plan_sql(self.query.as_str()).await?;
let (root_pipeline, pipelines) = planner.plan_sql(self.query.as_str()).await?;
let async_runtime = self.ctx.get_storage_runtime();
let executor = PipelinePullingExecutor::try_create(async_runtime, pipeline)?;

// Spawn sub-pipelines
for pipeline in pipelines {
let executor = PipelineExecutor::create(async_runtime.clone(), pipeline)?;
executor.execute()?;
}

// Spawn root pipeline
let executor = PipelinePullingExecutor::try_create(async_runtime, root_pipeline)?;
let executor_stream = Box::pin(ProcessorExecutorStream::create(executor)?);
Ok(Box::pin(self.ctx.try_create_abortable(executor_stream)?))
}
Expand Down
4 changes: 4 additions & 0 deletions query/src/pipelines/new/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ pub use sources::SyncSourcer;
pub use transforms::AggregatorParams;
pub use transforms::AggregatorTransformParams;
pub use transforms::BlockCompactor;
pub use transforms::ChainHashTable;
pub use transforms::ExpressionTransform;
pub use transforms::HashJoinState;
pub use transforms::ProjectionTransform;
pub use transforms::SinkBuildHashTable;
pub use transforms::SortMergeCompactor;
pub use transforms::SubQueriesPuller;
pub use transforms::TransformAddOn;
Expand All @@ -59,6 +62,7 @@ pub use transforms::TransformCompact;
pub use transforms::TransformCreateSets;
pub use transforms::TransformDummy;
pub use transforms::TransformFilter;
pub use transforms::TransformHashJoinProbe;
pub use transforms::TransformHaving;
pub use transforms::TransformLimit;
pub use transforms::TransformLimitBy;
Expand Down
Loading

0 comments on commit 8eafa22

Please sign in to comment.