Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(planner): Implement hash inner join #5175

Merged
merged 2 commits into from
May 7, 2022

Conversation

leiysky
Copy link
Contributor

@leiysky leiysky commented May 5, 2022

I hereby agree to the terms of the CLA available at: https://databend.rs/dev/policies/cla/

Summary

Initial version of hash join.

Only support inner join with ON clause, for example:

select * from t inner join t1 on t.a = t1.a

Changelog

  • New Feature

Related Issues

Fixes #issue

@leiysky leiysky added the C-feature Category: feature label May 5, 2022
@leiysky leiysky requested a review from zhang2014 May 5, 2022 13:07
@vercel
Copy link

vercel bot commented May 5, 2022

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Updated
databend ⬜️ Ignored (Inspect) May 7, 2022 at 0:27AM (UTC)

@mergify
Copy link
Contributor

mergify bot commented May 5, 2022

Thanks for the contribution!
I have applied any labels matching special text in your PR Changelog.

Please review the labels and make any necessary changes.

@mergify mergify bot added the pr-feature this PR introduces a new feature to the codebase label May 5, 2022
@Xuanwo
Copy link
Member

Xuanwo commented May 5, 2022

Join the world!

Comment on lines +56 to +66
let (root_pipeline, pipelines) = planner.plan_sql(self.query.as_str()).await?;
let async_runtime = self.ctx.get_storage_runtime();
let executor = PipelinePullingExecutor::try_create(async_runtime, pipeline)?;

// Spawn sub-pipelines
for pipeline in pipelines {
let executor = PipelineExecutor::create(async_runtime.clone(), pipeline)?;
executor.execute()?;
}

// Spawn root pipeline
let executor = PipelinePullingExecutor::try_create(async_runtime, root_pipeline)?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhang2014

I found it's tricky to put the parallel pipelines into a same NewPipeline.

In current implementation, it would spawn sub-tasks(i.e. building hash table) with PipelineExecutor, and spawn root task(i.e. the pipeline which probes hash table) with PipelinePullingExecutor.

The root task will be blocked until the hash table building is finished, which is achieved by polling the state of HashJoinState.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spawn sub-tasks is ok. I'll work on this task.

We need add NewPipe type(zip nested pipeline last pipe with outer pipeline previous pipe):

NewPipe::NestedPipeline { 
    nested_pipeline: NewPipeline,
    processors: Vec<ProcessorPtr>, 
    outputs_port: Vec<Arc<OutputPort>>, 
    inputs_port: Vec<(Arc<InputPort>, Arc<InputPort>)>, 

}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhang2014 That's amazing.

I suggest to make nested_pipeline a Vec<NewPipeline>, so we can support multi-way UNION and multi-way join in the future with it.

query/src/sql/exec/mod.rs Outdated Show resolved Hide resolved
let columns: Vec<&ColumnRef> = columns_vec.iter().collect();
Ok(match &self.hash_method_kind {
HashMethodKind::Serializer(method) => method
.build_keys(&columns, row_count)?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not find any reason to use hash_method_kind.
Evaluate xxhash64 may be more simple way to do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to reuse the HashMethod, but it seems the need of hashing is different from Aggregate.

It only needs a hashed u64 value here. Maybe I can just compute hash for each column and combine them together?

Copy link
Member

@sundy-li sundy-li May 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashMethod is optimized in the case of group aggregation, it produces unique keys(no collision) and leads to more memory.

Now you only need a hashed u64 value, so HashMethod may be useless unless you store your state using HashTable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got your point, now hash function may not work for multi-columns. Let's keep it as now and refactor it in the future.

Copy link
Contributor Author

@leiysky leiysky May 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sundy-li I have remove the HashMethod and use hash Function instead. PTAL

Maybe we can make HashMethod a more common component later.

let logical_inner_join: LogicalInnerJoin = plan.try_into()?;

let result = SExpr::create(
PhysicalHashJoin {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, when our statistics module matures, it seems that the build side can be determined based on statistical information

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We will always choose right side as build side. The join commutativity rule can help us enumurate the candidates.

fields.push(field.clone());
}

DataSchemaRefExt::create(fields)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍, other build_xxx methods can also use the API.

@BohuTANG
Copy link
Member

BohuTANG commented May 7, 2022

Oops, Conflicting files :\

@leiysky
Copy link
Contributor Author

leiysky commented May 7, 2022

Oops, Conflicting files :\

I'm fixing that.

@BohuTANG BohuTANG requested a review from sundy-li May 7, 2022 12:28
@sundy-li
Copy link
Member

sundy-li commented May 7, 2022

/LGTM

@BohuTANG BohuTANG merged commit d824fd9 into databendlabs:main May 7, 2022
select * from t inner join t1 on cast(t.a as float) = t1.b;
select * from t inner join t2 on t.a = t2.c;
select * from t inner join t2 on t.a = t2.c + 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not get it, why does this query have empty results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a bug, this query should output:

a c
----
2 1
3 2

Just as the following query.

I'll fix it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

select null+1; will panic in v2:

022-05-07T14:07:37.182626Z ERROR common_tracing::panic_hook: panicked at 'called `Option::unwrap()` on a `None` value', query/src/sql/planner/binder/project.rs:44:59 backtrace=Backtrace [{ fn: "common_tracing::panic_hook::set_panic_hook::{{closure}}", file: "./common/tracing/src/panic_hook.rs", line: 25 }, { fn: "std::panicking::rust_panic_with_hook", 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-feature Category: feature need-review pr-feature this PR introduces a new feature to the codebase
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants