Skip to content

Commit

Permalink
Merge branch 'main' into fix-assume-role-retry
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 18, 2022
2 parents 9954fd9 + e1246f7 commit c291c91
Show file tree
Hide file tree
Showing 19 changed files with 478 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::iter::repeat;
use std::iter::TrustedLen;
use std::sync::atomic::Ordering;

use common_catalog::table_context::TableContext;
use common_datablocks::DataBlock;
use common_datavalues::BooleanViewer;
use common_datavalues::ScalarViewer;
use common_exception::ErrorCode;
use common_exception::Result;
use common_hashtable::HashMap;
use common_hashtable::HashTableKeyable;
Expand All @@ -39,74 +42,172 @@ impl JoinHashTable {
Key: HashTableKeyable + Clone + 'static,
IT: Iterator<Item = Key> + TrustedLen,
{
let valids = &probe_state.valids;
let mut block_size = self.ctx.get_settings().get_max_block_size()? as usize;

// `probe_column` is the subquery result column.
// For sql: select * from t1 where t1.a in (select t2.a from t2); t2.a is the `probe_column`,
let probe_column = input.column(0);
// Check if there is any null in the probe column.
if let Some(validity) = probe_column.validity().1 {
if validity.unset_bits() > 0 {
let mut has_null = self.hash_join_desc.marker_join_desc.has_null.write();
*has_null = true;
if matches!(probe_column.validity().1, Some(x) if x.unset_bits() > 0) {
let mut has_null = self.hash_join_desc.marker_join_desc.has_null.write();
*has_null = true;
}

// If find join partner, set the marker to true.
let mut self_row_ptrs = self.row_ptrs.write();

for (i, key) in keys_iter.enumerate() {
if (i & block_size) == 0 {
block_size <<= 1;

if self.interrupt.load(Ordering::Relaxed) {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}
}

let probe_result_ptr = match self.hash_join_desc.from_correlated_subquery {
true => hash_table.find_key(&key),
false => self.probe_key(hash_table, key, valids, i),
};

if let Some(v) = probe_result_ptr {
let probed_rows = v.get_value();

for probed_row in probed_rows {
if let Some(p) = self_row_ptrs.iter_mut().find(|p| (*p).eq(&probed_row)) {
p.marker = Some(MarkerKind::True);
}
}
}
}
let probe_indexs = &mut probe_state.probe_indexs;
let build_indexs = &mut probe_state.build_indexs;

Ok(vec![DataBlock::empty()])
}
pub(crate) fn probe_left_mark_join_with_conjunct<Key, IT>(
&self,
hash_table: &HashMap<Key, Vec<RowPtr>>,
probe_state: &mut ProbeState,
keys_iter: IT,
input: &DataBlock,
) -> Result<Vec<DataBlock>>
where
Key: HashTableKeyable + Clone + 'static,
IT: Iterator<Item = Key> + TrustedLen,
{
let valids = &probe_state.valids;
let block_size = self.ctx.get_settings().get_max_block_size()? as usize;

// `probe_column` is the subquery result column.
// For sql: select * from t1 where t1.a in (select t2.a from t2); t2.a is the `probe_column`,
let probe_column = input.column(0);
// Check if there is any null in the probe column.
if matches!(probe_column.validity().1, Some(x) if x.unset_bits() > 0) {
let mut has_null = self.hash_join_desc.marker_join_desc.has_null.write();
*has_null = true;
}

let func_ctx = self.ctx.try_get_function_context()?;
let other_predicate = self.hash_join_desc.other_predicate.as_ref().unwrap();

let mut row_ptrs = self.row_ptrs.write();
let mut probe_indexes = Vec::with_capacity(block_size);
let mut build_indexes = Vec::with_capacity(block_size);

for (i, key) in keys_iter.enumerate() {
let probe_result_ptr = if self.hash_join_desc.from_correlated_subquery {
hash_table.find_key(&key)
} else {
self.probe_key(hash_table, key, valids, i)
let probe_result_ptr = match self.hash_join_desc.from_correlated_subquery {
true => hash_table.find_key(&key),
false => self.probe_key(hash_table, key, valids, i),
};

if let Some(v) = probe_result_ptr {
let probe_result_ptrs = v.get_value();
build_indexs.extend_from_slice(probe_result_ptrs);
probe_indexs.extend(std::iter::repeat(i as u32).take(probe_result_ptrs.len()));
for ptr in probe_result_ptrs {
// If has other conditions, we'll process marker later
if self.hash_join_desc.other_predicate.is_none() {
// If find join partner, set the marker to true.
let mut self_row_ptrs = self.row_ptrs.write();
if let Some(p) = self_row_ptrs.iter_mut().find(|p| (*p).eq(&ptr)) {
p.marker = Some(MarkerKind::True);
let probed_rows = v.get_value();

if probe_indexes.len() + probed_rows.len() < probe_indexes.capacity() {
build_indexes.extend_from_slice(probed_rows);
probe_indexes.extend(repeat(i as u32).take(probed_rows.len()));
} else {
let mut index = 0_usize;
let mut remain = probed_rows.len();

while index < probed_rows.len() {
if probe_indexes.len() + remain < probe_indexes.capacity() {
build_indexes.extend_from_slice(&probed_rows[index..]);
probe_indexes.extend(std::iter::repeat(i as u32).take(remain));
index += remain;
} else {
if self.interrupt.load(Ordering::Relaxed) {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

let addition = probe_indexes.capacity() - probe_indexes.len();
let new_index = index + addition;

build_indexes.extend_from_slice(&probed_rows[index..new_index]);
probe_indexes.extend(repeat(i as u32).take(addition));

let probe_block =
DataBlock::block_take_by_indices(input, &probe_indexes)?;
let build_block = self.row_space.gather(&build_indexes)?;
let merged_block = self.merge_eq_block(&build_block, &probe_block)?;

let type_vector = other_predicate.eval(&func_ctx, &merged_block)?;
let filter_column = type_vector.vector();
let filter_viewer = BooleanViewer::try_create(filter_column)?;

for (idx, build_index) in build_indexes.iter().enumerate() {
let self_row_ptr =
row_ptrs.iter_mut().find(|p| (*p).eq(&build_index)).unwrap();
if !filter_viewer.valid_at(idx) {
if self_row_ptr.marker == Some(MarkerKind::False) {
self_row_ptr.marker = Some(MarkerKind::Null);
}
} else if filter_viewer.value_at(idx) {
self_row_ptr.marker = Some(MarkerKind::True);
}
}

index = new_index;
remain -= addition;

build_indexes.clear();
probe_indexes.clear();
}
}
}
}
}
if self.hash_join_desc.other_predicate.is_none() {
return Ok(vec![DataBlock::empty()]);
}

if self.hash_join_desc.from_correlated_subquery {
// Must be correlated ANY subquery, we won't need to check `has_null` in `mark_join_blocks`.
// In the following, if value is Null and Marker is False, we'll set the marker to Null
let mut has_null = self.hash_join_desc.marker_join_desc.has_null.write();
*has_null = false;
}
let probe_block = DataBlock::block_take_by_indices(input, probe_indexs)?;
let build_block = self.row_space.gather(build_indexs)?;
let probe_block = DataBlock::block_take_by_indices(input, &probe_indexes)?;
let build_block = self.row_space.gather(&build_indexes)?;
let merged_block = self.merge_eq_block(&build_block, &probe_block)?;
let func_ctx = self.ctx.try_get_function_context()?;
let type_vector = self
.hash_join_desc
.other_predicate
.as_ref()
.unwrap()
.eval(&func_ctx, &merged_block)?;

let type_vector = other_predicate.eval(&func_ctx, &merged_block)?;
let filter_column = type_vector.vector();
let boolean_viewer = BooleanViewer::try_create(filter_column)?;
let mut row_ptrs = self.row_ptrs.write();
for (idx, build_index) in build_indexs.iter().enumerate() {
let filter_viewer = BooleanViewer::try_create(filter_column)?;

for (idx, build_index) in build_indexes.iter().enumerate() {
let self_row_ptr = row_ptrs.iter_mut().find(|p| (*p).eq(&build_index)).unwrap();
if !boolean_viewer.valid_at(idx) {
if !filter_viewer.valid_at(idx) {
if self_row_ptr.marker == Some(MarkerKind::False) {
self_row_ptr.marker = Some(MarkerKind::Null);
}
} else if boolean_viewer.value_at(idx) {
} else if filter_viewer.value_at(idx) {
self_row_ptr.marker = Some(MarkerKind::True);
}
}

if self.hash_join_desc.from_correlated_subquery {
// Must be correlated ANY subquery, we won't need to check `has_null` in `mark_join_blocks`.
// In the following, if value is Null and Marker is False, we'll set the marker to Null
let mut has_null = self.hash_join_desc.marker_join_desc.has_null.write();
*has_null = false;
}

Ok(vec![DataBlock::empty()])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,15 @@ impl JoinHashTable {
// so equi-condition is t1.b = subquery_5, and non-equi-condition is t1.a = t2.a.
// 3. Correlated Exists subquery: only have one kind of join condition, equi-condition.
// equi-condition is subquery's outer columns with subquery's derived columns. (see the above example in correlated ANY subquery)
JoinType::LeftMark => {
self.probe_left_mark_join(hash_table, probe_state, keys_iter, input)
}
JoinType::LeftMark => match self.hash_join_desc.other_predicate.is_none() {
true => self.probe_left_mark_join(hash_table, probe_state, keys_iter, input),
false => self.probe_left_mark_join_with_conjunct(
hash_table,
probe_state,
keys_iter,
input,
),
},
JoinType::RightMark => match self.hash_join_desc.other_predicate.is_none() {
true => self.probe_right_mark_join(hash_table, probe_state, keys_iter, input),
false => self.probe_right_mark_join_with_conjunct(
Expand Down
Loading

0 comments on commit c291c91

Please sign in to comment.