Skip to content

Commit

Permalink
refactor(formats): encapsulate conversions to TypeMapper, SchemaMappe…
Browse files Browse the repository at this point in the history
…r and DataMapper subclasses
  • Loading branch information
kszucs committed Jun 14, 2023
1 parent 6971a06 commit ab35311
Show file tree
Hide file tree
Showing 54 changed files with 1,202 additions and 1,001 deletions.
6 changes: 0 additions & 6 deletions ibis/backends/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,12 +516,6 @@ def __init__(self, *args, **kwargs):
key=lambda expr: expr.op(),
)

@functools.cached_property
def _pandas_converter(self):
from ibis.formats.pandas import PandasConverter

return PandasConverter

def __getstate__(self):
return dict(_con_args=self._con_args, _con_kwargs=self._con_kwargs)

Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/base/sql/alchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
AlchemyContext,
AlchemyExprTranslator,
)
from ibis.formats.pandas import PandasData

if TYPE_CHECKING:
import pandas as pd
Expand Down Expand Up @@ -200,7 +201,7 @@ def fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
# artificially locked tables
cursor.close()
raise
df = self._pandas_converter.convert_frame(df, schema)
df = PandasData.convert_table(df, schema)
if not df.empty and geospatial_supported:
return self._to_geodataframe(df, schema)
return df
Expand Down
192 changes: 93 additions & 99 deletions ibis/backends/base/sql/alchemy/datatypes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from typing import Callable, Mapping, Optional
from typing import Mapping

import sqlalchemy as sa
import sqlalchemy.types as sat
Expand All @@ -10,6 +10,7 @@
import ibis.expr.datatypes as dt
from ibis.backends.base.sql.alchemy.geospatial import geospatial_supported
from ibis.common.collections import FrozenDict
from ibis.formats import TypeMapper

if geospatial_supported:
import geoalchemy2 as ga
Expand Down Expand Up @@ -195,53 +196,6 @@ class Unknown(sa.Text):
dt.UUID: UUID,
}


def dtype_to_sqlalchemy(
dtype: dt.DataType,
converter: Optional[Callable[[dt.DataType], sa.TypeEngine]] = None,
):
"""Convert an Ibis type to a SQLAlchemy type.
Parameters
----------
dtype
Ibis type to convert.
converter
Converter function to use for nested types. If not provided, this function
will be used recursively. Should only be used when defining new converter for
dialects.
Returns
-------
sa.TypeEngine
"""
convert = converter or dtype_to_sqlalchemy

if dtype.is_decimal():
return sat.NUMERIC(dtype.precision, dtype.scale)
elif dtype.is_timestamp():
return sat.TIMESTAMP(timezone=bool(dtype.timezone))
elif dtype.is_array():
return ArrayType(convert(dtype.value_type))
elif dtype.is_struct():
fields = {k: convert(v) for k, v in dtype.fields.items()}
return StructType(fields)
elif dtype.is_map():
return MapType(convert(dtype.key_type), convert(dtype.value_type))
elif dtype.is_geospatial():
if geospatial_supported:
if dtype.geotype == 'geometry':
return ga.Geometry
elif dtype.geotype == 'geography':
return ga.Geography
else:
return ga.types._GISType
else:
raise TypeError("geospatial types are not supported")
else:
return _to_sqlalchemy_types[type(dtype)]


