From c87b609aa14b4edb8cde09e4cdadcaff9753b657 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Wed, 21 Feb 2024 20:44:39 +0000 Subject: [PATCH 01/23] Add plugable function factory --- datafusion/core/src/execution/context/mod.rs | 46 ++++++++- datafusion/core/tests/sql/sql_api.rs | 95 ++++++++++++++++++- datafusion/expr/src/logical_plan/mod.rs | 4 +- datafusion/expr/src/logical_plan/statement.rs | 31 ++++++ datafusion/sql/src/statement.rs | 69 ++++++++++++-- 5 files changed, 233 insertions(+), 12 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 2144cd3c7736..6e3838940ce2 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -45,7 +45,7 @@ use datafusion_common::{ use datafusion_execution::registry::SerializerRegistry; use datafusion_expr::{ logical_plan::{DdlStatement, Statement}, - Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, + CreateFunction, Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, }; pub use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::var_provider::is_system_variables; @@ -495,6 +495,19 @@ impl SessionContext { self.set_variable(stmt).await } + LogicalPlan::Statement(Statement::CreateFunction(stmt)) => { + let function_factory = self.state.read().function_factory.clone(); + + match function_factory { + Some(f) => f.create(self.state.clone(), stmt).await?, + None => Err(DataFusionError::Configuration( + "Function factory has not been configured".into(), + ))?, + }; + + self.return_empty_dataframe() + } + plan => Ok(DataFrame::new(self.state(), plan)), } } @@ -1261,7 +1274,22 @@ impl QueryPlanner for DefaultQueryPlanner { .await } } - +/// Crates and registers a function from [CreateFunction] statement +#[async_trait] +pub trait FunctionFactory: Sync + Send { + // TODO: I don't like having RwLock Leaking here, who ever implements it + // has to depend ot `parking_lot`. I'f we expose &mut SessionState it + // may keep lock of too long. + // + // Not sure if there is better approach. + // + /// Crates and registers a function from [CreateFunction] statement + async fn create( + &self, + state: Arc>, + statement: CreateFunction, + ) -> Result<()>; +} /// Execution context for registering data sources and executing queries. /// See [`SessionContext`] for a higher level API. /// @@ -1306,6 +1334,9 @@ pub struct SessionState { table_factories: HashMap>, /// Runtime environment runtime_env: Arc, + // TODO: I don't like having `function_factory` here but i see no better place for + // it. Moving it somewhere else may introduce circular dependency, + function_factory: Option>, } impl Debug for SessionState { @@ -1392,6 +1423,7 @@ impl SessionState { execution_props: ExecutionProps::new(), runtime_env: runtime, table_factories, + function_factory: None, }; // register built in functions @@ -1568,6 +1600,16 @@ impl SessionState { self } + /// Set create function handler + // TODO: Add more details to method docs + pub fn with_function_factory( + mut self, + create_function_hook: Arc, + ) -> Self { + self.function_factory = Some(create_function_hook); + self + } + /// Replace the extension [`SerializerRegistry`] pub fn with_serializer_registry( mut self, diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index d7adc9611b2f..acaaddb68150 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -15,7 +15,19 @@ // specific language governing permissions and limitations // under the License. -use datafusion::prelude::*; +use std::sync::Arc; + +use arrow_array::ArrowNativeTypeOp; +use datafusion::{ + execution::context::{FunctionFactory, SessionState}, + prelude::*, +}; +use datafusion_execution::{ + runtime_env::{RuntimeConfig, RuntimeEnv}, + FunctionRegistry, +}; +use datafusion_expr::CreateFunction; +use parking_lot::RwLock; use tempfile::TempDir; #[tokio::test] @@ -39,6 +51,87 @@ async fn unsupported_ddl_returns_error() { ctx.sql_with_options(sql, options).await.unwrap(); } +struct MockFunctionFactory; +#[async_trait::async_trait] +impl FunctionFactory for MockFunctionFactory { + #[doc = r" Crates and registers a function from [CreateFunction] statement"] + #[must_use] + #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)] + async fn create( + &self, + state: Arc>, + statement: CreateFunction, + ) -> datafusion::error::Result<()> { + // this function is a mock for testing + // `CreateFunction` should be used to derive this function + + let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| { + let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?; + let base = + datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed"); + let exponent = + datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed"); + + let array = base + .iter() + .zip(exponent.iter()) + .map(|(base, exponent)| match (base, exponent) { + (Some(base), Some(exponent)) => Some(base.add_wrapping(exponent)), + _ => None, + }) + .collect::(); + Ok(datafusion_expr::ColumnarValue::from( + Arc::new(array) as arrow_array::ArrayRef + )) + }); + + let args = statement.args.unwrap(); + let mock_udf = create_udf( + &statement.name, + vec![args[0].data_type.clone(), args[1].data_type.clone()], + Arc::new(statement.return_type.unwrap()), + datafusion_expr::Volatility::Immutable, + mock_add, + ); + + // we may need other infrastructure provided by state, for example: + // state.config().get_extension() + + // register mock udf for testing + state.write().register_udf(mock_udf.into())?; + Ok(()) + } +} + +#[tokio::test] +async fn create_user_defined_function_statement() { + let function_factory = Arc::new(MockFunctionFactory {}); + let runtime_config = RuntimeConfig::new(); + let runtime_environment = RuntimeEnv::new(runtime_config).unwrap(); + let session_config = + SessionConfig::new().set_str("datafusion.sql_parser.dialect", "PostgreSQL"); + + let state = + SessionState::new_with_config_rt(session_config, Arc::new(runtime_environment)) + .with_function_factory(function_factory); + + let ctx = SessionContext::new_with_state(state); + + let sql = r#" + CREATE FUNCTION better_add(DOUBLE, DOUBLE) + RETURNS DOUBLE + RETURN $1 + $2 + "#; + let _ = ctx.sql(sql).await.unwrap(); + + ctx.sql("select better_add(2.0, 2.0)") + .await + .unwrap() + .show() + .await + .unwrap(); +} + #[tokio::test] async fn unsupported_dml_returns_error() { let ctx = SessionContext::new(); diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index f6e6000897a5..7695491c57c7 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -40,8 +40,8 @@ pub use plan::{ TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, }; pub use statement::{ - SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd, - TransactionIsolationLevel, TransactionStart, + CreateFunction, OperateFunctionArg, SetVariable, Statement, TransactionAccessMode, + TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, }; pub use display::display_schema; diff --git a/datafusion/expr/src/logical_plan/statement.rs b/datafusion/expr/src/logical_plan/statement.rs index 0b2f1bd383a0..afe9e1dcaf39 100644 --- a/datafusion/expr/src/logical_plan/statement.rs +++ b/datafusion/expr/src/logical_plan/statement.rs @@ -17,7 +17,9 @@ use std::fmt::{self, Display}; +use arrow::datatypes::DataType; use datafusion_common::DFSchemaRef; +use sqlparser::ast::{ArgMode, CreateFunctionBody, Expr, Ident}; /// Various types of Statements. /// @@ -34,6 +36,8 @@ pub enum Statement { TransactionEnd(TransactionEnd), /// Set a Variable SetVariable(SetVariable), + + CreateFunction(CreateFunction), } impl Statement { @@ -43,6 +47,7 @@ impl Statement { Statement::TransactionStart(TransactionStart { schema, .. }) => schema, Statement::TransactionEnd(TransactionEnd { schema, .. }) => schema, Statement::SetVariable(SetVariable { schema, .. }) => schema, + Statement::CreateFunction(CreateFunction { schema, .. }) => schema, } } @@ -53,6 +58,7 @@ impl Statement { Statement::TransactionStart(_) => "TransactionStart", Statement::TransactionEnd(_) => "TransactionEnd", Statement::SetVariable(_) => "SetVariable", + Statement::CreateFunction(_) => "CreateFunction", } } @@ -85,6 +91,9 @@ impl Statement { }) => { write!(f, "SetVariable: set {variable:?} to {value:?}") } + Statement::CreateFunction(CreateFunction { name, .. }) => { + write!(f, "CreateFunction: name {name:?}") + } } } } @@ -148,3 +157,25 @@ pub struct SetVariable { /// Dummy schema pub schema: DFSchemaRef, } +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct CreateFunction { + // TODO: There is open question should we expose sqlparser types or redefine them here? + // At the moment it make more sense to expose sqlparser types and leave + // user to convert them as needed + pub or_replace: bool, + pub temporary: bool, + pub name: String, + pub args: Option>, + pub return_type: Option, + pub params: CreateFunctionBody, + //pub body: String, + /// Dummy schema + pub schema: DFSchemaRef, +} +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct OperateFunctionArg { + pub mode: Option, + pub name: Option, + pub data_type: DataType, + pub default_expr: Option, +} diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index dfac8367e912..f5f0f472214f 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -43,12 +43,12 @@ use datafusion_expr::logical_plan::DdlStatement; use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{ cast, col, Analyze, CreateCatalog, CreateCatalogSchema, - CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView, - DescribeTable, DmlStatement, DropCatalogSchema, DropTable, DropView, EmptyRelation, - Explain, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, PlanType, Prepare, - SetVariable, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode, - TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, - WriteOp, + CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateMemoryTable, + CreateView, DescribeTable, DmlStatement, DropCatalogSchema, DropTable, DropView, + EmptyRelation, Explain, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, + OperateFunctionArg, PlanType, Prepare, SetVariable, Statement as PlanStatement, + ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd, + TransactionIsolationLevel, TransactionStart, WriteOp, }; use sqlparser::ast; use sqlparser::ast::{ @@ -626,8 +626,63 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }); Ok(LogicalPlan::Statement(statement)) } + Statement::CreateFunction { + or_replace, + temporary, + name, + args, + return_type, + params, + } => { + let return_type = match return_type { + Some(t) => Some(self.convert_data_type(&t)?), + None => None, + }; - _ => not_impl_err!("Unsupported SQL statement: {sql:?}"), + let args = match args { + Some(a) => { + let a = a + .into_iter() + .map(|a| { + let data_type = self.convert_data_type(&a.data_type)?; + Ok(OperateFunctionArg { + mode: a.mode, + name: a.name, + default_expr: a.default_expr, + data_type, + }) + }) + .collect::>>(); + Some(a?) + } + None => None, + }; + + // Not sure if this is correct way to generate name + // postgresql function definition may have schema part as well + // datafusion at the moment does lookup based on given string + // `schema_name.function_name` will work even if there is no `schema_name` + let name: String = name + .0 + .into_iter() + .map(|i| i.value) + .collect::>() + .join("."); + let statement = PlanStatement::CreateFunction(CreateFunction { + or_replace, + temporary, + name, + return_type, + args, + params, + schema: DFSchemaRef::new(DFSchema::empty()), + }); + + Ok(LogicalPlan::Statement(statement)) + } + _ => { + not_impl_err!("Unsupported SQL statement: {sql:?}") + } } } From be5b23348ff96e7ca3fbec1a551b30824c042ae9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 24 Feb 2024 19:40:22 +0000 Subject: [PATCH 02/23] cover `DROP FUNCTION` as well ... ... partially, as `SessionState` does not expose unregister_udf at the moment. --- datafusion/core/src/execution/context/mod.rs | 30 +++++++++++-- datafusion/core/tests/sql/sql_api.rs | 15 ++++++- datafusion/expr/src/logical_plan/mod.rs | 5 ++- datafusion/expr/src/logical_plan/statement.rs | 16 ++++++- datafusion/sql/src/statement.rs | 44 ++++++++++++++++--- 5 files changed, 97 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 6e3838940ce2..b793ea3543df 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -45,7 +45,7 @@ use datafusion_common::{ use datafusion_execution::registry::SerializerRegistry; use datafusion_expr::{ logical_plan::{DdlStatement, Statement}, - CreateFunction, Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, + Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, }; pub use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::var_provider::is_system_variables; @@ -73,9 +73,10 @@ use crate::datasource::{ }; use crate::error::{DataFusionError, Result}; use crate::logical_expr::{ - CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, - CreateView, DropCatalogSchema, DropTable, DropView, Explain, LogicalPlan, - LogicalPlanBuilder, SetVariable, TableSource, TableType, UNNAMED_TABLE, + CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, + CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable, DropView, + Explain, LogicalPlan, LogicalPlanBuilder, SetVariable, TableSource, TableType, + UNNAMED_TABLE, }; use crate::optimizer::OptimizerRule; use datafusion_sql::{ @@ -508,6 +509,19 @@ impl SessionContext { self.return_empty_dataframe() } + LogicalPlan::Statement(Statement::DropFunction(stmt)) => { + let function_factory = self.state.read().function_factory.clone(); + + match function_factory { + Some(f) => f.remove(self.state.clone(), stmt).await?, + None => Err(DataFusionError::Configuration( + "Function factory has not been configured".into(), + ))?, + }; + + self.return_empty_dataframe() + } + plan => Ok(DataFrame::new(self.state(), plan)), } } @@ -1289,6 +1303,14 @@ pub trait FunctionFactory: Sync + Send { state: Arc>, statement: CreateFunction, ) -> Result<()>; + + /// Removes function from [SessionState] + // drop would make more sense but its already occupied in rust + async fn remove( + &self, + state: Arc>, + statement: DropFunction, + ) -> Result<()>; } /// Execution context for registering data sources and executing queries. /// See [`SessionContext`] for a higher level API. diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index acaaddb68150..c0706dca7f65 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -20,13 +20,14 @@ use std::sync::Arc; use arrow_array::ArrowNativeTypeOp; use datafusion::{ execution::context::{FunctionFactory, SessionState}, + logical_expr::{CreateFunction, DropFunction}, prelude::*, }; use datafusion_execution::{ runtime_env::{RuntimeConfig, RuntimeEnv}, FunctionRegistry, }; -use datafusion_expr::CreateFunction; + use parking_lot::RwLock; use tempfile::TempDir; @@ -101,6 +102,16 @@ impl FunctionFactory for MockFunctionFactory { state.write().register_udf(mock_udf.into())?; Ok(()) } + + async fn remove( + &self, + _state: Arc>, + _statement: DropFunction, + ) -> datafusion::error::Result<()> { + // at the moment state does not support unregister + // ignoring for now + Ok(()) + } } #[tokio::test] @@ -130,6 +141,8 @@ async fn create_user_defined_function_statement() { .show() .await .unwrap(); + + ctx.sql("drop function better_add").await.unwrap(); } #[tokio::test] diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 7695491c57c7..cdf6592bb9bf 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -40,8 +40,9 @@ pub use plan::{ TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, }; pub use statement::{ - CreateFunction, OperateFunctionArg, SetVariable, Statement, TransactionAccessMode, - TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, + CreateFunction, DropFunction, OperateFunctionArg, SetVariable, Statement, + TransactionAccessMode, TransactionConclusion, TransactionEnd, + TransactionIsolationLevel, TransactionStart, }; pub use display::display_schema; diff --git a/datafusion/expr/src/logical_plan/statement.rs b/datafusion/expr/src/logical_plan/statement.rs index afe9e1dcaf39..b258ff000f51 100644 --- a/datafusion/expr/src/logical_plan/statement.rs +++ b/datafusion/expr/src/logical_plan/statement.rs @@ -36,8 +36,10 @@ pub enum Statement { TransactionEnd(TransactionEnd), /// Set a Variable SetVariable(SetVariable), - + /// Create function statement CreateFunction(CreateFunction), + /// Drop function statement + DropFunction(DropFunction), } impl Statement { @@ -48,6 +50,7 @@ impl Statement { Statement::TransactionEnd(TransactionEnd { schema, .. }) => schema, Statement::SetVariable(SetVariable { schema, .. }) => schema, Statement::CreateFunction(CreateFunction { schema, .. }) => schema, + Statement::DropFunction(DropFunction { schema, .. }) => schema, } } @@ -59,6 +62,7 @@ impl Statement { Statement::TransactionEnd(_) => "TransactionEnd", Statement::SetVariable(_) => "SetVariable", Statement::CreateFunction(_) => "CreateFunction", + Statement::DropFunction(_) => "DropFunction", } } @@ -94,6 +98,9 @@ impl Statement { Statement::CreateFunction(CreateFunction { name, .. }) => { write!(f, "CreateFunction: name {name:?}") } + Statement::DropFunction(DropFunction { name, .. }) => { + write!(f, "CreateFunction: name {name:?}") + } } } } @@ -179,3 +186,10 @@ pub struct OperateFunctionArg { pub data_type: DataType, pub default_expr: Option, } + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct DropFunction { + pub name: String, + pub if_exists: bool, + pub schema: DFSchemaRef, +} diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index f5f0f472214f..4204e579175a 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -44,11 +44,12 @@ use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{ cast, col, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateMemoryTable, - CreateView, DescribeTable, DmlStatement, DropCatalogSchema, DropTable, DropView, - EmptyRelation, Explain, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, - OperateFunctionArg, PlanType, Prepare, SetVariable, Statement as PlanStatement, - ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd, - TransactionIsolationLevel, TransactionStart, WriteOp, + CreateView, DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, + DropView, EmptyRelation, Explain, ExprSchemable, Filter, LogicalPlan, + LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable, + Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode, + TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, + WriteOp, }; use sqlparser::ast; use sqlparser::ast::{ @@ -680,6 +681,39 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Ok(LogicalPlan::Statement(statement)) } + Statement::DropFunction { + if_exists, + func_desc, + .. + } => { + // Not sure if this is correct way to generate name + // postgresql function definition may have schema part as well + // datafusion at the moment does lookup based on given string + // `schema_name.function_name` will work even if there is no `schema_name` + + // according to postgresql documentation it can be only one function + // specified in drop statement + + if let Some(desc) = func_desc.first() { + let name: String = desc + .name + .0 + .iter() + .map(|i| i.value.to_owned()) + .collect::>() + .join("."); + let statement = PlanStatement::DropFunction(DropFunction { + if_exists, + name, + schema: DFSchemaRef::new(DFSchema::empty()), + }); + Ok(LogicalPlan::Statement(statement)) + } else { + Err(DataFusionError::Execution( + "Function name not provided".into(), + )) + } + } _ => { not_impl_err!("Unsupported SQL statement: {sql:?}") } From 76dc3bf650d479332ec87c03d142fdf7ccc7cfa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Wed, 28 Feb 2024 18:04:39 +0000 Subject: [PATCH 03/23] update documentation --- datafusion/core/src/execution/context/mod.rs | 28 +++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b793ea3543df..928bc06482fc 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1289,6 +1289,16 @@ impl QueryPlanner for DefaultQueryPlanner { } } /// Crates and registers a function from [CreateFunction] statement +/// +/// It is intended to handle `CREATE FUNCTION` statements +/// and interact with [SessionState] to registers new udfs. +/// +/// Datafusion `SQL` dialect does not support `CREATE FUNCTION` +/// in generic dialect, so dialect should be changed to `PostgreSQL` +/// +/// ```rust, no_run +/// SessionConfig::new().set_str("datafusion.sql_parser.dialect", "PostgreSQL"); +/// ``` #[async_trait] pub trait FunctionFactory: Sync + Send { // TODO: I don't like having RwLock Leaking here, who ever implements it @@ -1297,15 +1307,16 @@ pub trait FunctionFactory: Sync + Send { // // Not sure if there is better approach. // - /// Crates and registers a function from [CreateFunction] statement + + /// Handles creation of user defined function specified in [CreateFunction] statement async fn create( &self, state: Arc>, statement: CreateFunction, ) -> Result<()>; - /// Removes function from [SessionState] - // drop would make more sense but its already occupied in rust + /// Drops user defined function from [SessionState] + // Naming it `drop`` would make more sense but its already occupied in rust async fn remove( &self, state: Arc>, @@ -1356,8 +1367,11 @@ pub struct SessionState { table_factories: HashMap>, /// Runtime environment runtime_env: Arc, - // TODO: I don't like having `function_factory` here but i see no better place for - // it. Moving it somewhere else may introduce circular dependency, + /// [FunctionFactory] to support pluggable user defined function handler. + /// It is invoked on `CREATE FUNCTION` and `DROP FUNCTION` statements. + /// + /// Datafusion generic SQL dialect does not support `CRETE FUNCTION` statement + /// thus, changing dialect o PostgreSql is required function_factory: Option>, } @@ -1622,8 +1636,8 @@ impl SessionState { self } - /// Set create function handler - // TODO: Add more details to method docs + /// Registers `CREATE FUNCTION` statement handler implementing + /// [`FunctionFactory`] trait. pub fn with_function_factory( mut self, create_function_hook: Arc, From f265d68818f7abd53a242da2d75fd0e766c999ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Wed, 28 Feb 2024 19:18:50 +0000 Subject: [PATCH 04/23] fix doc test --- datafusion/core/src/execution/context/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 928bc06482fc..f165705671d4 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1297,6 +1297,7 @@ impl QueryPlanner for DefaultQueryPlanner { /// in generic dialect, so dialect should be changed to `PostgreSQL` /// /// ```rust, no_run +/// # use datafusion::execution::config::SessionConfig; /// SessionConfig::new().set_str("datafusion.sql_parser.dialect", "PostgreSQL"); /// ``` #[async_trait] From f084201283e9b64d55f9166d5520a44792dadee4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Thu, 29 Feb 2024 10:48:26 +0000 Subject: [PATCH 05/23] Address PR comments (code organization) --- datafusion/core/src/execution/context/mod.rs | 54 ++++---- datafusion/core/tests/sql/sql_api.rs | 109 +--------------- .../user_defined_scalar_functions.rs | 120 +++++++++++++++++- datafusion/expr/src/logical_plan/ddl.rs | 83 +++++++++++- datafusion/expr/src/logical_plan/mod.rs | 8 +- datafusion/expr/src/logical_plan/statement.rs | 48 +------ datafusion/proto/src/logical_plan/mod.rs | 6 + datafusion/sql/src/statement.rs | 74 ++++++++--- 8 files changed, 297 insertions(+), 205 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index f165705671d4..b56e572d4787 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -490,38 +490,14 @@ impl SessionContext { DdlStatement::DropTable(cmd) => self.drop_table(cmd).await, DdlStatement::DropView(cmd) => self.drop_view(cmd).await, DdlStatement::DropCatalogSchema(cmd) => self.drop_schema(cmd).await, + DdlStatement::CreateFunction(cmd) => self.create_function(cmd).await, + DdlStatement::DropFunction(cmd) => self.drop_function(cmd).await, }, // TODO what about the other statements (like TransactionStart and TransactionEnd) LogicalPlan::Statement(Statement::SetVariable(stmt)) => { self.set_variable(stmt).await } - LogicalPlan::Statement(Statement::CreateFunction(stmt)) => { - let function_factory = self.state.read().function_factory.clone(); - - match function_factory { - Some(f) => f.create(self.state.clone(), stmt).await?, - None => Err(DataFusionError::Configuration( - "Function factory has not been configured".into(), - ))?, - }; - - self.return_empty_dataframe() - } - - LogicalPlan::Statement(Statement::DropFunction(stmt)) => { - let function_factory = self.state.read().function_factory.clone(); - - match function_factory { - Some(f) => f.remove(self.state.clone(), stmt).await?, - None => Err(DataFusionError::Configuration( - "Function factory has not been configured".into(), - ))?, - }; - - self.return_empty_dataframe() - } - plan => Ok(DataFrame::new(self.state(), plan)), } } @@ -821,6 +797,32 @@ impl SessionContext { Ok(false) } + async fn create_function(&self, stmt: CreateFunction) -> Result { + let function_factory = self.state.read().function_factory.clone(); + + match function_factory { + Some(f) => f.create(self.state.clone(), stmt).await?, + None => Err(DataFusionError::Configuration( + "Function factory has not been configured".into(), + ))?, + }; + + self.return_empty_dataframe() + } + + async fn drop_function(&self, stmt: DropFunction) -> Result { + let function_factory = self.state.read().function_factory.clone(); + + match function_factory { + Some(f) => f.remove(self.state.clone(), stmt).await?, + None => Err(DataFusionError::Configuration( + "Function factory has not been configured".into(), + ))?, + }; + + self.return_empty_dataframe() + } + /// Registers a variable provider within this context. pub fn register_variable( &self, diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index c0706dca7f65..cc8ea3118284 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -15,20 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - -use arrow_array::ArrowNativeTypeOp; -use datafusion::{ - execution::context::{FunctionFactory, SessionState}, - logical_expr::{CreateFunction, DropFunction}, - prelude::*, -}; -use datafusion_execution::{ - runtime_env::{RuntimeConfig, RuntimeEnv}, - FunctionRegistry, -}; - -use parking_lot::RwLock; +use datafusion::prelude::*; + use tempfile::TempDir; #[tokio::test] @@ -52,99 +40,6 @@ async fn unsupported_ddl_returns_error() { ctx.sql_with_options(sql, options).await.unwrap(); } -struct MockFunctionFactory; -#[async_trait::async_trait] -impl FunctionFactory for MockFunctionFactory { - #[doc = r" Crates and registers a function from [CreateFunction] statement"] - #[must_use] - #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)] - async fn create( - &self, - state: Arc>, - statement: CreateFunction, - ) -> datafusion::error::Result<()> { - // this function is a mock for testing - // `CreateFunction` should be used to derive this function - - let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| { - let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?; - let base = - datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed"); - let exponent = - datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed"); - - let array = base - .iter() - .zip(exponent.iter()) - .map(|(base, exponent)| match (base, exponent) { - (Some(base), Some(exponent)) => Some(base.add_wrapping(exponent)), - _ => None, - }) - .collect::(); - Ok(datafusion_expr::ColumnarValue::from( - Arc::new(array) as arrow_array::ArrayRef - )) - }); - - let args = statement.args.unwrap(); - let mock_udf = create_udf( - &statement.name, - vec![args[0].data_type.clone(), args[1].data_type.clone()], - Arc::new(statement.return_type.unwrap()), - datafusion_expr::Volatility::Immutable, - mock_add, - ); - - // we may need other infrastructure provided by state, for example: - // state.config().get_extension() - - // register mock udf for testing - state.write().register_udf(mock_udf.into())?; - Ok(()) - } - - async fn remove( - &self, - _state: Arc>, - _statement: DropFunction, - ) -> datafusion::error::Result<()> { - // at the moment state does not support unregister - // ignoring for now - Ok(()) - } -} - -#[tokio::test] -async fn create_user_defined_function_statement() { - let function_factory = Arc::new(MockFunctionFactory {}); - let runtime_config = RuntimeConfig::new(); - let runtime_environment = RuntimeEnv::new(runtime_config).unwrap(); - let session_config = - SessionConfig::new().set_str("datafusion.sql_parser.dialect", "PostgreSQL"); - - let state = - SessionState::new_with_config_rt(session_config, Arc::new(runtime_environment)) - .with_function_factory(function_factory); - - let ctx = SessionContext::new_with_state(state); - - let sql = r#" - CREATE FUNCTION better_add(DOUBLE, DOUBLE) - RETURNS DOUBLE - RETURN $1 + $2 - "#; - let _ = ctx.sql(sql).await.unwrap(); - - ctx.sql("select better_add(2.0, 2.0)") - .await - .unwrap() - .show() - .await - .unwrap(); - - ctx.sql("drop function better_add").await.unwrap(); -} - #[tokio::test] async fn unsupported_dml_returns_error() { let ctx = SessionContext::new(); diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 0546ef59b1d8..845061cdab41 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -16,9 +16,12 @@ // under the License. use arrow::compute::kernels::numeric::add; -use arrow_array::{Array, ArrayRef, Float64Array, Int32Array, RecordBatch, UInt8Array}; +use arrow_array::{ + Array, ArrayRef, ArrowNativeTypeOp, Float64Array, Int32Array, RecordBatch, UInt8Array, +}; use arrow_schema::DataType::Float64; use arrow_schema::{DataType, Field, Schema}; +use datafusion::execution::context::{FunctionFactory, SessionState}; use datafusion::prelude::*; use datafusion::{execution::registry::FunctionRegistry, test_util}; use datafusion_common::cast::as_float64_array; @@ -26,10 +29,12 @@ use datafusion_common::{ assert_batches_eq, assert_batches_sorted_eq, cast::as_int32_array, not_impl_err, plan_err, ExprSchema, Result, ScalarValue, }; +use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_expr::{ - create_udaf, create_udf, Accumulator, ColumnarValue, ExprSchemable, - LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility, + create_udaf, create_udf, Accumulator, ColumnarValue, CreateFunction, DropFunction, + ExprSchemable, LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility, }; +use parking_lot::{Mutex, RwLock}; use rand::{thread_rng, Rng}; use std::any::Any; use std::iter; @@ -635,6 +640,115 @@ async fn verify_udf_return_type() -> Result<()> { Ok(()) } +#[derive(Debug, Default)] +struct MockFunctionFactory { + pub captured_expr: Mutex>, +} + +#[async_trait::async_trait] +impl FunctionFactory for MockFunctionFactory { + #[doc = r" Crates and registers a function from [CreateFunction] statement"] + #[must_use] + #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)] + async fn create( + &self, + state: Arc>, + statement: CreateFunction, + ) -> datafusion::error::Result<()> { + // this function is a mock for testing + // `CreateFunction` should be used to derive this function + + let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| { + let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?; + let base = + datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed"); + let exponent = + datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed"); + + let array = base + .iter() + .zip(exponent.iter()) + .map(|(base, exponent)| match (base, exponent) { + (Some(base), Some(exponent)) => Some(base.add_wrapping(exponent)), + _ => None, + }) + .collect::(); + Ok(datafusion_expr::ColumnarValue::from( + Arc::new(array) as arrow_array::ArrayRef + )) + }); + + let args = statement.args.unwrap(); + let mock_udf = create_udf( + &statement.name, + vec![args[0].data_type.clone(), args[1].data_type.clone()], + Arc::new(statement.return_type.unwrap()), + datafusion_expr::Volatility::Immutable, + mock_add, + ); + + // capture expression so we can verify + // it has been parsed + *self.captured_expr.lock() = statement.params.return_; + + // we may need other infrastructure provided by state, for example: + // state.config().get_extension() + + // register mock udf for testing + state.write().register_udf(mock_udf.into())?; + Ok(()) + } + + async fn remove( + &self, + _state: Arc>, + _statement: DropFunction, + ) -> datafusion::error::Result<()> { + // at the moment state does not support unregister + // ignoring for now + Ok(()) + } +} + +#[tokio::test] +async fn create_scalar_function_from_sql_statement() { + let function_factory = Arc::new(MockFunctionFactory::default()); + let runtime_config = RuntimeConfig::new(); + let runtime_environment = RuntimeEnv::new(runtime_config).unwrap(); + let session_config = + SessionConfig::new().set_str("datafusion.sql_parser.dialect", "PostgreSQL"); + + let state = + SessionState::new_with_config_rt(session_config, Arc::new(runtime_environment)) + .with_function_factory(function_factory.clone()); + + let ctx = SessionContext::new_with_state(state); + + let sql = r#" + CREATE FUNCTION better_add(DOUBLE, DOUBLE) + RETURNS DOUBLE + RETURN $1 + $2 + "#; + let _ = ctx.sql(sql).await.unwrap(); + + ctx.sql("select better_add(2.0, 2.0)") + .await + .unwrap() + .show() + .await + .unwrap(); + + // sql expression should be convert to datafusion expression + // in this case + let captured_expression = function_factory.captured_expr.lock().clone().unwrap(); + + // is there some better way to test this + assert_eq!("$1 + $2", captured_expression.to_string()); + println!("{:?}", captured_expression); + + ctx.sql("drop function better_add").await.unwrap(); +} + fn create_udf_context() -> SessionContext { let ctx = SessionContext::new(); // register a custom UDF diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index e74992d99373..64072a3a4d1d 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -22,12 +22,14 @@ use std::{ hash::{Hash, Hasher}, }; -use crate::{Expr, LogicalPlan}; +use crate::{Expr, LogicalPlan, Volatility}; +use arrow::datatypes::DataType; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ Constraints, DFSchemaRef, OwnedSchemaReference, OwnedTableReference, }; +use sqlparser::ast::Ident; /// Various types of DDL (CREATE / DROP) catalog manipulation #[derive(Clone, PartialEq, Eq, Hash)] @@ -48,6 +50,10 @@ pub enum DdlStatement { DropView(DropView), /// Drops a catalog schema DropCatalogSchema(DropCatalogSchema), + /// Create function statement + CreateFunction(CreateFunction), + /// Drop function statement + DropFunction(DropFunction), } impl DdlStatement { @@ -66,6 +72,8 @@ impl DdlStatement { DdlStatement::DropTable(DropTable { schema, .. }) => schema, DdlStatement::DropView(DropView { schema, .. }) => schema, DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema, + DdlStatement::CreateFunction(CreateFunction { schema, .. }) => schema, + DdlStatement::DropFunction(DropFunction { schema, .. }) => schema, } } @@ -81,6 +89,8 @@ impl DdlStatement { DdlStatement::DropTable(_) => "DropTable", DdlStatement::DropView(_) => "DropView", DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema", + DdlStatement::CreateFunction(_) => "CreateFunction", + DdlStatement::DropFunction(_) => "DropFunction", } } @@ -97,6 +107,8 @@ impl DdlStatement { DdlStatement::DropTable(_) => vec![], DdlStatement::DropView(_) => vec![], DdlStatement::DropCatalogSchema(_) => vec![], + DdlStatement::CreateFunction(_) => vec![], + DdlStatement::DropFunction(_) => vec![], } } @@ -156,6 +168,12 @@ impl DdlStatement { }) => { write!(f, "DropCatalogSchema: {name:?} if not exist:={if_exists} cascade:={cascade}") } + DdlStatement::CreateFunction(CreateFunction { name, .. }) => { + write!(f, "CreateFunction: name {name:?}") + } + DdlStatement::DropFunction(DropFunction { name, .. }) => { + write!(f, "CreateFunction: name {name:?}") + } } } } @@ -303,3 +321,66 @@ pub struct DropCatalogSchema { /// Dummy schema pub schema: DFSchemaRef, } + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct CreateFunction { + // TODO: There is open question should we expose sqlparser types or redefine them here? + // At the moment it make more sense to expose sqlparser types and leave + // user to convert them as needed + pub or_replace: bool, + pub temporary: bool, + pub name: String, + pub args: Option>, + pub return_type: Option, + // TODO: move this to new struct here + pub params: CreateFunctionBody, + //pub body: String, + /// Dummy schema + pub schema: DFSchemaRef, +} +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct OperateFunctionArg { + // it is not really supported so no need to have it here + // currently + // pub mode: Option, + pub name: Option, + pub data_type: DataType, + pub default_expr: Option, +} +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct CreateFunctionBody { + /// LANGUAGE lang_name + pub language: Option, + /// IMMUTABLE | STABLE | VOLATILE + pub behavior: Option, + /// AS 'definition' + pub as_: Option, + /// RETURN expression + pub return_: Option, +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub enum FunctionDefinition { + SingleQuotedDef(String), + DoubleDollarDef(String), +} + +impl From for FunctionDefinition { + fn from(value: sqlparser::ast::FunctionDefinition) -> Self { + match value { + sqlparser::ast::FunctionDefinition::SingleQuotedDef(s) => { + Self::SingleQuotedDef(s) + } + sqlparser::ast::FunctionDefinition::DoubleDollarDef(s) => { + Self::DoubleDollarDef(s) + } + } + } +} + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct DropFunction { + pub name: String, + pub if_exists: bool, + pub schema: DFSchemaRef, +} diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index cdf6592bb9bf..d93af3c468d7 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -28,8 +28,9 @@ pub use builder::{ LogicalPlanBuilder, UNNAMED_TABLE, }; pub use ddl::{ - CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, - CreateView, DdlStatement, DropCatalogSchema, DropTable, DropView, + CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, + CreateFunctionBody, CreateMemoryTable, CreateView, DdlStatement, DropCatalogSchema, + DropFunction, DropTable, DropView, OperateFunctionArg, }; pub use dml::{DmlStatement, WriteOp}; pub use plan::{ @@ -40,8 +41,7 @@ pub use plan::{ TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, }; pub use statement::{ - CreateFunction, DropFunction, OperateFunctionArg, SetVariable, Statement, - TransactionAccessMode, TransactionConclusion, TransactionEnd, + SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, }; diff --git a/datafusion/expr/src/logical_plan/statement.rs b/datafusion/expr/src/logical_plan/statement.rs index b258ff000f51..f294e7d3ea4c 100644 --- a/datafusion/expr/src/logical_plan/statement.rs +++ b/datafusion/expr/src/logical_plan/statement.rs @@ -15,11 +15,8 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::{self, Display}; - -use arrow::datatypes::DataType; use datafusion_common::DFSchemaRef; -use sqlparser::ast::{ArgMode, CreateFunctionBody, Expr, Ident}; +use std::fmt::{self, Display}; /// Various types of Statements. /// @@ -36,10 +33,6 @@ pub enum Statement { TransactionEnd(TransactionEnd), /// Set a Variable SetVariable(SetVariable), - /// Create function statement - CreateFunction(CreateFunction), - /// Drop function statement - DropFunction(DropFunction), } impl Statement { @@ -49,8 +42,6 @@ impl Statement { Statement::TransactionStart(TransactionStart { schema, .. }) => schema, Statement::TransactionEnd(TransactionEnd { schema, .. }) => schema, Statement::SetVariable(SetVariable { schema, .. }) => schema, - Statement::CreateFunction(CreateFunction { schema, .. }) => schema, - Statement::DropFunction(DropFunction { schema, .. }) => schema, } } @@ -61,8 +52,6 @@ impl Statement { Statement::TransactionStart(_) => "TransactionStart", Statement::TransactionEnd(_) => "TransactionEnd", Statement::SetVariable(_) => "SetVariable", - Statement::CreateFunction(_) => "CreateFunction", - Statement::DropFunction(_) => "DropFunction", } } @@ -95,12 +84,6 @@ impl Statement { }) => { write!(f, "SetVariable: set {variable:?} to {value:?}") } - Statement::CreateFunction(CreateFunction { name, .. }) => { - write!(f, "CreateFunction: name {name:?}") - } - Statement::DropFunction(DropFunction { name, .. }) => { - write!(f, "CreateFunction: name {name:?}") - } } } } @@ -164,32 +147,3 @@ pub struct SetVariable { /// Dummy schema pub schema: DFSchemaRef, } -#[derive(Clone, PartialEq, Eq, Hash, Debug)] -pub struct CreateFunction { - // TODO: There is open question should we expose sqlparser types or redefine them here? - // At the moment it make more sense to expose sqlparser types and leave - // user to convert them as needed - pub or_replace: bool, - pub temporary: bool, - pub name: String, - pub args: Option>, - pub return_type: Option, - pub params: CreateFunctionBody, - //pub body: String, - /// Dummy schema - pub schema: DFSchemaRef, -} -#[derive(Clone, PartialEq, Eq, Hash, Debug)] -pub struct OperateFunctionArg { - pub mode: Option, - pub name: Option, - pub data_type: DataType, - pub default_expr: Option, -} - -#[derive(Clone, PartialEq, Eq, Hash, Debug)] -pub struct DropFunction { - pub name: String, - pub if_exists: bool, - pub schema: DFSchemaRef, -} diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 7c9ead27e3b5..7acad1844d48 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1657,6 +1657,12 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::Ddl(DdlStatement::DropCatalogSchema(_)) => Err(proto_error( "LogicalPlan serde is not yet implemented for DropCatalogSchema", )), + LogicalPlan::Ddl(DdlStatement::CreateFunction(_)) => Err(proto_error( + "LogicalPlan serde is not yet implemented for CreateFunction", + )), + LogicalPlan::Ddl(DdlStatement::DropFunction(_)) => Err(proto_error( + "LogicalPlan serde is not yet implemented for DropFunction", + )), LogicalPlan::Statement(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for Statement", )), diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 4204e579175a..0918842a1cb8 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -43,13 +43,13 @@ use datafusion_expr::logical_plan::DdlStatement; use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{ cast, col, Analyze, CreateCatalog, CreateCatalogSchema, - CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateMemoryTable, - CreateView, DescribeTable, DmlStatement, DropCatalogSchema, DropFunction, DropTable, - DropView, EmptyRelation, Explain, ExprSchemable, Filter, LogicalPlan, - LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable, + CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody, + CreateMemoryTable, CreateView, DescribeTable, DmlStatement, DropCatalogSchema, + DropFunction, DropTable, DropView, EmptyRelation, Explain, ExprSchemable, Filter, + LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, SetVariable, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, - WriteOp, + Volatility, WriteOp, }; use sqlparser::ast; use sqlparser::ast::{ @@ -639,22 +639,32 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { Some(t) => Some(self.convert_data_type(&t)?), None => None, }; + let mut planner_context = PlannerContext::new(); + let empty_schema = &DFSchema::empty(); let args = match args { - Some(a) => { - let a = a + Some(function_args) => { + let function_args = function_args .into_iter() - .map(|a| { - let data_type = self.convert_data_type(&a.data_type)?; + .map(|arg| { + let data_type = self.convert_data_type(&arg.data_type)?; + + let default_expr = match arg.default_expr { + Some(expr) => Some(self.sql_to_expr( + expr, + empty_schema, + &mut planner_context, + )?), + None => None, + }; Ok(OperateFunctionArg { - mode: a.mode, - name: a.name, - default_expr: a.default_expr, + name: arg.name, + default_expr, data_type, }) }) .collect::>>(); - Some(a?) + Some(function_args?) } None => None, }; @@ -669,7 +679,37 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|i| i.value) .collect::>() .join("."); - let statement = PlanStatement::CreateFunction(CreateFunction { + + // + // convert resulting expression to data fusion expression + // + let arg_types = args.as_ref().map(|arg| { + arg.iter().map(|t| t.data_type.clone()).collect::>() + }); + let mut planner_context = PlannerContext::new() + .with_prepare_param_data_types(arg_types.unwrap_or_default()); + + let result_expression = match params.return_ { + Some(r) => Some(self.sql_to_expr( + r, + &DFSchema::empty(), + &mut planner_context, + )?), + None => None, + }; + + let params = CreateFunctionBody { + language: params.language, + behavior: params.behavior.map(|b| match b { + ast::FunctionBehavior::Immutable => Volatility::Immutable, + ast::FunctionBehavior::Stable => Volatility::Stable, + ast::FunctionBehavior::Volatile => Volatility::Volatile, + }), + as_: params.as_.map(|m| m.into()), + return_: result_expression, + }; + + let statement = DdlStatement::CreateFunction(CreateFunction { or_replace, temporary, name, @@ -679,7 +719,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { schema: DFSchemaRef::new(DFSchema::empty()), }); - Ok(LogicalPlan::Statement(statement)) + Ok(LogicalPlan::Ddl(statement)) } Statement::DropFunction { if_exists, @@ -702,12 +742,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .map(|i| i.value.to_owned()) .collect::>() .join("."); - let statement = PlanStatement::DropFunction(DropFunction { + let statement = DdlStatement::DropFunction(DropFunction { if_exists, name, schema: DFSchemaRef::new(DFSchema::empty()), }); - Ok(LogicalPlan::Statement(statement)) + Ok(LogicalPlan::Ddl(statement)) } else { Err(DataFusionError::Execution( "Function name not provided".into(), From a8d54b6f8fc0c3046f876b325af45d639d1e3009 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Thu, 29 Feb 2024 20:10:37 +0000 Subject: [PATCH 06/23] Address PR comments (factory interface) --- datafusion/core/src/execution/context/mod.rs | 79 ++++++++++++------- .../user_defined_scalar_functions.rs | 40 +++++----- 2 files changed, 73 insertions(+), 46 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b56e572d4787..ca281ac97c17 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -798,28 +798,48 @@ impl SessionContext { } async fn create_function(&self, stmt: CreateFunction) -> Result { - let function_factory = self.state.read().function_factory.clone(); + let function = { + let state = self.state.read().clone(); + let function_factory = &state.function_factory; + + match function_factory { + Some(f) => f.create(state.config(), stmt).await?, + _ => Err(DataFusionError::Configuration( + "Function factory has not been configured".into(), + ))?, + } + }; - match function_factory { - Some(f) => f.create(self.state.clone(), stmt).await?, - None => Err(DataFusionError::Configuration( - "Function factory has not been configured".into(), - ))?, + match function { + RegisterFunction::Scalar(f) => { + self.state.write().register_udf(f)?; + } + RegisterFunction::Aggregate(f) => { + self.state.write().register_udaf(f)?; + } + RegisterFunction::Window(f) => { + self.state.write().register_udwf(f)?; + } + RegisterFunction::Table(name, f) => self.register_udtf(&name, f), }; self.return_empty_dataframe() } async fn drop_function(&self, stmt: DropFunction) -> Result { - let function_factory = self.state.read().function_factory.clone(); - - match function_factory { - Some(f) => f.remove(self.state.clone(), stmt).await?, - None => Err(DataFusionError::Configuration( - "Function factory has not been configured".into(), - ))?, + let _function = { + let state = self.state.read().clone(); + let function_factory = &state.function_factory; + + match function_factory { + Some(f) => f.remove(state.config(), stmt).await?, + None => Err(DataFusionError::Configuration( + "Function factory has not been configured".into(), + ))?, + } }; + // TODO: Once we have unregister UDF we need to implement it here self.return_empty_dataframe() } @@ -1304,27 +1324,32 @@ impl QueryPlanner for DefaultQueryPlanner { /// ``` #[async_trait] pub trait FunctionFactory: Sync + Send { - // TODO: I don't like having RwLock Leaking here, who ever implements it - // has to depend ot `parking_lot`. I'f we expose &mut SessionState it - // may keep lock of too long. - // - // Not sure if there is better approach. - // - /// Handles creation of user defined function specified in [CreateFunction] statement async fn create( &self, - state: Arc>, + state: &SessionConfig, statement: CreateFunction, - ) -> Result<()>; + ) -> Result; /// Drops user defined function from [SessionState] - // Naming it `drop`` would make more sense but its already occupied in rust + // Naming it `drop` would make more sense but its already occupied in rust async fn remove( &self, - state: Arc>, + state: &SessionConfig, statement: DropFunction, - ) -> Result<()>; + ) -> Result; +} + +/// Type of function to create +pub enum RegisterFunction { + /// Scalar user defined function + Scalar(Arc), + /// Aggregate user defined function + Aggregate(Arc), + /// Window user defined function + Window(Arc), + /// Table user defined function + Table(String, Arc), } /// Execution context for registering data sources and executing queries. /// See [`SessionContext`] for a higher level API. @@ -1643,9 +1668,9 @@ impl SessionState { /// [`FunctionFactory`] trait. pub fn with_function_factory( mut self, - create_function_hook: Arc, + function_factory: Arc, ) -> Self { - self.function_factory = Some(create_function_hook); + self.function_factory = Some(function_factory); self } diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 845061cdab41..1c61342b6b7d 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -21,7 +21,7 @@ use arrow_array::{ }; use arrow_schema::DataType::Float64; use arrow_schema::{DataType, Field, Schema}; -use datafusion::execution::context::{FunctionFactory, SessionState}; +use datafusion::execution::context::{FunctionFactory, RegisterFunction, SessionState}; use datafusion::prelude::*; use datafusion::{execution::registry::FunctionRegistry, test_util}; use datafusion_common::cast::as_float64_array; @@ -34,7 +34,7 @@ use datafusion_expr::{ create_udaf, create_udf, Accumulator, ColumnarValue, CreateFunction, DropFunction, ExprSchemable, LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility, }; -use parking_lot::{Mutex, RwLock}; +use parking_lot::Mutex; use rand::{thread_rng, Rng}; use std::any::Any; use std::iter; @@ -652,9 +652,9 @@ impl FunctionFactory for MockFunctionFactory { #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)] async fn create( &self, - state: Arc>, + _config: &SessionConfig, statement: CreateFunction, - ) -> datafusion::error::Result<()> { + ) -> datafusion::error::Result { // this function is a mock for testing // `CreateFunction` should be used to derive this function @@ -691,22 +691,25 @@ impl FunctionFactory for MockFunctionFactory { // it has been parsed *self.captured_expr.lock() = statement.params.return_; - // we may need other infrastructure provided by state, for example: - // state.config().get_extension() - - // register mock udf for testing - state.write().register_udf(mock_udf.into())?; - Ok(()) + Ok(RegisterFunction::Scalar(Arc::new(mock_udf))) } async fn remove( &self, - _state: Arc>, + _config: &SessionConfig, _statement: DropFunction, - ) -> datafusion::error::Result<()> { - // at the moment state does not support unregister - // ignoring for now - Ok(()) + ) -> datafusion::error::Result { + // TODO: I don't like that remove returns RegisterFunction + // we have to keep two states in FunctionFactory iml and + // SessionState + // + // It would be better to return (function_name, function type) tuple + + // at the moment state does not support unregister user defined functions + + Err(DataFusionError::NotImplemented( + "remove function has not been implemented".into(), + )) } } @@ -738,15 +741,14 @@ async fn create_scalar_function_from_sql_statement() { .await .unwrap(); - // sql expression should be convert to datafusion expression - // in this case + // check if we sql expr has been converted to datafusion expr let captured_expression = function_factory.captured_expr.lock().clone().unwrap(); // is there some better way to test this assert_eq!("$1 + $2", captured_expression.to_string()); - println!("{:?}", captured_expression); - ctx.sql("drop function better_add").await.unwrap(); + // no support at the moment + // ctx.sql("drop function better_add").await.unwrap(); } fn create_udf_context() -> SessionContext { From 262f0f601a0623f58f7899714641b8eaa4bb16f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Thu, 29 Feb 2024 21:40:59 +0000 Subject: [PATCH 07/23] fix test after rebase --- .../core/tests/user_defined/user_defined_scalar_functions.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 1c61342b6b7d..d178fbea4c00 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -704,10 +704,10 @@ impl FunctionFactory for MockFunctionFactory { // SessionState // // It would be better to return (function_name, function type) tuple - + // // at the moment state does not support unregister user defined functions - Err(DataFusionError::NotImplemented( + Err(datafusion_common::DataFusionError::NotImplemented( "remove function has not been implemented".into(), )) } From fe63c31c6f78d05d50af93f21cc5f323e296a1f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 2 Mar 2024 07:28:45 +0000 Subject: [PATCH 08/23] `remove`'s gone from the trait ... ... `DROP FUNCTION` will look for function name in all available registries (udf, udaf, udwf). `remove` may be necessary if UDaF and UDwF do not get `simplify` method from #9304. --- datafusion/core/src/execution/context/mod.rs | 54 +++++++------------ .../user_defined_scalar_functions.rs | 45 ++++++---------- 2 files changed, 35 insertions(+), 64 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index ca281ac97c17..68fb82f83925 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -827,20 +827,23 @@ impl SessionContext { } async fn drop_function(&self, stmt: DropFunction) -> Result { - let _function = { - let state = self.state.read().clone(); - let function_factory = &state.function_factory; - - match function_factory { - Some(f) => f.remove(state.config(), stmt).await?, - None => Err(DataFusionError::Configuration( - "Function factory has not been configured".into(), - ))?, - } - }; - - // TODO: Once we have unregister UDF we need to implement it here - self.return_empty_dataframe() + // we don't know function type at this point + // decision has been made to drop all functions + let mut dropped = false; + dropped |= self.state.write().deregister_udf(&stmt.name)?.is_some(); + dropped |= self.state.write().deregister_udaf(&stmt.name)?.is_some(); + dropped |= self.state.write().deregister_udwf(&stmt.name)?.is_some(); + + // DROP FUNCTION IF EXISTS drops the specified function only if that + // function exists and in this way, it avoids error. While the DROP FUNCTION + // statement also performs the same function, it throws an + // error if the function does not exist. + + if !stmt.if_exists && !dropped { + Err(DataFusionError::Execution("Function does not exist".into())) + } else { + self.return_empty_dataframe() + } } /// Registers a variable provider within this context. @@ -1310,18 +1313,9 @@ impl QueryPlanner for DefaultQueryPlanner { .await } } -/// Crates and registers a function from [CreateFunction] statement -/// -/// It is intended to handle `CREATE FUNCTION` statements -/// and interact with [SessionState] to registers new udfs. -/// -/// Datafusion `SQL` dialect does not support `CREATE FUNCTION` -/// in generic dialect, so dialect should be changed to `PostgreSQL` -/// -/// ```rust, no_run -/// # use datafusion::execution::config::SessionConfig; -/// SessionConfig::new().set_str("datafusion.sql_parser.dialect", "PostgreSQL"); -/// ``` +/// A pluggable interface to handle `CREATE FUNCTION` statements +/// and interact with [SessionState] to registers new udf, udaf or udwf. + #[async_trait] pub trait FunctionFactory: Sync + Send { /// Handles creation of user defined function specified in [CreateFunction] statement @@ -1330,14 +1324,6 @@ pub trait FunctionFactory: Sync + Send { state: &SessionConfig, statement: CreateFunction, ) -> Result; - - /// Drops user defined function from [SessionState] - // Naming it `drop` would make more sense but its already occupied in rust - async fn remove( - &self, - state: &SessionConfig, - statement: DropFunction, - ) -> Result; } /// Type of function to create diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index d178fbea4c00..f6bea0160f1f 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -31,8 +31,8 @@ use datafusion_common::{ }; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_expr::{ - create_udaf, create_udf, Accumulator, ColumnarValue, CreateFunction, DropFunction, - ExprSchemable, LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility, + create_udaf, create_udf, Accumulator, ColumnarValue, CreateFunction, ExprSchemable, + LogicalPlanBuilder, ScalarUDF, ScalarUDFImpl, Signature, Volatility, }; use parking_lot::Mutex; use rand::{thread_rng, Rng}; @@ -693,28 +693,10 @@ impl FunctionFactory for MockFunctionFactory { Ok(RegisterFunction::Scalar(Arc::new(mock_udf))) } - - async fn remove( - &self, - _config: &SessionConfig, - _statement: DropFunction, - ) -> datafusion::error::Result { - // TODO: I don't like that remove returns RegisterFunction - // we have to keep two states in FunctionFactory iml and - // SessionState - // - // It would be better to return (function_name, function type) tuple - // - // at the moment state does not support unregister user defined functions - - Err(datafusion_common::DataFusionError::NotImplemented( - "remove function has not been implemented".into(), - )) - } } #[tokio::test] -async fn create_scalar_function_from_sql_statement() { +async fn create_scalar_function_from_sql_statement() -> Result<()> { let function_factory = Arc::new(MockFunctionFactory::default()); let runtime_config = RuntimeConfig::new(); let runtime_environment = RuntimeEnv::new(runtime_config).unwrap(); @@ -732,14 +714,9 @@ async fn create_scalar_function_from_sql_statement() { RETURNS DOUBLE RETURN $1 + $2 "#; - let _ = ctx.sql(sql).await.unwrap(); + let _ = ctx.sql(sql).await?; - ctx.sql("select better_add(2.0, 2.0)") - .await - .unwrap() - .show() - .await - .unwrap(); + ctx.sql("select better_add(2.0, 2.0)").await?.show().await?; // check if we sql expr has been converted to datafusion expr let captured_expression = function_factory.captured_expr.lock().clone().unwrap(); @@ -747,8 +724,16 @@ async fn create_scalar_function_from_sql_statement() { // is there some better way to test this assert_eq!("$1 + $2", captured_expression.to_string()); - // no support at the moment - // ctx.sql("drop function better_add").await.unwrap(); + // statement drops function + assert!(ctx.sql("drop function better_add").await.is_ok()); + // no function, it panics + assert!(ctx.sql("drop function better_add").await.is_err()); + // no function, it dies not care + assert!(ctx.sql("drop function if exists better_add").await.is_ok()); + // query should fail as there is no function + assert!(ctx.sql("select better_add(2.0, 2.0)").await.is_err()); + + Ok(()) } fn create_udf_context() -> SessionContext { From b975df7e491096188c421dc4aaabf9a06938d028 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 2 Mar 2024 09:01:26 +0000 Subject: [PATCH 09/23] Rename FunctionDefinition and export it ... FunctionDefinition already exists, DefinitionStatement makes more sense. --- datafusion/expr/src/logical_plan/ddl.rs | 6 +++--- datafusion/expr/src/logical_plan/mod.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 64072a3a4d1d..1ac4c67a20b0 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -354,18 +354,18 @@ pub struct CreateFunctionBody { /// IMMUTABLE | STABLE | VOLATILE pub behavior: Option, /// AS 'definition' - pub as_: Option, + pub as_: Option, /// RETURN expression pub return_: Option, } #[derive(Clone, PartialEq, Eq, Hash, Debug)] -pub enum FunctionDefinition { +pub enum DefinitionStatement { SingleQuotedDef(String), DoubleDollarDef(String), } -impl From for FunctionDefinition { +impl From for DefinitionStatement { fn from(value: sqlparser::ast::FunctionDefinition) -> Self { match value { sqlparser::ast::FunctionDefinition::SingleQuotedDef(s) => { diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index d93af3c468d7..84781cb2e9ec 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -29,8 +29,8 @@ pub use builder::{ }; pub use ddl::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, - CreateFunctionBody, CreateMemoryTable, CreateView, DdlStatement, DropCatalogSchema, - DropFunction, DropTable, DropView, OperateFunctionArg, + CreateFunctionBody, CreateMemoryTable, CreateView, DdlStatement, DefinitionStatement, + DropCatalogSchema, DropFunction, DropTable, DropView, OperateFunctionArg, }; pub use dml::{DmlStatement, WriteOp}; pub use plan::{ From 9d1d7155d9f665f06e1478dbd35eed40ecb0d67e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 2 Mar 2024 11:30:45 +0000 Subject: [PATCH 10/23] Update datafusion/expr/src/logical_plan/ddl.rs Co-authored-by: Andrew Lamb --- datafusion/expr/src/logical_plan/ddl.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 1ac4c67a20b0..b4a7e957dc88 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -340,8 +340,7 @@ pub struct CreateFunction { } #[derive(Clone, PartialEq, Eq, Hash, Debug)] pub struct OperateFunctionArg { - // it is not really supported so no need to have it here - // currently + // TODO: figure out how to support mode // pub mode: Option, pub name: Option, pub data_type: DataType, From 7e7d89687a210af4380fb4e74e5aa2a279aa8e29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 2 Mar 2024 11:30:57 +0000 Subject: [PATCH 11/23] Update datafusion/core/src/execution/context/mod.rs Co-authored-by: Andrew Lamb --- datafusion/core/src/execution/context/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 68fb82f83925..47b56439751d 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1650,8 +1650,7 @@ impl SessionState { self } - /// Registers `CREATE FUNCTION` statement handler implementing - /// [`FunctionFactory`] trait. + /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` statements pub fn with_function_factory( mut self, function_factory: Arc, From 0430c116c43b55f64390111e88a0f7bbfea5da39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 2 Mar 2024 11:31:07 +0000 Subject: [PATCH 12/23] Update datafusion/core/tests/user_defined/user_defined_scalar_functions.rs Co-authored-by: Andrew Lamb --- .../tests/user_defined/user_defined_scalar_functions.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index f6bea0160f1f..5048d8e07243 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -655,9 +655,10 @@ impl FunctionFactory for MockFunctionFactory { _config: &SessionConfig, statement: CreateFunction, ) -> datafusion::error::Result { - // this function is a mock for testing - // `CreateFunction` should be used to derive this function - + // In this example, we always create a function that adds its arguments + // with the name specified in `CREATE FUNCTION`. In a real implementation + // the body of the created UDF would also likely be a function of the contents + // of the `CreateFunction` let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| { let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?; let base = From 210b194a52003b6c0f09573e651c59f078206782 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 2 Mar 2024 11:31:44 +0000 Subject: [PATCH 13/23] Update datafusion/expr/src/logical_plan/ddl.rs Co-authored-by: Andrew Lamb --- datafusion/expr/src/logical_plan/ddl.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index b4a7e957dc88..8a58f9e69217 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -322,6 +322,9 @@ pub struct DropCatalogSchema { pub schema: DFSchemaRef, } +/// Arguments passed to `CREATE FUNCTION` +/// +/// Note this meant to be the same as from sqlparser's [`sqlparser::ast::Statement::CreateFunction`] #[derive(Clone, PartialEq, Eq, Hash, Debug)] pub struct CreateFunction { // TODO: There is open question should we expose sqlparser types or redefine them here? From 1d5d7391fef1b0d33a51cfffd1f470665d923e87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 2 Mar 2024 12:00:14 +0000 Subject: [PATCH 14/23] resolve part of follow up comments --- .../user_defined/user_defined_scalar_functions.rs | 15 ++++++++++++--- datafusion/expr/src/logical_plan/ddl.rs | 2 -- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 5048d8e07243..af06c345c3a3 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -709,20 +709,29 @@ async fn create_scalar_function_from_sql_statement() -> Result<()> { .with_function_factory(function_factory.clone()); let ctx = SessionContext::new_with_state(state); + let options = SQLOptions::new().with_allow_ddl(false); let sql = r#" CREATE FUNCTION better_add(DOUBLE, DOUBLE) RETURNS DOUBLE RETURN $1 + $2 "#; - let _ = ctx.sql(sql).await?; + + // try to create function when sql options have allow ddl disabled + assert!(ctx.sql_with_options(sql, options.clone()).await.is_err()); + + // Create the `better_add` function dynamically via CREATE FUNCTION statement + assert!(ctx.sql(sql).await.is_ok()); + // try to drop function when sql options have allow ddl disabled + assert!(ctx + .sql_with_options("drop function better_add", options.clone()) + .await + .is_err()); ctx.sql("select better_add(2.0, 2.0)").await?.show().await?; // check if we sql expr has been converted to datafusion expr let captured_expression = function_factory.captured_expr.lock().clone().unwrap(); - - // is there some better way to test this assert_eq!("$1 + $2", captured_expression.to_string()); // statement drops function diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 8a58f9e69217..968c40c8bf62 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -335,9 +335,7 @@ pub struct CreateFunction { pub name: String, pub args: Option>, pub return_type: Option, - // TODO: move this to new struct here pub params: CreateFunctionBody, - //pub body: String, /// Dummy schema pub schema: DFSchemaRef, } From 84b0fbd67d2688582472aa8b74a650344423560e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 2 Mar 2024 12:28:25 +0000 Subject: [PATCH 15/23] Qualified functions are not supported anymore --- datafusion/sql/src/statement.rs | 45 +++++++++++++++------------------ 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 0918842a1cb8..0ca261497d3a 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -668,18 +668,16 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } None => None, }; - - // Not sure if this is correct way to generate name - // postgresql function definition may have schema part as well - // datafusion at the moment does lookup based on given string - // `schema_name.function_name` will work even if there is no `schema_name` - let name: String = name - .0 - .into_iter() - .map(|i| i.value) - .collect::>() - .join("."); - + // at the moment functions can't be qualified `schema.name` + let name = match &name.0[..] { + [] => Err(DataFusionError::Execution( + "Function should have name".into(), + ))?, + [n] => n.value.clone(), + [..] => Err(DataFusionError::NotImplemented( + "Qualified functions are not supported".into(), + ))?, + }; // // convert resulting expression to data fusion expression // @@ -726,22 +724,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { func_desc, .. } => { - // Not sure if this is correct way to generate name - // postgresql function definition may have schema part as well - // datafusion at the moment does lookup based on given string - // `schema_name.function_name` will work even if there is no `schema_name` - // according to postgresql documentation it can be only one function // specified in drop statement - if let Some(desc) = func_desc.first() { - let name: String = desc - .name - .0 - .iter() - .map(|i| i.value.to_owned()) - .collect::>() - .join("."); + // at the moment functions can't be qualified `schema.name` + let name = match &desc.name.0[..] { + [] => Err(DataFusionError::Execution( + "Function should have name".into(), + ))?, + [n] => n.value.clone(), + [..] => Err(DataFusionError::NotImplemented( + "Qualified functions are not supported".into(), + ))?, + }; let statement = DdlStatement::DropFunction(DropFunction { if_exists, name, From b64257058aaee654bbf06d1052202f6a8e9355bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 2 Mar 2024 12:33:15 +0000 Subject: [PATCH 16/23] update docs and todos --- datafusion/core/src/execution/context/mod.rs | 4 ++-- .../core/tests/user_defined/user_defined_scalar_functions.rs | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 47b56439751d..3d7f2c5798f0 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1381,10 +1381,10 @@ pub struct SessionState { table_factories: HashMap>, /// Runtime environment runtime_env: Arc, + /// [FunctionFactory] to support pluggable user defined function handler. - /// It is invoked on `CREATE FUNCTION` and `DROP FUNCTION` statements. /// - /// Datafusion generic SQL dialect does not support `CRETE FUNCTION` statement + /// It will be invoked on `CREATE FUNCTION` statements. /// thus, changing dialect o PostgreSql is required function_factory: Option>, } diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index af06c345c3a3..f60a24246653 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -701,9 +701,10 @@ async fn create_scalar_function_from_sql_statement() -> Result<()> { let function_factory = Arc::new(MockFunctionFactory::default()); let runtime_config = RuntimeConfig::new(); let runtime_environment = RuntimeEnv::new(runtime_config).unwrap(); + + // TODO: remove dialect once new version of sql parser arrives let session_config = SessionConfig::new().set_str("datafusion.sql_parser.dialect", "PostgreSQL"); - let state = SessionState::new_with_config_rt(session_config, Arc::new(runtime_environment)) .with_function_factory(function_factory.clone()); From b8f8991fc65a5c109db042265ae0db5d77a5f9a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 2 Mar 2024 13:21:44 +0000 Subject: [PATCH 17/23] fix clippy --- .../tests/user_defined/user_defined_scalar_functions.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index f60a24246653..9f88efc1f6f8 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -700,7 +700,7 @@ impl FunctionFactory for MockFunctionFactory { async fn create_scalar_function_from_sql_statement() -> Result<()> { let function_factory = Arc::new(MockFunctionFactory::default()); let runtime_config = RuntimeConfig::new(); - let runtime_environment = RuntimeEnv::new(runtime_config).unwrap(); + let runtime_environment = RuntimeEnv::new(runtime_config)?; // TODO: remove dialect once new version of sql parser arrives let session_config = @@ -719,13 +719,13 @@ async fn create_scalar_function_from_sql_statement() -> Result<()> { "#; // try to create function when sql options have allow ddl disabled - assert!(ctx.sql_with_options(sql, options.clone()).await.is_err()); + assert!(ctx.sql_with_options(sql, options).await.is_err()); // Create the `better_add` function dynamically via CREATE FUNCTION statement assert!(ctx.sql(sql).await.is_ok()); // try to drop function when sql options have allow ddl disabled assert!(ctx - .sql_with_options("drop function better_add", options.clone()) + .sql_with_options("drop function better_add", options) .await .is_err()); From 5a9ad09f165c6a4f87d47f749eef1e6fdf693eee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Sat, 2 Mar 2024 20:14:49 +0000 Subject: [PATCH 18/23] address additional comments --- datafusion/core/src/execution/context/mod.rs | 2 +- datafusion/core/tests/sql/sql_api.rs | 1 - .../user_defined_scalar_functions.rs | 17 +++++++++++ datafusion/sql/src/statement.rs | 28 ++++++------------- 4 files changed, 27 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 3d7f2c5798f0..d1477230b0f3 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -840,7 +840,7 @@ impl SessionContext { // error if the function does not exist. if !stmt.if_exists && !dropped { - Err(DataFusionError::Execution("Function does not exist".into())) + exec_err!("Function does not exist") } else { self.return_empty_dataframe() } diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index cc8ea3118284..d7adc9611b2f 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -16,7 +16,6 @@ // under the License. use datafusion::prelude::*; - use tempfile::TempDir; #[tokio::test] diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index 9f88efc1f6f8..dc3d8931d24a 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -744,6 +744,23 @@ async fn create_scalar_function_from_sql_statement() -> Result<()> { // query should fail as there is no function assert!(ctx.sql("select better_add(2.0, 2.0)").await.is_err()); + // tests expression parsing + // if expression is not correct + let bad_expression_sql = r#" + CREATE FUNCTION bad_expression_fun(DOUBLE, DOUBLE) + RETURNS DOUBLE + RETURN $1 $3 + "#; + assert!(ctx.sql(bad_expression_sql).await.is_err()); + + // tests bad function definition + let bad_definition_sql = r#" + CREATE FUNCTION bad_definition_fun(DOUBLE, DOUBLE) + RET BAD_TYPE + RETURN $1 + $3 + "#; + assert!(ctx.sql(bad_definition_sql).await.is_err()); + Ok(()) } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 0ca261497d3a..35063a6cfa06 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -31,10 +31,10 @@ use arrow_schema::DataType; use datafusion_common::file_options::StatementOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ - not_impl_err, plan_datafusion_err, plan_err, schema_err, unqualified_field_not_found, - Column, Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, - OwnedTableReference, Result, ScalarValue, SchemaError, SchemaReference, - TableReference, ToDFSchema, + exec_err, not_impl_err, plan_datafusion_err, plan_err, schema_err, + unqualified_field_not_found, Column, Constraints, DFField, DFSchema, DFSchemaRef, + DataFusionError, OwnedTableReference, Result, ScalarValue, SchemaError, + SchemaReference, TableReference, ToDFSchema, }; use datafusion_expr::dml::{CopyOptions, CopyTo}; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; @@ -670,13 +670,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; // at the moment functions can't be qualified `schema.name` let name = match &name.0[..] { - [] => Err(DataFusionError::Execution( - "Function should have name".into(), - ))?, + [] => exec_err!("Function should have name")?, [n] => n.value.clone(), - [..] => Err(DataFusionError::NotImplemented( - "Qualified functions are not supported".into(), - ))?, + [..] => not_impl_err!("Qualified functions are not supported")?, }; // // convert resulting expression to data fusion expression @@ -729,13 +725,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { if let Some(desc) = func_desc.first() { // at the moment functions can't be qualified `schema.name` let name = match &desc.name.0[..] { - [] => Err(DataFusionError::Execution( - "Function should have name".into(), - ))?, + [] => exec_err!("Function should have name")?, [n] => n.value.clone(), - [..] => Err(DataFusionError::NotImplemented( - "Qualified functions are not supported".into(), - ))?, + [..] => not_impl_err!("Qualified functions are not supported")?, }; let statement = DdlStatement::DropFunction(DropFunction { if_exists, @@ -744,9 +736,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }); Ok(LogicalPlan::Ddl(statement)) } else { - Err(DataFusionError::Execution( - "Function name not provided".into(), - )) + exec_err!("Function name not provided") } } _ => { From 58479e3c1e5f9177c19dbb57ca1dd50cbf321048 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 3 Mar 2024 02:50:17 -0500 Subject: [PATCH 19/23] Add sqllogicteset for CREATE/DROP function --- .../test_files/create_function.slt | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 datafusion/sqllogictest/test_files/create_function.slt diff --git a/datafusion/sqllogictest/test_files/create_function.slt b/datafusion/sqllogictest/test_files/create_function.slt new file mode 100644 index 000000000000..a64e7f9ff1ec --- /dev/null +++ b/datafusion/sqllogictest/test_files/create_function.slt @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +## SQL tests for CREATE / DROP FUNCTION +## +## Note that DataFusion provides a pluggable system for creating functions +## but has no built in support for doing so. + +# Use PostgresSQL dialect (until we upgrade to sqlparser 0.44, where CREATE FUNCTION) +# is supported in the Generic dialect (the default) +statement ok +set datafusion.sql_parser.dialect = 'Postgres'; + +# Create function will fail unless a user supplied function factory is supplied +statement error DataFusion error: Invalid or Unsupported Configuration: Function factory has not been configured +CREATE FUNCTION foo (DOUBLE) RETURNS DOUBLE RETURN $1 + $2; + +# multi-part identifiers are not supported +statement error DataFusion error: This feature is not implemented: Qualified functions are not supported +CREATE FUNCTION foo.bar (DOUBLE) RETURNS DOUBLE RETURN $1 + $2; + +statement error DataFusion error: This feature is not implemented: Qualified functions are not supported +DROP FUNCTION foo.bar; + + +# Show it is possible to drop existing (UDF) functions +query I +select abs(-1); +---- +1 + +# drop the function +statement ok +DROP FUNCTION abs; + +# now the the query errors +query error +select abs(-1); +---- +DataFusion error: Error during planning: Invalid function 'abs'. +Did you mean 'cos'? + + +# Can't drop the function again +statement error DataFusion error: Execution error: Function does not exist +DROP FUNCTION abs; From 83acc8cec0e93aac8ba1788785a04e203a55e659 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 3 Mar 2024 03:33:16 -0500 Subject: [PATCH 20/23] Add coverage for DROP FUNCTION IF EXISTS --- datafusion/sqllogictest/test_files/create_function.slt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/sqllogictest/test_files/create_function.slt b/datafusion/sqllogictest/test_files/create_function.slt index a64e7f9ff1ec..3e56a8a360b2 100644 --- a/datafusion/sqllogictest/test_files/create_function.slt +++ b/datafusion/sqllogictest/test_files/create_function.slt @@ -59,3 +59,7 @@ Did you mean 'cos'? # Can't drop the function again statement error DataFusion error: Execution error: Function does not exist DROP FUNCTION abs; + +# But DROP IF EXISTS does not error +statement ok +DROP FUNCTION IF EXISTS abs; From 383602c52f9f022c85de0dfbbf1ce709ccce4658 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 3 Mar 2024 03:35:07 -0500 Subject: [PATCH 21/23] fix multiline error --- datafusion/sqllogictest/test_files/create_function.slt | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/datafusion/sqllogictest/test_files/create_function.slt b/datafusion/sqllogictest/test_files/create_function.slt index 3e56a8a360b2..baa40ac64afc 100644 --- a/datafusion/sqllogictest/test_files/create_function.slt +++ b/datafusion/sqllogictest/test_files/create_function.slt @@ -37,7 +37,6 @@ CREATE FUNCTION foo.bar (DOUBLE) RETURNS DOUBLE RETURN $1 + $2; statement error DataFusion error: This feature is not implemented: Qualified functions are not supported DROP FUNCTION foo.bar; - # Show it is possible to drop existing (UDF) functions query I select abs(-1); @@ -49,12 +48,8 @@ statement ok DROP FUNCTION abs; # now the the query errors -query error +query error Invalid function 'abs'. select abs(-1); ----- -DataFusion error: Error during planning: Invalid function 'abs'. -Did you mean 'cos'? - # Can't drop the function again statement error DataFusion error: Execution error: Function does not exist From 00b805834228fd1fdad4f58353d301982b9d932b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marko=20Milenkovi=C4=87?= Date: Tue, 5 Mar 2024 09:19:01 +0000 Subject: [PATCH 22/23] revert dialect back to generic in test ... ... as `create function` gets support in latest sqlparser. --- .../tests/user_defined/user_defined_scalar_functions.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index dc3d8931d24a..2a734b6e785c 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -702,9 +702,7 @@ async fn create_scalar_function_from_sql_statement() -> Result<()> { let runtime_config = RuntimeConfig::new(); let runtime_environment = RuntimeEnv::new(runtime_config)?; - // TODO: remove dialect once new version of sql parser arrives - let session_config = - SessionConfig::new().set_str("datafusion.sql_parser.dialect", "PostgreSQL"); + let session_config = SessionConfig::new(); let state = SessionState::new_with_config_rt(session_config, Arc::new(runtime_environment)) .with_function_factory(function_factory.clone()); @@ -718,12 +716,12 @@ async fn create_scalar_function_from_sql_statement() -> Result<()> { RETURN $1 + $2 "#; - // try to create function when sql options have allow ddl disabled + // try to `create function` when sql options have allow ddl disabled assert!(ctx.sql_with_options(sql, options).await.is_err()); // Create the `better_add` function dynamically via CREATE FUNCTION statement assert!(ctx.sql(sql).await.is_ok()); - // try to drop function when sql options have allow ddl disabled + // try to `drop function` when sql options have allow ddl disabled assert!(ctx .sql_with_options("drop function better_add", options) .await From 8a0f42f6c86e40d3f87d7b96c03ac5bad46f5210 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 5 Mar 2024 17:32:27 -0500 Subject: [PATCH 23/23] fmt --- .../tests/user_defined/user_defined_scalar_functions.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index e14a0f7c1f14..d9b60134b3d9 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -16,7 +16,10 @@ // under the License. use arrow::compute::kernels::numeric::add; -use arrow_array::{Array, ArrayRef, ArrowNativeTypeOp, Float32Array, Float64Array, Int32Array, RecordBatch, UInt8Array}; +use arrow_array::{ + Array, ArrayRef, ArrowNativeTypeOp, Float32Array, Float64Array, Int32Array, + RecordBatch, UInt8Array, +}; use arrow_schema::DataType::Float64; use arrow_schema::{DataType, Field, Schema}; use datafusion::execution::context::{FunctionFactory, RegisterFunction, SessionState}; @@ -35,11 +38,11 @@ use datafusion_expr::{ }; use parking_lot::Mutex; +use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use rand::{thread_rng, Rng}; use std::any::Any; use std::iter; use std::sync::Arc; -use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; /// test that casting happens on udfs. /// c11 is f32, but `custom_sqrt` requires f64. Casting happens but the logical plan and