Skip to content

Commit

Permalink
Refactor pycapsule export (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
kylebarron authored Jul 23, 2024
1 parent 13e9a18 commit a8764e3
Show file tree
Hide file tree
Showing 12 changed files with 129 additions and 80 deletions.
18 changes: 3 additions & 15 deletions pyo3-arrow/src/array.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::ffi::CString;
use std::fmt::Display;
use std::sync::Arc;

use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use arrow_array::{make_array, Array, ArrayRef};
use arrow_schema::{Field, FieldRef};
use pyo3::intern;
Expand All @@ -11,6 +9,7 @@ use pyo3::types::{PyCapsule, PyTuple, PyType};

use crate::error::PyArrowResult;
use crate::ffi::from_python::utils::import_array_pycapsules;
use crate::ffi::to_array_pycapsules;
use crate::ffi::to_python::nanoarrow::to_nanoarrow_array;
use crate::interop::numpy::to_numpy::to_numpy;

Expand Down Expand Up @@ -124,20 +123,9 @@ impl PyArray {
pub fn __arrow_c_array__<'py>(
&'py self,
py: Python<'py>,
requested_schema: Option<PyObject>,
requested_schema: Option<Bound<PyCapsule>>,
) -> PyArrowResult<Bound<PyTuple>> {
let field = &self.field;
let ffi_schema = FFI_ArrowSchema::try_from(field)?;
let ffi_array = FFI_ArrowArray::new(&self.array.to_data());

let schema_capsule_name = CString::new("arrow_schema").unwrap();
let array_capsule_name = CString::new("arrow_array").unwrap();

let schema_capsule = PyCapsule::new_bound(py, ffi_schema, Some(schema_capsule_name))?;
let array_capsule = PyCapsule::new_bound(py, ffi_array, Some(array_capsule_name))?;
let tuple = PyTuple::new_bound(py, vec![schema_capsule, array_capsule]);

Ok(tuple)
to_array_pycapsules(py, self.field.clone(), &self.array, requested_schema)
}

pub fn __eq__(&self, other: &PyArray) -> bool {
Expand Down
17 changes: 7 additions & 10 deletions pyo3-arrow/src/chunked.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::ffi::CString;
use std::fmt::Display;
use std::sync::Arc;

Expand All @@ -13,8 +12,8 @@ use crate::error::{PyArrowError, PyArrowResult};
use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
use crate::ffi::from_python::utils::import_stream_pycapsule;
use crate::ffi::to_python::chunked::ArrayIterator;
use crate::ffi::to_python::ffi_stream::new_stream;
use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
use crate::ffi::to_python::to_stream_pycapsule;
use crate::interop::numpy::to_numpy::chunked_to_numpy;

/// A Python-facing Arrow chunked array.
Expand Down Expand Up @@ -149,15 +148,13 @@ impl PyChunkedArray {
fn __arrow_c_stream__<'py>(
&'py self,
py: Python<'py>,
requested_schema: Option<PyObject>,
requested_schema: Option<Bound<PyCapsule>>,
) -> PyResult<Bound<'py, PyCapsule>> {
let field = self.field.clone();
let chunks = self.chunks.clone();

let array_reader = Box::new(ArrayIterator::new(chunks.into_iter().map(Ok), field));
let ffi_stream = new_stream(array_reader);
let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
PyCapsule::new_bound(py, ffi_stream, Some(stream_capsule_name))
let array_reader = Box::new(ArrayIterator::new(
self.chunks.clone().into_iter().map(Ok),
self.field.clone(),
));
to_stream_pycapsule(py, array_reader, requested_schema)
}

pub fn __eq__(&self, other: &PyChunkedArray) -> bool {
Expand Down
8 changes: 6 additions & 2 deletions pyo3-arrow/src/ffi/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
pub mod from_python;
pub mod to_python;
pub(crate) mod from_python;
pub(crate) mod to_python;

pub use to_python::chunked::{ArrayIterator, ArrayReader};
pub use to_python::{to_array_pycapsules, to_schema_pycapsule, to_stream_pycapsule};
// pub use
1 change: 1 addition & 0 deletions pyo3-arrow/src/ffi/to_python/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ impl<R: ArrayReader + ?Sized> ArrayReader for Box<R> {
}
}

/// An iterator of [`ArrayRef`] with an attached [`FieldRef`]
pub struct ArrayIterator<I>
where
I: IntoIterator<Item = Result<ArrayRef, ArrowError>>,
Expand Down
3 changes: 3 additions & 0 deletions pyo3-arrow/src/ffi/to_python/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod chunked;
pub mod ffi_stream;
pub mod nanoarrow;
mod utils;

pub use utils::{to_array_pycapsules, to_schema_pycapsule, to_stream_pycapsule};
55 changes: 55 additions & 0 deletions pyo3-arrow/src/ffi/to_python/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::ffi::CString;

use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use arrow_array::Array;
use arrow_schema::{ArrowError, FieldRef};
use pyo3::prelude::*;
use pyo3::types::{PyCapsule, PyTuple};

use crate::error::PyArrowResult;
use crate::ffi::to_python::ffi_stream::new_stream;
use crate::ffi::ArrayReader;

/// Export a [`arrow_schema::Schema`], [`arrow_schema::Field`], or [`arrow_schema::DataType`] to a
/// PyCapsule holding an Arrow C Schema pointer.
pub fn to_schema_pycapsule(
py: Python,
field: impl TryInto<FFI_ArrowSchema, Error = ArrowError>,
) -> PyArrowResult<Bound<PyCapsule>> {
let ffi_schema: FFI_ArrowSchema = field.try_into()?;
let schema_capsule_name = CString::new("arrow_schema").unwrap();
let schema_capsule = PyCapsule::new_bound(py, ffi_schema, Some(schema_capsule_name))?;
Ok(schema_capsule)
}

/// Export an [`Array`] and [`FieldRef`] to a tuple of PyCapsules holding an Arrow C Schema and
/// Arrow C Array pointers.
pub fn to_array_pycapsules<'py>(
py: Python<'py>,
field: FieldRef,
array: &dyn Array,
_requested_schema: Option<Bound<PyCapsule>>,
) -> PyArrowResult<Bound<'py, PyTuple>> {
let ffi_schema = FFI_ArrowSchema::try_from(&field)?;
let ffi_array = FFI_ArrowArray::new(&array.to_data());

