Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce clones of LogicalPlan in planner #7775

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ impl RunOpt {
println!("=== Logical plan ===\n{plan:?}\n");
}

let plan = state.optimize(&plan)?;
let plan = state.optimize(plan)?;
if debug {
println!("=== Optimized logical plan ===\n{plan:?}\n");
}
let physical_plan = state.create_physical_plan(&plan).await?;
let physical_plan = state.create_physical_plan(plan).await?;
if debug {
println!(
"=== Physical plan ===\n{}\n",
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/rewrite_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ pub fn main() -> Result<()> {
let config = OptimizerContext::default().with_skip_failing_rules(false);
let analyzer = Analyzer::with_rules(vec![Arc::new(MyAnalyzerRule {})]);
let analyzed_plan =
analyzer.execute_and_check(&logical_plan, config.options(), |_, _| {})?;
analyzer.execute_and_check(logical_plan, config.options(), |_, _| {})?;
println!(
"Analyzed Logical Plan:\n\n{}\n",
analyzed_plan.display_indent()
);

// then run the optimizer with our custom rule
let optimizer = Optimizer::with_rules(vec![Arc::new(MyOptimizerRule {})]);
let optimized_plan = optimizer.optimize(&analyzed_plan, &config, observe)?;
let optimized_plan = optimizer.optimize(analyzed_plan, &config, observe)?;
println!(
"Optimized Logical Plan:\n\n{}\n",
optimized_plan.display_indent()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ impl From<DataFusionError> for io::Error {
}

impl DataFusionError {
const BACK_TRACE_SEP: &str = "\n\nbacktrace: ";
const BACK_TRACE_SEP: &'static str = "\n\nbacktrace: ";

/// Get deepest underlying [`DataFusionError`]
///
Expand Down
24 changes: 12 additions & 12 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl DataFrame {

/// Create a physical plan
pub async fn create_physical_plan(self) -> Result<Arc<dyn ExecutionPlan>> {
self.session_state.create_physical_plan(&self.plan).await
self.session_state.create_physical_plan(self.plan).await
}

/// Filter the DataFrame by column. Returns a new DataFrame only containing the
Expand Down Expand Up @@ -891,7 +891,7 @@ impl DataFrame {
/// operations may take place against a different state
pub fn into_optimized_plan(self) -> Result<LogicalPlan> {
// Optimize the plan first for better UX
self.session_state.optimize(&self.plan)
self.session_state.optimize(self.plan)
}

/// Converts this [`DataFrame`] into a [`TableProvider`] that can be registered
Expand Down Expand Up @@ -1302,7 +1302,7 @@ impl TableProvider for DataFrameTableProvider {
expr = expr.limit(0, Some(l))?
}
let plan = expr.build()?;
state.create_physical_plan(&plan).await
state.create_physical_plan(plan).await
}
}

Expand Down Expand Up @@ -1613,8 +1613,8 @@ mod tests {

#[tokio::test]
async fn registry() -> Result<()> {
let mut ctx = SessionContext::new();
register_aggregate_csv(&mut ctx, "aggregate_test_100").await?;
let ctx = SessionContext::new();
register_aggregate_csv(&ctx, "aggregate_test_100").await?;

// declare the udf
let my_fn: ScalarFunctionImplementation =
Expand Down Expand Up @@ -1747,14 +1747,14 @@ mod tests {

/// Create a logical plan from a SQL query
async fn create_plan(sql: &str) -> Result<LogicalPlan> {
let mut ctx = SessionContext::new();
register_aggregate_csv(&mut ctx, "aggregate_test_100").await?;
let ctx = SessionContext::new();
register_aggregate_csv(&ctx, "aggregate_test_100").await?;
Ok(ctx.sql(sql).await?.into_unoptimized_plan())
}

async fn test_table_with_name(name: &str) -> Result<DataFrame> {
let mut ctx = SessionContext::new();
register_aggregate_csv(&mut ctx, name).await?;
let ctx = SessionContext::new();
register_aggregate_csv(&ctx, name).await?;
ctx.table(name).await
}

Expand All @@ -1763,7 +1763,7 @@ mod tests {
}

async fn register_aggregate_csv(
ctx: &mut SessionContext,
ctx: &SessionContext,
table_name: &str,
) -> Result<()> {
let schema = test_util::aggr_test_schema();
Expand Down Expand Up @@ -2011,9 +2011,9 @@ mod tests {
"datafusion.sql_parser.enable_ident_normalization".to_owned(),
"false".to_owned(),
)]))?;
let mut ctx = SessionContext::new_with_config(config);
let ctx = SessionContext::new_with_config(config);
let name = "aggregate_test_100";
register_aggregate_csv(&mut ctx, name).await?;
register_aggregate_csv(&ctx, name).await?;
let df = ctx.table(name);

let df = df
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1942,7 +1942,7 @@ mod tests {
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
.create_physical_plan(&insert_into_table)
.create_physical_plan(insert_into_table.clone())
.await?;

// Execute the physical plan and collect the results
Expand Down Expand Up @@ -1986,7 +1986,7 @@ mod tests {
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
.create_physical_plan(&insert_into_table)
.create_physical_plan(insert_into_table)
.await?;

// Again, execute the physical plan and collect the results
Expand Down Expand Up @@ -2155,7 +2155,7 @@ mod tests {
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
.create_physical_plan(&insert_into_table)
.create_physical_plan(insert_into_table.clone())
.await?;
// Execute the physical plan and collect the results
let res = collect(plan, session_ctx.task_ctx()).await?;
Expand Down Expand Up @@ -2195,7 +2195,7 @@ mod tests {
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
.create_physical_plan(&insert_into_table)
.create_physical_plan(insert_into_table)
.await?;

// Again, execute the physical plan and collect the results
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ mod tests {
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
.create_physical_plan(&insert_into_table)
.create_physical_plan(insert_into_table)
.await?;

// Execute the physical plan and collect the results
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl TableProvider for ViewTable {
plan = plan.limit(0, Some(limit))?;
}

state.create_physical_plan(&plan.build()?).await
state.create_physical_plan(plan.build()?).await
}
}

Expand Down
28 changes: 14 additions & 14 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ impl SessionContext {
} = cmd;

let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone());
let input = self.state().optimize(&input)?;
let input = self.state().optimize(input)?;
let table = self.table(&name).await;
match (if_not_exists, or_replace, table) {
(true, false, Ok(_)) => self.return_empty_dataframe(),
Expand Down Expand Up @@ -1252,7 +1252,7 @@ impl SessionContext {
since = "23.0.0",
note = "Use SessionState::optimize to ensure a consistent state for planning and execution"
)]
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
pub fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
self.state.read().optimize(plan)
}

Expand All @@ -1263,7 +1263,7 @@ impl SessionContext {
)]
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
logical_plan: LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
self.state().create_physical_plan(logical_plan).await
}
Expand Down Expand Up @@ -1344,7 +1344,7 @@ pub trait QueryPlanner {
/// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for execution
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
logical_plan: LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>>;
}
Expand All @@ -1357,7 +1357,7 @@ impl QueryPlanner for DefaultQueryPlanner {
/// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for execution
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
logical_plan: LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let planner = DefaultPhysicalPlanner::default();
Expand Down Expand Up @@ -1858,13 +1858,13 @@ impl SessionState {
}

/// Optimizes the logical plan by applying optimizer rules.
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
pub fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan> {
if let LogicalPlan::Explain(e) = plan {
let mut stringified_plans = e.stringified_plans.clone();
let mut stringified_plans = e.stringified_plans;

// analyze & capture output of each rule
let analyzed_plan = match self.analyzer.execute_and_check(
e.plan.as_ref(),
e.plan.as_ref().clone(),
self.options(),
|analyzed_plan, analyzer| {
let analyzer_name = analyzer.name().to_string();
Expand Down Expand Up @@ -1895,7 +1895,7 @@ impl SessionState {

// optimize the child plan, capturing the output of each optimizer
let (plan, logical_optimization_succeeded) = match self.optimizer.optimize(
&analyzed_plan,
analyzed_plan,
self,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
Expand Down Expand Up @@ -1924,7 +1924,7 @@ impl SessionState {
let analyzed_plan =
self.analyzer
.execute_and_check(plan, self.options(), |_, _| {})?;
self.optimizer.optimize(&analyzed_plan, self, |_, _| {})
self.optimizer.optimize(analyzed_plan, self, |_, _| {})
}
}

Expand All @@ -1937,11 +1937,11 @@ impl SessionState {
/// DDL `CREATE TABLE` must be handled by another layer.
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
logical_plan: LogicalPlan,
) -> Result<Arc<dyn ExecutionPlan>> {
let logical_plan = self.optimize(logical_plan)?;
self.query_planner
.create_physical_plan(&logical_plan, self)
.create_physical_plan(logical_plan, self)
.await
}

Expand Down Expand Up @@ -2714,7 +2714,7 @@ mod tests {
impl PhysicalPlanner for MyPhysicalPlanner {
async fn create_physical_plan(
&self,
_logical_plan: &LogicalPlan,
_logical_plan: LogicalPlan,
_session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("query not supported")
Expand All @@ -2737,7 +2737,7 @@ mod tests {
impl QueryPlanner for MyQueryPlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
logical_plan: LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
let physical_planner = MyPhysicalPlanner {};
Expand Down
Loading