Skip to content

Commit

Permalink
minor(sqlparser): encapsulate PlanerContext, reduce some clones (#5814)
Browse files Browse the repository at this point in the history
* minor(sqlparser): encapsulate PlanerContext, reduce some clones

* clippy
  • Loading branch information
alamb authored Apr 3, 2023
1 parent 2191a69 commit fd350fa
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 43 deletions.
8 changes: 2 additions & 6 deletions datafusion/sql/src/expr/identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
Err(_) => {
// check the outer_query_schema and try to find a match
let outer_query_schema_opt =
planner_context.outer_query_schema.as_ref();
if let Some(outer) = outer_query_schema_opt {
if let Some(outer) = planner_context.outer_query_schema() {
match outer.field_with_unqualified_name(normalize_ident.as_str())
{
Ok(field) => {
Expand Down Expand Up @@ -159,10 +157,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
"Unsupported compound identifier: {ids:?}"
)))
} else {
let outer_query_schema_opt =
planner_context.outer_query_schema.as_ref();
// check the outer_query_schema and try to find a match
if let Some(outer) = outer_query_schema_opt {
if let Some(outer) = planner_context.outer_query_schema() {
let search_result = search_dfschema(&ids, outer);
match search_result {
// found matching field with spare identifier(s) for nested field(s) in structure
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
) -> Result<Expr> {
match sql {
SQLExpr::Value(value) => {
self.parse_value(value, &planner_context.prepare_param_data_types)
self.parse_value(value, planner_context.prepare_param_data_types())
}
SQLExpr::Extract { field, expr } => Ok(Expr::ScalarFunction {
fun: BuiltinScalarFunction::DatePart,
Expand Down
18 changes: 9 additions & 9 deletions datafusion/sql/src/expr/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema = planner_context.outer_query_schema.clone();
planner_context.outer_query_schema = Some(input_schema.clone());
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone()));
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.outer_query_schema = old_outer_query_schema;
planner_context.set_outer_query_schema(old_outer_query_schema);
Ok(Expr::Exists {
subquery: Subquery {
subquery: Arc::new(sub_plan),
Expand All @@ -52,11 +52,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema = planner_context.outer_query_schema.clone();
planner_context.outer_query_schema = Some(input_schema.clone());
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone()));
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.outer_query_schema = old_outer_query_schema;
planner_context.set_outer_query_schema(old_outer_query_schema);
let expr = Box::new(self.sql_to_expr(expr, input_schema, planner_context)?);
Ok(Expr::InSubquery {
expr,
Expand All @@ -74,11 +74,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input_schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let old_outer_query_schema = planner_context.outer_query_schema.clone();
planner_context.outer_query_schema = Some(input_schema.clone());
let old_outer_query_schema =
planner_context.set_outer_query_schema(Some(input_schema.clone()));
let sub_plan = self.query_to_plan(subquery, planner_context)?;
let outer_ref_columns = sub_plan.all_out_ref_exprs();
planner_context.outer_query_schema = old_outer_query_schema;
planner_context.set_outer_query_schema(old_outer_query_schema);
Ok(Expr::ScalarSubquery(Subquery {
subquery: Arc::new(sub_plan),
outer_ref_columns,
Expand Down
65 changes: 52 additions & 13 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,21 @@ impl Default for ParserOptions {
}
}

#[derive(Debug, Clone)]
/// Struct to store the states used by the Planner. The Planner will leverage the states to resolve
/// CTEs, Views, subqueries and PREPARE statements. The states include
/// Common Table Expression (CTE) provided with WITH clause and
/// Parameter Data Types provided with PREPARE statement and the query schema of the
/// outer query plan
#[derive(Debug, Clone)]
pub struct PlannerContext {
/// Data type provided with prepare statement
pub prepare_param_data_types: Vec<DataType>,
/// Map of CTE name to logical plan of the WITH clause
pub ctes: HashMap<String, LogicalPlan>,
/// Data types for numbered parameters ($1, $2, etc), if supplied
/// in `PREPARE` statement
prepare_param_data_types: Vec<DataType>,
/// Map of CTE name to logical plan of the WITH clause.
/// Use Arc<LogicalPlan> to allow cheap cloning
ctes: HashMap<String, Arc<LogicalPlan>>,
/// The query schema of the outer query plan, used to resolve the columns in subquery
pub outer_query_schema: Option<DFSchema>,
outer_query_schema: Option<DFSchema>,
}

impl Default for PlannerContext {
Expand All @@ -100,15 +102,52 @@ impl PlannerContext {
}
}

/// Create a new PlannerContext with provided prepare_param_data_types
pub fn new_with_prepare_param_data_types(
/// Update the PlannerContext with provided prepare_param_data_types
pub fn with_prepare_param_data_types(
mut self,
prepare_param_data_types: Vec<DataType>,
) -> Self {
Self {
prepare_param_data_types,
ctes: HashMap::new(),
outer_query_schema: None,
}
self.prepare_param_data_types = prepare_param_data_types;
self
}

// return a reference to the outer queries schema
pub fn outer_query_schema(&self) -> Option<&DFSchema> {
self.outer_query_schema.as_ref()
}

/// sets the outer query schema, returning the existing one, if
/// any
pub fn set_outer_query_schema(
&mut self,
mut schema: Option<DFSchema>,
) -> Option<DFSchema> {
std::mem::swap(&mut self.outer_query_schema, &mut schema);
schema
}

/// Return the types of parameters (`$1`, `$2`, etc) if known
pub fn prepare_param_data_types(&self) -> &[DataType] {
&self.prepare_param_data_types
}

/// returns true if there is a Common Table Expression (CTE) /
/// Subquery for the specified name
pub fn contains_cte(&self, cte_name: &str) -> bool {
self.ctes.contains_key(cte_name)
}

/// Inserts a LogicalPlan for the Common Table Expression (CTE) /
/// Subquery for the specified name
pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
let cte_name = cte_name.into();
self.ctes.insert(cte_name, Arc::new(plan));
}

/// Return a plan for the Common Table Expression (CTE) / Subquery for the
/// specified name
pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
self.ctes.get(cte_name).map(|cte| cte.as_ref())
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/sql/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
for cte in with.cte_tables {
// A `WITH` block can't use the same name more than once
let cte_name = normalize_ident(cte.alias.name.clone());
if planner_context.ctes.contains_key(&cte_name) {
if planner_context.contains_cte(&cte_name) {
return Err(DataFusionError::SQL(ParserError(format!(
"WITH query name {cte_name:?} specified more than once"
))));
Expand All @@ -68,7 +68,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// projection (e.g. "WITH table(t1, t2) AS SELECT 1, 2").
let logical_plan = self.apply_table_alias(logical_plan, cte.alias)?;

planner_context.ctes.insert(cte_name, logical_plan);
planner_context.insert_cte(cte_name, logical_plan);
}
}
let plan = self.set_expr_to_plan(*set_expr, planner_context)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// normalize name and alias
let table_ref = self.object_name_to_table_reference(name)?;
let table_name = table_ref.to_string();
let cte = planner_context.ctes.get(&table_name);
let cte = planner_context.get_cte(&table_name);
(
match (
cte,
Expand Down
12 changes: 5 additions & 7 deletions datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
match selection {
Some(predicate_expr) => {
let fallback_schemas = plan.fallback_normalize_schemas();
let outer_query_schema = planner_context.outer_query_schema.clone();
let outer_query_schema_vec =
if let Some(outer) = outer_query_schema.as_ref() {
vec![outer]
} else {
vec![]
};
let outer_query_schema = planner_context.outer_query_schema().cloned();
let outer_query_schema_vec = outer_query_schema
.as_ref()
.map(|schema| vec![schema])
.unwrap_or_else(Vec::new);

let filter_expr =
self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
match set_expr {
SetExpr::Select(s) => self.select_to_plan(*s, planner_context),
SetExpr::Values(v) => {
self.sql_values_to_plan(v, &planner_context.prepare_param_data_types)
self.sql_values_to_plan(v, planner_context.prepare_param_data_types())
}
SetExpr::SetOperation {
op,
Expand Down
6 changes: 3 additions & 3 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.collect::<Result<_>>()?;

// Create planner context with parameters
let mut planner_context =
PlannerContext::new_with_prepare_param_data_types(data_types.clone());
let mut planner_context = PlannerContext::new()
.with_prepare_param_data_types(data_types.clone());

// Build logical plan for inner statement of the prepare statement
let plan = self.sql_statement_to_plan_with_context(
Expand Down Expand Up @@ -961,7 +961,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

// Projection
let mut planner_context =
PlannerContext::new_with_prepare_param_data_types(prepare_param_data_types);
PlannerContext::new().with_prepare_param_data_types(prepare_param_data_types);
let source = self.query_to_plan(*source, &mut planner_context)?;
if fields.len() != source.schema().fields().len() {
Err(DataFusionError::Plan(
Expand Down

0 comments on commit fd350fa

Please sign in to comment.