Skip to content

Commit

Permalink
fix clippy, add annotations, remove tpch q8
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 committed Jul 4, 2022
1 parent 0e2de3f commit 883c08e
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,29 +105,29 @@ pub enum HashTable {

#[derive(Clone, Eq, PartialEq, Debug)]
pub enum MarkerKind {
TRUE,
FALSE,
NULL,
True,
False,
Null,
}

pub struct JoinHashTable {
/// Reference count
ref_count: Mutex<usize>,
is_finished: Mutex<bool>,

pub struct HashJoinUtil {
pub(crate) build_keys: Vec<EvalNode<ColumnID>>,
pub(crate) probe_keys: Vec<EvalNode<ColumnID>>,
pub(crate) join_type: JoinType,
pub(crate) other_predicate: Option<EvalNode<ColumnID>>,
pub(crate) marker: RwLock<Vec<MarkerKind>>,
pub(crate) marker_index: Option<IndexType>,
}

pub struct JoinHashTable {
pub(crate) ctx: Arc<QueryContext>,

/// Reference count
ref_count: Mutex<usize>,
is_finished: Mutex<bool>,
/// A shared big hash table stores all the rows from build side
pub(crate) hash_table: RwLock<HashTable>,
pub(crate) row_space: RowSpace,
pub(crate) join_type: JoinType,
pub(crate) other_predicate: Option<EvalNode<ColumnID>>,

pub(crate) marker: RwLock<Vec<MarkerKind>>,
pub(crate) marker_index: Option<IndexType>,
pub(crate) hash_join_util: HashJoinUtil,
}

impl JoinHashTable {
Expand All @@ -143,127 +143,107 @@ impl JoinHashTable {
let hash_key_types: Vec<DataTypeImpl> =
build_keys.iter().map(|expr| expr.data_type()).collect();
let method = DataBlock::choose_hash_method_with_types(&hash_key_types)?;
let hash_join_util = HashJoinUtil {
build_keys: build_keys
.iter()
.map(Evaluator::eval_physical_scalar)
.collect::<Result<_>>()?,
probe_keys: probe_keys
.iter()
.map(Evaluator::eval_physical_scalar)
.collect::<Result<_>>()?,
join_type,
other_predicate: other_predicate
.map(Evaluator::eval_physical_scalar)
.transpose()?,
marker: RwLock::new(vec![]),
marker_index,
};
Ok(match method {
HashMethodKind::SingleString(_) | HashMethodKind::Serializer(_) => {
Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::SerializerHashTable(SerializerHashTable {
hash_table: HashMap::<KeysRef, Vec<RowPtr>>::create(),
hash_method: HashMethodSerializer::default(),
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
marker_index,
hash_join_util,
)?)
}
HashMethodKind::KeysU8(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU8HashTable(KeyU8HashTable {
hash_table: HashMap::<u8, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
marker_index,
hash_join_util,
)?),
HashMethodKind::KeysU16(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU16HashTable(KeyU16HashTable {
hash_table: HashMap::<u16, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
marker_index,
hash_join_util,
)?),
HashMethodKind::KeysU32(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU32HashTable(KeyU32HashTable {
hash_table: HashMap::<u32, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
marker_index,
hash_join_util,
)?),
HashMethodKind::KeysU64(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU64HashTable(KeyU64HashTable {
hash_table: HashMap::<u64, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
marker_index,
hash_join_util,
)?),
HashMethodKind::KeysU128(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU128HashTable(KeyU128HashTable {
hash_table: HashMap::<u128, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
marker_index,
hash_join_util,
)?),
HashMethodKind::KeysU256(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU256HashTable(KeyU256HashTable {
hash_table: HashMap::<U256, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
marker_index,
hash_join_util,
)?),
HashMethodKind::KeysU512(hash_method) => Arc::new(JoinHashTable::try_create(
ctx,
join_type,
HashTable::KeyU512HashTable(KeyU512HashTable {
hash_table: HashMap::<U512, Vec<RowPtr>>::create(),
hash_method,
}),
build_keys,
probe_keys,
other_predicate,
build_schema,
marker_index,
hash_join_util,
)?),
})
}

pub fn try_create(
ctx: Arc<QueryContext>,
join_type: JoinType,
hash_table: HashTable,
build_keys: &[PhysicalScalar],
probe_keys: &[PhysicalScalar],
other_predicate: Option<&PhysicalScalar>,
mut build_data_schema: DataSchemaRef,
marker_index: Option<IndexType>,
hash_join_util: HashJoinUtil,
) -> Result<Self> {
if join_type == JoinType::Left {
if hash_join_util.join_type == JoinType::Left {
let mut nullable_field = Vec::with_capacity(build_data_schema.fields().len());
for field in build_data_schema.fields().iter() {
nullable_field.push(DataField::new_nullable(
Expand All @@ -277,22 +257,9 @@ impl JoinHashTable {
row_space: RowSpace::new(build_data_schema),
ref_count: Mutex::new(0),
is_finished: Mutex::new(false),
build_keys: build_keys
.iter()
.map(Evaluator::eval_physical_scalar)
.collect::<Result<_>>()?,
probe_keys: probe_keys
.iter()
.map(Evaluator::eval_physical_scalar)
.collect::<Result<_>>()?,
other_predicate: other_predicate
.map(Evaluator::eval_physical_scalar)
.transpose()?,
hash_join_util,
ctx,
hash_table: RwLock::new(hash_table),
join_type,
marker: RwLock::new(vec![]),
marker_index,
})
}

Expand Down Expand Up @@ -363,6 +330,7 @@ impl JoinHashTable {
) -> Result<Vec<DataBlock>> {
let func_ctx = self.ctx.try_get_function_context()?;
let probe_keys = self
.hash_join_util
.probe_keys
.iter()
.map(|expr| Ok(expr.eval(&func_ctx, input)?.vector().clone()))
Expand Down Expand Up @@ -430,6 +398,7 @@ impl HashJoinState for JoinHashTable {
fn build(&self, input: DataBlock) -> Result<()> {
let func_ctx = self.ctx.try_get_function_context()?;
let build_cols = self
.hash_join_util
.build_keys
.iter()
.map(|expr| Ok(expr.eval(&func_ctx, &input)?.vector().clone()))
Expand All @@ -452,12 +421,12 @@ impl HashJoinState for JoinHashTable {
}

fn probe(&self, input: &DataBlock, probe_state: &mut ProbeState) -> Result<Vec<DataBlock>> {
match self.join_type {
match self.hash_join_util.join_type {
JoinType::Inner | JoinType::Semi | JoinType::Anti | JoinType::Left | JoinType::Mark => {
self.probe_join(input, probe_state)
}
JoinType::Cross => self.probe_cross_join(input, probe_state),
_ => unimplemented!("{} is unimplemented", self.join_type),
_ => unimplemented!("{} is unimplemented", self.hash_join_util.join_type),
}
}

Expand Down Expand Up @@ -486,20 +455,20 @@ impl HashJoinState for JoinHashTable {

fn finish(&self) -> Result<()> {
let chunks = self.row_space.chunks.read().unwrap();
let mut marker = self.marker.write();
let mut marker = self.hash_join_util.marker.write();
for chunk_index in 0..chunks.len() {
let chunk = &chunks[chunk_index];
let mut columns = vec![];
if let Some(cols) = chunk.cols.as_ref() {
columns = Vec::with_capacity(cols.len());
for col in cols.iter() {
if self.join_type == JoinType::Mark {
if self.hash_join_util.join_type == JoinType::Mark {
assert_eq!(cols.len(), 1);
for row_idx in 0..col.len() {
if col.get(row_idx) == DataValue::Null {
marker.push(MarkerKind::NULL);
marker.push(MarkerKind::Null);
} else {
marker.push(MarkerKind::FALSE);
marker.push(MarkerKind::False);
}
}
}
Expand Down
Loading

0 comments on commit 883c08e

Please sign in to comment.