Skip to content

Commit

Permalink
feat(datatypes): add as_struct method to convert schemas to structs
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Jan 20, 2023
1 parent a7aa3a2 commit 64be7b1
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 47 deletions.
22 changes: 5 additions & 17 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,17 @@
MutableMapping,
)

if TYPE_CHECKING:
import pandas as pd
import pyarrow as pa

import ibis.expr.schema as sch

import ibis
import ibis.common.exceptions as exc
import ibis.config
import ibis.expr.operations as ops
import ibis.expr.types as ir
from ibis import util

if TYPE_CHECKING:
import pandas as pd
import pyarrow as pa

__all__ = ('BaseBackend', 'Database', 'connect')


Expand Down Expand Up @@ -223,16 +221,6 @@ def _import_pyarrow():
else:
return pyarrow

@staticmethod
def _table_or_column_schema(expr: ir.Expr) -> sch.Schema:
from ibis.backends.pyarrow.datatypes import sch

if isinstance(expr, ir.Table):
return expr.schema()
else:
# ColumnExpr has no schema method, define single-column schema
return sch.schema([(expr.get_name(), expr.type())])

@util.experimental
def to_pyarrow(
self,
Expand Down Expand Up @@ -275,7 +263,7 @@ def to_pyarrow(
except ValueError:
# The pyarrow batches iterator is empty so pass in an empty
# iterator and a pyarrow schema
schema = self._table_or_column_schema(expr)
schema = expr.as_table().schema()
table = pa.Table.from_batches([], schema=schema.to_pyarrow())

if isinstance(expr, ir.Table):
Expand Down
28 changes: 11 additions & 17 deletions ibis/backends/base/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def to_pyarrow_batches(
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
**kwargs: Any,
**_: Any,
) -> pa.ipc.RecordBatchReader:
"""Execute expression and return an iterator of pyarrow record batches.
Expand All @@ -172,31 +172,25 @@ def to_pyarrow_batches(
Mapping of scalar parameter expressions to value.
chunk_size
Maximum number of rows in each returned record batch.
kwargs
Keyword arguments
Returns
-------
results
RecordBatchReader
RecordBatchReader
Collection of pyarrow `RecordBatch`s.
"""
pa = self._import_pyarrow()

from ibis.backends.pyarrow.datatypes import ibis_to_pyarrow_struct

schema = self._table_or_column_schema(expr)

def _batches():
schema = expr.as_table().schema()
array_type = schema.as_struct().to_pyarrow()
arrays = (
pa.array(map(tuple, batch), type=array_type)
for batch in self._cursor_batches(
expr, params=params, limit=limit, chunk_size=chunk_size
):
struct_array = pa.array(
map(tuple, batch),
type=ibis_to_pyarrow_struct(schema),
)
yield pa.RecordBatch.from_struct_array(struct_array)
)
)
batches = map(pa.RecordBatch.from_struct_array, arrays)

return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), _batches())
return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batches)

def execute(
self,
Expand Down
21 changes: 8 additions & 13 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,21 +303,16 @@ def to_pyarrow_batches(
"""
pa = self._import_pyarrow()

from ibis.backends.pyarrow.datatypes import ibis_to_pyarrow_struct

schema = self._table_or_column_schema(expr)

def _batches():
schema = expr.as_table().schema()
array_type = schema.as_struct().to_pyarrow()
batches = (
pa.RecordBatch.from_struct_array(pa.array(batch, type=array_type))
for batch in self._cursor_batches(
expr, params=params, limit=limit, chunk_size=chunk_size
):
struct_array = pa.array(
map(tuple, batch),
type=ibis_to_pyarrow_struct(schema),
)
yield pa.RecordBatch.from_struct_array(struct_array)

return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), _batches())
)
)

return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batches)

def _cursor_batches(
self,
Expand Down
3 changes: 3 additions & 0 deletions ibis/expr/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ def to_pyarrow(self):

return ibis_to_pyarrow_schema(self)

def as_struct(self) -> dt.Struct:
return dt.Struct(self.names, self.types)

def __gt__(self, other: Schema) -> bool:
"""Return whether `self` is a strict superset of `other`."""
return set(self.items()) > set(other.items())
Expand Down

0 comments on commit 64be7b1

Please sign in to comment.