let schema_capsule_name = CString::new("arrow_schema").unwrap();
let array_capsule_name = CString::new("arrow_array").unwrap();

let schema_capsule = PyCapsule::new_bound(py, ffi_schema, Some(schema_capsule_name))?;
let array_capsule = PyCapsule::new_bound(py, ffi_array, Some(array_capsule_name))?;
let tuple = PyTuple::new_bound(py, vec![schema_capsule, array_capsule]);

Ok(tuple)
}

/// Export an [`ArrayIterator`] to a PyCapsule holding an Arrow C Stream pointer.
pub fn to_stream_pycapsule<'py>(
py: Python<'py>,
array_reader: Box<dyn ArrayReader + Send>,
_requested_schema: Option<Bound<PyCapsule>>,
) -> PyResult<Bound<'py, PyCapsule>> {
let ffi_stream = new_stream(array_reader);
let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
PyCapsule::new_bound(py, ffi_stream, Some(stream_capsule_name))
}
11 changes: 2 additions & 9 deletions pyo3-arrow/src/field.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::ffi::CString;
use std::fmt::Display;
use std::sync::Arc;

use arrow::ffi::FFI_ArrowSchema;
use arrow_schema::{Field, FieldRef};
use pyo3::exceptions::PyTypeError;
use pyo3::intern;
Expand All @@ -12,6 +10,7 @@ use pyo3::types::{PyCapsule, PyTuple, PyType};
use crate::error::PyArrowResult;
use crate::ffi::from_python::utils::import_schema_pycapsule;
use crate::ffi::to_python::nanoarrow::to_nanoarrow_schema;
use crate::ffi::to_python::to_schema_pycapsule;

/// A Python-facing Arrow field.
///
Expand Down Expand Up @@ -93,13 +92,7 @@ impl PyField {
/// For example, you can call [`pyarrow.field()`][pyarrow.field] to convert this array
/// into a pyarrow field, without copying memory.
fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
let ffi_schema = FFI_ArrowSchema::try_from(self.0.as_ref())?;
let schema_capsule_name = CString::new("arrow_schema").unwrap();
Ok(PyCapsule::new_bound(
py,
ffi_schema,
Some(schema_capsule_name),
)?)
to_schema_pycapsule(py, self.0.as_ref())
}

