Skip to content

Commit

Permalink
feat(api): add a to_polars() method for returning query results as …
Browse files Browse the repository at this point in the history
…`polars` objects
  • Loading branch information
jcrist committed Feb 27, 2024
1 parent 17f5e97 commit 53454c1
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 8 deletions.
38 changes: 38 additions & 0 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from collections.abc import Iterable, Iterator, Mapping, MutableMapping

import pandas as pd
import polars as pl
import pyarrow as pa
import sqlglot as sg
import torch
Expand Down Expand Up @@ -223,6 +224,43 @@ def to_pyarrow(
table.rename_columns(table_expr.columns).cast(arrow_schema)
)

@util.experimental
def to_polars(
self,
expr: ir.Expr,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
**kwargs: Any,
) -> pl.DataFrame:
"""Execute expression and return results in as a polars DataFrame.
This method is eager and will execute the associated expression
immediately.
Parameters
----------
expr
Ibis expression to export to polars.
params
Mapping of scalar parameter expressions to value.
limit
An integer to effect a specific row limit. A value of `None` means
"no limit". The default is in `ibis/config.py`.
kwargs
Keyword arguments
Returns
-------
dataframe
A polars DataFrame holding the results of the executed expression.
"""
import polars as pl

table = self.to_pyarrow(expr.as_table(), params=params, limit=limit, **kwargs)
return expr.__polars_result__(pl.from_arrow(table))

