diff --git a/ibis/backends/base/__init__.py b/ibis/backends/base/__init__.py index 25ccbe1886fb..8b5fc07692df 100644 --- a/ibis/backends/base/__init__.py +++ b/ibis/backends/base/__init__.py @@ -20,12 +20,6 @@ MutableMapping, ) -if TYPE_CHECKING: - import pandas as pd - import pyarrow as pa - - import ibis.expr.schema as sch - import ibis import ibis.common.exceptions as exc import ibis.config @@ -33,6 +27,10 @@ import ibis.expr.types as ir from ibis import util +if TYPE_CHECKING: + import pandas as pd + import pyarrow as pa + __all__ = ('BaseBackend', 'Database', 'connect') @@ -223,16 +221,6 @@ def _import_pyarrow(): else: return pyarrow - @staticmethod - def _table_or_column_schema(expr: ir.Expr) -> sch.Schema: - from ibis.backends.pyarrow.datatypes import sch - - if isinstance(expr, ir.Table): - return expr.schema() - else: - # ColumnExpr has no schema method, define single-column schema - return sch.schema([(expr.get_name(), expr.type())]) - @util.experimental def to_pyarrow( self, @@ -275,7 +263,7 @@ def to_pyarrow( except ValueError: # The pyarrow batches iterator is empty so pass in an empty # iterator and a pyarrow schema - schema = self._table_or_column_schema(expr) + schema = expr.as_table().schema() table = pa.Table.from_batches([], schema=schema.to_pyarrow()) if isinstance(expr, ir.Table): diff --git a/ibis/backends/base/sql/__init__.py b/ibis/backends/base/sql/__init__.py index 791403f94beb..025cf65fd658 100644 --- a/ibis/backends/base/sql/__init__.py +++ b/ibis/backends/base/sql/__init__.py @@ -154,7 +154,7 @@ def to_pyarrow_batches( params: Mapping[ir.Scalar, Any] | None = None, limit: int | str | None = None, chunk_size: int = 1_000_000, - **kwargs: Any, + **_: Any, ) -> pa.ipc.RecordBatchReader: """Execute expression and return an iterator of pyarrow record batches. @@ -172,31 +172,25 @@ def to_pyarrow_batches( Mapping of scalar parameter expressions to value. chunk_size Maximum number of rows in each returned record batch. - kwargs - Keyword arguments Returns ------- - results - RecordBatchReader + RecordBatchReader + Collection of pyarrow `RecordBatch`s. """ pa = self._import_pyarrow() - from ibis.backends.pyarrow.datatypes import ibis_to_pyarrow_struct - - schema = self._table_or_column_schema(expr) - - def _batches(): + schema = expr.as_table().schema() + array_type = schema.as_struct().to_pyarrow() + arrays = ( + pa.array(map(tuple, batch), type=array_type) for batch in self._cursor_batches( expr, params=params, limit=limit, chunk_size=chunk_size - ): - struct_array = pa.array( - map(tuple, batch), - type=ibis_to_pyarrow_struct(schema), - ) - yield pa.RecordBatch.from_struct_array(struct_array) + ) + ) + batches = map(pa.RecordBatch.from_struct_array, arrays) - return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), _batches()) + return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batches) def execute( self, diff --git a/ibis/backends/clickhouse/__init__.py b/ibis/backends/clickhouse/__init__.py index b54e185e3046..8e8224733177 100644 --- a/ibis/backends/clickhouse/__init__.py +++ b/ibis/backends/clickhouse/__init__.py @@ -303,21 +303,16 @@ def to_pyarrow_batches( """ pa = self._import_pyarrow() - from ibis.backends.pyarrow.datatypes import ibis_to_pyarrow_struct - - schema = self._table_or_column_schema(expr) - - def _batches(): + schema = expr.as_table().schema() + array_type = schema.as_struct().to_pyarrow() + batches = ( + pa.RecordBatch.from_struct_array(pa.array(batch, type=array_type)) for batch in self._cursor_batches( expr, params=params, limit=limit, chunk_size=chunk_size - ): - struct_array = pa.array( - map(tuple, batch), - type=ibis_to_pyarrow_struct(schema), - ) - yield pa.RecordBatch.from_struct_array(struct_array) - - return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), _batches()) + ) + ) + + return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batches) def _cursor_batches( self, diff --git a/ibis/expr/schema.py b/ibis/expr/schema.py index e42ecff893b1..ddc78c62112a 100644 --- a/ibis/expr/schema.py +++ b/ibis/expr/schema.py @@ -215,6 +215,9 @@ def to_pyarrow(self): return ibis_to_pyarrow_schema(self) + def as_struct(self) -> dt.Struct: + return dt.Struct(self.names, self.types) + def __gt__(self, other: Schema) -> bool: """Return whether `self` is a strict superset of `other`.""" return set(self.items()) > set(other.items())