Skip to content

Commit

Permalink
fix(snowflake): fix array printing by using a pyarrow extension type
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Nov 18, 2023
1 parent 8bac145 commit 7d8fe5a
Showing 6 changed files with 100 additions and 13 deletions.
6 changes: 3 additions & 3 deletions ibis/backends/snowflake/__init__.py
Original file line number Diff line number Diff line change
@@ -364,6 +364,8 @@ def to_pyarrow(
limit: int | str | None = None,
**_: Any,
) -> pa.Table:
from ibis.backends.snowflake.converter import SnowflakePyArrowData

self._run_pre_execute_hooks(expr)

query_ast = self.compiler.to_ast_ensure_limit(expr, limit, params=params)
@@ -375,9 +377,7 @@ def to_pyarrow(
if res is None:
res = target_schema.empty_table()

res = res.rename_columns(target_schema.names).cast(target_schema)

return expr.__pyarrow_result__(res)
return expr.__pyarrow_result__(res, data_mapper=SnowflakePyArrowData)

def fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
if (table := cursor.cursor.fetch_arrow_all()) is None:
29 changes: 29 additions & 0 deletions ibis/backends/snowflake/converter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from ibis.formats.pandas import PandasData
from ibis.formats.pyarrow import PYARROW_JSON_TYPE, PyArrowData

if TYPE_CHECKING:
import pyarrow as pa

import ibis.expr.datatypes as dt
from ibis.expr.schema import Schema


class SnowflakePandasData(PandasData):
@@ -10,3 +19,23 @@ def convert_JSON(s, dtype, pandas_type):
return s.map(converter, na_action="ignore").astype("object")

convert_Struct = convert_Array = convert_Map = convert_JSON


class SnowflakePyArrowData(PyArrowData):
@classmethod
def convert_table(cls, table: pa.Table, schema: Schema) -> pa.Table:
import pyarrow as pa

columns = [cls.convert_column(table[name], typ) for name, typ in schema.items()]
return pa.Table.from_arrays(columns, names=schema.names)

@classmethod
def convert_column(cls, column: pa.Array, dtype: dt.DataType) -> pa.Array:
if dtype.is_json() or dtype.is_array() or dtype.is_map() or dtype.is_struct():
import pyarrow as pa

if isinstance(column, pa.ChunkedArray):
column = column.combine_chunks()

return pa.ExtensionArray.from_storage(PYARROW_JSON_TYPE, column)
return super().convert_column(column, dtype)
7 changes: 7 additions & 0 deletions ibis/backends/snowflake/tests/test_client.py
Original file line number Diff line number Diff line change
@@ -219,3 +219,10 @@ def test_read_parquet(con, data_dir):
t = con.read_parquet(path)

assert t.timestamp_col.type().is_timestamp()


def test_array_repr(con, monkeypatch):
monkeypatch.setattr(ibis.options, "interactive", True)
t = con.tables.ARRAY_TYPES
expr = t.x
assert repr(expr)
19 changes: 13 additions & 6 deletions ibis/expr/types/generic.py
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

import ibis.expr.builders as bl
import ibis.expr.types as ir
from ibis.formats.pyarrow import PyArrowData


@public
@@ -1204,10 +1205,13 @@ class Scalar(Value):
def __interactive_rich_console__(self, console, options):
return console.render(repr(self.execute()), options=options)

def __pyarrow_result__(self, table: pa.Table) -> pa.Scalar:
from ibis.formats.pyarrow import PyArrowData
def __pyarrow_result__(
self, table: pa.Table, data_mapper: type[PyArrowData] | None = None
) -> pa.Scalar:
if data_mapper is None:
from ibis.formats.pyarrow import PyArrowData as data_mapper

return PyArrowData.convert_scalar(table[0][0], self.type())
return data_mapper.convert_scalar(table[0][0], self.type())

def __pandas_result__(self, df: pd.DataFrame) -> Any:
return df.iat[0, 0]
@@ -1275,10 +1279,13 @@ def __interactive_rich_console__(self, console, options):
projection = named.as_table()
return console.render(projection, options=options)

def __pyarrow_result__(self, table: pa.Table) -> pa.Array | pa.ChunkedArray:
from ibis.formats.pyarrow import PyArrowData
def __pyarrow_result__(
self, table: pa.Table, data_mapper: type[PyArrowData] | None = None
) -> pa.Array | pa.ChunkedArray:
if data_mapper is None:
from ibis.formats.pyarrow import PyArrowData as data_mapper

return PyArrowData.convert_column(table[0], self.type())
return data_mapper.convert_column(table[0], self.type())

def __pandas_result__(self, df: pd.DataFrame) -> pd.Series:
from ibis.formats.pandas import PandasData
10 changes: 7 additions & 3 deletions ibis/expr/types/relations.py
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
from ibis.expr.types.groupby import GroupedTable
from ibis.expr.types.tvf import WindowedTable
from ibis.selectors import IfAnyAll, Selector
from ibis.formats.pyarrow import PyArrowData

_ALIASES = (f"_ibis_view_{n:d}" for n in itertools.count())

@@ -158,10 +159,13 @@ def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True):