@util.experimental
def to_pyarrow_batches(
self,
Expand Down
38 changes: 30 additions & 8 deletions ibis/backends/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,22 +412,34 @@ def _get_sql_string_view_schema(self, name, table, query) -> sch.Schema:
def _get_schema_using_query(self, query: str) -> sch.Schema:
return PolarsSchema.to_ibis(self._context.execute(query).schema)

def execute(
def _to_dataframe(
self,
expr: ir.Expr,
params: Mapping[ir.Expr, object] | None = None,
limit: int | None = None,
streaming: bool = False,
**kwargs: Any,
):
) -> pl.DataFrame:
lf = self.compile(expr, params=params, **kwargs)
if limit == "default":
limit = ibis.options.sql.default_limit
if limit is not None:
df = lf.fetch(limit, streaming=streaming)
else:
df = lf.collect(streaming=streaming)
return df

def execute(
self,
expr: ir.Expr,
params: Mapping[ir.Expr, object] | None = None,
limit: int | None = None,
streaming: bool = False,
**kwargs: Any,
):
df = self._to_dataframe(
expr, params=params, limit=limit, streaming=streaming, **kwargs
)
if isinstance(expr, (ir.Table, ir.Scalar)):
return expr.__pandas_result__(df.to_pandas())
else:
Expand All @@ -438,20 +450,30 @@ def execute(
# note: skip frame-construction overhead
return df.to_series().to_pandas()

def _to_pyarrow_table(
def to_polars(
self,
expr: ir.Expr,
params: Mapping[ir.Expr, object] | None = None,
limit: int | None = None,
streaming: bool = False,
**kwargs: Any,
):
lf = self.compile(expr, params=params, **kwargs)
if limit is not None:
df = lf.fetch(limit, streaming=streaming)
else:
df = lf.collect(streaming=streaming)
df = self._to_dataframe(
expr, params=params, limit=limit, streaming=streaming, **kwargs
)
return expr.__polars_result__(df)

def _to_pyarrow_table(
self,
expr: ir.Expr,
params: Mapping[ir.Expr, object] | None = None,
limit: int | None = None,
streaming: bool = False,
**kwargs: Any,
):
df = self._to_dataframe(
expr, params=params, limit=limit, streaming=streaming, **kwargs
)
table = df.to_arrow()
if isinstance(expr, (ir.Table, ir.Value)):
schema = expr.as_table().schema().to_pyarrow()
Expand Down
39 changes: 39 additions & 0 deletions ibis/backends/tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,42 @@ def test_to_pandas_batches_scalar(backend, con):

result2 = list(t.to_pandas_batches())
assert result2 == [expected]


@pytest.mark.parametrize("limit", limit_no_limit)
def test_table_to_polars(limit, awards_players):
pl = pytest.importorskip("polars")
res = awards_players.to_polars(limit=limit)
assert isinstance(res, pl.DataFrame)
if limit is not None:
assert len(res) == limit

expected_schema = {
"playerID": pl.Utf8,
"awardID": pl.Utf8,
"yearID": pl.Int64,
"lgID": pl.Utf8,
"tie": pl.Utf8,
"notes": pl.Utf8,
}
assert res.schema == expected_schema


@pytest.mark.parametrize("limit", limit_no_limit)
def test_column_to_polars(limit, awards_players):
pl = pytest.importorskip("polars")
res = awards_players.awardID.to_polars(limit=limit)
assert isinstance(res, pl.Series)
if limit is not None:
assert len(res) == limit


@pytest.mark.parametrize("limit", no_limit)
def test_scalar_to_polars(limit, awards_players):
pytest.importorskip("polars")
scalar = awards_players.yearID.min().to_polars(limit=limit)
assert isinstance(scalar, int)

expr = awards_players.filter(awards_players.awardID == "DEADBEEF").yearID.min()
res = expr.to_polars(limit=limit)
assert res is None
33 changes: 33 additions & 0 deletions ibis/expr/types/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pathlib import Path

import pandas as pd
import polars as pl
import pyarrow as pa
import torch

Expand Down Expand Up @@ -425,6 +426,38 @@ def to_pyarrow(
self, params=params, limit=limit, **kwargs
)

@experimental
def to_polars(
self,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
**kwargs: Any,
) -> pl.DataFrame:
"""Execute expression and return results as a polars dataframe.
This method is eager and will execute the associated expression
immediately.
Parameters
----------
params
Mapping of scalar parameter expressions to value.
limit
An integer to effect a specific row limit. A value of `None` means
"no limit". The default is in `ibis/config.py`.
kwargs
Keyword arguments
Returns
-------
DataFrame
A polars dataframe holding the results of the executed expression.
"""
return self._find_backend(use_default=True).to_polars(
self, params=params, limit=limit, **kwargs
)

@experimental
def to_pandas_batches(
self,
Expand Down
11 changes: 11 additions & 0 deletions ibis/expr/types/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

if TYPE_CHECKING:
import pandas as pd
import polars as pl
import pyarrow as pa

import ibis.expr.types as ir
Expand Down Expand Up @@ -1203,6 +1204,11 @@ def __pandas_result__(self, df: pd.DataFrame) -> Any:

return PandasData.convert_scalar(df, self.type())

def __polars_result__(self, df: pl.DataFrame) -> Any:
from ibis.formats.polars import PolarsData

return PolarsData.convert_scalar(df, self.type())

def as_scalar(self):
"""Inform ibis that the expression should be treated as a scalar.
Expand Down Expand Up @@ -1332,6 +1338,11 @@ def __pandas_result__(self, df: pd.DataFrame) -> pd.Series:
(column,) = df.columns
return PandasData.convert_column(df.loc[:, column], self.type())

def __polars_result__(self, df: pl.DataFrame) -> pl.Series:
from ibis.formats.polars import PolarsData

return PolarsData.convert_column(df, self.type())

def as_scalar(self) -> Scalar:
"""Inform ibis that the expression should be treated as a scalar.
Expand Down
6 changes: 6 additions & 0 deletions ibis/expr/types/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

if TYPE_CHECKING:
import pandas as pd
import polars as pl
import pyarrow as pa

import ibis.expr.types as ir
Expand Down Expand Up @@ -279,6 +280,11 @@ def __pandas_result__(self, df: pd.DataFrame) -> pd.DataFrame:

return PandasData.convert_table(df, self.schema())

def __polars_result__(self, df: pl.DataFrame) -> Any:
from ibis.formats.polars import PolarsData

return PolarsData.convert_table(df, self.schema())

def _bind_reduction_filter(self, where):
if where is None or not isinstance(where, Deferred):
return where
Expand Down

0 comments on commit 53454c1

Please sign in to comment.