Skip to content

Commit

Permalink
refine code
Browse files Browse the repository at this point in the history
  • Loading branch information
leiysky committed May 7, 2022
1 parent 7284836 commit cbf5ddd
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 60 deletions.
41 changes: 40 additions & 1 deletion common/datablocks/src/kernels/data_block_group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_datavalues::remove_nullable;
use common_datavalues::{DataTypeImpl, remove_nullable};
use common_datavalues::DataType;
use common_datavalues::TypeID;
use common_exception::Result;
Expand All @@ -28,6 +28,7 @@ use crate::HashMethod;
use crate::HashMethodSingleString;

impl DataBlock {
// TODO(leiysky): replace with `DataBlock::choose_hash_method_with_types` and deprecate this method
pub fn choose_hash_method(
block: &DataBlock,
column_names: &[String],
Expand Down Expand Up @@ -68,6 +69,44 @@ impl DataBlock {
}
}

pub fn choose_hash_method_with_types(
hash_key_types: &[DataTypeImpl]
) -> Result<HashMethodKind> {
if hash_key_types.len() == 1 {
let typ = &hash_key_types[0];
if typ.data_type_id() == TypeID::String {
return Ok(HashMethodKind::SingleString(
HashMethodSingleString::default(),
));
}
}

let mut group_key_len = 0;
for typ in hash_key_types {
let typ = remove_nullable(typ);

if typ.data_type_id().is_numeric() || typ.data_type_id().is_date_or_date_time() {
group_key_len += typ.data_type_id().numeric_byte_size()?;

//extra one byte for null flag
if typ.is_nullable() {
group_key_len += 1;
}
} else {
return Ok(HashMethodKind::Serializer(HashMethodSerializer::default()));
}
}

match group_key_len {
1 => Ok(HashMethodKind::KeysU8(HashMethodKeysU8::default())),
2 => Ok(HashMethodKind::KeysU16(HashMethodKeysU16::default())),
3..=4 => Ok(HashMethodKind::KeysU32(HashMethodKeysU32::default())),
5..=8 => Ok(HashMethodKind::KeysU64(HashMethodKeysU64::default())),
// TODO support u128, u256
_ => Ok(HashMethodKind::Serializer(HashMethodSerializer::default())),
}
}

pub fn group_by_blocks(block: &DataBlock, column_names: &[String]) -> Result<Vec<DataBlock>> {
let method = Self::choose_hash_method(block, column_names)?;
Ok(match method {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_arrow::arrow::bitmap::Bitmap;
use common_datablocks::DataBlock;
use common_datablocks::HashMethod;
use common_datablocks::HashMethodKind;
use common_datavalues::combine_validities;
use common_datavalues::{combine_validities, DataTypeImpl};
use common_datavalues::ColumnRef;
use common_datavalues::DataSchemaRef;
use common_exception::Result;
Expand Down Expand Up @@ -82,13 +82,12 @@ impl ChainHashTable {
_probe_data_schema: DataSchemaRef,
ctx: Arc<QueryContext>,
) -> Result<Self> {
let sample_block = DataBlock::empty_with_schema(build_data_schema.clone());
// TODO: support complicated expressions
let build_keys: Vec<String> = build_expressions
let build_keys = build_expressions
.iter()
.map(|expr| expr.column_name())
.collect();
let hash_method_kind = DataBlock::choose_hash_method(&sample_block, &build_keys)?;
.map(|expr| expr.to_data_type(&build_data_schema))
.collect::<Result<Vec<DataTypeImpl>>>()?;
let hash_method_kind = DataBlock::choose_hash_method_with_types(&build_keys)?;
Ok(Self {
row_space: RowSpace::new(build_data_schema),
ref_count: Mutex::new(0),
Expand Down Expand Up @@ -203,7 +202,7 @@ impl HashJoinState for ChainHashTable {
null_bitmap = combine_validities(null_bitmap.as_ref(), bitmap);
}

self.row_space.push(input, hash_values, null_bitmap)?;
self.row_space.push(input, hash_values)?;

Ok(())
}
Expand Down Expand Up @@ -253,6 +252,10 @@ impl HashJoinState for ChainHashTable {
let mut results: Vec<DataBlock> = vec![];
for (i, hash_value) in hash_values.iter().enumerate().take(input.num_rows()) {
let probe_result_ptrs = self.get_matched_ptrs(*hash_value);
if probe_result_ptrs.is_empty() {
// No matched row for current probe row
continue;
}
let result_block = self.row_space.gather(&probe_result_ptrs)?;

let probe_block = DataBlock::block_take_by_indices(input, &[i as u32])?;
Expand Down
24 changes: 12 additions & 12 deletions query/src/pipelines/new/processors/transforms/hash_join/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
use std::sync::Arc;
use std::sync::RwLock;

use common_arrow::arrow::bitmap::Bitmap;
use common_datablocks::DataBlock;
use common_datavalues::ColumnRef;
use common_datavalues::DataField;
Expand All @@ -32,7 +31,6 @@ pub type HashVector = Vec<u64>;
pub struct Chunk {
pub data_block: DataBlock,
pub hash_values: HashVector,
pub null_bitmap: Option<Bitmap>,
pub next_ptr: Vec<Option<RowPtr>>,
}

Expand Down Expand Up @@ -65,13 +63,11 @@ impl RowSpace {
&self,
data_block: DataBlock,
hash_values: HashVector,
null_bitmap: Option<Bitmap>,
) -> Result<()> {
let row_count = data_block.num_rows();
let chunk = Chunk {
data_block,
hash_values,
null_bitmap,
next_ptr: vec![None; row_count],
};

Expand Down Expand Up @@ -106,17 +102,21 @@ impl RowSpace {
let chunks = self.chunks.read().unwrap();
for row_ptr in row_ptrs.iter() {
assert!((row_ptr.chunk_index as usize) < chunks.len());
data_blocks.push(
self.gather_single_chunk(&chunks[row_ptr.chunk_index as usize], &[
row_ptr.row_index
])?,
);
let block = self.gather_single_chunk(&chunks[row_ptr.chunk_index as usize], &[
row_ptr.row_index
])?;
if !block.is_empty() {
data_blocks.push(block);
}
}
}

let data_block = DataBlock::concat_blocks(&data_blocks)?;

Ok(data_block)
if !data_blocks.is_empty() {
let data_block = DataBlock::concat_blocks(&data_blocks)?;
Ok(data_block)
} else {
Ok(DataBlock::empty_with_schema(self.data_schema.clone()))
}
}

fn gather_single_chunk(&self, chunk: &Chunk, indices: &[u32]) -> Result<DataBlock> {
Expand Down
55 changes: 28 additions & 27 deletions query/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::servers::mysql::MySQLFederated;
use crate::servers::mysql::MYSQL_VERSION;
use crate::sessions::QueryContext;
use crate::sessions::SessionRef;
use crate::sql::PlanParser;
use crate::sql::{DfParser, DfStatement, PlanParser};
use crate::users::CertifiedInfo;

struct InteractiveWorkerBase<W: std::io::Write> {
Expand Down Expand Up @@ -154,7 +154,7 @@ impl<W: std::io::Write + Send + Sync> AsyncMysqlShim<W> for InteractiveWorker<W>
}

async fn on_close<'a>(&'a mut self, id: u32)
where W: 'async_trait {
where W: 'async_trait {
self.base.do_close(id).await;
}

Expand Down Expand Up @@ -277,40 +277,41 @@ impl<W: std::io::Write> InteractiveWorkerBase<W> {
let context = self.session.create_query_context().await?;
context.attach_query_str(query);

let (plan, hints) = PlanParser::parse_with_hint(query, context.clone()).await;
if let (Some(hint_error_code), Err(error_code)) = (
hints
.iter()
.find(|v| v.error_code.is_some())
.and_then(|x| x.error_code),
&plan,
) {
// Pre-check if parsing error can be ignored
if hint_error_code == error_code.code() {
return Ok((vec![DataBlock::empty()], String::from("")));
}
}

let plan = match plan {
Ok(p) => p,
Err(e) => {
InterpreterQueryLog::fail_to_start(context, e.clone()).await;
return Err(e);
}
};
tracing::debug!("Get logic plan:\n{:?}", plan);

let settings = context.get_settings();

let (stmts, hints) = DfParser::parse_sql(query, context.get_current_session().get_type())?;

let interpreter: Arc<dyn Interpreter> =
if settings.get_enable_new_processor_framework()? != 0
&& context.get_cluster().is_empty()
&& settings.get_enable_planner_v2()? != 0
&& matches!(plan, PlanNode::Select(..))
&& matches!(stmts.get(0), Some(DfStatement::Query(_)))
{
// New planner is enabled, and the statement is ensured to be `SELECT` statement.
SelectInterpreterV2::try_create(context.clone(), query)?
} else {
let (plan, _) = PlanParser::parse_with_hint(query, context.clone()).await;
if let (Some(hint_error_code), Err(error_code)) = (
hints
.iter()
.find(|v| v.error_code.is_some())
.and_then(|x| x.error_code),
&plan,
) {
// Pre-check if parsing error can be ignored
if hint_error_code == error_code.code() {
return Ok((vec![DataBlock::empty()], String::from("")));
}
}

let plan = match plan {
Ok(p) => p,
Err(e) => {
InterpreterQueryLog::fail_to_start(context, e.clone()).await;
return Err(e);
}
};
tracing::debug!("Get logic plan:\n{:?}", plan);
InterpreterFactory::get(context.clone(), plan)?
};

Expand Down Expand Up @@ -372,7 +373,7 @@ impl<W: std::io::Write> InteractiveWorkerBase<W> {

Ok::<Vec<DataBlock>, ErrorCode>(query_result)
}
.in_current_span(),
.in_current_span(),
)?;

let query_result = query_result
Expand Down
6 changes: 1 addition & 5 deletions query/src/sql/planner/binder/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,6 @@ impl<'a> JoinConditionResolver<'a> {
// Only equi-predicate can be exploited by common join algorithms(e.g. sort-merge join, hash join).
// For the predicates that aren't equi-predicate, we will lift them as a `Filter` operator.
if let Some((left, right)) = split_equivalent_predicate(predicate) {
if !matches!(&left, Scalar::BoundColumnRef(_))
|| !matches!(&right, Scalar::BoundColumnRef(_))
{
return Err(ErrorCode::UnImplement("Unsupported JOIN condition"));
}
let left_used_columns = left.used_columns();
let right_used_columns = right.used_columns();
let left_columns: ColumnSet = self.left_context.all_column_bindings().iter().fold(
Expand All @@ -233,6 +228,7 @@ impl<'a> JoinConditionResolver<'a> {
},
);

// TODO(leiysky): bump types of left conditions and right conditions
if left_used_columns.is_subset(&left_columns)
&& right_used_columns.is_subset(&right_columns)
{
Expand Down
10 changes: 5 additions & 5 deletions query/src/sql/statements/query/query_schema_joined_analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ impl JoinedSchemaAnalyzer {
}
}

// if analyzed_tables.len() != 1 {
// return Err(ErrorCode::LogicalError(
// "Logical error: this is relation rpn bug.",
// ));
// }
if analyzed_tables.len() != 1 {
return Err(ErrorCode::LogicalError(
"Logical error: this is relation rpn bug.",
));
}

Ok(analyzed_tables.remove(0))
}
Expand Down
6 changes: 3 additions & 3 deletions query/src/sql/statements/statement_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,9 @@ impl DfQueryStatement {

let mut tables_desc = schema.take_tables_desc();

// if tables_desc.len() != 1 {
// return Err(ErrorCode::UnImplement("Select join unimplemented yet."));
// }
if tables_desc.len() != 1 {
return Err(ErrorCode::UnImplement("Select join unimplemented yet."));
}

match tables_desc.remove(0) {
JoinedTableDesc::Table {
Expand Down

0 comments on commit cbf5ddd

Please sign in to comment.