From 9be85fcc56f2d8c11a23024c507313b8c41862c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 10 Aug 2021 16:28:56 +0200 Subject: [PATCH 01/12] e --- python/Cargo.toml | 6 +- python/datafusion/__init__.py | 1 + python/{ => datafusion}/tests/__init__.py | 0 python/{ => datafusion}/tests/generic.py | 0 python/{ => datafusion}/tests/test_df.py | 0 .../tests/test_math_functions.py | 0 .../{ => datafusion}/tests/test_pa_types.py | 0 python/{ => datafusion}/tests/test_sql.py | 0 .../tests/test_string_functions.py | 0 python/{ => datafusion}/tests/test_udaf.py | 0 python/pyproject.toml | 14 ++ python/rust-toolchain | 2 +- python/src/catalog.rs | 1 + python/src/context.rs | 4 +- python/src/dataframe.rs | 27 +-- python/src/errors.rs | 19 +- python/src/lib.rs | 5 +- python/src/pyarrow.rs | 187 ++++++++++++++++++ python/src/to_py.rs | 75 ------- python/src/to_rust.rs | 57 ------ python/src/types.rs | 1 + python/src/udaf.rs | 1 - python/src/udf.rs | 11 +- 23 files changed, 246 insertions(+), 165 deletions(-) create mode 100644 python/datafusion/__init__.py rename python/{ => datafusion}/tests/__init__.py (100%) rename python/{ => datafusion}/tests/generic.py (100%) rename python/{ => datafusion}/tests/test_df.py (100%) rename python/{ => datafusion}/tests/test_math_functions.py (100%) rename python/{ => datafusion}/tests/test_pa_types.py (100%) rename python/{ => datafusion}/tests/test_sql.py (100%) rename python/{ => datafusion}/tests/test_string_functions.py (100%) rename python/{ => datafusion}/tests/test_udaf.py (100%) create mode 100644 python/src/catalog.rs create mode 100644 python/src/pyarrow.rs delete mode 100644 python/src/to_py.rs diff --git a/python/Cargo.toml b/python/Cargo.toml index fe84e5234c33..83973cc25627 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -30,16 +30,16 @@ edition = "2018" libc = "0.2" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } rand = "0.7" -pyo3 = { version = "0.14.1", features = ["extension-module"] } +pyo3 = { version = "0.14.2", features = ["extension-module"] } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "4d61196dee8526998aee7e7bb10ea88422e5f9e1" } [lib] -name = "datafusion" +name = "internals" crate-type = ["cdylib"] [package.metadata.maturin] +name = "datafusion.internals" requires-dist = ["pyarrow>=1"] - classifier = [ "Development Status :: 2 - Pre-Alpha", "Intended Audience :: Developers", diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py new file mode 100644 index 000000000000..a09dca093f54 --- /dev/null +++ b/python/datafusion/__init__.py @@ -0,0 +1 @@ +from .internals import * diff --git a/python/tests/__init__.py b/python/datafusion/tests/__init__.py similarity index 100% rename from python/tests/__init__.py rename to python/datafusion/tests/__init__.py diff --git a/python/tests/generic.py b/python/datafusion/tests/generic.py similarity index 100% rename from python/tests/generic.py rename to python/datafusion/tests/generic.py diff --git a/python/tests/test_df.py b/python/datafusion/tests/test_df.py similarity index 100% rename from python/tests/test_df.py rename to python/datafusion/tests/test_df.py diff --git a/python/tests/test_math_functions.py b/python/datafusion/tests/test_math_functions.py similarity index 100% rename from python/tests/test_math_functions.py rename to python/datafusion/tests/test_math_functions.py diff --git a/python/tests/test_pa_types.py b/python/datafusion/tests/test_pa_types.py similarity index 100% rename from python/tests/test_pa_types.py rename to python/datafusion/tests/test_pa_types.py diff --git a/python/tests/test_sql.py b/python/datafusion/tests/test_sql.py similarity index 100% rename from python/tests/test_sql.py rename to python/datafusion/tests/test_sql.py diff --git a/python/tests/test_string_functions.py b/python/datafusion/tests/test_string_functions.py similarity index 100% rename from python/tests/test_string_functions.py rename to python/datafusion/tests/test_string_functions.py diff --git a/python/tests/test_udaf.py b/python/datafusion/tests/test_udaf.py similarity index 100% rename from python/tests/test_udaf.py rename to python/datafusion/tests/test_udaf.py diff --git a/python/pyproject.toml b/python/pyproject.toml index 1482129897fa..401426509c37 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -18,3 +18,17 @@ [build-system] requires = ["maturin>=0.11,<0.12"] build-backend = "maturin" + +[project] +name = "ebtelep" +dependencies = [ + "pyarrow" +] + +[tool.isort] +profile = "black" + +# [tool.mypy] +# ignore_missing_imports = true +# disallow_untyped_defs = true +# files = "datafusion" diff --git a/python/rust-toolchain b/python/rust-toolchain index 6231a95e3036..2bf5ad0447d3 100644 --- a/python/rust-toolchain +++ b/python/rust-toolchain @@ -1 +1 @@ -nightly-2021-05-10 +stable diff --git a/python/src/catalog.rs b/python/src/catalog.rs new file mode 100644 index 000000000000..8b137891791f --- /dev/null +++ b/python/src/catalog.rs @@ -0,0 +1 @@ + diff --git a/python/src/context.rs b/python/src/context.rs index 9acc14a5e260..ef8c45deb8e2 100644 --- a/python/src/context.rs +++ b/python/src/context.rs @@ -24,6 +24,7 @@ use rand::Rng; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; +use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::MemTable; use datafusion::execution::context::ExecutionContext as _ExecutionContext; @@ -32,6 +33,7 @@ use datafusion::prelude::CsvReadOptions; use crate::dataframe; use crate::errors; use crate::functions; +use crate::pyarrow::PyArrowConvert; use crate::to_rust; use crate::types::PyDataType; @@ -121,7 +123,7 @@ impl ExecutionContext { .to_str() .ok_or(PyValueError::new_err("Unable to convert path to a string"))?; let schema = match schema { - Some(s) => Some(to_rust::to_rust_schema(s)?), + Some(s) => Some(Schema::from_pyarrow(s)?), None => None, }; let delimiter = delimiter.as_bytes(); diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs index 4a50262ec329..2c5f9cb3c00c 100644 --- a/python/src/dataframe.rs +++ b/python/src/dataframe.rs @@ -18,7 +18,10 @@ use std::sync::{Arc, Mutex}; use logical_plan::LogicalPlan; -use pyo3::{prelude::*, types::PyTuple}; +use pyo3::{ + prelude::*, + types::{PyList, PyTuple}, +}; use tokio::runtime::Runtime; use datafusion::execution::context::ExecutionContext as _ExecutionContext; @@ -26,7 +29,7 @@ use datafusion::logical_plan::{JoinType, LogicalPlanBuilder}; use datafusion::physical_plan::collect; use datafusion::{execution::context::ExecutionContextState, logical_plan}; -use crate::{errors, to_py}; +use crate::errors; use crate::{errors::DataFusionError, expression}; /// A DataFrame is a representation of a logical plan and an API to compose statements. @@ -121,22 +124,22 @@ impl DataFrame { /// Unless some order is specified in the plan, there is no guarantee of the order of the result fn collect(&self, py: Python) -> PyResult { let ctx = _ExecutionContext::from(self.ctx_state.clone()); - let plan = ctx - .optimize(&self.plan) - .map_err(|e| -> errors::DataFusionError { e.into() })?; + let plan = ctx.optimize(&self.plan).map_err(DataFusionError::from)?; let plan = ctx .create_physical_plan(&plan) - .map_err(|e| -> errors::DataFusionError { e.into() })?; + .map_err(DataFusionError::from)?; let rt = Runtime::new().unwrap(); let batches = py.allow_threads(|| { - rt.block_on(async { - collect(plan) - .await - .map_err(|e| -> errors::DataFusionError { e.into() }) - }) + rt.block_on(async { collect(plan).await.map_err(DataFusionError::from) }) })?; - to_py::to_py(&batches) + + let mut py_batches = vec![]; + for batch in batches { + py_batches.push(to_py_batch(batch, py, pyarrow)?); + } + let py_list = PyList::new(py, py_batches); + Ok(PyObject::from(py_list)) } /// Returns the join of two DataFrames `on`. diff --git a/python/src/errors.rs b/python/src/errors.rs index fbe98037a030..cf99c1ccbae8 100644 --- a/python/src/errors.rs +++ b/python/src/errors.rs @@ -16,10 +16,15 @@ // under the License. use core::fmt; +//use std::result::Result; use datafusion::arrow::error::ArrowError; use datafusion::error::DataFusionError as InnerDataFusionError; -use pyo3::{exceptions, PyErr}; +use pyo3::{ + exceptions::{PyException, PyRuntimeError}, + prelude::*, + PyErr, +}; #[derive(Debug)] pub enum DataFusionError { @@ -38,9 +43,9 @@ impl fmt::Display for DataFusionError { } } -impl From for PyErr { - fn from(err: DataFusionError) -> PyErr { - exceptions::PyException::new_err(err.to_string()) +impl From for DataFusionError { + fn from(err: ArrowError) -> DataFusionError { + DataFusionError::ArrowError(err) } } @@ -50,9 +55,9 @@ impl From for DataFusionError { } } -impl From for DataFusionError { - fn from(err: ArrowError) -> DataFusionError { - DataFusionError::ArrowError(err) +impl From for PyErr { + fn from(err: DataFusionError) -> PyErr { + PyException::new_err(err.to_string()) } } diff --git a/python/src/lib.rs b/python/src/lib.rs index aecfe9994cd1..5102fa003b86 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -17,13 +17,14 @@ use pyo3::prelude::*; +mod catalog; mod context; mod dataframe; mod errors; mod expression; mod functions; +mod pyarrow; mod scalar; -mod to_py; mod to_rust; mod types; mod udaf; @@ -31,7 +32,7 @@ mod udf; /// DataFusion. #[pymodule] -fn datafusion(py: Python, m: &PyModule) -> PyResult<()> { +fn internals(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/python/src/pyarrow.rs b/python/src/pyarrow.rs new file mode 100644 index 000000000000..339d2be1277f --- /dev/null +++ b/python/src/pyarrow.rs @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::convert::TryFrom; +use std::error; + +use libc::uintptr_t; +use pyo3::prelude::*; +use pyo3::wrap_pyfunction; + +use datafusion::arrow::array::{make_array_from_raw, ArrayRef}; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::arrow::ffi; +use datafusion::arrow::ffi::FFI_ArrowSchema; +use datafusion::arrow::record_batch::RecordBatch; + +use crate::errors::DataFusionError; + +pub trait PyArrowConvert: Sized { + fn from_pyarrow(value: &PyAny) -> PyResult; + fn to_pyarrow(&self, py: Python) -> PyResult; +} + +impl PyArrowConvert for DataType { + fn from_pyarrow(value: &PyAny) -> PyResult { + let c_schema = FFI_ArrowSchema::empty(); + let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; + value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?; + let dtype = DataType::try_from(&c_schema).map_err(DataFusionError::from)?; + Ok(dtype) + } + + fn to_pyarrow(&self, py: Python) -> PyResult { + let c_schema = FFI_ArrowSchema::try_from(self).map_err(DataFusionError::from)?; + let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; + let cls = py.import("pyarrow.DataType")?; + let dtype = cls.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?; + Ok(dtype.into()) + } +} + +impl PyArrowConvert for Field { + fn from_pyarrow(value: &PyAny) -> PyResult { + let c_schema = FFI_ArrowSchema::empty(); + let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; + value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?; + let field = Field::try_from(&c_schema).map_err(DataFusionError::from)?; + Ok(field) + } + + fn to_pyarrow(&self, py: Python) -> PyResult { + let c_schema = FFI_ArrowSchema::try_from(self).map_err(DataFusionError::from)?; + let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; + let cls = py.import("pyarrow.Field")?; + let dtype = cls.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?; + Ok(dtype.into()) + } +} + +impl PyArrowConvert for Schema { + fn from_pyarrow(value: &PyAny) -> PyResult { + let c_schema = FFI_ArrowSchema::empty(); + let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; + value.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?; + let schema = Schema::try_from(&c_schema).map_err(DataFusionError::from)?; + Ok(schema) + } + + fn to_pyarrow(&self, py: Python) -> PyResult { + let c_schema = FFI_ArrowSchema::try_from(self).map_err(DataFusionError::from)?; + let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; + let cls = py.import("pyarrow.Schema")?; + let schema = cls.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?; + Ok(schema.into()) + } +} + +impl PyArrowConvert for ArrayRef { + fn from_pyarrow(value: &PyAny) -> PyResult { + // prepare a pointer to receive the Array struct + let (array_pointer, schema_pointer) = + ffi::ArrowArray::into_raw(unsafe { ffi::ArrowArray::empty() }); + + // make the conversion through PyArrow's private API + // this changes the pointer's memory and is thus unsafe. + // In particular, `_export_to_c` can go out of bounds + value.call_method1( + "_export_to_c", + (array_pointer as uintptr_t, schema_pointer as uintptr_t), + )?; + + let array = unsafe { make_array_from_raw(array_pointer, schema_pointer) } + .map_err(DataFusionError::from)?; + Ok(array) + } + + fn to_pyarrow(&self, py: Python) -> PyResult { + let (array_pointer, schema_pointer) = + self.to_raw().map_err(DataFusionError::from)?; + + let cls = py.import("pyarrow.Array")?; + let array = cls.call_method1( + "_import_from_c", + (array_pointer as uintptr_t, schema_pointer as uintptr_t), + )?; + Ok(array.to_object(py)) + } +} + +impl PyArrowConvert for RecordBatch { + fn from_pyarrow(value: &PyAny) -> PyResult { + // TODO(kszucs): implement the FFI conversions in arrow-rs for RecordBatches + let schema = value.getattr("schema")?; + let arrays = value.getattr("columns")?; + + // let names = schema.getattr("names")?.extract::>()?; + + // let fields = names + // .iter() + // .enumerate() + // .map(|(i, name)| { + // let field = schema.call_method1("field", (i,))?; + // let nullable = field.getattr("nullable")?.extract::()?; + // let py_data_type = field.getattr("type")?; + // let data_type = py_data_type.extract::()?.data_type; + // Ok(Field::new(name, data_type, nullable)) + // }) + // .collect::>()?; + + let schema = Schema::from_pyarrow(schema)?; + + let arrays = (0..names.len()) + .map(|i| { + let array = batch.call_method1("column", (i,))?; + to_rust(array) + }) + .collect::>()?; + + let batch = + RecordBatch::try_new(schema, arrays).map_err(DataFusionError::from)?; + Ok(batch) + } + + fn to_pyarrow(&self, py: Python) -> PyResult { + let mut py_arrays = vec![]; + let mut py_names = vec![]; + + let columns = self.columns().iter(); + let fields = self.schema().fields().iter(); + + for (array, field) in columns.zip(fields) { + py_arrays.push(array.to_pyarrow(py)?); + py_names.push(field.name()); + } + + let cls = py.import("pyarrow.RecordBatch")?; + let record = cls.call_method1("from_arrays", (py_arrays, py_names))?; + + Ok(PyObject::from(record)) + } + + // fn to_pyarrow(batches: &[RecordBatch]) -> PyResult { + // Python::with_gil(|py| { + // let pyarrow = PyModule::import(py, "pyarrow")?; + // let mut py_batches = vec![]; + // for batch in batches { + // py_batches.push(to_py_batch(batch, py, pyarrow)?); + // } + // let list = PyList::new(py, py_batches); + // Ok(PyObject::from(list)) + // }) + // } +} diff --git a/python/src/to_py.rs b/python/src/to_py.rs deleted file mode 100644 index 6bc0581c8c70..000000000000 --- a/python/src/to_py.rs +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use datafusion::arrow::array::ArrayRef; -use datafusion::arrow::record_batch::RecordBatch; -use libc::uintptr_t; -use pyo3::prelude::*; -use pyo3::types::PyList; -use pyo3::PyErr; -use std::convert::From; - -use crate::errors; - -pub fn to_py_array(array: &ArrayRef, py: Python) -> PyResult { - let (array_pointer, schema_pointer) = - array.to_raw().map_err(errors::DataFusionError::from)?; - - let pa = py.import("pyarrow")?; - - let array = pa.getattr("Array")?.call_method1( - "_import_from_c", - (array_pointer as uintptr_t, schema_pointer as uintptr_t), - )?; - Ok(array.to_object(py)) -} - -fn to_py_batch<'a>( - batch: &RecordBatch, - py: Python, - pyarrow: &'a PyModule, -) -> Result { - let mut py_arrays = vec![]; - let mut py_names = vec![]; - - let schema = batch.schema(); - for (array, field) in batch.columns().iter().zip(schema.fields().iter()) { - let array = to_py_array(array, py)?; - - py_arrays.push(array); - py_names.push(field.name()); - } - - let record = pyarrow - .getattr("RecordBatch")? - .call_method1("from_arrays", (py_arrays, py_names))?; - - Ok(PyObject::from(record)) -} - -/// Converts a &[RecordBatch] into a Vec represented in PyArrow -pub fn to_py(batches: &[RecordBatch]) -> PyResult { - Python::with_gil(|py| { - let pyarrow = PyModule::import(py, "pyarrow")?; - let mut py_batches = vec![]; - for batch in batches { - py_batches.push(to_py_batch(batch, py, pyarrow)?); - } - let list = PyList::new(py, py_batches); - Ok(PyObject::from(list)) - }) -} diff --git a/python/src/to_rust.rs b/python/src/to_rust.rs index 7977fe4ff8ce..5c7a1aeca5e4 100644 --- a/python/src/to_rust.rs +++ b/python/src/to_rust.rs @@ -31,55 +31,6 @@ use pyo3::prelude::*; use crate::{errors, types::PyDataType}; -/// converts a pyarrow Array into a Rust Array -pub fn to_rust(ob: &PyAny) -> PyResult { - // prepare a pointer to receive the Array struct - let (array_pointer, schema_pointer) = - ffi::ArrowArray::into_raw(unsafe { ffi::ArrowArray::empty() }); - - // make the conversion through PyArrow's private API - // this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds - ob.call_method1( - "_export_to_c", - (array_pointer as uintptr_t, schema_pointer as uintptr_t), - )?; - - let array = unsafe { make_array_from_raw(array_pointer, schema_pointer) } - .map_err(errors::DataFusionError::from)?; - Ok(array) -} - -/// converts a pyarrow batch into a RecordBatch -pub fn to_rust_batch(batch: &PyAny) -> PyResult { - let schema = batch.getattr("schema")?; - let names = schema.getattr("names")?.extract::>()?; - - let fields = names - .iter() - .enumerate() - .map(|(i, name)| { - let field = schema.call_method1("field", (i,))?; - let nullable = field.getattr("nullable")?.extract::()?; - let py_data_type = field.getattr("type")?; - let data_type = py_data_type.extract::()?.data_type; - Ok(Field::new(name, data_type, nullable)) - }) - .collect::>()?; - - let schema = Arc::new(Schema::new(fields)); - - let arrays = (0..names.len()) - .map(|i| { - let array = batch.call_method1("column", (i,))?; - to_rust(array) - }) - .collect::>()?; - - let batch = - RecordBatch::try_new(schema, arrays).map_err(errors::DataFusionError::from)?; - Ok(batch) -} - /// converts a pyarrow Scalar into a Rust Scalar pub fn to_rust_scalar(ob: &PyAny) -> PyResult { let t = ob @@ -112,11 +63,3 @@ pub fn to_rust_scalar(ob: &PyAny) -> PyResult { } }) } - -pub fn to_rust_schema(ob: &PyAny) -> PyResult { - let c_schema = ffi::FFI_ArrowSchema::empty(); - let c_schema_ptr = &c_schema as *const ffi::FFI_ArrowSchema; - ob.call_method1("_export_to_c", (c_schema_ptr as uintptr_t,))?; - let schema = Schema::try_from(&c_schema).map_err(errors::DataFusionError::from)?; - Ok(schema) -} diff --git a/python/src/types.rs b/python/src/types.rs index bd6ef0d376e6..e9a183d7e6af 100644 --- a/python/src/types.rs +++ b/python/src/types.rs @@ -19,6 +19,7 @@ use datafusion::arrow::datatypes::DataType; use pyo3::{FromPyObject, PyAny, PyResult}; use crate::errors; +use crate::pyarrow::PyArrowConvert; /// utility struct to convert PyObj to native DataType #[derive(Debug, Clone)] diff --git a/python/src/udaf.rs b/python/src/udaf.rs index 83e8be05db60..0f5bf9b88e96 100644 --- a/python/src/udaf.rs +++ b/python/src/udaf.rs @@ -28,7 +28,6 @@ use datafusion::{ }; use crate::scalar::Scalar; -use crate::to_py::to_py_array; use crate::to_rust::to_rust_scalar; #[derive(Debug)] diff --git a/python/src/udf.rs b/python/src/udf.rs index 49a18d993241..d17f2b163fd2 100644 --- a/python/src/udf.rs +++ b/python/src/udf.rs @@ -15,15 +15,14 @@ // specific language governing permissions and limitations // under the License. -use pyo3::{prelude::*, types::PyTuple}; - +use datafusion::arrow::array::ArrayRef; use datafusion::{arrow::array, physical_plan::functions::make_scalar_function}; +use pyo3::{prelude::*, types::PyTuple}; use datafusion::error::DataFusionError; use datafusion::physical_plan::functions::ScalarFunctionImplementation; -use crate::to_py::to_py_array; -use crate::to_rust::to_rust; +use crate::pyarrow::PyArrowConvert; /// creates a DataFusion's UDF implementation from a python function that expects pyarrow arrays /// This is more efficient as it performs a zero-copy of the contents. @@ -40,7 +39,7 @@ pub fn array_udf(func: PyObject) -> ScalarFunctionImplementation { .iter() .map(|arg| { // remove unwrap - to_py_array(arg, py).unwrap() + arg.to_pyarrow(py).unwrap() }) .collect::>(); let py_args = PyTuple::new(py, py_args); @@ -52,7 +51,7 @@ pub fn array_udf(func: PyObject) -> ScalarFunctionImplementation { Err(error) => Err(DataFusionError::Execution(format!("{:?}", error))), }?; - let array = to_rust(value).unwrap(); + let array = ArrayRef::from(value).unwrap(); Ok(array) }) }, From 54651d3137d05bda83f3d4dfcfb01575e15acf96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 10 Aug 2021 16:44:24 +0200 Subject: [PATCH 02/12] f --- datafusion/Cargo.toml | 2 +- python/src/pyarrow.rs | 55 ++++++++++++++++++++++++++++++++++++------- python/src/udf.rs | 5 ++-- 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index bfb3a93e3249..368f76774d69 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -45,7 +45,7 @@ unicode_expressions = ["unicode-segmentation"] [dependencies] ahash = "0.7" -hashbrown = "0.11" +hashbrown = { version = "0.11", features = ["raw"] } arrow = { version = "5.1", features = ["prettyprint"] } parquet = { version = "5.1", features = ["arrow"] } sqlparser = "0.9.0" diff --git a/python/src/pyarrow.rs b/python/src/pyarrow.rs index 339d2be1277f..a5a56b22321b 100644 --- a/python/src/pyarrow.rs +++ b/python/src/pyarrow.rs @@ -17,11 +17,15 @@ use std::convert::TryFrom; use std::error; +use std::sync::Arc; use libc::uintptr_t; use pyo3::prelude::*; use pyo3::wrap_pyfunction; +use pyo3::types::PyList; +use pyo3::exceptions::PyValueError; +use datafusion::scalar::ScalarValue; use datafusion::arrow::array::{make_array_from_raw, ArrayRef}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::ffi; @@ -124,8 +128,6 @@ impl PyArrowConvert for ArrayRef { impl PyArrowConvert for RecordBatch { fn from_pyarrow(value: &PyAny) -> PyResult { // TODO(kszucs): implement the FFI conversions in arrow-rs for RecordBatches - let schema = value.getattr("schema")?; - let arrays = value.getattr("columns")?; // let names = schema.getattr("names")?.extract::>()?; @@ -141,13 +143,12 @@ impl PyArrowConvert for RecordBatch { // }) // .collect::>()?; - let schema = Schema::from_pyarrow(schema)?; + let schema = value.getattr("schema")?; + let schema = Arc::new(Schema::from_pyarrow(schema)?); - let arrays = (0..names.len()) - .map(|i| { - let array = batch.call_method1("column", (i,))?; - to_rust(array) - }) + let arrays = value.getattr("columns")?.downcast::()?; + let arrays = arrays.iter() + .map(ArrayRef::from_pyarrow) .collect::>()?; let batch = @@ -185,3 +186,41 @@ impl PyArrowConvert for RecordBatch { // }) // } } + +impl PyArrowConvert for ScalarValue { + fn from_pyarrow(value: &PyAny) -> PyResult { + let t = ob + .getattr("__class__")? + .getattr("__name__")? + .extract::<&str>()?; + + let p = ob.call_method0("as_py")?; + + Ok(match t { + "Int8Scalar" => ScalarValue::Int8(Some(p.extract::()?)), + "Int16Scalar" => ScalarValue::Int16(Some(p.extract::()?)), + "Int32Scalar" => ScalarValue::Int32(Some(p.extract::()?)), + "Int64Scalar" => ScalarValue::Int64(Some(p.extract::()?)), + "UInt8Scalar" => ScalarValue::UInt8(Some(p.extract::()?)), + "UInt16Scalar" => ScalarValue::UInt16(Some(p.extract::()?)), + "UInt32Scalar" => ScalarValue::UInt32(Some(p.extract::()?)), + "UInt64Scalar" => ScalarValue::UInt64(Some(p.extract::()?)), + "FloatScalar" => ScalarValue::Float32(Some(p.extract::()?)), + "DoubleScalar" => ScalarValue::Float64(Some(p.extract::()?)), + "BooleanScalar" => ScalarValue::Boolean(Some(p.extract::()?)), + "StringScalar" => ScalarValue::Utf8(Some(p.extract::()?)), + "LargeStringScalar" => ScalarValue::LargeUtf8(Some(p.extract::()?)), + other => { + return Err(DataFusionError::Common(format!( + "Type \"{}\"not yet implemented", + other + )) + .into()) + } + }) + } + + fn to_pyarrow(&self, py: Python) -> PyResult { + Err(PyValueError::new_err("argument is wrong")) + } +} diff --git a/python/src/udf.rs b/python/src/udf.rs index d17f2b163fd2..773f6b5320ea 100644 --- a/python/src/udf.rs +++ b/python/src/udf.rs @@ -17,10 +17,9 @@ use datafusion::arrow::array::ArrayRef; use datafusion::{arrow::array, physical_plan::functions::make_scalar_function}; -use pyo3::{prelude::*, types::PyTuple}; - use datafusion::error::DataFusionError; use datafusion::physical_plan::functions::ScalarFunctionImplementation; +use pyo3::{prelude::*, types::PyTuple}; use crate::pyarrow::PyArrowConvert; @@ -51,7 +50,7 @@ pub fn array_udf(func: PyObject) -> ScalarFunctionImplementation { Err(error) => Err(DataFusionError::Execution(format!("{:?}", error))), }?; - let array = ArrayRef::from(value).unwrap(); + let array = ArrayRef::from_pyarrow(value).unwrap(); Ok(array) }) }, From 3ed8b99233cc8bceabde0f7cb55b55a235bb6347 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 10 Aug 2021 16:54:25 +0200 Subject: [PATCH 03/12] g --- python/src/context.rs | 1 - python/src/errors.rs | 6 +--- python/src/lib.rs | 1 - python/src/pyarrow.rs | 40 ++++---------------------- python/src/scalar.rs | 9 +++--- python/src/to_rust.rs | 65 ------------------------------------------- python/src/types.rs | 36 ++---------------------- python/src/udaf.rs | 13 +++++---- python/src/udf.rs | 2 +- 9 files changed, 21 insertions(+), 152 deletions(-) delete mode 100644 python/src/to_rust.rs diff --git a/python/src/context.rs b/python/src/context.rs index ef8c45deb8e2..495df1d1bd44 100644 --- a/python/src/context.rs +++ b/python/src/context.rs @@ -34,7 +34,6 @@ use crate::dataframe; use crate::errors; use crate::functions; use crate::pyarrow::PyArrowConvert; -use crate::to_rust; use crate::types::PyDataType; /// `ExecutionContext` is able to plan and execute DataFusion plans. diff --git a/python/src/errors.rs b/python/src/errors.rs index cf99c1ccbae8..cc181a98755d 100644 --- a/python/src/errors.rs +++ b/python/src/errors.rs @@ -20,11 +20,7 @@ use core::fmt; use datafusion::arrow::error::ArrowError; use datafusion::error::DataFusionError as InnerDataFusionError; -use pyo3::{ - exceptions::{PyException, PyRuntimeError}, - prelude::*, - PyErr, -}; +use pyo3::{exceptions::PyException, PyErr}; #[derive(Debug)] pub enum DataFusionError { diff --git a/python/src/lib.rs b/python/src/lib.rs index 5102fa003b86..f0075c7b2913 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -25,7 +25,6 @@ mod expression; mod functions; mod pyarrow; mod scalar; -mod to_rust; mod types; mod udaf; mod udf; diff --git a/python/src/pyarrow.rs b/python/src/pyarrow.rs index a5a56b22321b..935489eb3aa3 100644 --- a/python/src/pyarrow.rs +++ b/python/src/pyarrow.rs @@ -16,21 +16,19 @@ // under the License. use std::convert::TryFrom; -use std::error; use std::sync::Arc; use libc::uintptr_t; +use pyo3::exceptions::PyValueError; use pyo3::prelude::*; -use pyo3::wrap_pyfunction; use pyo3::types::PyList; -use pyo3::exceptions::PyValueError; -use datafusion::scalar::ScalarValue; use datafusion::arrow::array::{make_array_from_raw, ArrayRef}; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::arrow::ffi; use datafusion::arrow::ffi::FFI_ArrowSchema; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::scalar::ScalarValue; use crate::errors::DataFusionError; @@ -128,26 +126,12 @@ impl PyArrowConvert for ArrayRef { impl PyArrowConvert for RecordBatch { fn from_pyarrow(value: &PyAny) -> PyResult { // TODO(kszucs): implement the FFI conversions in arrow-rs for RecordBatches - - // let names = schema.getattr("names")?.extract::>()?; - - // let fields = names - // .iter() - // .enumerate() - // .map(|(i, name)| { - // let field = schema.call_method1("field", (i,))?; - // let nullable = field.getattr("nullable")?.extract::()?; - // let py_data_type = field.getattr("type")?; - // let data_type = py_data_type.extract::()?.data_type; - // Ok(Field::new(name, data_type, nullable)) - // }) - // .collect::>()?; - let schema = value.getattr("schema")?; let schema = Arc::new(Schema::from_pyarrow(schema)?); let arrays = value.getattr("columns")?.downcast::()?; - let arrays = arrays.iter() + let arrays = arrays + .iter() .map(ArrayRef::from_pyarrow) .collect::>()?; @@ -173,28 +157,16 @@ impl PyArrowConvert for RecordBatch { Ok(PyObject::from(record)) } - - // fn to_pyarrow(batches: &[RecordBatch]) -> PyResult { - // Python::with_gil(|py| { - // let pyarrow = PyModule::import(py, "pyarrow")?; - // let mut py_batches = vec![]; - // for batch in batches { - // py_batches.push(to_py_batch(batch, py, pyarrow)?); - // } - // let list = PyList::new(py, py_batches); - // Ok(PyObject::from(list)) - // }) - // } } impl PyArrowConvert for ScalarValue { fn from_pyarrow(value: &PyAny) -> PyResult { - let t = ob + let t = value .getattr("__class__")? .getattr("__name__")? .extract::<&str>()?; - let p = ob.call_method0("as_py")?; + let p = value.call_method0("as_py")?; Ok(match t { "Int8Scalar" => ScalarValue::Int8(Some(p.extract::()?)), diff --git a/python/src/scalar.rs b/python/src/scalar.rs index 0c562a940361..6a5079d9e2ae 100644 --- a/python/src/scalar.rs +++ b/python/src/scalar.rs @@ -17,20 +17,19 @@ use pyo3::prelude::*; -use datafusion::scalar::ScalarValue as _Scalar; - -use crate::to_rust::to_rust_scalar; +use crate::pyarrow::PyArrowConvert; +use datafusion::scalar::ScalarValue; /// An expression that can be used on a DataFrame #[derive(Debug, Clone)] pub(crate) struct Scalar { - pub(crate) scalar: _Scalar, + pub(crate) scalar: ScalarValue, } impl<'source> FromPyObject<'source> for Scalar { fn extract(ob: &'source PyAny) -> PyResult { Ok(Self { - scalar: to_rust_scalar(ob)?, + scalar: ScalarValue::from_pyarrow(ob)?, }) } } diff --git a/python/src/to_rust.rs b/python/src/to_rust.rs deleted file mode 100644 index 5c7a1aeca5e4..000000000000 --- a/python/src/to_rust.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::convert::TryFrom; -use std::sync::Arc; - -use datafusion::arrow::{ - array::{make_array_from_raw, ArrayRef}, - datatypes::Field, - datatypes::Schema, - ffi, - record_batch::RecordBatch, -}; -use datafusion::scalar::ScalarValue; -use libc::uintptr_t; -use pyo3::prelude::*; - -use crate::{errors, types::PyDataType}; - -/// converts a pyarrow Scalar into a Rust Scalar -pub fn to_rust_scalar(ob: &PyAny) -> PyResult { - let t = ob - .getattr("__class__")? - .getattr("__name__")? - .extract::<&str>()?; - - let p = ob.call_method0("as_py")?; - - Ok(match t { - "Int8Scalar" => ScalarValue::Int8(Some(p.extract::()?)), - "Int16Scalar" => ScalarValue::Int16(Some(p.extract::()?)), - "Int32Scalar" => ScalarValue::Int32(Some(p.extract::()?)), - "Int64Scalar" => ScalarValue::Int64(Some(p.extract::()?)), - "UInt8Scalar" => ScalarValue::UInt8(Some(p.extract::()?)), - "UInt16Scalar" => ScalarValue::UInt16(Some(p.extract::()?)), - "UInt32Scalar" => ScalarValue::UInt32(Some(p.extract::()?)), - "UInt64Scalar" => ScalarValue::UInt64(Some(p.extract::()?)), - "FloatScalar" => ScalarValue::Float32(Some(p.extract::()?)), - "DoubleScalar" => ScalarValue::Float64(Some(p.extract::()?)), - "BooleanScalar" => ScalarValue::Boolean(Some(p.extract::()?)), - "StringScalar" => ScalarValue::Utf8(Some(p.extract::()?)), - "LargeStringScalar" => ScalarValue::LargeUtf8(Some(p.extract::()?)), - other => { - return Err(errors::DataFusionError::Common(format!( - "Type \"{}\"not yet implemented", - other - )) - .into()) - } - }) -} diff --git a/python/src/types.rs b/python/src/types.rs index e9a183d7e6af..cfba0c639a47 100644 --- a/python/src/types.rs +++ b/python/src/types.rs @@ -18,7 +18,6 @@ use datafusion::arrow::datatypes::DataType; use pyo3::{FromPyObject, PyAny, PyResult}; -use crate::errors; use crate::pyarrow::PyArrowConvert; /// utility struct to convert PyObj to native DataType @@ -29,38 +28,7 @@ pub struct PyDataType { impl<'source> FromPyObject<'source> for PyDataType { fn extract(ob: &'source PyAny) -> PyResult { - let id = ob.getattr("id")?.extract::()?; - let data_type = data_type_id(&id)?; - Ok(PyDataType { data_type }) + let dtype = DataType::from_pyarrow(ob)?; + Ok(PyDataType { data_type: dtype }) } } - -fn data_type_id(id: &i32) -> Result { - // see https://github.com/apache/arrow/blob/3694794bdfd0677b95b8c95681e392512f1c9237/python/pyarrow/includes/libarrow.pxd - // this is not ideal as it does not generalize for non-basic types - // Find a way to get a unique name from the pyarrow.DataType - Ok(match id { - 1 => DataType::Boolean, - 2 => DataType::UInt8, - 3 => DataType::Int8, - 4 => DataType::UInt16, - 5 => DataType::Int16, - 6 => DataType::UInt32, - 7 => DataType::Int32, - 8 => DataType::UInt64, - 9 => DataType::Int64, - 10 => DataType::Float16, - 11 => DataType::Float32, - 12 => DataType::Float64, - 13 => DataType::Utf8, - 14 => DataType::Binary, - 34 => DataType::LargeUtf8, - 35 => DataType::LargeBinary, - other => { - return Err(errors::DataFusionError::Common(format!( - "The type {} is not valid", - other - ))) - } - }) -} diff --git a/python/src/udaf.rs b/python/src/udaf.rs index 0f5bf9b88e96..60440e0c45a1 100644 --- a/python/src/udaf.rs +++ b/python/src/udaf.rs @@ -27,8 +27,8 @@ use datafusion::{ scalar::ScalarValue, }; +use crate::pyarrow::PyArrowConvert; use crate::scalar::Scalar; -use crate::to_rust::to_rust_scalar; #[derive(Debug)] struct PyAccumulator { @@ -42,7 +42,7 @@ impl PyAccumulator { } impl Accumulator for PyAccumulator { - fn state(&self) -> Result> { + fn state(&self) -> Result> { Python::with_gil(|py| { let state = self .accum @@ -66,7 +66,7 @@ impl Accumulator for PyAccumulator { todo!() } - fn evaluate(&self) -> Result { + fn evaluate(&self) -> Result { Python::with_gil(|py| { let value = self .accum @@ -74,7 +74,7 @@ impl Accumulator for PyAccumulator { .call_method0("evaluate") .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?; - to_rust_scalar(value) + ScalarValue::from_pyarrow(value) .map_err(|e| InnerDataFusionError::Execution(format!("{}", e))) }) } @@ -89,7 +89,7 @@ impl Accumulator for PyAccumulator { .iter() .map(|arg| { // remove unwrap - to_py_array(arg, py).unwrap() + arg.to_pyarrow(py).unwrap() }) .collect::>(); let py_args = PyTuple::new(py, py_args); @@ -110,7 +110,8 @@ impl Accumulator for PyAccumulator { // 2. merge let state = &states[0]; - let state = to_py_array(state, py) + let state = state + .to_pyarrow(py) .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?; // 2. diff --git a/python/src/udf.rs b/python/src/udf.rs index 773f6b5320ea..849104d94656 100644 --- a/python/src/udf.rs +++ b/python/src/udf.rs @@ -16,9 +16,9 @@ // under the License. use datafusion::arrow::array::ArrayRef; -use datafusion::{arrow::array, physical_plan::functions::make_scalar_function}; use datafusion::error::DataFusionError; use datafusion::physical_plan::functions::ScalarFunctionImplementation; +use datafusion::{arrow::array, physical_plan::functions::make_scalar_function}; use pyo3::{prelude::*, types::PyTuple}; use crate::pyarrow::PyArrowConvert; From 65298ebac65387451f2d022df90f77e8e8bcb20c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 10 Aug 2021 17:00:24 +0200 Subject: [PATCH 04/12] h --- python/src/context.rs | 59 ++++++++++++++++++++--------------------- python/src/dataframe.rs | 3 ++- python/src/pyarrow.rs | 5 ++-- 3 files changed, 34 insertions(+), 33 deletions(-) diff --git a/python/src/context.rs b/python/src/context.rs index 495df1d1bd44..bd538d906e65 100644 --- a/python/src/context.rs +++ b/python/src/context.rs @@ -65,36 +65,35 @@ impl ExecutionContext { )) } - fn create_dataframe( - &mut self, - partitions: Vec>, - py: Python, - ) -> PyResult { - let partitions: Vec> = partitions - .iter() - .map(|batches| { - batches - .iter() - .map(|batch| to_rust::to_rust_batch(batch.as_ref(py))) - .collect() - }) - .collect::>()?; - - let table = - errors::wrap(MemTable::try_new(partitions[0][0].schema(), partitions))?; - - // generate a random (unique) name for this table - let name = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(10) - .collect::(); - - errors::wrap(self.ctx.register_table(&*name, Arc::new(table)))?; - Ok(dataframe::DataFrame::new( - self.ctx.state.clone(), - errors::wrap(self.ctx.table(&*name))?.to_logical_plan(), - )) - } + // fn create_dataframe( + // &mut self, + // partitions: Vec>, + // ) -> PyResult { + // let partitions: Vec> = partitions + // .iter() + // .map(|batches| { + // batches + // .iter() + // .map(RecordBatch::from_pyarrow) + // .collect::>() + // }) + // .collect::>()?; + + // let table = + // errors::wrap(MemTable::try_new(partitions[0][0].schema(), partitions))?; + + // // generate a random (unique) name for this table + // let name = rand::thread_rng() + // .sample_iter(&Alphanumeric) + // .take(10) + // .collect::(); + + // errors::wrap(self.ctx.register_table(&*name, Arc::new(table)))?; + // Ok(dataframe::DataFrame::new( + // self.ctx.state.clone(), + // errors::wrap(self.ctx.table(&*name))?.to_logical_plan(), + // )) + // } fn register_parquet(&mut self, name: &str, path: &str) -> PyResult<()> { errors::wrap(self.ctx.register_parquet(name, path))?; diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs index 2c5f9cb3c00c..aa8e4cd4a3a0 100644 --- a/python/src/dataframe.rs +++ b/python/src/dataframe.rs @@ -30,6 +30,7 @@ use datafusion::physical_plan::collect; use datafusion::{execution::context::ExecutionContextState, logical_plan}; use crate::errors; +use crate::pyarrow::PyArrowConvert; use crate::{errors::DataFusionError, expression}; /// A DataFrame is a representation of a logical plan and an API to compose statements. @@ -136,7 +137,7 @@ impl DataFrame { let mut py_batches = vec![]; for batch in batches { - py_batches.push(to_py_batch(batch, py, pyarrow)?); + py_batches.push(batch.to_pyarrow(py)?); } let py_list = PyList::new(py, py_batches); Ok(PyObject::from(py_list)) diff --git a/python/src/pyarrow.rs b/python/src/pyarrow.rs index 935489eb3aa3..a633e11c9d6e 100644 --- a/python/src/pyarrow.rs +++ b/python/src/pyarrow.rs @@ -144,8 +144,9 @@ impl PyArrowConvert for RecordBatch { let mut py_arrays = vec![]; let mut py_names = vec![]; + let schema = self.schema(); + let fields = schema.fields().iter(); let columns = self.columns().iter(); - let fields = self.schema().fields().iter(); for (array, field) in columns.zip(fields) { py_arrays.push(array.to_pyarrow(py)?); @@ -192,7 +193,7 @@ impl PyArrowConvert for ScalarValue { }) } - fn to_pyarrow(&self, py: Python) -> PyResult { + fn to_pyarrow(&self, _py: Python) -> PyResult { Err(PyValueError::new_err("argument is wrong")) } } From f8cb1e23fa222703b01fe84e530dce38fdab5f78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 10 Aug 2021 18:04:42 +0200 Subject: [PATCH 05/12] j --- python/datafusion/__init__.py | 7 ++- python/src/context.rs | 97 ++++++++++++++++---------------- python/src/dataframe.rs | 45 +++++++-------- python/src/expression.rs | 101 +++++++++++++++++----------------- python/src/functions.rs | 71 ++++++++++++------------ python/src/lib.rs | 6 +- python/src/pyarrow.rs | 26 +++++---- 7 files changed, 182 insertions(+), 171 deletions(-) diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index a09dca093f54..6406cf11a9c6 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -1 +1,6 @@ -from .internals import * +from .internals import ( + PyExecutionContext as ExecutionContext, + PyDataFrame as DataFrame, + PyExpr as Expr, + functions +) diff --git a/python/src/context.rs b/python/src/context.rs index bd538d906e65..e89a0870a944 100644 --- a/python/src/context.rs +++ b/python/src/context.rs @@ -27,76 +27,77 @@ use pyo3::prelude::*; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; use datafusion::datasource::MemTable; -use datafusion::execution::context::ExecutionContext as _ExecutionContext; +use datafusion::execution::context::ExecutionContext; use datafusion::prelude::CsvReadOptions; -use crate::dataframe; -use crate::errors; +use crate::dataframe::PyDataFrame; +use crate::errors::DataFusionError; use crate::functions; use crate::pyarrow::PyArrowConvert; use crate::types::PyDataType; -/// `ExecutionContext` is able to plan and execute DataFusion plans. +/// `PyExecutionContext` is able to plan and execute DataFusion plans. /// It has a powerful optimizer, a physical planner for local execution, and a /// multi-threaded execution engine to perform the execution. #[pyclass(unsendable)] -pub(crate) struct ExecutionContext { - ctx: _ExecutionContext, +pub(crate) struct PyExecutionContext { + ctx: ExecutionContext, } #[pymethods] -impl ExecutionContext { +impl PyExecutionContext { #[new] fn new() -> Self { - ExecutionContext { - ctx: _ExecutionContext::new(), + PyExecutionContext { + ctx: ExecutionContext::new(), } } - /// Returns a DataFrame whose plan corresponds to the SQL statement. - fn sql(&mut self, query: &str) -> PyResult { - let df = self - .ctx - .sql(query) - .map_err(|e| -> errors::DataFusionError { e.into() })?; - Ok(dataframe::DataFrame::new( + /// Returns a PyDataFrame whose plan corresponds to the SQL statement. + fn sql(&mut self, query: &str) -> PyResult { + let df = self.ctx.sql(query).map_err(DataFusionError::from)?; + Ok(PyDataFrame::new( self.ctx.state.clone(), df.to_logical_plan(), )) } - // fn create_dataframe( - // &mut self, - // partitions: Vec>, - // ) -> PyResult { - // let partitions: Vec> = partitions - // .iter() - // .map(|batches| { - // batches - // .iter() - // .map(RecordBatch::from_pyarrow) - // .collect::>() - // }) - // .collect::>()?; - - // let table = - // errors::wrap(MemTable::try_new(partitions[0][0].schema(), partitions))?; - - // // generate a random (unique) name for this table - // let name = rand::thread_rng() - // .sample_iter(&Alphanumeric) - // .take(10) - // .collect::(); - - // errors::wrap(self.ctx.register_table(&*name, Arc::new(table)))?; - // Ok(dataframe::DataFrame::new( - // self.ctx.state.clone(), - // errors::wrap(self.ctx.table(&*name))?.to_logical_plan(), - // )) - // } + fn create_dataframe( + &mut self, + partitions: Vec>, + ) -> PyResult { + let partitions: Vec> = partitions + .into_iter() + .map(|batches| { + batches + .into_iter() + .map(RecordBatch::from_pyarrow) + .collect::>() + }) + .collect::>()?; + + let table = MemTable::try_new(partitions[0][0].schema(), partitions) + .map_err(DataFusionError::from)?; + + // generate a random (unique) name for this table + let name = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(10) + .collect::(); + + self.ctx + .register_table(&*name, Arc::new(table)) + .map_err(DataFusionError::from)?; + let table = self.ctx.table(&*name).map_err(DataFusionError::from)?; + + let df = PyDataFrame::new(self.ctx.state.clone(), table.to_logical_plan()); + Ok(df) + } fn register_parquet(&mut self, name: &str, path: &str) -> PyResult<()> { - errors::wrap(self.ctx.register_parquet(name, path))?; + self.ctx + .register_parquet(name, path) + .map_err(DataFusionError::from)?; Ok(()) } @@ -138,7 +139,9 @@ impl ExecutionContext { .file_extension(file_extension); options.schema = schema.as_ref(); - errors::wrap(self.ctx.register_csv(name, path, options))?; + self.ctx + .register_csv(name, path, options) + .map_err(DataFusionError::from)?; Ok(()) } diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs index aa8e4cd4a3a0..113fcfad4ecd 100644 --- a/python/src/dataframe.rs +++ b/python/src/dataframe.rs @@ -29,29 +29,30 @@ use datafusion::logical_plan::{JoinType, LogicalPlanBuilder}; use datafusion::physical_plan::collect; use datafusion::{execution::context::ExecutionContextState, logical_plan}; -use crate::errors; -use crate::pyarrow::PyArrowConvert; -use crate::{errors::DataFusionError, expression}; +use crate::{ + errors, errors::DataFusionError, expression, expression::PyExpr, + pyarrow::PyArrowConvert, +}; -/// A DataFrame is a representation of a logical plan and an API to compose statements. +/// A PyDataFrame is a representation of a logical plan and an API to compose statements. /// Use it to build a plan and `.collect()` to execute the plan and collect the result. /// The actual execution of a plan runs natively on Rust and Arrow on a multi-threaded environment. #[pyclass] -pub(crate) struct DataFrame { +pub(crate) struct PyDataFrame { ctx_state: Arc>, plan: LogicalPlan, } -impl DataFrame { - /// creates a new DataFrame +impl PyDataFrame { + /// creates a new PyDataFrame pub fn new(ctx_state: Arc>, plan: LogicalPlan) -> Self { Self { ctx_state, plan } } } #[pymethods] -impl DataFrame { - /// Select `expressions` from the existing DataFrame. +impl PyDataFrame { + /// Select `expressions` from the existing PyDataFrame. #[args(args = "*")] fn select(&self, args: &PyTuple) -> PyResult { let expressions = expression::from_tuple(args)?; @@ -60,30 +61,26 @@ impl DataFrame { errors::wrap(builder.project(expressions.into_iter().map(|e| e.expr)))?; let plan = errors::wrap(builder.build())?; - Ok(DataFrame { + Ok(PyDataFrame { ctx_state: self.ctx_state.clone(), plan, }) } /// Filter according to the `predicate` expression - fn filter(&self, predicate: expression::Expression) -> PyResult { + fn filter(&self, predicate: PyExpr) -> PyResult { let builder = LogicalPlanBuilder::from(self.plan.clone()); let builder = errors::wrap(builder.filter(predicate.expr))?; let plan = errors::wrap(builder.build())?; - Ok(DataFrame { + Ok(PyDataFrame { ctx_state: self.ctx_state.clone(), plan, }) } /// Aggregates using expressions - fn aggregate( - &self, - group_by: Vec, - aggs: Vec, - ) -> PyResult { + fn aggregate(&self, group_by: Vec, aggs: Vec) -> PyResult { let builder = LogicalPlanBuilder::from(self.plan.clone()); let builder = errors::wrap(builder.aggregate( group_by.into_iter().map(|e| e.expr), @@ -91,19 +88,19 @@ impl DataFrame { ))?; let plan = errors::wrap(builder.build())?; - Ok(DataFrame { + Ok(PyDataFrame { ctx_state: self.ctx_state.clone(), plan, }) } /// Sort by specified sorting expressions - fn sort(&self, exprs: Vec) -> PyResult { + fn sort(&self, exprs: Vec) -> PyResult { let exprs = exprs.into_iter().map(|e| e.expr); let builder = LogicalPlanBuilder::from(self.plan.clone()); let builder = errors::wrap(builder.sort(exprs))?; let plan = errors::wrap(builder.build())?; - Ok(DataFrame { + Ok(PyDataFrame { ctx_state: self.ctx_state.clone(), plan, }) @@ -115,7 +112,7 @@ impl DataFrame { let builder = errors::wrap(builder.limit(count))?; let plan = errors::wrap(builder.build())?; - Ok(DataFrame { + Ok(PyDataFrame { ctx_state: self.ctx_state.clone(), plan, }) @@ -143,8 +140,8 @@ impl DataFrame { Ok(PyObject::from(py_list)) } - /// Returns the join of two DataFrames `on`. - fn join(&self, right: &DataFrame, on: Vec<&str>, how: &str) -> PyResult { + /// Returns the join of two PyDataFrames `on`. + fn join(&self, right: &PyDataFrame, on: Vec<&str>, how: &str) -> PyResult { let builder = LogicalPlanBuilder::from(self.plan.clone()); let join_type = match how { @@ -167,7 +164,7 @@ impl DataFrame { let plan = errors::wrap(builder.build())?; - Ok(DataFrame { + Ok(PyDataFrame { ctx_state: self.ctx_state.clone(), plan, }) diff --git a/python/src/expression.rs b/python/src/expression.rs index 4320b1d14c8b..f1a12d7ba802 100644 --- a/python/src/expression.rs +++ b/python/src/expression.rs @@ -19,90 +19,89 @@ use pyo3::{ basic::CompareOp, prelude::*, types::PyTuple, PyNumberProtocol, PyObjectProtocol, }; -use datafusion::logical_plan::Expr as _Expr; -use datafusion::physical_plan::udaf::AggregateUDF as _AggregateUDF; -use datafusion::physical_plan::udf::ScalarUDF as _ScalarUDF; +use datafusion::logical_plan::Expr; +use datafusion::physical_plan::{udaf::AggregateUDF, udf::ScalarUDF}; -/// An expression that can be used on a DataFrame +/// An PyExpr that can be used on a DataFrame #[pyclass] #[derive(Debug, Clone)] -pub(crate) struct Expression { - pub(crate) expr: _Expr, +pub(crate) struct PyExpr { + pub(crate) expr: Expr, } /// converts a tuple of expressions into a vector of Expressions -pub(crate) fn from_tuple(value: &PyTuple) -> PyResult> { +pub(crate) fn from_tuple(value: &PyTuple) -> PyResult> { value .iter() - .map(|e| e.extract::()) + .map(|e| e.extract::()) .collect::>() } #[pyproto] -impl PyNumberProtocol for Expression { - fn __add__(lhs: Expression, rhs: Expression) -> PyResult { - Ok(Expression { +impl PyNumberProtocol for PyExpr { + fn __add__(lhs: PyExpr, rhs: PyExpr) -> PyResult { + Ok(PyExpr { expr: lhs.expr + rhs.expr, }) } - fn __sub__(lhs: Expression, rhs: Expression) -> PyResult { - Ok(Expression { + fn __sub__(lhs: PyExpr, rhs: PyExpr) -> PyResult { + Ok(PyExpr { expr: lhs.expr - rhs.expr, }) } - fn __truediv__(lhs: Expression, rhs: Expression) -> PyResult { - Ok(Expression { + fn __truediv__(lhs: PyExpr, rhs: PyExpr) -> PyResult { + Ok(PyExpr { expr: lhs.expr / rhs.expr, }) } - fn __mul__(lhs: Expression, rhs: Expression) -> PyResult { - Ok(Expression { + fn __mul__(lhs: PyExpr, rhs: PyExpr) -> PyResult { + Ok(PyExpr { expr: lhs.expr * rhs.expr, }) } - fn __and__(lhs: Expression, rhs: Expression) -> PyResult { - Ok(Expression { + fn __and__(lhs: PyExpr, rhs: PyExpr) -> PyResult { + Ok(PyExpr { expr: lhs.expr.and(rhs.expr), }) } - fn __or__(lhs: Expression, rhs: Expression) -> PyResult { - Ok(Expression { + fn __or__(lhs: PyExpr, rhs: PyExpr) -> PyResult { + Ok(PyExpr { expr: lhs.expr.or(rhs.expr), }) } - fn __invert__(&self) -> PyResult { - Ok(Expression { + fn __invert__(&self) -> PyResult { + Ok(PyExpr { expr: self.expr.clone().not(), }) } } #[pyproto] -impl PyObjectProtocol for Expression { - fn __richcmp__(&self, other: Expression, op: CompareOp) -> Expression { +impl PyObjectProtocol for PyExpr { + fn __richcmp__(&self, other: PyExpr, op: CompareOp) -> PyExpr { match op { - CompareOp::Lt => Expression { + CompareOp::Lt => PyExpr { expr: self.expr.clone().lt(other.expr), }, - CompareOp::Le => Expression { + CompareOp::Le => PyExpr { expr: self.expr.clone().lt_eq(other.expr), }, - CompareOp::Eq => Expression { + CompareOp::Eq => PyExpr { expr: self.expr.clone().eq(other.expr), }, - CompareOp::Ne => Expression { + CompareOp::Ne => PyExpr { expr: self.expr.clone().not_eq(other.expr), }, - CompareOp::Gt => Expression { + CompareOp::Gt => PyExpr { expr: self.expr.clone().gt(other.expr), }, - CompareOp::Ge => Expression { + CompareOp::Ge => PyExpr { expr: self.expr.clone().gt_eq(other.expr), }, } @@ -110,39 +109,39 @@ impl PyObjectProtocol for Expression { } #[pymethods] -impl Expression { - /// assign a name to the expression - pub fn alias(&self, name: &str) -> PyResult { - Ok(Expression { +impl PyExpr { + /// assign a name to the PyExpr + pub fn alias(&self, name: &str) -> PyResult { + Ok(PyExpr { expr: self.expr.clone().alias(name), }) } - /// Create a sort expression from an existing expression. + /// Create a sort PyExpr from an existing PyExpr. #[args(ascending = true, nulls_first = true)] - pub fn sort(&self, ascending: bool, nulls_first: bool) -> PyResult { - Ok(Expression { + pub fn sort(&self, ascending: bool, nulls_first: bool) -> PyResult { + Ok(PyExpr { expr: self.expr.clone().sort(ascending, nulls_first), }) } } -/// Represents a ScalarUDF +/// Represents a PyScalarUDF #[pyclass] #[derive(Debug, Clone)] -pub struct ScalarUDF { - pub(crate) function: _ScalarUDF, +pub struct PyScalarUDF { + pub(crate) function: ScalarUDF, } #[pymethods] -impl ScalarUDF { - /// creates a new expression with the call of the udf +impl PyScalarUDF { + /// creates a new PyExpr with the call of the udf #[call] #[args(args = "*")] - fn __call__(&self, args: &PyTuple) -> PyResult { + fn __call__(&self, args: &PyTuple) -> PyResult { let args = from_tuple(args)?.iter().map(|e| e.expr.clone()).collect(); - Ok(Expression { + Ok(PyExpr { expr: self.function.call(args), }) } @@ -151,19 +150,19 @@ impl ScalarUDF { /// Represents a AggregateUDF #[pyclass] #[derive(Debug, Clone)] -pub struct AggregateUDF { - pub(crate) function: _AggregateUDF, +pub struct PyAggregateUDF { + pub(crate) function: AggregateUDF, } #[pymethods] -impl AggregateUDF { - /// creates a new expression with the call of the udf +impl PyAggregateUDF { + /// creates a new PyExpr with the call of the udf #[call] #[args(args = "*")] - fn __call__(&self, args: &PyTuple) -> PyResult { + fn __call__(&self, args: &PyTuple) -> PyResult { let args = from_tuple(args)?.iter().map(|e| e.expr.clone()).collect(); - Ok(Expression { + Ok(PyExpr { expr: self.function.call(args), }) } diff --git a/python/src/functions.rs b/python/src/functions.rs index 23f010a6ae45..350cac7dce04 100644 --- a/python/src/functions.rs +++ b/python/src/functions.rs @@ -15,46 +15,47 @@ // specific language governing permissions and limitations // under the License. -use crate::udaf; -use crate::udf; -use crate::{expression, types::PyDataType}; +use std::sync::Arc; + use datafusion::arrow::datatypes::DataType; use datafusion::logical_plan; use pyo3::{prelude::*, types::PyTuple, wrap_pyfunction}; -use std::sync::Arc; -/// Expression representing a column on the existing plan. +use crate::{ + expression, + expression::{PyAggregateUDF, PyExpr, PyScalarUDF}, + types::PyDataType, + udaf, udf, +}; + +/// PyExpr representing a column on the existing plan. #[pyfunction] #[pyo3(text_signature = "(name)")] -fn col(name: &str) -> expression::Expression { - expression::Expression { +fn col(name: &str) -> PyExpr { + PyExpr { expr: logical_plan::col(name), } } -/// Expression representing a constant value +/// PyExpr representing a constant value #[pyfunction] #[pyo3(text_signature = "(value)")] -fn lit(value: i32) -> expression::Expression { - expression::Expression { +fn lit(value: i32) -> PyExpr { + PyExpr { expr: logical_plan::lit(value), } } #[pyfunction] -fn array(value: Vec) -> expression::Expression { - expression::Expression { +fn array(value: Vec) -> PyExpr { + PyExpr { expr: logical_plan::array(value.into_iter().map(|x| x.expr).collect::>()), } } #[pyfunction] -fn in_list( - expr: expression::Expression, - value: Vec, - negated: bool, -) -> expression::Expression { - expression::Expression { +fn in_list(expr: PyExpr, value: Vec, negated: bool) -> PyExpr { + PyExpr { expr: logical_plan::in_list( expr.expr, value.into_iter().map(|x| x.expr).collect::>(), @@ -65,8 +66,8 @@ fn in_list( /// Current date and time #[pyfunction] -fn now() -> expression::Expression { - expression::Expression { +fn now() -> PyExpr { + PyExpr { // here lit(0) is a stub for conform to arity expr: logical_plan::now(logical_plan::lit(0)), } @@ -74,8 +75,8 @@ fn now() -> expression::Expression { /// Returns a random value in the range 0.0 <= x < 1.0 #[pyfunction] -fn random() -> expression::Expression { - expression::Expression { +fn random() -> PyExpr { + PyExpr { expr: logical_plan::random(), } } @@ -83,10 +84,10 @@ fn random() -> expression::Expression { /// Concatenates the text representations of all the arguments. /// NULL arguments are ignored. #[pyfunction(args = "*")] -fn concat(args: &PyTuple) -> PyResult { +fn concat(args: &PyTuple) -> PyResult { let expressions = expression::from_tuple(args)?; let args = expressions.into_iter().map(|e| e.expr).collect::>(); - Ok(expression::Expression { + Ok(PyExpr { expr: logical_plan::concat(&args), }) } @@ -95,10 +96,10 @@ fn concat(args: &PyTuple) -> PyResult { /// The first argument is used as the separator string, and should not be NULL. /// Other NULL arguments are ignored. #[pyfunction(sep, args = "*")] -fn concat_ws(sep: String, args: &PyTuple) -> PyResult { +fn concat_ws(sep: String, args: &PyTuple) -> PyResult { let expressions = expression::from_tuple(args)?; let args = expressions.into_iter().map(|e| e.expr).collect::>(); - Ok(expression::Expression { + Ok(PyExpr { expr: logical_plan::concat_ws(sep, &args), }) } @@ -107,8 +108,8 @@ macro_rules! define_unary_function { ($NAME: ident) => { #[doc = "This function is not documented yet"] #[pyfunction] - fn $NAME(value: expression::Expression) -> expression::Expression { - expression::Expression { + fn $NAME(value: PyExpr) -> PyExpr { + PyExpr { expr: logical_plan::$NAME(value.expr), } } @@ -116,8 +117,8 @@ macro_rules! define_unary_function { ($NAME: ident, $DOC: expr) => { #[doc = $DOC] #[pyfunction] - fn $NAME(value: expression::Expression) -> expression::Expression { - expression::Expression { + fn $NAME(value: PyExpr) -> PyExpr { + PyExpr { expr: logical_plan::$NAME(value.expr), } } @@ -205,12 +206,12 @@ pub(crate) fn create_udf( input_types: Vec, return_type: PyDataType, name: &str, -) -> expression::ScalarUDF { +) -> PyScalarUDF { let input_types: Vec = input_types.iter().map(|d| d.data_type.clone()).collect(); let return_type = Arc::new(return_type.data_type); - expression::ScalarUDF { + PyScalarUDF { function: logical_plan::create_udf( name, input_types, @@ -227,7 +228,7 @@ fn udf( input_types: Vec, return_type: PyDataType, py: Python, -) -> PyResult { +) -> PyResult { let name = fun.getattr(py, "__qualname__")?.extract::(py)?; Ok(create_udf(fun, input_types, return_type, &name)) @@ -241,7 +242,7 @@ fn udaf( return_type: PyDataType, state_type: Vec, py: Python, -) -> PyResult { +) -> PyResult { let name = accumulator .getattr(py, "__qualname__")? .extract::(py)?; @@ -250,7 +251,7 @@ fn udaf( let return_type = Arc::new(return_type.data_type); let state_type = Arc::new(state_type.into_iter().map(|t| t.data_type).collect()); - Ok(expression::AggregateUDF { + Ok(PyAggregateUDF { function: logical_plan::create_udaf( &name, input_type, diff --git a/python/src/lib.rs b/python/src/lib.rs index f0075c7b2913..a8c839339bb4 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -32,9 +32,9 @@ mod udf; /// DataFusion. #[pymodule] fn internals(py: Python, m: &PyModule) -> PyResult<()> { - m.add_class::()?; - m.add_class::()?; - m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; let functions = PyModule::new(py, "functions")?; functions::init(functions)?; diff --git a/python/src/pyarrow.rs b/python/src/pyarrow.rs index a633e11c9d6e..7b86edf29181 100644 --- a/python/src/pyarrow.rs +++ b/python/src/pyarrow.rs @@ -49,8 +49,9 @@ impl PyArrowConvert for DataType { fn to_pyarrow(&self, py: Python) -> PyResult { let c_schema = FFI_ArrowSchema::try_from(self).map_err(DataFusionError::from)?; let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; - let cls = py.import("pyarrow.DataType")?; - let dtype = cls.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?; + let module = py.import("pyarrow")?; + let class = module.getattr("DataType")?; + let dtype = class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?; Ok(dtype.into()) } } @@ -67,8 +68,9 @@ impl PyArrowConvert for Field { fn to_pyarrow(&self, py: Python) -> PyResult { let c_schema = FFI_ArrowSchema::try_from(self).map_err(DataFusionError::from)?; let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; - let cls = py.import("pyarrow.Field")?; - let dtype = cls.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?; + let module = py.import("pyarrow")?; + let class = module.getattr("Field")?; + let dtype = class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?; Ok(dtype.into()) } } @@ -85,8 +87,10 @@ impl PyArrowConvert for Schema { fn to_pyarrow(&self, py: Python) -> PyResult { let c_schema = FFI_ArrowSchema::try_from(self).map_err(DataFusionError::from)?; let c_schema_ptr = &c_schema as *const FFI_ArrowSchema; - let cls = py.import("pyarrow.Schema")?; - let schema = cls.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?; + let module = py.import("pyarrow")?; + let class = module.getattr("Schema")?; + let schema = + class.call_method1("_import_from_c", (c_schema_ptr as uintptr_t,))?; Ok(schema.into()) } } @@ -114,8 +118,9 @@ impl PyArrowConvert for ArrayRef { let (array_pointer, schema_pointer) = self.to_raw().map_err(DataFusionError::from)?; - let cls = py.import("pyarrow.Array")?; - let array = cls.call_method1( + let module = py.import("pyarrow")?; + let class = module.getattr("Array")?; + let array = class.call_method1( "_import_from_c", (array_pointer as uintptr_t, schema_pointer as uintptr_t), )?; @@ -153,8 +158,9 @@ impl PyArrowConvert for RecordBatch { py_names.push(field.name()); } - let cls = py.import("pyarrow.RecordBatch")?; - let record = cls.call_method1("from_arrays", (py_arrays, py_names))?; + let module = py.import("pyarrow")?; + let class = module.getattr("RecordBatch")?; + let record = class.call_method1("from_arrays", (py_arrays, py_names))?; Ok(PyObject::from(record)) } From f64bc8de09f0dd505bdb8967ea271537929036af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Tue, 10 Aug 2021 18:40:14 +0200 Subject: [PATCH 06/12] k --- python/src/dataframe.rs | 8 +++----- python/src/pyarrow.rs | 4 ++-- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs index 113fcfad4ecd..2dd42b64faaf 100644 --- a/python/src/dataframe.rs +++ b/python/src/dataframe.rs @@ -17,17 +17,15 @@ use std::sync::{Arc, Mutex}; -use logical_plan::LogicalPlan; use pyo3::{ prelude::*, types::{PyList, PyTuple}, }; use tokio::runtime::Runtime; -use datafusion::execution::context::ExecutionContext as _ExecutionContext; -use datafusion::logical_plan::{JoinType, LogicalPlanBuilder}; +use datafusion::execution::context::{ExecutionContext, ExecutionContextState}; +use datafusion::logical_plan::{JoinType, LogicalPlan, LogicalPlanBuilder}; use datafusion::physical_plan::collect; -use datafusion::{execution::context::ExecutionContextState, logical_plan}; use crate::{ errors, errors::DataFusionError, expression, expression::PyExpr, @@ -121,7 +119,7 @@ impl PyDataFrame { /// Executes the plan, returning a list of `RecordBatch`es. /// Unless some order is specified in the plan, there is no guarantee of the order of the result fn collect(&self, py: Python) -> PyResult { - let ctx = _ExecutionContext::from(self.ctx_state.clone()); + let ctx = ExecutionContext::from(self.ctx_state.clone()); let plan = ctx.optimize(&self.plan).map_err(DataFusionError::from)?; let plan = ctx .create_physical_plan(&plan) diff --git a/python/src/pyarrow.rs b/python/src/pyarrow.rs index 7b86edf29181..81180aa11cd5 100644 --- a/python/src/pyarrow.rs +++ b/python/src/pyarrow.rs @@ -19,7 +19,7 @@ use std::convert::TryFrom; use std::sync::Arc; use libc::uintptr_t; -use pyo3::exceptions::PyValueError; +use pyo3::exceptions::PyNotImplementedError; use pyo3::prelude::*; use pyo3::types::PyList; @@ -200,6 +200,6 @@ impl PyArrowConvert for ScalarValue { } fn to_pyarrow(&self, _py: Python) -> PyResult { - Err(PyValueError::new_err("argument is wrong")) + Err(PyNotImplementedError::new_err("Not implemented")) } } From 5500e54f2bd5754207d1f95571267673973d46af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Wed, 11 Aug 2021 14:18:48 +0200 Subject: [PATCH 07/12] remove scalar wrapper class --- python/src/scalar.rs | 35 ----------------------------------- python/src/udaf.rs | 21 +++++++++++---------- 2 files changed, 11 insertions(+), 45 deletions(-) delete mode 100644 python/src/scalar.rs diff --git a/python/src/scalar.rs b/python/src/scalar.rs deleted file mode 100644 index 6a5079d9e2ae..000000000000 --- a/python/src/scalar.rs +++ /dev/null @@ -1,35 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use pyo3::prelude::*; - -use crate::pyarrow::PyArrowConvert; -use datafusion::scalar::ScalarValue; - -/// An expression that can be used on a DataFrame -#[derive(Debug, Clone)] -pub(crate) struct Scalar { - pub(crate) scalar: ScalarValue, -} - -impl<'source> FromPyObject<'source> for Scalar { - fn extract(ob: &'source PyAny) -> PyResult { - Ok(Self { - scalar: ScalarValue::from_pyarrow(ob)?, - }) - } -} diff --git a/python/src/udaf.rs b/python/src/udaf.rs index 60440e0c45a1..316184e3a0e8 100644 --- a/python/src/udaf.rs +++ b/python/src/udaf.rs @@ -17,7 +17,10 @@ use std::sync::Arc; -use pyo3::{prelude::*, types::PyTuple}; +use pyo3::{ + prelude::*, + types::{PyList, PyTuple}, +}; use datafusion::arrow::array::ArrayRef; @@ -28,7 +31,6 @@ use datafusion::{ }; use crate::pyarrow::PyArrowConvert; -use crate::scalar::Scalar; #[derive(Debug)] struct PyAccumulator { @@ -44,16 +46,15 @@ impl PyAccumulator { impl Accumulator for PyAccumulator { fn state(&self) -> Result> { Python::with_gil(|py| { - let state = self - .accum + self.accum .as_ref(py) - .call_method0("to_scalars") - .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))? - .extract::>() - .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?; - - Ok(state.into_iter().map(|v| v.scalar).collect::>()) + .call_method0("to_scalars")? + .downcast::()? + .iter() + .map(ScalarValue::from_pyarrow) + .collect::>>() }) + .map_err(|e| InnerDataFusionError::Execution(format!("{}", e))) } fn update(&mut self, _values: &[ScalarValue]) -> Result<()> { From 3e239d7c6b98fc69b22e655db0844944ef53bef9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Wed, 11 Aug 2021 14:26:21 +0200 Subject: [PATCH 08/12] fix compilation --- python/src/lib.rs | 1 - python/src/udaf.rs | 9 ++------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index a8c839339bb4..ca0a3ea5df9c 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -24,7 +24,6 @@ mod errors; mod expression; mod functions; mod pyarrow; -mod scalar; mod types; mod udaf; mod udf; diff --git a/python/src/udaf.rs b/python/src/udaf.rs index 316184e3a0e8..ada75a035a28 100644 --- a/python/src/udaf.rs +++ b/python/src/udaf.rs @@ -69,15 +69,10 @@ impl Accumulator for PyAccumulator { fn evaluate(&self) -> Result { Python::with_gil(|py| { - let value = self - .accum - .as_ref(py) - .call_method0("evaluate") - .map_err(|e| InnerDataFusionError::Execution(format!("{}", e)))?; - + let value = self.accum.as_ref(py).call_method0("evaluate")?; ScalarValue::from_pyarrow(value) - .map_err(|e| InnerDataFusionError::Execution(format!("{}", e))) }) + .map_err(|e| InnerDataFusionError::Execution(format!("{}", e))) } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { From 0a671fff20c55c676dce5aff5d630f057b29833f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Wed, 11 Aug 2021 14:40:37 +0200 Subject: [PATCH 09/12] remove type wrapper --- python/src/context.rs | 11 +++++------ python/src/functions.rs | 43 ++++++++++++++++++++++++----------------- python/src/lib.rs | 1 - python/src/types.rs | 34 -------------------------------- 4 files changed, 30 insertions(+), 59 deletions(-) delete mode 100644 python/src/types.rs diff --git a/python/src/context.rs b/python/src/context.rs index e89a0870a944..027e402c7c93 100644 --- a/python/src/context.rs +++ b/python/src/context.rs @@ -34,7 +34,6 @@ use crate::dataframe::PyDataFrame; use crate::errors::DataFusionError; use crate::functions; use crate::pyarrow::PyArrowConvert; -use crate::types::PyDataType; /// `PyExecutionContext` is able to plan and execute DataFusion plans. /// It has a powerful optimizer, a physical planner for local execution, and a @@ -149,12 +148,12 @@ impl PyExecutionContext { &mut self, name: &str, func: PyObject, - args_types: Vec, - return_type: PyDataType, - ) { - let function = functions::create_udf(func, args_types, return_type, name); - + args_types: Vec<&PyAny>, + return_type: &PyAny, + ) -> PyResult<()> { + let function = functions::create_udf(func, args_types, return_type, name)?; self.ctx.register_udf(function.function); + Ok(()) } fn tables(&self) -> HashSet { diff --git a/python/src/functions.rs b/python/src/functions.rs index 350cac7dce04..f283e2fd838f 100644 --- a/python/src/functions.rs +++ b/python/src/functions.rs @@ -24,7 +24,7 @@ use pyo3::{prelude::*, types::PyTuple, wrap_pyfunction}; use crate::{ expression, expression::{PyAggregateUDF, PyExpr, PyScalarUDF}, - types::PyDataType, + pyarrow::PyArrowConvert, udaf, udf, }; @@ -203,53 +203,60 @@ define_unary_function!(count); pub(crate) fn create_udf( fun: PyObject, - input_types: Vec, - return_type: PyDataType, + input_types: Vec<&PyAny>, + return_type: &PyAny, name: &str, -) -> PyScalarUDF { - let input_types: Vec = - input_types.iter().map(|d| d.data_type.clone()).collect(); - let return_type = Arc::new(return_type.data_type); +) -> PyResult { + let input_types: Vec = input_types + .into_iter() + .map(DataType::from_pyarrow) + .collect::>()?; + let return_type = Arc::new(DataType::from_pyarrow(return_type)?); - PyScalarUDF { + Ok(PyScalarUDF { function: logical_plan::create_udf( name, input_types, return_type, udf::array_udf(fun), ), - } + }) } /// Creates a new udf. #[pyfunction] fn udf( fun: PyObject, - input_types: Vec, - return_type: PyDataType, + input_types: Vec<&PyAny>, + return_type: &PyAny, py: Python, ) -> PyResult { let name = fun.getattr(py, "__qualname__")?.extract::(py)?; - Ok(create_udf(fun, input_types, return_type, &name)) + create_udf(fun, input_types, return_type, &name) } /// Creates a new udf. #[pyfunction] fn udaf( accumulator: PyObject, - input_type: PyDataType, - return_type: PyDataType, - state_type: Vec, + input_type: &PyAny, + return_type: &PyAny, + state_type: Vec<&PyAny>, py: Python, ) -> PyResult { let name = accumulator .getattr(py, "__qualname__")? .extract::(py)?; - let input_type = input_type.data_type; - let return_type = Arc::new(return_type.data_type); - let state_type = Arc::new(state_type.into_iter().map(|t| t.data_type).collect()); + let input_type = DataType::from_pyarrow(input_type)?; + let return_type = Arc::new(DataType::from_pyarrow(return_type)?); + let state_type = Arc::new( + state_type + .into_iter() + .map(DataType::from_pyarrow) + .collect::>()?, + ); Ok(PyAggregateUDF { function: logical_plan::create_udaf( diff --git a/python/src/lib.rs b/python/src/lib.rs index ca0a3ea5df9c..85a5d61e4eda 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -24,7 +24,6 @@ mod errors; mod expression; mod functions; mod pyarrow; -mod types; mod udaf; mod udf; diff --git a/python/src/types.rs b/python/src/types.rs deleted file mode 100644 index cfba0c639a47..000000000000 --- a/python/src/types.rs +++ /dev/null @@ -1,34 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use datafusion::arrow::datatypes::DataType; -use pyo3::{FromPyObject, PyAny, PyResult}; - -use crate::pyarrow::PyArrowConvert; - -/// utility struct to convert PyObj to native DataType -#[derive(Debug, Clone)] -pub struct PyDataType { - pub data_type: DataType, -} - -impl<'source> FromPyObject<'source> for PyDataType { - fn extract(ob: &'source PyAny) -> PyResult { - let dtype = DataType::from_pyarrow(ob)?; - Ok(PyDataType { data_type: dtype }) - } -} From ad0747474d9a199603c552d17d5ad2a2cecd1691 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Wed, 11 Aug 2021 15:12:15 +0200 Subject: [PATCH 10/12] isort --- python/datafusion/__init__.py | 10 ++++------ python/datafusion/tests/test_df.py | 1 + python/datafusion/tests/test_math_functions.py | 1 + python/datafusion/tests/test_sql.py | 1 + python/datafusion/tests/test_string_functions.py | 1 + python/datafusion/tests/test_udaf.py | 1 + python/pyproject.toml | 5 ----- 7 files changed, 9 insertions(+), 11 deletions(-) diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index 6406cf11a9c6..c6866ea18a3a 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -1,6 +1,4 @@ -from .internals import ( - PyExecutionContext as ExecutionContext, - PyDataFrame as DataFrame, - PyExpr as Expr, - functions -) +from .internals import PyDataFrame as DataFrame +from .internals import PyExecutionContext as ExecutionContext +from .internals import PyExpr as Expr +from .internals import functions diff --git a/python/datafusion/tests/test_df.py b/python/datafusion/tests/test_df.py index 5b6cbddbd74b..b04eba53f6fd 100644 --- a/python/datafusion/tests/test_df.py +++ b/python/datafusion/tests/test_df.py @@ -17,6 +17,7 @@ import pyarrow as pa import pytest + from datafusion import ExecutionContext from datafusion import functions as f diff --git a/python/datafusion/tests/test_math_functions.py b/python/datafusion/tests/test_math_functions.py index 98656b8c4f42..4e473c3de16a 100644 --- a/python/datafusion/tests/test_math_functions.py +++ b/python/datafusion/tests/test_math_functions.py @@ -18,6 +18,7 @@ import numpy as np import pyarrow as pa import pytest + from datafusion import ExecutionContext from datafusion import functions as f diff --git a/python/datafusion/tests/test_sql.py b/python/datafusion/tests/test_sql.py index 669f640529eb..d6a16f23b6c8 100644 --- a/python/datafusion/tests/test_sql.py +++ b/python/datafusion/tests/test_sql.py @@ -20,6 +20,7 @@ import pytest from datafusion import ExecutionContext + from . import generic as helpers diff --git a/python/datafusion/tests/test_string_functions.py b/python/datafusion/tests/test_string_functions.py index ea064a6b2e9f..4255d34805a0 100644 --- a/python/datafusion/tests/test_string_functions.py +++ b/python/datafusion/tests/test_string_functions.py @@ -17,6 +17,7 @@ import pyarrow as pa import pytest + from datafusion import ExecutionContext from datafusion import functions as f diff --git a/python/datafusion/tests/test_udaf.py b/python/datafusion/tests/test_udaf.py index e7044d6119e3..aca1215a7cb2 100644 --- a/python/datafusion/tests/test_udaf.py +++ b/python/datafusion/tests/test_udaf.py @@ -20,6 +20,7 @@ import pyarrow as pa import pyarrow.compute as pc import pytest + from datafusion import ExecutionContext from datafusion import functions as f diff --git a/python/pyproject.toml b/python/pyproject.toml index 401426509c37..21319a0fb9c7 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -27,8 +27,3 @@ dependencies = [ [tool.isort] profile = "black" - -# [tool.mypy] -# ignore_missing_imports = true -# disallow_untyped_defs = true -# files = "datafusion" From 3965075e147228518a0d0cb38bf3cfe4f3d92f99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Wed, 11 Aug 2021 15:44:59 +0200 Subject: [PATCH 11/12] fix project name --- python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index 21319a0fb9c7..ce33f58d2917 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -20,7 +20,7 @@ requires = ["maturin>=0.11,<0.12"] build-backend = "maturin" [project] -name = "ebtelep" +name = "datafusion" dependencies = [ "pyarrow" ] From 4f3698f66ebc6fe8593e35edb0e926c70990eb2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kriszti=C3=A1n=20Sz=C5=B1cs?= Date: Wed, 11 Aug 2021 15:47:24 +0200 Subject: [PATCH 12/12] cleanup --- python/datafusion/__init__.py | 24 ++++++++++++++++++++++++ python/src/catalog.rs | 1 - python/src/lib.rs | 1 - 3 files changed, 24 insertions(+), 2 deletions(-) delete mode 100644 python/src/catalog.rs diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index c6866ea18a3a..20bc3f22bfc5 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -1,4 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + from .internals import PyDataFrame as DataFrame from .internals import PyExecutionContext as ExecutionContext from .internals import PyExpr as Expr from .internals import functions + +__all__ = [ + "DataFrame", + "ExecutionContext", + "Expr", + "functions" +] diff --git a/python/src/catalog.rs b/python/src/catalog.rs deleted file mode 100644 index 8b137891791f..000000000000 --- a/python/src/catalog.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/python/src/lib.rs b/python/src/lib.rs index 85a5d61e4eda..19b7f8a2d1ff 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -17,7 +17,6 @@ use pyo3::prelude::*; -mod catalog; mod context; mod dataframe; mod errors;