pub fn __eq__(&self, other: &PyField) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion pyo3-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod array;
mod chunked;
pub mod error;
mod ffi;
pub mod ffi;
mod field;
pub mod input;
mod interop;
Expand Down
31 changes: 17 additions & 14 deletions pyo3-arrow/src/record_batch.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::ffi::CString;
use std::fmt::Display;
use std::sync::Arc;

use arrow::array::AsArray;
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
use arrow_array::{Array, RecordBatch, StructArray};
use arrow_schema::{DataType, SchemaBuilder};
use arrow_array::{Array, ArrayRef, RecordBatch, StructArray};
use arrow_schema::{DataType, Field, SchemaBuilder};
use pyo3::exceptions::PyValueError;
use pyo3::intern;
use pyo3::prelude::*;
Expand All @@ -14,6 +12,7 @@ use pyo3::types::{PyCapsule, PyTuple, PyType};
use crate::error::PyArrowResult;
use crate::ffi::from_python::utils::import_array_pycapsules;
use crate::ffi::to_python::nanoarrow::to_nanoarrow_array;
use crate::ffi::to_python::to_array_pycapsules;
use crate::schema::display_schema;

/// A Python-facing Arrow record batch.
Expand Down Expand Up @@ -100,19 +99,23 @@ impl PyRecordBatch {
pub fn __arrow_c_array__<'py>(
&'py self,
py: Python<'py>,
requested_schema: Option<PyObject>,
requested_schema: Option<Bound<PyCapsule>>,
) -> PyArrowResult<Bound<'py, PyTuple>> {
let schema = self.0.schema();
let array = StructArray::from(self.0.clone());
let field = Field::new_struct("", self.0.schema_ref().fields().clone(), false);
let array: ArrayRef = Arc::new(StructArray::from(self.0.clone()));
to_array_pycapsules(py, field.into(), &array, requested_schema)

let ffi_schema = FFI_ArrowSchema::try_from(schema.as_ref())?;
let ffi_array = FFI_ArrowArray::new(&array.to_data());
// let schema = self.0.schema();
// let array = StructArray::from(self.0.clone());

let schema_capsule_name = CString::new("arrow_schema").unwrap();
let array_capsule_name = CString::new("arrow_array").unwrap();
let schema_capsule = PyCapsule::new_bound(py, ffi_schema, Some(schema_capsule_name))?;
let array_capsule = PyCapsule::new_bound(py, ffi_array, Some(array_capsule_name))?;
Ok(PyTuple::new_bound(py, vec![schema_capsule, array_capsule]))
// let ffi_schema = FFI_ArrowSchema::try_from(schema.as_ref())?;
// let ffi_array = FFI_ArrowArray::new(&array.to_data());

// let schema_capsule_name = CString::new("arrow_schema").unwrap();
// let array_capsule_name = CString::new("arrow_array").unwrap();
// let schema_capsule = PyCapsule::new_bound(py, ffi_schema, Some(schema_capsule_name))?;
// let array_capsule = PyCapsule::new_bound(py, ffi_array, Some(array_capsule_name))?;
// Ok(PyTuple::new_bound(py, vec![schema_capsule, array_capsule]))
}

pub fn __eq__(&self, other: &PyRecordBatch) -> bool {
Expand Down
24 changes: 16 additions & 8 deletions pyo3-arrow/src/record_batch_reader.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::ffi::CString;
use std::fmt::Display;
use std::sync::Arc;

use arrow::ffi_stream::FFI_ArrowArrayStream;
use arrow_array::RecordBatchReader;
use arrow_schema::SchemaRef;
use arrow_array::{ArrayRef, RecordBatchReader, StructArray};
use arrow_schema::{Field, SchemaRef};
use pyo3::exceptions::{PyIOError, PyValueError};
use pyo3::intern;
use pyo3::prelude::*;
use pyo3::types::{PyCapsule, PyTuple, PyType};

use crate::error::PyArrowResult;
use crate::ffi::from_python::utils::import_stream_pycapsule;
use crate::ffi::to_python::chunked::ArrayIterator;
use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
use crate::ffi::to_python::to_stream_pycapsule;
use crate::schema::display_schema;
use crate::{PySchema, PyTable};

Expand Down Expand Up @@ -124,16 +125,23 @@ impl PyRecordBatchReader {
fn __arrow_c_stream__<'py>(
&'py mut self,
py: Python<'py>,
requested_schema: Option<PyObject>,
requested_schema: Option<Bound<PyCapsule>>,
) -> PyResult<Bound<'py, PyCapsule>> {
let reader = self
.0
.take()
.ok_or(PyIOError::new_err("Cannot read from closed stream"))?;

let ffi_stream = FFI_ArrowArrayStream::new(reader);
let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
PyCapsule::new_bound(py, ffi_stream, Some(stream_capsule_name))
let schema = reader.schema().clone();
let array_reader = reader.into_iter().map(|maybe_batch| {
let arr: ArrayRef = Arc::new(StructArray::from(maybe_batch?));
Ok(arr)
});
let array_reader = Box::new(ArrayIterator::new(
array_reader,
Field::new_struct("", schema.fields().clone(), false).into(),
));
to_stream_pycapsule(py, array_reader, requested_schema)
}

