Skip to content

Commit

Permalink
Merge pull request #5175 from leiysky/hash-join
Browse files Browse the repository at this point in the history
feat(planner): Implement hash inner join
  • Loading branch information
BohuTANG authored May 7, 2022
2 parents 01d80ee + 6efe9c5 commit d824fd9
Show file tree
Hide file tree
Showing 32 changed files with 1,597 additions and 168 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions common/datablocks/src/kernels/data_block_group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use common_datavalues::remove_nullable;
use common_datavalues::DataType;
use common_datavalues::DataTypeImpl;
use common_datavalues::TypeID;
use common_exception::Result;

Expand All @@ -28,6 +29,7 @@ use crate::HashMethod;
use crate::HashMethodSingleString;

impl DataBlock {
// TODO(leiysky): replace with `DataBlock::choose_hash_method_with_types` and deprecate this method
pub fn choose_hash_method(
block: &DataBlock,
column_names: &[String],
Expand Down Expand Up @@ -68,6 +70,44 @@ impl DataBlock {
}
}

pub fn choose_hash_method_with_types(
hash_key_types: &[DataTypeImpl],
) -> Result<HashMethodKind> {
if hash_key_types.len() == 1 {
let typ = &hash_key_types[0];
if typ.data_type_id() == TypeID::String {
return Ok(HashMethodKind::SingleString(
HashMethodSingleString::default(),
));
}
}

let mut group_key_len = 0;
for typ in hash_key_types {
let typ = remove_nullable(typ);

if typ.data_type_id().is_numeric() || typ.data_type_id().is_date_or_date_time() {
group_key_len += typ.data_type_id().numeric_byte_size()?;

//extra one byte for null flag
if typ.is_nullable() {
group_key_len += 1;
}
} else {
return Ok(HashMethodKind::Serializer(HashMethodSerializer::default()));
}
}

match group_key_len {
1 => Ok(HashMethodKind::KeysU8(HashMethodKeysU8::default())),
2 => Ok(HashMethodKind::KeysU16(HashMethodKeysU16::default())),
3..=4 => Ok(HashMethodKind::KeysU32(HashMethodKeysU32::default())),
5..=8 => Ok(HashMethodKind::KeysU64(HashMethodKeysU64::default())),
// TODO support u128, u256
_ => Ok(HashMethodKind::Serializer(HashMethodSerializer::default())),
}
}

pub fn group_by_blocks(block: &DataBlock, column_names: &[String]) -> Result<Vec<DataBlock>> {
let method = Self::choose_hash_method(block, column_names)?;
Ok(match method {
Expand Down
1 change: 1 addition & 0 deletions query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ time = "0.3.9"
tokio-rustls = "0.23.3"
tokio-stream = { version = "0.1.8", features = ["net"] }
tonic = "=0.6.2"
twox-hash = "1.6.2"
typetag = "0.1.8"
uuid = { version = "0.8.2", features = ["serde", "v4"] }
walkdir = "2.3.2"
Expand Down
13 changes: 11 additions & 2 deletions query/src/interpreters/interpreter_select_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_tracing::tracing;
use crate::interpreters::stream::ProcessorExecutorStream;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::pipelines::new::executor::PipelineExecutor;
use crate::pipelines::new::executor::PipelinePullingExecutor;
use crate::sessions::QueryContext;
use crate::sql::Planner;
Expand Down Expand Up @@ -52,9 +53,17 @@ impl Interpreter for SelectInterpreterV2 {
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let mut planner = Planner::new(self.ctx.clone());
let pipeline = planner.plan_sql(self.query.as_str()).await?;
let (root_pipeline, pipelines) = planner.plan_sql(self.query.as_str()).await?;
let async_runtime = self.ctx.get_storage_runtime();
let executor = PipelinePullingExecutor::try_create(async_runtime, pipeline)?;

// Spawn sub-pipelines
for pipeline in pipelines {
let executor = PipelineExecutor::create(async_runtime.clone(), pipeline)?;
executor.execute()?;
}

// Spawn root pipeline
let executor = PipelinePullingExecutor::try_create(async_runtime, root_pipeline)?;
let executor_stream = Box::pin(ProcessorExecutorStream::create(executor)?);
Ok(Box::pin(self.ctx.try_create_abortable(executor_stream)?))
}
Expand Down
4 changes: 4 additions & 0 deletions query/src/pipelines/new/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ pub use sources::SyncSourcer;
pub use transforms::AggregatorParams;
pub use transforms::AggregatorTransformParams;
pub use transforms::BlockCompactor;
pub use transforms::ChainHashTable;
pub use transforms::ExpressionTransform;
pub use transforms::HashJoinState;
pub use transforms::ProjectionTransform;
pub use transforms::SinkBuildHashTable;
pub use transforms::SortMergeCompactor;
pub use transforms::SubQueriesPuller;
pub use transforms::TransformAddOn;
Expand All @@ -59,6 +62,7 @@ pub use transforms::TransformCompact;
pub use transforms::TransformCreateSets;
pub use transforms::TransformDummy;
pub use transforms::TransformFilter;
pub use transforms::TransformHashJoinProbe;
pub use transforms::TransformHaving;
pub use transforms::TransformLimit;
pub use transforms::TransformLimitBy;
Expand Down
59 changes: 59 additions & 0 deletions query/src/pipelines/new/processors/transforms/hash_join/hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::hash::Hasher;

use common_datavalues::ColumnRef;
use common_datavalues::ColumnWithField;
use common_datavalues::DataField;
use common_datavalues::PrimitiveColumn;
use common_datavalues::Series;
use common_exception::Result;
use common_functions::scalars::FunctionContext;
use common_functions::scalars::FunctionFactory;
use twox_hash::XxHash64;

pub type HashVector = Vec<u64>;

pub struct HashUtil;

impl HashUtil {
pub fn compute_hash(column: &ColumnRef) -> Result<HashVector> {
let hash_function = FunctionFactory::instance().get("xxhash64", &[&column.data_type()])?;
let field = DataField::new("", column.data_type());
let result = hash_function.eval(
FunctionContext::default(),
&[ColumnWithField::new(column.clone(), field)],
column.len(),
)?;

let result = Series::remove_nullable(&result);
let result = Series::check_get::<PrimitiveColumn<u64>>(&result)?;
Ok(result.values().to_vec())
}

pub fn combine_hashes(inputs: &[HashVector], size: usize) -> HashVector {
static XXHASH_SEED: u64 = 0;

let mut result = Vec::with_capacity(size);
result.resize(size, XxHash64::with_seed(XXHASH_SEED));
for input in inputs.iter() {
assert_eq!(input.len(), size);
for i in 0..size {
result[i].write_u64(input[i]);
}
}
result.into_iter().map(|h| h.finish()).collect()
}
}
Loading

0 comments on commit d824fd9

Please sign in to comment.