_FLOAT_PREC_TO_TYPE = {
11: dt.Float16,
24: dt.Float32,
Expand All @@ -260,54 +214,94 @@ def dtype_to_sqlalchemy(
}


def dtype_from_sqlalchemy(typ, nullable=True, converter=None):
"""Convert a SQLAlchemy type to an Ibis type.
Parameters
----------
typ
SQLAlchemy type to convert.
nullable : bool, optional
Whether the returned type should be nullable.
converter
Converter function to use for nested types. If not provided, this function
will be used recursively. Should only be used when defining new converter for
dialects.
Returns
-------
dt.DataType
"""
convert = converter or dtype_from_sqlalchemy

if dtype := _from_sqlalchemy_types.get(type(typ)):
return dtype(nullable=nullable)
elif isinstance(typ, sat.Float):
if (float_typ := _FLOAT_PREC_TO_TYPE.get(typ.precision)) is not None:
return float_typ(nullable=nullable)
return dt.Decimal(typ.precision, typ.scale, nullable=nullable)
elif isinstance(typ, sat.Numeric):
return dt.Decimal(typ.precision, typ.scale, nullable=nullable)
elif isinstance(typ, ArrayType):
return dt.Array(convert(typ.value_type), nullable=nullable)
elif isinstance(typ, sat.ARRAY):
ndim = typ.dimensions
if ndim is not None and ndim != 1:
raise NotImplementedError("Nested array types not yet supported")
return dt.Array(convert(typ.item_type), nullable=nullable)
elif isinstance(typ, StructType):
fields = {k: convert(v) for k, v in typ.fields.items()}
return dt.Struct(fields, nullable=nullable)
elif isinstance(typ, MapType):
return dt.Map(convert(typ.key_type), convert(typ.value_type), nullable=nullable)
elif isinstance(typ, sa.DateTime):
timezone = "UTC" if typ.timezone else None
return dt.Timestamp(timezone, nullable=nullable)
elif geospatial_supported and isinstance(typ, ga.types._GISType):
name = typ.geometry_type.upper()
try:
return _GEOSPATIAL_TYPES[name](geotype=typ.name, nullable=nullable)
except KeyError:
raise ValueError(f"Unrecognized geometry type: {name}")
else:
raise TypeError(f"Unable to convert type: {typ!r}")
class AlchemyType(TypeMapper):
@classmethod
def from_ibis(cls, dtype: dt.DataType) -> sat.TypeEngine:
"""Convert an Ibis type to a SQLAlchemy type.
Parameters
----------
dtype
Ibis type to convert.
Returns
-------
SQLAlchemy type.
"""
if dtype.is_decimal():
return sat.NUMERIC(dtype.precision, dtype.scale)
elif dtype.is_timestamp():
return sat.TIMESTAMP(timezone=bool(dtype.timezone))
elif dtype.is_array():
return ArrayType(cls.from_ibis(dtype.value_type))
elif dtype.is_struct():
fields = {k: cls.from_ibis(v) for k, v in dtype.fields.items()}
return StructType(fields)
elif dtype.is_map():
return MapType(
cls.from_ibis(dtype.key_type), cls.from_ibis(dtype.value_type)
)
elif dtype.is_geospatial():
if geospatial_supported:
if dtype.geotype == 'geometry':
return ga.Geometry
elif dtype.geotype == 'geography':
return ga.Geography
else:
return ga.types._GISType
else:
raise TypeError("geospatial types are not supported")
else:
return _to_sqlalchemy_types[type(dtype)]

@classmethod
def to_ibis(cls, typ: sat.TypeEngine, nullable: bool = True) -> dt.DataType:
"""Convert a SQLAlchemy type to an Ibis type.
Parameters
----------
typ
SQLAlchemy type to convert.
nullable : bool, optional
Whether the returned type should be nullable.
Returns
-------
Ibis type.
"""

if dtype := _from_sqlalchemy_types.get(type(typ)):
return dtype(nullable=nullable)
elif isinstance(typ, sat.Float):
if (float_typ := _FLOAT_PREC_TO_TYPE.get(typ.precision)) is not None:
return float_typ(nullable=nullable)
return dt.Decimal(typ.precision, typ.scale, nullable=nullable)
elif isinstance(typ, sat.Numeric):
return dt.Decimal(typ.precision, typ.scale, nullable=nullable)
elif isinstance(typ, ArrayType):
return dt.Array(cls.to_ibis(typ.value_type), nullable=nullable)
elif isinstance(typ, sat.ARRAY):
ndim = typ.dimensions
if ndim is not None and ndim != 1:
raise NotImplementedError("Nested array types not yet supported")
return dt.Array(cls.to_ibis(typ.item_type), nullable=nullable)
elif isinstance(typ, StructType):
fields = {k: cls.to_ibis(v) for k, v in typ.fields.items()}
return dt.Struct(fields, nullable=nullable)
elif isinstance(typ, MapType):
return dt.Map(
cls.to_ibis(typ.key_type),
cls.to_ibis(typ.value_type),
nullable=nullable,
)
elif isinstance(typ, sa.DateTime):
timezone = "UTC" if typ.timezone else None
return dt.Timestamp(timezone, nullable=nullable)
elif geospatial_supported and isinstance(typ, ga.types._GISType):
name = typ.geometry_type.upper()
try:
return _GEOSPATIAL_TYPES[name](geotype=typ.name, nullable=nullable)
except KeyError:
raise ValueError(f"Unrecognized geometry type: {name}")
else:
raise TypeError(f"Unable to convert type: {typ!r}")
15 changes: 9 additions & 6 deletions ibis/backends/base/sql/alchemy/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
import ibis
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
from ibis.backends.base.sql.alchemy.datatypes import (
dtype_from_sqlalchemy,
dtype_to_sqlalchemy,
)
from ibis.backends.base.sql.alchemy.datatypes import AlchemyType
from ibis.backends.base.sql.alchemy.registry import (
fixed_arity,
sqlalchemy_operation_registry,
Expand Down Expand Up @@ -45,6 +42,7 @@ class AlchemyExprTranslator(ExprTranslator):
_registry = sqlalchemy_operation_registry
_rewrites = ExprTranslator._rewrites.copy()

type_mapper = AlchemyType
context_class = AlchemyContext

_bool_aggs_need_cast_to_int32 = True
Expand Down Expand Up @@ -74,8 +72,13 @@ def integer_to_timestamp(self, arg, tz: str | None = None):

supports_unnest_in_select = True

get_sqla_type = staticmethod(dtype_to_sqlalchemy)
get_ibis_type = staticmethod(dtype_from_sqlalchemy)
@classmethod
def get_sqla_type(cls, ibis_type):
return cls.type_mapper.from_ibis(ibis_type)

@classmethod
def get_ibis_type(cls, sqla_type, nullable=True):
return cls.type_mapper.to_ibis(sqla_type, nullable=nullable)

@functools.cached_property
def dialect(self) -> sa.engine.interfaces.Dialect:
Expand Down
9 changes: 5 additions & 4 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
schema_from_bigquery_table,
)
from ibis.backends.bigquery.compiler import BigQueryCompiler
from ibis.backends.bigquery.datatypes import schema_from_bigquery, schema_to_bigquery
from ibis.backends.bigquery.datatypes import BigQuerySchema
from ibis.formats.pandas import PandasData

with contextlib.suppress(ImportError):
from ibis.backends.bigquery.udf import udf # noqa: F401
Expand Down Expand Up @@ -251,7 +252,7 @@ def _fully_qualified_name(self, name, database):
def _get_schema_using_query(self, query):
job_config = bq.QueryJobConfig(dry_run=True, use_query_cache=False)
job = self.client.query(query, job_config=job_config)
return schema_from_bigquery(job.schema)
return BigQuerySchema.to_ibis(job.schema)

def _get_table_schema(self, qualified_name):
dataset, table = qualified_name.rsplit(".", 1)
Expand Down Expand Up @@ -336,7 +337,7 @@ def execute(self, expr, params=None, limit="default", **kwargs):
def fetch_from_cursor(self, cursor, schema):
arrow_t = self._cursor_to_arrow(cursor)
df = arrow_t.to_pandas(timestamp_as_object=True)
return self._pandas_converter.convert_frame(df, schema)
return PandasData.convert_table(df, schema)

def _cursor_to_arrow(
self,
Expand Down Expand Up @@ -458,7 +459,7 @@ def create_table(
)
if schema is not None:
table_id = self._fully_qualified_name(name, database)
table = bq.Table(table_id, schema=schema_to_bigquery(schema))
table = bq.Table(table_id, schema=BigQuerySchema.from_ibis(schema))
self.client.create_table(table)
else:
project_id, dataset = self._parse_project_and_dataset(database)
Expand Down
6 changes: 3 additions & 3 deletions ibis/backends/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
from ibis.backends.bigquery.datatypes import dtype_to_bigquery, schema_from_bigquery
from ibis.backends.bigquery.datatypes import BigQuerySchema, BigQueryType

NATIVE_PARTITION_COL = "_PARTITIONTIME"


def schema_from_bigquery_table(table):
schema = schema_from_bigquery(table.schema)
schema = BigQuerySchema.to_ibis(table.schema)

# Check for partitioning information
partition_info = table._properties.get("timePartitioning", None)
Expand Down Expand Up @@ -84,7 +84,7 @@ def bq_param_array(dtype: dt.Array, value, name):
value_type = dtype.value_type

try:
bigquery_type = dtype_to_bigquery(value_type)
bigquery_type = BigQueryType.from_ibis(value_type)
except NotImplementedError:
raise com.UnsupportedBackendType(dtype)
else:
Expand Down
Loading

0 comments on commit ab35311

Please sign in to comment.