return IbisDataFrame(self, nan_as_null=nan_as_null, allow_copy=allow_copy)

def __pyarrow_result__(self, table: pa.Table) -> pa.Table:
from ibis.formats.pyarrow import PyArrowData
def __pyarrow_result__(
self, table: pa.Table, data_mapper: type[PyArrowData] | None = None
) -> pa.Table:
if data_mapper is None:
from ibis.formats.pyarrow import PyArrowData as data_mapper

return PyArrowData.convert_table(table, self.schema())
return data_mapper.convert_table(table, self.schema())

def __pandas_result__(self, df: pd.DataFrame) -> pd.DataFrame:
from ibis.formats.pandas import PandasData
42 changes: 41 additions & 1 deletion ibis/formats/pyarrow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING, Any

import pyarrow as pa
@@ -12,6 +13,38 @@
if TYPE_CHECKING:
from collections.abc import Sequence


class JSONScalar(pa.ExtensionScalar):
def as_py(self):
value = self.value
if value is None:
return value
else:
return json.loads(value.as_py())


class JSONArray(pa.ExtensionArray):
pass


class JSONType(pa.ExtensionType):
def __init__(self):
super().__init__(pa.string(), "ibis.json")

def __arrow_ext_serialize__(self):
return b""

@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
return cls()

def __arrow_ext_class__(self):
return JSONArray

def __arrow_ext_scalar_class__(self):
return JSONScalar


_from_pyarrow_types = {
pa.int8(): dt.Int8,
pa.int16(): dt.Int16,
@@ -57,7 +90,6 @@
dt.Unknown: pa.string(),
dt.MACADDR: pa.string(),
dt.INET: pa.string(),
dt.JSON: pa.string(),
}


@@ -95,6 +127,8 @@ def to_ibis(cls, typ: pa.DataType, nullable=True) -> dt.DataType:
key_dtype = cls.to_ibis(typ.key_type, typ.key_field.nullable)
value_dtype = cls.to_ibis(typ.item_type, typ.item_field.nullable)
return dt.Map(key_dtype, value_dtype, nullable=nullable)
elif isinstance(typ, JSONType):
return dt.JSON()
else:
return _from_pyarrow_types[typ](nullable=nullable)

@@ -154,6 +188,8 @@ def from_ibis(cls, dtype: dt.DataType) -> pa.DataType:
nullable=dtype.value_type.nullable,
)
return pa.map_(key_field, value_field, keys_sorted=False)
elif dtype.is_json():
return PYARROW_JSON_TYPE
else:
try:
return _to_pyarrow_types[type(dtype)]
@@ -254,3 +290,7 @@ def convert_table(cls, table: pa.Table, schema: Schema) -> pa.Table:
return table.cast(desired_schema)
else:
return table


PYARROW_JSON_TYPE = JSONType()
pa.register_extension_type(PYARROW_JSON_TYPE)

0 comments on commit 7d8fe5a

Please sign in to comment.