Skip to content

Commit

Permalink
Merge pull request #6412 from xudong963/in_subquery
Browse files Browse the repository at this point in the history
feat(planner): support mark join, (not)in/any subquery, make tpch16 and tpch18 happy
  • Loading branch information
BohuTANG authored Jul 5, 2022
2 parents 243159a + 03473c9 commit ff8cf56
Show file tree
Hide file tree
Showing 23 changed files with 428 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use common_datavalues::DataField;
use common_datavalues::DataSchemaRef;
use common_datavalues::DataSchemaRefExt;
use common_datavalues::DataTypeImpl;
use common_datavalues::DataValue;
use common_exception::Result;
use common_hashtable::HashMap;
use common_hashtable::HashTableKeyable;
Expand All @@ -49,6 +50,7 @@ use crate::sessions::QueryContext;
use crate::sql::exec::ColumnID;
use crate::sql::exec::PhysicalScalar;
use crate::sql::planner::plans::JoinType;
use crate::sql::IndexType;

pub struct SerializerHashTable {
pub(crate) hash_table: HashMap<KeysRef, Vec<RowPtr>>,
Expand Down Expand Up @@ -101,21 +103,31 @@ pub enum HashTable {
KeyU512HashTable(KeyU512HashTable),
}

pub struct JoinHashTable {
/// Reference count
ref_count: Mutex<usize>,
is_finished: Mutex<bool>,
#[derive(Clone, Eq, PartialEq, Debug)]
pub enum MarkerKind {
True,
False,
Null,
}

pub struct HashJoinDesc {
pub(crate) build_keys: Vec<EvalNode<ColumnID>>,
pub(crate) probe_keys: Vec<EvalNode<ColumnID>>,
pub(crate) join_type: JoinType,
pub(crate) other_predicate: Option<EvalNode<ColumnID>>,
pub(crate) marker: RwLock<Vec<MarkerKind>>,
pub(crate) marker_index: Option<IndexType>,
}

pub struct JoinHashTable {
pub(crate) ctx: Arc<QueryContext>,

/// Reference count
ref_count: Mutex<usize>,
is_finished: Mutex<bool>,
/// A shared big hash table stores all the rows from build side
pub(crate) hash_table: RwLock<HashTable>,
pub(crate) row_space: RowSpace,
pub(crate) join_type: JoinType,
pub(crate) other_predicate: Option<EvalNode<ColumnID>>,
pub(crate) hash_join_desc: HashJoinDesc,
}

impl JoinHashTable {
Expand All @@ -126,122 +138,112 @@ impl JoinHashTable {
probe_keys: &[PhysicalScalar],
other_predicate: Option<&PhysicalScalar>,
build_schema: DataSchemaRef,
marker_index: Option<IndexType>,
) -> Result<Arc<JoinHashTable>> {
let hash_key_types: Vec<DataTypeImpl> =
build_keys.iter().map(|expr| expr.data_type()).collect();
let method = DataBlock::choose_hash_method_with_types(&hash_key_types)?;
let hash_join_desc = HashJoinDesc {
build_keys: build_keys
.iter()
.map(Evaluator::eval_physical_scalar)
.collect::<Result<_>>()?,
probe_keys: probe_keys
.iter()
.map(Evaluator::eval_physical_scalar)
.collect::<Result<_>>()?,
join_type,
other_predicate: other_predicate
.map(Evaluator::eval_physical_scalar)
.transpose()?,
marker: RwLock::new(vec![]),
marker_index,
};
Ok(match method {
HashMethodKind::SingleString(_) | HashMethodKind::Serializer(_) => {
Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::SerializerHashTable(SerializerHashTable {
hash_table: HashMap::<KeysRef, Vec<RowPtr>>::create(),
hash_method: HashMethodSerializer::default(),
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
hash_join_desc,
)?)
}
HashMethodKind::KeysU8(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU8HashTable(KeyU8HashTable {
hash_table: HashMap::<u8, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
hash_join_desc,
)?),
HashMethodKind::KeysU16(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU16HashTable(KeyU16HashTable {
hash_table: HashMap::<u16, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
hash_join_desc,
)?),
HashMethodKind::KeysU32(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU32HashTable(KeyU32HashTable {
hash_table: HashMap::<u32, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
hash_join_desc,
)?),
HashMethodKind::KeysU64(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU64HashTable(KeyU64HashTable {
hash_table: HashMap::<u64, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
hash_join_desc,
)?),
HashMethodKind::KeysU128(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU128HashTable(KeyU128HashTable {
hash_table: HashMap::<u128, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
hash_join_desc,
)?),
HashMethodKind::KeysU256(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU256HashTable(KeyU256HashTable {
hash_table: HashMap::<U256, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
hash_join_desc,
)?),
HashMethodKind::KeysU512(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU512HashTable(KeyU512HashTable {
hash_table: HashMap::<U512, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
hash_join_desc,
)?),
})
}

pub fn try_create(
ctx: Arc<QueryContext>,
join_type: JoinType,
hash_table: HashTable,
build_keys: &[PhysicalScalar],
probe_keys: &[PhysicalScalar],
other_predicate: Option<&PhysicalScalar>,
mut build_data_schema: DataSchemaRef,
hash_join_desc: HashJoinDesc,
) -> Result<Self> {
if join_type == JoinType::Left {
if hash_join_desc.join_type == JoinType::Left {
let mut nullable_field = Vec::with_capacity(build_data_schema.fields().len());
for field in build_data_schema.fields().iter() {
nullable_field.push(DataField::new_nullable(
Expand All @@ -255,20 +257,9 @@ impl JoinHashTable {
row_space: RowSpace::new(build_data_schema),
ref_count: Mutex::new(0),
is_finished: Mutex::new(false),
build_keys: build_keys
.iter()
.map(Evaluator::eval_physical_scalar)
.collect::<Result<_>>()?,
probe_keys: probe_keys
.iter()
.map(Evaluator::eval_physical_scalar)
.collect::<Result<_>>()?,
other_predicate: other_predicate
.map(Evaluator::eval_physical_scalar)
.transpose()?,
hash_join_desc,
ctx,
hash_table: RwLock::new(hash_table),
join_type,
})
}

Expand Down Expand Up @@ -339,6 +330,7 @@ impl JoinHashTable {
) -> Result<Vec<DataBlock>> {
let func_ctx = self.ctx.try_get_function_context()?;
let probe_keys = self
.hash_join_desc
.probe_keys
.iter()
.map(|expr| Ok(expr.eval(&func_ctx, input)?.vector().clone()))
Expand Down Expand Up @@ -406,6 +398,7 @@ impl HashJoinState for JoinHashTable {
fn build(&self, input: DataBlock) -> Result<()> {
let func_ctx = self.ctx.try_get_function_context()?;
let build_cols = self
.hash_join_desc
.build_keys
.iter()
.map(|expr| Ok(expr.eval(&func_ctx, &input)?.vector().clone()))
Expand All @@ -428,12 +421,12 @@ impl HashJoinState for JoinHashTable {
}

fn probe(&self, input: &DataBlock, probe_state: &mut ProbeState) -> Result<Vec<DataBlock>> {
match self.join_type {
JoinType::Inner | JoinType::Semi | JoinType::Anti | JoinType::Left => {
match self.hash_join_desc.join_type {
JoinType::Inner | JoinType::Semi | JoinType::Anti | JoinType::Left | JoinType::Mark => {
self.probe_join(input, probe_state)
}
JoinType::Cross => self.probe_cross_join(input, probe_state),
_ => unimplemented!("{} is unimplemented", self.join_type),
_ => unimplemented!("{} is unimplemented", self.hash_join_desc.join_type),
}
}

Expand Down Expand Up @@ -461,13 +454,24 @@ impl HashJoinState for JoinHashTable {
}

fn finish(&self) -> Result<()> {
let chunks = self.row_space.chunks.write().unwrap();
let chunks = self.row_space.chunks.read().unwrap();
let mut marker = self.hash_join_desc.marker.write();
for chunk_index in 0..chunks.len() {
let chunk = &chunks[chunk_index];
let mut columns = vec![];
if let Some(cols) = chunk.cols.as_ref() {
columns = Vec::with_capacity(cols.len());
for col in cols.iter() {
if self.hash_join_desc.join_type == JoinType::Mark {
assert_eq!(cols.len(), 1);
for row_idx in 0..col.len() {
if col.get(row_idx) == DataValue::Null {
marker.push(MarkerKind::Null);
} else {
marker.push(MarkerKind::False);
}
}
}
columns.push(col);
}
}
Expand Down
Loading

0 comments on commit ff8cf56

Please sign in to comment.