Skip to content

Commit

Permalink
chore(cubestore): Upgrade DF: fix join requirement extraction and Pla…
Browse files Browse the repository at this point in the history
…nProperties for ClusterSend
  • Loading branch information
paveltiunov committed Nov 30, 2024
1 parent 6316cfd commit 3793971
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ pub fn push_aggregate_to_workers(
// Router plan, replace partial aggregate with cluster send.
Ok(Arc::new(
cs.with_changed_schema(
agg.schema().clone(),
p.clone()
.with_new_children(vec![cs.input_for_optimizations.clone()])?,
),
Expand All @@ -43,7 +42,6 @@ pub fn push_aggregate_to_workers(
// Worker plan, execute partial aggregate inside the worker.
Ok(Arc::new(WorkerExec {
input: p.clone().with_new_children(vec![w.input.clone()])?,
schema: agg.schema().clone(),
max_batch_rows: w.max_batch_rows,
limit_and_reverse: w.limit_and_reverse.clone(),
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,23 @@ pub fn rewrite_plan_impl<'a, R: PlanRewriter>(
let updated_ctx = f.enter_node(&p, ctx);
let ctx = updated_ctx.as_ref().unwrap_or(ctx);

p.map_children(|c| rewrite_plan_impl(c, ctx, f))?
.transform_parent(|n| f.rewrite(n, ctx).map(|new| Transformed::yes(new)))
let join_context = match &p {
LogicalPlan::Join(Join { left, right, .. }) => vec![
(left.clone(), f.enter_join_left(&p, ctx)),
(right.clone(), f.enter_join_right(&p, ctx)),
],
_ => Vec::new(),
};

p.map_children(|c| {
let next_ctx = join_context
.iter()
.find(|(n, _)| n.as_ref() == &c)
.and_then(|(_, join_ctx)| join_ctx.as_ref())
.unwrap_or(ctx);
rewrite_plan_impl(c, next_ctx, f)
})?
.transform_parent(|n| f.rewrite(n, ctx).map(|new| Transformed::yes(new)))

// // First, update children.
// let updated = match p {
Expand Down
1 change: 0 additions & 1 deletion rust/cubestore/cubestore/src/queryplanner/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ impl ExecutionPlan for PanicWorkerExec {
pub fn plan_panic_worker() -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
Ok(Arc::new(WorkerExec {
input: Arc::new(PanicWorkerExec::new()),
schema: Arc::new(Schema::empty()),
max_batch_rows: 1,
limit_and_reverse: None,
}))
Expand Down
19 changes: 4 additions & 15 deletions rust/cubestore/cubestore/src/queryplanner/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ impl PlanRewriter for CollectConstraints {
}
join_on
.iter()
.map(|(l, _)| match l {
.map(|(_, r)| match r {
Expr::Column(c) => Some(c.name.to_string()),
_ => None,
})
Expand Down Expand Up @@ -1593,7 +1593,6 @@ impl ExtensionPlanner for CubeExtensionPlanner {
Ok(Some(self.plan_cluster_send(
input.clone(),
&cs.snapshots,
input.schema(),
false,
usize::MAX,
cs.limit_and_reverse.clone(),
Expand All @@ -1617,18 +1616,16 @@ impl CubeExtensionPlanner {
&self,
mut input: Arc<dyn ExecutionPlan>,
snapshots: &Vec<Snapshots>,
schema: SchemaRef,
use_streaming: bool,
max_batch_rows: usize,
limit_and_reverse: Option<(usize, bool)>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
if snapshots.is_empty() {
return Ok(Arc::new(EmptyExec::new(schema)));
return Ok(Arc::new(EmptyExec::new(input.schema())));
}
// Note that MergeExecs are added automatically when needed.
if let Some(c) = self.cluster.as_ref() {
Ok(Arc::new(ClusterSendExec::new(
schema,
c.clone(),
self.serialized_plan.clone(),
snapshots,
Expand All @@ -1638,7 +1635,6 @@ impl CubeExtensionPlanner {
} else {
Ok(Arc::new(WorkerExec {
input,
schema,
max_batch_rows,
limit_and_reverse,
}))
Expand All @@ -1651,9 +1647,6 @@ impl CubeExtensionPlanner {
#[derive(Debug)]
pub struct WorkerExec {
pub input: Arc<dyn ExecutionPlan>,
// TODO: remove and use `self.input.schema()`
// This is a hacky workaround for wrong schema of joins after projection pushdown.
pub schema: SchemaRef,
pub max_batch_rows: usize,
pub limit_and_reverse: Option<(usize, bool)>,
}
Expand All @@ -1670,10 +1663,6 @@ impl ExecutionPlan for WorkerExec {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
Expand All @@ -1683,9 +1672,9 @@ impl ExecutionPlan for WorkerExec {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
assert_eq!(children.len(), 1);
let input = children.into_iter().next().unwrap();
Ok(Arc::new(WorkerExec {
input: children.into_iter().next().unwrap(),
schema: self.schema.clone(),
input,
max_batch_rows: self.max_batch_rows,
limit_and_reverse: self.limit_and_reverse.clone(),
}))
Expand Down
39 changes: 23 additions & 16 deletions rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use std::sync::Arc;

use crate::queryplanner::check_memory::CheckMemoryExec;
use crate::queryplanner::filter_by_key_range::FilterByKeyRangeExec;
use crate::queryplanner::merge_sort::LastRowByUniqueKeyExec;
use crate::queryplanner::panic::{PanicWorkerExec, PanicWorkerNode};
use crate::queryplanner::planning::{ClusterSendNode, Snapshot, WorkerExec};
use crate::queryplanner::providers::InfoSchemaQueryCacheTableProvider;
use crate::queryplanner::query_executor::{
ClusterSendExec, CubeTable, CubeTableExec, InlineTableProvider,
};
Expand All @@ -31,13 +33,13 @@ use crate::queryplanner::trace_data_loaded::TraceDataLoadedExec;
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider};
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::joins::{HashJoinExec, SortMergeJoinExec};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::union::UnionExec;
use crate::queryplanner::providers::InfoSchemaQueryCacheTableProvider;

#[derive(Default, Clone, Copy)]
pub struct PPOptions {
Expand Down Expand Up @@ -306,7 +308,10 @@ fn pp_source(t: Arc<dyn TableProvider>) -> String {
format!("InlineTableProvider(data: {} rows)", t.get_data().len())
} else if let Some(t) = t.as_any().downcast_ref::<InfoSchemaTableProvider>() {
format!("InfoSchemaTableProvider(table: {:?})", t.table)
} else if let Some(_) = t.as_any().downcast_ref::<InfoSchemaQueryCacheTableProvider>() {
} else if let Some(_) = t
.as_any()
.downcast_ref::<InfoSchemaQueryCacheTableProvider>()
{
"InfoSchemaQueryCacheTableProvider".to_string()
} else {
panic!("unknown table provider");
Expand Down Expand Up @@ -400,7 +405,7 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
AggregateMode::Single => "Single",
AggregateMode::SinglePartitioned => "SinglePartitioned",
};
*out += &format!("{}{}Aggregate", mode, strat);
*out += &format!("{}{}Aggregate", strat, mode);
if o.show_aggregations {
*out += &format!(", aggs: {:?}", agg.aggr_expr())
}
Expand Down Expand Up @@ -484,18 +489,17 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
// TODO upgrade DF
// } else if let Some(_) = a.downcast_ref::<MergeExec>() {
// *out += "Merge";
// } else if let Some(_) = a.downcast_ref::<MergeSortExec>() {
// *out += "MergeSort";
} else if let Some(_) = a.downcast_ref::<SortPreservingMergeExec>() {
*out += "MergeSort";
// } else if let Some(_) = a.downcast_ref::<MergeReSortExec>() {
// *out += "MergeResort";
// } else if let Some(j) = a.downcast_ref::<MergeJoinExec>() {
// *out += &format!(
// "MergeJoin, on: [{}]",
// j.join_on()
// .iter()
// .map(|(l, r)| format!("{} = {}", l, r))
// .join(", ")
// );
} else if let Some(j) = a.downcast_ref::<SortMergeJoinExec>() {
*out += &format!(
"MergeJoin, on: [{}]",
j.on.iter()
.map(|(l, r)| format!("{} = {}", l, r))
.join(", ")
);
// } else if let Some(j) = a.downcast_ref::<CrossJoinExec>() {
// *out += &format!("CrossJoin, on: {}", j.on)
// } else if let Some(j) = a.downcast_ref::<CrossJoinAggExec>() {
Expand All @@ -522,8 +526,8 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
// *out += "SkipRows";
// } else if let Some(_) = a.downcast_ref::<RollingWindowAggExec>() {
// *out += "RollingWindowAgg";
// } else if let Some(_) = a.downcast_ref::<LastRowByUniqueKeyExec>() {
// *out += "LastRowByUniqueKey";
} else if let Some(_) = a.downcast_ref::<LastRowByUniqueKeyExec>() {
*out += "LastRowByUniqueKey";
} else if let Some(_) = a.downcast_ref::<MemoryExec>() {
*out += "MemoryScan";
} else if let Some(r) = a.downcast_ref::<RepartitionExec>() {
Expand All @@ -533,6 +537,9 @@ fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, ou
*out += &to_string.split(" ").next().unwrap_or(&to_string);
}

// TODO upgrade DF - remove
// *out += &format!(", schema: {}", p.schema());

// TODO upgrade DF
// if o.show_output_hints {
// let hints = p.output_hints();
Expand Down
75 changes: 40 additions & 35 deletions rust/cubestore/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,9 @@ impl QueryExecutorImpl {
0,
Arc::new(PreOptimizeRule::new(self.memory_handler.clone(), None)),
);
let config = Self::session_config();
let session_state = SessionStateBuilder::new()
.with_config(
SessionConfig::new()
.with_batch_size(4096)
// TODO upgrade DF fails if bigger than 1
.with_target_partitions(1),
)
.with_config(config)
.with_runtime_env(runtime)
.with_default_features()
.with_query_planner(Arc::new(CubeQueryPlanner::new_on_router(
Expand Down Expand Up @@ -394,13 +390,9 @@ impl QueryExecutorImpl {
data_loaded_size.clone(),
)),
);
let config = Self::session_config();
let session_state = SessionStateBuilder::new()
.with_config(
SessionConfig::new()
.with_batch_size(4096)
// TODO upgrade DF fails if bigger than 1
.with_target_partitions(1),
)
.with_config(config)
.with_runtime_env(runtime)
.with_default_features()
.with_query_planner(Arc::new(CubeQueryPlanner::new_on_worker(
Expand All @@ -413,6 +405,16 @@ impl QueryExecutorImpl {
let ctx = SessionContext::new_with_state(session_state);
Ok(Arc::new(ctx))
}

fn session_config() -> SessionConfig {
let mut config = SessionConfig::new()
.with_batch_size(4096)
// TODO upgrade DF if less than 2 then there will be no MergeJoin. Decide on repartitioning.
.with_target_partitions(2)
.with_prefer_existing_sort(true);
config.options_mut().optimizer.prefer_hash_join = false;
config
}
}

#[derive(Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -1144,7 +1146,6 @@ impl Debug for InlineTableProvider {
}

pub struct ClusterSendExec {
schema: SchemaRef,
properties: PlanProperties,
pub partitions: Vec<(
/*node*/ String,
Expand All @@ -1171,7 +1172,6 @@ pub enum InlineCompoundPartition {

impl ClusterSendExec {
pub fn new(
schema: SchemaRef,
cluster: Arc<dyn Cluster>,
serialized_plan: Arc<SerializedPlan>,
union_snapshots: &[Snapshots],
Expand All @@ -1183,13 +1183,10 @@ impl ClusterSendExec {
union_snapshots,
&serialized_plan.planning_meta().multi_part_subtree,
)?;
let eq_properties = EquivalenceProperties::new(schema.clone());
Ok(Self {
schema,
properties: PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(partitions.len()),
ExecutionMode::Bounded,
properties: Self::compute_properties(
input_for_optimizations.properties(),
partitions.len(),
),
partitions,
cluster,
Expand All @@ -1199,6 +1196,17 @@ impl ClusterSendExec {
})
}

fn compute_properties(
input_properties: &PlanProperties,
partitions_num: usize,
) -> PlanProperties {
PlanProperties::new(
input_properties.eq_properties.clone(),
Partitioning::UnknownPartitioning(partitions_num),
input_properties.execution_mode.clone(),
)
}

pub(crate) fn distribute_to_workers(
config: &dyn ConfigObj,
snapshots: &[Snapshots],
Expand Down Expand Up @@ -1406,14 +1414,12 @@ impl ClusterSendExec {
r
}

pub fn with_changed_schema(
&self,
schema: SchemaRef,
input_for_optimizations: Arc<dyn ExecutionPlan>,
) -> Self {
pub fn with_changed_schema(&self, input_for_optimizations: Arc<dyn ExecutionPlan>) -> Self {
ClusterSendExec {
schema,
properties: self.properties.clone(),
properties: Self::compute_properties(
input_for_optimizations.properties(),
self.partitions.len(),
),
partitions: self.partitions.clone(),
cluster: self.cluster.clone(),
serialized_plan: self.serialized_plan.clone(),
Expand Down Expand Up @@ -1462,10 +1468,6 @@ impl ExecutionPlan for ClusterSendExec {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input_for_optimizations]
}
Expand All @@ -1479,8 +1481,10 @@ impl ExecutionPlan for ClusterSendExec {
}
let input_for_optimizations = children.into_iter().next().unwrap();
Ok(Arc::new(ClusterSendExec {
schema: self.schema.clone(),
properties: self.properties.clone(),
properties: Self::compute_properties(
input_for_optimizations.properties(),
self.partitions.len(),
),
partitions: self.partitions.clone(),
cluster: self.cluster.clone(),
serialized_plan: self.serialized_plan.clone(),
Expand All @@ -1500,7 +1504,7 @@ impl ExecutionPlan for ClusterSendExec {
let plan = self.serialized_plan_for_partitions(partitions);

let cluster = self.cluster.clone();
let schema = self.schema.clone();
let schema = self.properties.eq_properties.schema().clone();
let node_name = node_name.to_string();
if self.use_streaming {
// A future that yields a stream
Expand Down Expand Up @@ -1554,7 +1558,8 @@ impl fmt::Debug for ClusterSendExec {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), fmt::Error> {
f.write_fmt(format_args!(
"ClusterSendExec: {:?}: {:?}",
self.schema, self.partitions
self.properties.eq_properties.schema(),
self.partitions
))
}
}
Expand Down

0 comments on commit 3793971

Please sign in to comment.