diff --git a/src/query/ast/src/ast/statements/table.rs b/src/query/ast/src/ast/statements/table.rs index 57792449eebe..de0712229599 100644 --- a/src/query/ast/src/ast/statements/table.rs +++ b/src/query/ast/src/ast/statements/table.rs @@ -796,6 +796,7 @@ pub enum OptimizeTableAction { All, Purge { before: Option }, Compact { target: CompactTarget }, + ClusterBy { exprs: Vec }, } impl Display for OptimizeTableAction { @@ -820,6 +821,11 @@ impl Display for OptimizeTableAction { } Ok(()) } + OptimizeTableAction::ClusterBy { exprs } => { + write!(f, "CLUSTER BY HILBERT(")?; + write_comma_separated_list(f, exprs)?; + write!(f, ")") + } } } } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index b92fabeb0516..3f601f881acf 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -3713,6 +3713,10 @@ pub fn optimize_table_action(i: Input) -> IResult { target: opt_segment.map_or(CompactTarget::Block, |_| CompactTarget::Segment), } }), + map( + rule! { CLUSTER ~ ^BY ~ HILBERT ~ ^"(" ~ ^#comma_separated_list1(expr) ~ ^")" }, + |(_, _, _, _, exprs, _)| OptimizeTableAction::ClusterBy { exprs }, + ), ))(i) } diff --git a/src/query/expression/src/kernels/sort.rs b/src/query/expression/src/kernels/sort.rs index 91bc7ebff299..02a6c1fcb579 100644 --- a/src/query/expression/src/kernels/sort.rs +++ b/src/query/expression/src/kernels/sort.rs @@ -16,6 +16,7 @@ use databend_common_exception::Result; use crate::types::DataType; use crate::visitor::ValueVisitor; +use crate::Column; use crate::ColumnBuilder; use crate::DataBlock; use crate::Scalar; @@ -126,8 +127,11 @@ pub fn compare_scalars(rows: Vec>, data_types: &[DataType]) -> Resul .into_iter() .map(|builder| builder.build()) .collect::>(); + compare_columns(order_columns, length) +} - let descriptions = order_columns +pub fn compare_columns(columns: Vec, length: usize) -> Result> { + let descriptions = columns .iter() .enumerate() .map(|(idx, _)| SortColumnDescription { @@ -139,7 +143,7 @@ pub fn compare_scalars(rows: Vec>, data_types: &[DataType]) -> Resul let mut sort_compare = SortCompare::new(descriptions, length, LimitType::None); - for array in order_columns { + for array in columns { sort_compare.visit_value(Value::Column(array))?; sort_compare.increment_column_index(); } diff --git a/src/query/functions/src/aggregates/aggregate_range_bound.rs b/src/query/functions/src/aggregates/aggregate_range_bound.rs new file mode 100644 index 000000000000..19b79ba6655e --- /dev/null +++ b/src/query/functions/src/aggregates/aggregate_range_bound.rs @@ -0,0 +1,362 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::sync::Arc; + +use borsh::BorshDeserialize; +use borsh::BorshSerialize; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::compare_columns; +use databend_common_expression::types::array::ArrayColumnBuilder; +use databend_common_expression::types::Bitmap; +use databend_common_expression::types::*; +use databend_common_expression::with_number_mapped_type; +use databend_common_expression::AggregateFunctionRef; +use databend_common_expression::Scalar; +use ethnum::i256; +use rand::prelude::SliceRandom; +use rand::rngs::SmallRng; +use rand::thread_rng; +use rand::Rng; +use rand::SeedableRng; + +use super::assert_unary_arguments; +use super::AggregateUnaryFunction; +use super::FunctionData; +use super::UnaryState; +use crate::aggregates::aggregate_function_factory::AggregateFunctionDescription; +use crate::with_simple_no_number_mapped_type; + +struct RangeBoundData { + partitions: usize, + sample_size: usize, +} + +impl FunctionData for RangeBoundData { + fn as_any(&self) -> &dyn Any { + self + } +} + +#[derive(BorshSerialize, BorshDeserialize)] +pub struct RangeBoundState +where + T: ValueType, + T::Scalar: BorshSerialize + BorshDeserialize, +{ + values: Vec<(u64, Vec)>, + total_rows: usize, + total_samples: usize, +} + +impl Default for RangeBoundState +where + T: ValueType, + T::Scalar: BorshSerialize + BorshDeserialize, +{ + fn default() -> Self { + RangeBoundState:: { + values: vec![], + total_rows: 0, + total_samples: 0, + } + } +} + +impl UnaryState> for RangeBoundState +where + T: ArgType + Sync + Send, + T::Scalar: Ord + Sync + Send + BorshSerialize + BorshDeserialize, +{ + fn add( + &mut self, + other: T::ScalarRef<'_>, + function_data: Option<&dyn FunctionData>, + ) -> Result<()> { + let range_bound_data = unsafe { + function_data + .unwrap() + .as_any() + .downcast_ref_unchecked::() + }; + + let total_sample_size = std::cmp::min( + range_bound_data.sample_size * range_bound_data.partitions, + 10_000, + ); + + if self.values.is_empty() { + self.values.push((0, vec![])); + } + let (total_rows, samples) = &mut self.values[0]; + *total_rows += 1; + self.total_rows += 1; + if samples.len() < total_sample_size { + self.total_samples += 1; + samples.push(T::to_owned_scalar(other)); + } else { + let mut rng = thread_rng(); + let replacement_index = rng.gen_range(0..*total_rows) as usize; + if replacement_index < total_sample_size { + self.total_samples += 1; + samples[replacement_index] = T::to_owned_scalar(other); + } + } + Ok(()) + } + + fn add_batch( + &mut self, + other: T::Column, + validity: Option<&Bitmap>, + function_data: Option<&dyn FunctionData>, + ) -> Result<()> { + let column_len = T::column_len(&other); + let unset_bits = validity.map_or(0, |v| v.null_count()); + if unset_bits == column_len { + return Ok(()); + } + + let range_bound_data = unsafe { + function_data + .unwrap() + .as_any() + .downcast_ref_unchecked::() + }; + let sample_size = std::cmp::max( + (column_len - unset_bits) / 100, + range_bound_data.sample_size, + ); + + let mut indices = validity.map_or_else( + || (0..column_len).collect::>(), + |v| { + v.iter() + .enumerate() + .filter_map(|(i, v)| if v { Some(i) } else { None }) + .collect() + }, + ); + + let valid_size = indices.len(); + let sampled_indices = if valid_size > sample_size { + let mut rng = SmallRng::from_entropy(); + indices.shuffle(&mut rng); + &indices[..sample_size] + } else { + &indices + }; + + let sample_values = sampled_indices + .iter() + .map(|i| T::to_owned_scalar(unsafe { T::index_column_unchecked(&other, *i) })) + .collect::>(); + + self.total_rows += valid_size; + self.total_samples += sample_values.len(); + self.values.push((valid_size as u64, sample_values)); + Ok(()) + } + + fn merge(&mut self, rhs: &Self) -> Result<()> { + self.values.extend_from_slice(&rhs.values); + self.total_rows += rhs.total_rows; + self.total_samples += rhs.total_samples; + Ok(()) + } + + fn merge_result( + &mut self, + builder: &mut ArrayColumnBuilder, + function_data: Option<&dyn FunctionData>, + ) -> Result<()> { + let range_bound_data = unsafe { + function_data + .unwrap() + .as_any() + .downcast_ref_unchecked::() + }; + let step = self.total_rows as f64 / range_bound_data.partitions as f64; + + let values = std::mem::take(&mut self.values); + let mut data = Vec::with_capacity(self.total_samples); + let mut weights = Vec::with_capacity(self.total_samples); + for (num, values) in values.into_iter() { + let weight = num as f64 / values.len() as f64; + values.into_iter().for_each(|v| { + data.push(v); + weights.push(weight); + }); + } + let col = T::upcast_column(T::column_from_vec(data.clone(), &[])); + let indices = compare_columns(vec![col], self.total_samples)?; + + let mut cum_weight = 0.0; + let mut target = step; + let mut bounds = Vec::with_capacity(range_bound_data.partitions - 1); + let mut previous_bound = None; + + let mut i = 0; + let mut j = 0; + while i < self.total_samples && j < range_bound_data.partitions - 1 { + let idx = indices[i] as usize; + let weight = weights[idx]; + cum_weight += weight; + if cum_weight >= target { + let data = &data[idx]; + if previous_bound.as_ref().map_or(true, |prev| data > prev) { + bounds.push(data.clone()); + target += step; + j += 1; + previous_bound = Some(data.clone()); + } + } + i += 1; + } + + let col = T::column_from_vec(bounds, &[]); + builder.push(col); + Ok(()) + } +} + +pub fn try_create_aggregate_range_bound_function( + display_name: &str, + params: Vec, + arguments: Vec, +) -> Result { + assert_unary_arguments(display_name, arguments.len())?; + let data_type = arguments[0].clone().remove_nullable(); + let function_data = get_partitions(¶ms, display_name)?; + let return_type = DataType::Array(Box::new(data_type.clone())); + + with_simple_no_number_mapped_type!(|T| match data_type { + DataType::T => { + let func = AggregateUnaryFunction::, T, ArrayType>::try_create( + display_name, + return_type, + params, + arguments[0].clone(), + ) + .with_function_data(Box::new(function_data)) + .with_need_drop(true); + Ok(Arc::new(func)) + } + DataType::Number(num_type) => { + with_number_mapped_type!(|NUM| match num_type { + NumberDataType::NUM => { + let func = AggregateUnaryFunction::< + RangeBoundState>, + NumberType, + ArrayType>, + >::try_create( + display_name, return_type, params, arguments[0].clone() + ) + .with_function_data(Box::new(function_data)) + .with_need_drop(true); + Ok(Arc::new(func)) + } + }) + } + DataType::Decimal(DecimalDataType::Decimal128(_)) => { + let func = AggregateUnaryFunction::< + RangeBoundState>, + DecimalType, + ArrayType>, + >::try_create( + display_name, return_type, params, arguments[0].clone() + ) + .with_function_data(Box::new(function_data)) + .with_need_drop(true); + Ok(Arc::new(func)) + } + DataType::Decimal(DecimalDataType::Decimal256(_)) => { + let func = AggregateUnaryFunction::< + RangeBoundState>, + DecimalType, + ArrayType>, + >::try_create( + display_name, return_type, params, arguments[0].clone() + ) + .with_function_data(Box::new(function_data)) + .with_need_drop(true); + Ok(Arc::new(func)) + } + DataType::Binary => { + let func = AggregateUnaryFunction::< + RangeBoundState, + BinaryType, + ArrayType, + >::try_create( + display_name, return_type, params, arguments[0].clone() + ) + .with_function_data(Box::new(function_data)) + .with_need_drop(true); + Ok(Arc::new(func)) + } + _ => Err(ErrorCode::BadDataValueType(format!( + "{} does not support type '{:?}'", + display_name, data_type + ))), + }) +} +pub fn aggregate_range_bound_function_desc() -> AggregateFunctionDescription { + AggregateFunctionDescription::creator(Box::new( + crate::aggregates::try_create_aggregate_range_bound_function, + )) +} + +fn get_partitions(params: &[Scalar], display_name: &str) -> Result { + match params.len() { + 0 => Ok(RangeBoundData { + partitions: 1000, + sample_size: 100, + }), + 1 => { + let partitions = get_positive_integer(¶ms[0], display_name)?; + Ok(RangeBoundData { + partitions, + sample_size: 100, + }) + } + 2 => { + let partitions = get_positive_integer(¶ms[0], display_name)?; + let sample_size = get_positive_integer(¶ms[1], display_name)?; + Ok(RangeBoundData { + partitions, + sample_size, + }) + } + _ => Err(ErrorCode::BadArguments(format!( + "The number of arguments in aggregate function {} must be [0, 1, 2]", + display_name, + ))), + } +} + +fn get_positive_integer(val: &Scalar, display_name: &str) -> Result { + if let Scalar::Number(number) = val { + if let Some(number) = number.integer_to_i128() { + if number > 0 { + return Ok(number as usize); + } + } + } + Err(ErrorCode::BadDataValueType(format!( + "The argument of aggregate function {} must be positive int", + display_name + ))) +} diff --git a/src/query/functions/src/aggregates/aggregator.rs b/src/query/functions/src/aggregates/aggregator.rs index e463ae7f8168..5dd1c0229def 100644 --- a/src/query/functions/src/aggregates/aggregator.rs +++ b/src/query/functions/src/aggregates/aggregator.rs @@ -32,6 +32,7 @@ use super::aggregate_min_max_any::aggregate_any_function_desc; use super::aggregate_min_max_any::aggregate_max_function_desc; use super::aggregate_min_max_any::aggregate_min_function_desc; use super::aggregate_mode::aggregate_mode_function_desc; +use super::aggregate_range_bound_function_desc; use super::aggregate_stddev::aggregate_stddev_pop_function_desc; use super::aggregate_stddev::aggregate_stddev_samp_function_desc; use super::aggregate_window_funnel::aggregate_window_funnel_function_desc; @@ -118,6 +119,8 @@ impl Aggregators { factory.register("skewness", aggregate_skewness_function_desc()); factory.register("string_agg", aggregate_string_agg_function_desc()); + factory.register("range_bound", aggregate_range_bound_function_desc()); + factory.register( "bitmap_and_count", aggregate_bitmap_and_count_function_desc(), diff --git a/src/query/functions/src/aggregates/mod.rs b/src/query/functions/src/aggregates/mod.rs index 3092cd526068..8e3065b8d9cc 100644 --- a/src/query/functions/src/aggregates/mod.rs +++ b/src/query/functions/src/aggregates/mod.rs @@ -39,6 +39,7 @@ mod aggregate_quantile_cont; mod aggregate_quantile_disc; mod aggregate_quantile_tdigest; mod aggregate_quantile_tdigest_weighted; +mod aggregate_range_bound; mod aggregate_retention; mod aggregate_scalar_state; mod aggregate_skewness; @@ -71,6 +72,7 @@ pub use aggregate_quantile_cont::*; pub use aggregate_quantile_disc::*; pub use aggregate_quantile_tdigest::*; pub use aggregate_quantile_tdigest_weighted::*; +pub use aggregate_range_bound::*; pub use aggregate_retention::*; pub use aggregate_skewness::*; pub use aggregate_string_agg::*; diff --git a/src/query/functions/src/scalars/hilbert.rs b/src/query/functions/src/scalars/hilbert.rs index 67f426e6ffb1..b8a6e9a78f55 100644 --- a/src/query/functions/src/scalars/hilbert.rs +++ b/src/query/functions/src/scalars/hilbert.rs @@ -15,6 +15,8 @@ use databend_common_expression::hilbert_index; use databend_common_expression::types::ArrayType; use databend_common_expression::types::BinaryType; +use databend_common_expression::types::GenericType; +use databend_common_expression::types::NullableType; use databend_common_expression::types::NumberDataType; use databend_common_expression::types::NumberType; use databend_common_expression::types::StringType; @@ -56,12 +58,19 @@ pub fn register(registry: &mut FunctionRegistry) { }) } - registry.register_passthrough_nullable_2_arg::, NumberType, BinaryType, _, _>( + registry.register_combine_nullable_2_arg::>, NumberType, BinaryType, _, _>( "hilbert_index", |_, _, _| FunctionDomain::Full, - vectorize_with_builder_2_arg::, NumberType, BinaryType>( + vectorize_with_builder_2_arg::>, NumberType, NullableType>( |val, len, builder, ctx| { - let points = val.iter().collect::>(); + let mut points = Vec::with_capacity(val.len()); + for a in val.iter() { + if a.is_none() { + builder.push_null(); + return; + } + points.push(a.unwrap()); + } let dimension = points.len(); if std::intrinsics::unlikely(len > 64) { @@ -70,11 +79,28 @@ pub fn register(registry: &mut FunctionRegistry) { ctx.set_error(builder.len(), "Dimension must between 2 and 5"); } else { let slice = hilbert_index(&points, len as usize); - builder.put_slice(&slice); + builder.push(&slice); } - - builder.commit_row(); }, ), ); + + registry.register_passthrough_nullable_2_arg::, ArrayType>, NumberType, _, _>( + "range_partition_id", + |_, _, _| FunctionDomain::Full, + vectorize_with_builder_2_arg::, ArrayType>, NumberType>(|val, arr, builder, _| { + let mut low = 0; + let mut high = arr.len(); + while low < high { + let mid = (high + low) / 2; + let bound = unsafe {arr.index_unchecked(mid)}; + if val > bound { + low = mid + 1; + } else { + high = mid; + } + } + builder.push(low as u64); + }), + ); } diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index 3281a19ef742..e8a0c838a317 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -36,6 +36,7 @@ use databend_common_sql::binder::MutationType; use databend_common_sql::optimizer::get_udf_names; use databend_common_sql::plans::InsertInputSource; use databend_common_sql::plans::Mutation; +use databend_common_sql::plans::OptimizeClusterBy; use databend_common_sql::plans::OptimizeCompactBlock; use databend_common_sql::plans::PresignAction; use databend_common_sql::plans::Recluster; @@ -982,6 +983,10 @@ impl AccessChecker for PrivilegeAccess { let plan: OptimizeCompactBlock = s_expr.plan().clone().try_into()?; self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Super, false, false).await? }, + Plan::OptimizeClusterBy{ s_expr } => { + let plan: OptimizeClusterBy = s_expr.plan().clone().try_into()?; + self.validate_table_access(&plan.catalog_name, &plan.database_name, &plan.table_name, UserPrivilegeType::Super, false, false).await? + } Plan::VacuumTable(plan) => { self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Super, false, false).await? } diff --git a/src/query/service/src/interpreters/interpreter_factory.rs b/src/query/service/src/interpreters/interpreter_factory.rs index aecf00f6c941..3939a04a1f4a 100644 --- a/src/query/service/src/interpreters/interpreter_factory.rs +++ b/src/query/service/src/interpreters/interpreter_factory.rs @@ -273,6 +273,9 @@ impl InterpreterFactory { *need_purge, )?)) } + Plan::OptimizeClusterBy { s_expr } => Ok(Arc::new( + OptimizeClusterByInterpreter::try_create(ctx, *s_expr.clone())?, + )), Plan::VacuumTable(vacuum_table) => Ok(Arc::new(VacuumTableInterpreter::try_create( ctx, *vacuum_table.clone(), diff --git a/src/query/service/src/interpreters/interpreter_optimize_cluster_by.rs b/src/query/service/src/interpreters/interpreter_optimize_cluster_by.rs new file mode 100644 index 000000000000..6f1aca6e7ec1 --- /dev/null +++ b/src/query/service/src/interpreters/interpreter_optimize_cluster_by.rs @@ -0,0 +1,83 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_sql::executor::PhysicalPlanBuilder; +use databend_common_sql::optimizer::SExpr; +use databend_common_sql::plans::OptimizeClusterBy; + +use crate::interpreters::Interpreter; +use crate::pipelines::PipelineBuildResult; +use crate::schedulers::build_query_pipeline_without_render_result_set; +use crate::sessions::QueryContext; +use crate::sessions::TableContext; + +pub struct OptimizeClusterByInterpreter { + ctx: Arc, + s_expr: SExpr, +} + +impl OptimizeClusterByInterpreter { + pub fn try_create(ctx: Arc, s_expr: SExpr) -> Result { + Ok(OptimizeClusterByInterpreter { ctx, s_expr }) + } +} + +#[async_trait::async_trait] +impl Interpreter for OptimizeClusterByInterpreter { + fn name(&self) -> &str { + "OptimizeClusterByInterpreter" + } + + fn is_ddl(&self) -> bool { + true + } + + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + let OptimizeClusterBy { + catalog_name, + database_name, + table_name, + metadata, + bind_context, + } = self.s_expr.plan().clone().try_into()?; + + let table = self + .ctx + .get_table(&catalog_name, &database_name, &table_name) + .await?; + + let mut builder = PhysicalPlanBuilder::new(metadata.clone(), self.ctx.clone(), false); + let physical_plan = builder + .build(&self.s_expr, bind_context.column_set()) + .await?; + let mut build_res = + build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?; + + table.append_data(self.ctx.clone(), &mut build_res.main_pipeline)?; + table.commit_insertion( + self.ctx.clone(), + &mut build_res.main_pipeline, + None, + vec![], + true, + None, + unsafe { self.ctx.get_settings().get_deduplicate_label()? }, + )?; + return Ok(build_res); + } +} diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index 9362b7092121..9eaaab9e726d 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -63,6 +63,7 @@ mod interpreter_notification_alter; mod interpreter_notification_create; mod interpreter_notification_desc; mod interpreter_notification_drop; +mod interpreter_optimize_cluster_by; mod interpreter_optimize_compact_block; mod interpreter_optimize_compact_segment; mod interpreter_optimize_purge; @@ -179,6 +180,7 @@ pub use interpreter_network_policy_alter::AlterNetworkPolicyInterpreter; pub use interpreter_network_policy_create::CreateNetworkPolicyInterpreter; pub use interpreter_network_policy_desc::DescNetworkPolicyInterpreter; pub use interpreter_network_policy_drop::DropNetworkPolicyInterpreter; +pub use interpreter_optimize_cluster_by::OptimizeClusterByInterpreter; pub use interpreter_optimize_compact_block::OptimizeCompactBlockInterpreter; pub use interpreter_optimize_compact_segment::OptimizeCompactSegmentInterpreter; pub use interpreter_optimize_purge::OptimizePurgeInterpreter; diff --git a/src/query/service/src/sessions/queue_mgr.rs b/src/query/service/src/sessions/queue_mgr.rs index 231e05aff5b3..e1ffd4ef6704 100644 --- a/src/query/service/src/sessions/queue_mgr.rs +++ b/src/query/service/src/sessions/queue_mgr.rs @@ -408,6 +408,7 @@ impl QueryEntry { Plan::OptimizePurge(_) | Plan::OptimizeCompactSegment(_) | Plan::OptimizeCompactBlock { .. } + | Plan::OptimizeClusterBy { .. } | Plan::VacuumTable(_) | Plan::VacuumTemporaryFiles(_) | Plan::RefreshIndex(_) diff --git a/src/query/sql/src/executor/physical_plan_builder.rs b/src/query/sql/src/executor/physical_plan_builder.rs index e386d1d4b5d3..a8c1cdcf3493 100644 --- a/src/query/sql/src/executor/physical_plan_builder.rs +++ b/src/query/sql/src/executor/physical_plan_builder.rs @@ -142,6 +142,10 @@ impl PhysicalPlanBuilder { } RelOperator::Recluster(recluster) => self.build_recluster(recluster).await, RelOperator::CompactBlock(compact) => self.build_compact_block(compact).await, + RelOperator::OptimizeClusterBy(optimize) => { + self.build_optimize_cluster_by(s_expr, optimize, required) + .await + } } } diff --git a/src/query/sql/src/executor/physical_plans/mod.rs b/src/query/sql/src/executor/physical_plans/mod.rs index 957443396364..7f39474d8286 100644 --- a/src/query/sql/src/executor/physical_plans/mod.rs +++ b/src/query/sql/src/executor/physical_plans/mod.rs @@ -43,6 +43,7 @@ mod physical_mutation_into_organize; mod physical_mutation_into_split; mod physical_mutation_manipulate; mod physical_mutation_source; +mod physical_optimize_cluster_by; mod physical_project_set; mod physical_r_cte_scan; mod physical_range_join; diff --git a/src/query/sql/src/executor/physical_plans/physical_optimize_cluster_by.rs b/src/query/sql/src/executor/physical_plans/physical_optimize_cluster_by.rs new file mode 100644 index 000000000000..ed5f985352a2 --- /dev/null +++ b/src/query/sql/src/executor/physical_plans/physical_optimize_cluster_by.rs @@ -0,0 +1,33 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; + +use crate::executor::PhysicalPlan; +use crate::executor::PhysicalPlanBuilder; +use crate::optimizer::ColumnSet; +use crate::optimizer::SExpr; + +impl PhysicalPlanBuilder { + pub async fn build_optimize_cluster_by( + &mut self, + s_expr: &SExpr, + _optimize: &crate::plans::OptimizeClusterBy, + required: ColumnSet, + ) -> Result { + let mut plan = self.build(s_expr.child(0)?, required).await?; + plan.adjust_plan_id(&mut 0); + Ok(plan) + } +} diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 656b25e74e68..50947aa01c03 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -32,6 +32,7 @@ use databend_common_ast::ast::DescribeTableStmt; use databend_common_ast::ast::DropTableStmt; use databend_common_ast::ast::Engine; use databend_common_ast::ast::ExistsTableStmt; +use databend_common_ast::ast::Expr; use databend_common_ast::ast::Identifier; use databend_common_ast::ast::InvertedIndexDefinition; use databend_common_ast::ast::ModifyColumnAction; @@ -122,6 +123,7 @@ use crate::plans::ExistsTablePlan; use crate::plans::ModifyColumnAction as ModifyColumnActionInPlan; use crate::plans::ModifyTableColumnPlan; use crate::plans::ModifyTableCommentPlan; +use crate::plans::OptimizeClusterBy; use crate::plans::OptimizeCompactBlock; use crate::plans::OptimizeCompactSegmentPlan; use crate::plans::OptimizePurgePlan; @@ -1214,11 +1216,102 @@ impl Binder { })) } }, + AstOptimizeTableAction::ClusterBy { exprs } => { + self.bind_optimize_cluster_by(bind_context, exprs, catalog, database, table) + .await? + } }; Ok(plan) } + #[async_backtrace::framed] + pub(in crate::planner::binder) async fn bind_optimize_cluster_by( + &mut self, + bind_context: &mut BindContext, + exprs: &[Expr], + catalog_name: String, + database_name: String, + table_name: String, + ) -> Result { + let schema = self + .ctx + .get_table(&catalog_name, &database_name, &table_name) + .await? + .schema(); + let cluster_key_strs = self + .analyze_cluster_keys( + &ClusterOption { + cluster_type: ClusterType::Hilbert, + cluster_exprs: exprs.to_vec(), + }, + schema, + ) + .await?; + + let keys_bounds_str = cluster_key_strs + .iter() + .map(|s| format!("range_bound(1000)({}) AS {}_bound", s, s)) + .collect::>() + .join(", "); + let hilbert_keys_str = cluster_key_strs + .iter() + .map(|s| { + format!( + "hilbert_key(to_uint16(range_partition_id({table_name}.{s}, _keys_bound.{s}_bound)))" + ) + }) + .collect::>() + .join(", "); + let query = format!( + "WITH _keys_bound AS MATERIALIZED ( \ + SELECT \ + {keys_bounds_str} \ + FROM {database_name}.{table_name} \ + ), \ + _source_data AS ( \ + SELECT \ + {table_name}.*, \ + hilbert_index([{hilbert_keys_str}], 2) AS _hilbert_index \ + FROM {database_name}.{table_name}, _keys_bound \ + ) \ + SELECT \ + * EXCLUDE(_hilbert_index) \ + FROM \ + _source_data \ + ORDER BY \ + _hilbert_index \ + " + ); + + let tokens = tokenize_sql(query.as_str())?; + let (stmt, _) = parse_sql(&tokens, self.dialect)?; + let Statement::Query(query) = &stmt else { + unreachable!() + }; + let mut new_bind_context = BindContext::new(); + let (s_expr, new_bind_context) = self.bind_query(&mut new_bind_context, query)?; + // Wrap `LogicalMaterializedCte` to `s_expr` + let s_expr = new_bind_context.cte_context.wrap_m_cte(s_expr); + let new_bind_context = Box::new(new_bind_context); + bind_context.parent = Some(new_bind_context.clone()); + let cluster_by = OptimizeClusterBy { + catalog_name, + database_name, + table_name, + metadata: self.metadata.clone(), + bind_context: new_bind_context, + }; + let s_expr = SExpr::create_unary( + Arc::new(RelOperator::OptimizeClusterBy(cluster_by)), + Arc::new(s_expr), + ); + + Ok(Plan::OptimizeClusterBy { + s_expr: Box::new(s_expr), + }) + } + #[async_backtrace::framed] pub(in crate::planner::binder) async fn bind_vacuum_table( &mut self, @@ -1656,7 +1749,7 @@ impl Binder { // cluster keys cannot be a udf expression. scalar_binder.forbid_udf(); - let mut cluster_keys = Vec::with_capacity(expr_len); + let mut cluster_key_strs = Vec::with_capacity(expr_len); for cluster_expr in cluster_exprs.iter() { let (cluster_key, _) = scalar_binder.bind(cluster_expr)?; if cluster_key.used_columns().len() != 1 || !cluster_key.evaluable() { @@ -1687,10 +1780,10 @@ impl Binder { ctx: &self.name_resolution_ctx, }; cluster_expr.drive_mut(&mut normalizer); - cluster_keys.push(format!("{:#}", &cluster_expr)); + cluster_key_strs.push(format!("{:#}", &cluster_expr)); } - Ok(cluster_keys) + Ok(cluster_key_strs) } fn valid_cluster_key_type(data_type: &DataType) -> bool { diff --git a/src/query/sql/src/planner/binder/util.rs b/src/query/sql/src/planner/binder/util.rs index f082224649b4..9a372b2d3b2d 100644 --- a/src/query/sql/src/planner/binder/util.rs +++ b/src/query/sql/src/planner/binder/util.rs @@ -88,7 +88,8 @@ impl Binder { | RelOperator::Mutation(_) | RelOperator::Recluster(_) | RelOperator::MutationSource(_) - | RelOperator::CompactBlock(_) => { + | RelOperator::CompactBlock(_) + | RelOperator::OptimizeClusterBy(_) => { return Err(ErrorCode::SyntaxException(format!( "{:?} is not allowed in recursive cte", expr.plan().rel_op() diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index 31cc20f78735..f06cef854976 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -73,15 +73,18 @@ impl Plan { Plan::DropTableClusterKey(_) => Ok("DropTableClusterKey".to_string()), Plan::ReclusterTable { .. } => Ok("ReclusterTable".to_string()), Plan::TruncateTable(_) => Ok("TruncateTable".to_string()), - Plan::OptimizePurge(_) => Ok("OptimizePurge".to_string()), - Plan::OptimizeCompactSegment(_) => Ok("OptimizeCompactSegment".to_string()), - Plan::OptimizeCompactBlock { .. } => Ok("OptimizeCompactBlock".to_string()), Plan::VacuumTable(_) => Ok("VacuumTable".to_string()), Plan::VacuumDropTable(_) => Ok("VacuumDropTable".to_string()), Plan::VacuumTemporaryFiles(_) => Ok("VacuumTemporaryFiles".to_string()), Plan::AnalyzeTable(_) => Ok("AnalyzeTable".to_string()), Plan::ExistsTable(_) => Ok("ExistsTable".to_string()), + // Optimize + Plan::OptimizePurge(_) => Ok("OptimizePurge".to_string()), + Plan::OptimizeCompactSegment(_) => Ok("OptimizeCompactSegment".to_string()), + Plan::OptimizeCompactBlock { .. } => Ok("OptimizeCompactBlock".to_string()), + Plan::OptimizeClusterBy { .. } => Ok("OptimizeClusterBy".to_string()), + // Views Plan::CreateView(_) => Ok("CreateView".to_string()), Plan::AlterView(_) => Ok("AlterView".to_string()), diff --git a/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs b/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs index 5a2b34ceec35..1db443c9c73b 100644 --- a/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs +++ b/src/query/sql/src/planner/optimizer/decorrelate/subquery_rewriter.rs @@ -200,6 +200,7 @@ impl SubqueryRewriter { | RelOperator::Mutation(_) | RelOperator::MutationSource(_) | RelOperator::Recluster(_) + | RelOperator::OptimizeClusterBy(_) | RelOperator::CompactBlock(_) => Ok(s_expr.clone()), } } diff --git a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs index a2363d69029b..bd1deef2843f 100644 --- a/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs +++ b/src/query/sql/src/planner/optimizer/dynamic_sample/dynamic_sample.rs @@ -93,6 +93,7 @@ pub async fn dynamic_sample( | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::Recluster(_) + | RelOperator::OptimizeClusterBy(_) | RelOperator::CompactBlock(_) | RelOperator::MutationSource(_) => { s_expr.plan().derive_stats(&RelExpr::with_s_expr(s_expr)) diff --git a/src/query/sql/src/planner/optimizer/format.rs b/src/query/sql/src/planner/optimizer/format.rs index b72f217ae4cb..9e9b63fb60fa 100644 --- a/src/query/sql/src/planner/optimizer/format.rs +++ b/src/query/sql/src/planner/optimizer/format.rs @@ -76,6 +76,7 @@ pub fn display_rel_op(rel_op: &RelOperator) -> String { RelOperator::Mutation(_) => "MergeInto".to_string(), RelOperator::MutationSource(_) => "MutationSource".to_string(), RelOperator::Recluster(_) => "Recluster".to_string(), + RelOperator::OptimizeClusterBy(_) => "OptimizeClusterBy".to_string(), RelOperator::CompactBlock(_) => "CompactBlock".to_string(), } } diff --git a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs index bd968b999508..534a4f36d7d8 100644 --- a/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs +++ b/src/query/sql/src/planner/optimizer/hyper_dp/dphyp.rs @@ -299,6 +299,7 @@ impl DPhpy { | RelOperator::Mutation(_) | RelOperator::MutationSource(_) | RelOperator::Recluster(_) + | RelOperator::OptimizeClusterBy(_) | RelOperator::CompactBlock(_) => Ok((Arc::new(s_expr.clone()), true)), } } diff --git a/src/query/sql/src/planner/optimizer/optimizer.rs b/src/query/sql/src/planner/optimizer/optimizer.rs index de1ea299a3fd..1177ba4582e0 100644 --- a/src/query/sql/src/planner/optimizer/optimizer.rs +++ b/src/query/sql/src/planner/optimizer/optimizer.rs @@ -338,6 +338,12 @@ pub async fn optimize(mut opt_ctx: OptimizerContext, plan: Plan) -> Result } Ok(Plan::Replace(plan)) } + Plan::OptimizeClusterBy { s_expr } => { + let input_s_expr = optimize_query(&mut opt_ctx, s_expr.child(0)?.clone()).await?; + Ok(Plan::OptimizeClusterBy { + s_expr: Box::new(s_expr.replace_children(vec![Arc::new(input_s_expr)])), + }) + } Plan::CreateTable(mut plan) => { if let Some(p) = &plan.as_select { @@ -507,7 +513,7 @@ async fn optimize_mutation(mut opt_ctx: OptimizerContext, s_expr: SExpr) -> Resu if let &RelOperator::Exchange(_) = input_s_expr.plan() { input_s_expr = input_s_expr.child(0)?.clone(); } - // If there still exists a Exchange::Merge operator, we should disable distributed optimization and + // If there still exists an Exchange::Merge operator, we should disable distributed optimization and // optimize the input plan again. if input_s_expr.has_merge_exchange() { opt_ctx = opt_ctx.with_enable_distributed_optimization(false); diff --git a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_semi_to_inner_join.rs b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_semi_to_inner_join.rs index 29dfe4872f60..bc0595b7a724 100644 --- a/src/query/sql/src/planner/optimizer/rule/rewrite/rule_semi_to_inner_join.rs +++ b/src/query/sql/src/planner/optimizer/rule/rewrite/rule_semi_to_inner_join.rs @@ -142,6 +142,7 @@ fn find_group_by_keys(child: &SExpr, group_by_keys: &mut HashSet) -> | RelOperator::Mutation(_) | RelOperator::MutationSource(_) | RelOperator::Recluster(_) + | RelOperator::OptimizeClusterBy(_) | RelOperator::CompactBlock(_) => {} } Ok(()) diff --git a/src/query/sql/src/planner/optimizer/s_expr.rs b/src/query/sql/src/planner/optimizer/s_expr.rs index d56f921e5053..d74bfacebe0f 100644 --- a/src/query/sql/src/planner/optimizer/s_expr.rs +++ b/src/query/sql/src/planner/optimizer/s_expr.rs @@ -340,6 +340,7 @@ impl SExpr { | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::Recluster(_) + | RelOperator::OptimizeClusterBy(_) | RelOperator::CompactBlock(_) => {} }; for child in &self.children { @@ -441,6 +442,7 @@ fn find_subquery(rel_op: &RelOperator) -> bool { | RelOperator::RecursiveCteScan(_) | RelOperator::Mutation(_) | RelOperator::Recluster(_) + | RelOperator::OptimizeClusterBy(_) | RelOperator::CompactBlock(_) => false, RelOperator::Join(op) => { op.equi_conditions.iter().any(|condition| { diff --git a/src/query/sql/src/planner/plans/operator.rs b/src/query/sql/src/planner/plans/operator.rs index d7f6b20c5be5..cf6106e2d3e5 100644 --- a/src/query/sql/src/planner/plans/operator.rs +++ b/src/query/sql/src/planner/plans/operator.rs @@ -39,6 +39,7 @@ use crate::plans::Filter; use crate::plans::Join; use crate::plans::Limit; use crate::plans::Mutation; +use crate::plans::OptimizeClusterBy; use crate::plans::OptimizeCompactBlock; use crate::plans::ProjectSet; use crate::plans::Recluster; @@ -119,6 +120,7 @@ pub enum RelOp { RecursiveCteScan, MergeInto, Recluster, + OptimizeClusterBy, CompactBlock, MutationSource, @@ -150,9 +152,10 @@ pub enum RelOperator { RecursiveCteScan(RecursiveCteScan), AsyncFunction(AsyncFunction), Mutation(Mutation), + MutationSource(MutationSource), Recluster(Recluster), + OptimizeClusterBy(OptimizeClusterBy), CompactBlock(OptimizeCompactBlock), - MutationSource(MutationSource), } impl Operator for RelOperator { @@ -180,6 +183,7 @@ impl Operator for RelOperator { RelOperator::AsyncFunction(rel_op) => rel_op.rel_op(), RelOperator::Mutation(rel_op) => rel_op.rel_op(), RelOperator::Recluster(rel_op) => rel_op.rel_op(), + RelOperator::OptimizeClusterBy(rel_op) => rel_op.rel_op(), RelOperator::CompactBlock(rel_op) => rel_op.rel_op(), RelOperator::MutationSource(rel_op) => rel_op.rel_op(), } @@ -209,6 +213,7 @@ impl Operator for RelOperator { RelOperator::AsyncFunction(rel_op) => rel_op.arity(), RelOperator::Mutation(rel_op) => rel_op.arity(), RelOperator::Recluster(rel_op) => rel_op.arity(), + RelOperator::OptimizeClusterBy(rel_op) => rel_op.arity(), RelOperator::CompactBlock(rel_op) => rel_op.arity(), RelOperator::MutationSource(rel_op) => rel_op.arity(), } @@ -238,6 +243,7 @@ impl Operator for RelOperator { RelOperator::AsyncFunction(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::Mutation(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::Recluster(rel_op) => rel_op.derive_relational_prop(rel_expr), + RelOperator::OptimizeClusterBy(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::CompactBlock(rel_op) => rel_op.derive_relational_prop(rel_expr), RelOperator::MutationSource(rel_op) => rel_op.derive_relational_prop(rel_expr), } @@ -267,6 +273,7 @@ impl Operator for RelOperator { RelOperator::AsyncFunction(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::Mutation(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::Recluster(rel_op) => rel_op.derive_physical_prop(rel_expr), + RelOperator::OptimizeClusterBy(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::CompactBlock(rel_op) => rel_op.derive_physical_prop(rel_expr), RelOperator::MutationSource(rel_op) => rel_op.derive_physical_prop(rel_expr), } @@ -296,6 +303,7 @@ impl Operator for RelOperator { RelOperator::AsyncFunction(rel_op) => rel_op.derive_stats(rel_expr), RelOperator::Mutation(rel_op) => rel_op.derive_stats(rel_expr), RelOperator::Recluster(rel_op) => rel_op.derive_stats(rel_expr), + RelOperator::OptimizeClusterBy(rel_op) => rel_op.derive_stats(rel_expr), RelOperator::CompactBlock(rel_op) => rel_op.derive_stats(rel_expr), RelOperator::MutationSource(rel_op) => rel_op.derive_stats(rel_expr), } @@ -375,6 +383,9 @@ impl Operator for RelOperator { RelOperator::Recluster(rel_op) => { rel_op.compute_required_prop_child(ctx, rel_expr, child_index, required) } + RelOperator::OptimizeClusterBy(rel_op) => { + rel_op.compute_required_prop_child(ctx, rel_expr, child_index, required) + } RelOperator::CompactBlock(rel_op) => { rel_op.compute_required_prop_child(ctx, rel_expr, child_index, required) } @@ -457,6 +468,9 @@ impl Operator for RelOperator { RelOperator::Recluster(rel_op) => { rel_op.compute_required_prop_children(ctx, rel_expr, required) } + RelOperator::OptimizeClusterBy(rel_op) => { + rel_op.compute_required_prop_children(ctx, rel_expr, required) + } RelOperator::CompactBlock(rel_op) => { rel_op.compute_required_prop_children(ctx, rel_expr, required) } @@ -874,6 +888,26 @@ impl TryFrom for Recluster { } } +impl From for RelOperator { + fn from(v: OptimizeClusterBy) -> Self { + Self::OptimizeClusterBy(v) + } +} + +impl TryFrom for OptimizeClusterBy { + type Error = ErrorCode; + fn try_from(value: RelOperator) -> Result { + if let RelOperator::OptimizeClusterBy(value) = value { + Ok(value) + } else { + Err(ErrorCode::Internal(format!( + "Cannot downcast {:?} to OptimizeClusterBy", + value.rel_op() + ))) + } + } +} + impl From for RelOperator { fn from(v: OptimizeCompactBlock) -> Self { Self::CompactBlock(v) diff --git a/src/query/sql/src/planner/plans/optimize.rs b/src/query/sql/src/planner/plans/optimize.rs index b71c1f2c653c..1330e49296a1 100644 --- a/src/query/sql/src/planner/plans/optimize.rs +++ b/src/query/sql/src/planner/plans/optimize.rs @@ -17,6 +17,8 @@ use databend_common_catalog::table::NavigationPoint; use crate::plans::Operator; use crate::plans::RelOp; +use crate::BindContext; +use crate::MetadataRef; #[derive(Clone, Debug)] pub struct OptimizePurgePlan { @@ -48,3 +50,45 @@ impl Operator for OptimizeCompactBlock { RelOp::CompactBlock } } + +#[derive(Clone)] +pub struct OptimizeClusterBy { + pub catalog_name: String, + pub database_name: String, + pub table_name: String, + pub metadata: MetadataRef, + pub bind_context: Box, +} + +impl std::fmt::Debug for OptimizeClusterBy { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("OptimizeClusterBy") + .field("catalog", &self.catalog_name) + .field("database", &self.database_name) + .field("table", &self.table_name) + .finish() + } +} + +impl Eq for OptimizeClusterBy {} + +impl PartialEq for OptimizeClusterBy { + fn eq(&self, other: &Self) -> bool { + self.catalog_name == other.catalog_name + && self.database_name == other.database_name + && self.table_name == other.table_name + } +} + +impl std::hash::Hash for OptimizeClusterBy { + fn hash(&self, state: &mut H) { + self.database_name.hash(state); + self.table_name.hash(state); + } +} + +impl Operator for OptimizeClusterBy { + fn rel_op(&self) -> RelOp { + RelOp::OptimizeClusterBy + } +} diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index da139fa99e92..c5388c23cdc3 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -232,6 +232,9 @@ pub enum Plan { s_expr: Box, need_purge: bool, }, + OptimizeClusterBy { + s_expr: Box, + }, // Insert Insert(Box), @@ -431,7 +434,8 @@ impl Plan { | Plan::DataMutation { .. } | Plan::OptimizePurge(_) | Plan::OptimizeCompactSegment(_) - | Plan::OptimizeCompactBlock { .. } => QueryKind::Update, + | Plan::OptimizeCompactBlock { .. } + | Plan::OptimizeClusterBy { .. } => QueryKind::Update, _ => QueryKind::Other, } }