pub fn __repr__(&self) -> String {
Expand Down
8 changes: 2 additions & 6 deletions pyo3-arrow/src/schema.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::ffi::CString;
use std::fmt::Display;
use std::sync::Arc;

use arrow::ffi::FFI_ArrowSchema;
use arrow_schema::{Schema, SchemaRef};
use pyo3::exceptions::PyTypeError;
use pyo3::intern;
Expand All @@ -12,6 +10,7 @@ use pyo3::types::{PyCapsule, PyTuple, PyType};
use crate::error::PyArrowResult;
use crate::ffi::from_python::utils::import_schema_pycapsule;
use crate::ffi::to_python::nanoarrow::to_nanoarrow_schema;
use crate::ffi::to_python::to_schema_pycapsule;

/// A Python-facing Arrow schema.
///
Expand Down Expand Up @@ -98,10 +97,7 @@ impl PySchema {
/// For example, you can call [`pyarrow.schema()`][pyarrow.schema] to convert this array
/// into a pyarrow schema, without copying memory.
fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
let ffi_schema = FFI_ArrowSchema::try_from(self.as_ref())?;
let schema_capsule_name = CString::new("arrow_schema").unwrap();
let schema_capsule = PyCapsule::new_bound(py, ffi_schema, Some(schema_capsule_name))?;
Ok(schema_capsule)
to_schema_pycapsule(py, self.0.as_ref())
}

pub fn __repr__(&self) -> String {
Expand Down
31 changes: 16 additions & 15 deletions pyo3-arrow/src/table.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::ffi::CString;
use std::fmt::Display;
use std::sync::Arc;

use arrow::ffi_stream::ArrowArrayStreamReader as ArrowRecordBatchStreamReader;
use arrow::ffi_stream::FFI_ArrowArrayStream;
use arrow_array::RecordBatchReader;
use arrow_array::{RecordBatch, RecordBatchIterator};
use arrow_schema::SchemaRef;
use arrow_array::RecordBatch;
use arrow_array::{ArrayRef, RecordBatchReader, StructArray};
use arrow_schema::{Field, SchemaRef};
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::intern;
use pyo3::prelude::*;
use pyo3::types::{PyCapsule, PyTuple, PyType};

use crate::ffi::from_python::utils::import_stream_pycapsule;
use crate::ffi::to_python::chunked::ArrayIterator;
use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
use crate::ffi::to_python::to_stream_pycapsule;
use crate::schema::display_schema;
use crate::PySchema;

Expand Down Expand Up @@ -87,18 +88,18 @@ impl PyTable {
fn __arrow_c_stream__<'py>(
&'py self,
py: Python<'py>,
requested_schema: Option<PyObject>,
requested_schema: Option<Bound<PyCapsule>>,
) -> PyResult<Bound<'py, PyCapsule>> {
let batches = self.batches.clone();

let record_batch_reader = Box::new(RecordBatchIterator::new(
batches.into_iter().map(Ok),
self.schema.clone(),
let field = self.schema.fields().clone();
let array_reader = self.batches.clone().into_iter().map(|batch| {
let arr: ArrayRef = Arc::new(StructArray::from(batch));
Ok(arr)
});
let array_reader = Box::new(ArrayIterator::new(
array_reader,
Field::new_struct("", field, false).into(),
));
let ffi_stream = FFI_ArrowArrayStream::new(record_batch_reader);

let stream_capsule_name = CString::new("arrow_array_stream").unwrap();
PyCapsule::new_bound(py, ffi_stream, Some(stream_capsule_name))
to_stream_pycapsule(py, array_reader, requested_schema)
}

pub fn __eq__(&self, other: &PyTable) -> bool {
Expand Down

0 comments on commit a8764e3

Please sign in to comment.