diff --git a/ibis/backends/__init__.py b/ibis/backends/__init__.py index cfd3fe118673..feef3d84e2ec 100644 --- a/ibis/backends/__init__.py +++ b/ibis/backends/__init__.py @@ -24,6 +24,7 @@ from collections.abc import Iterable, Iterator, Mapping, MutableMapping import pandas as pd + import polars as pl import pyarrow as pa import sqlglot as sg import torch @@ -223,6 +224,43 @@ def to_pyarrow( table.rename_columns(table_expr.columns).cast(arrow_schema) ) + @util.experimental + def to_polars( + self, + expr: ir.Expr, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + **kwargs: Any, + ) -> pl.DataFrame: + """Execute expression and return results in as a polars DataFrame. + + This method is eager and will execute the associated expression + immediately. + + Parameters + ---------- + expr + Ibis expression to export to polars. + params + Mapping of scalar parameter expressions to value. + limit + An integer to effect a specific row limit. A value of `None` means + "no limit". The default is in `ibis/config.py`. + kwargs + Keyword arguments + + Returns + ------- + dataframe + A polars DataFrame holding the results of the executed expression. + + """ + import polars as pl + + table = self.to_pyarrow(expr.as_table(), params=params, limit=limit, **kwargs) + return expr.__polars_result__(pl.from_arrow(table)) + @util.experimental def to_pyarrow_batches( self, diff --git a/ibis/backends/polars/__init__.py b/ibis/backends/polars/__init__.py index 1406c227e3ce..2d4bb372989e 100644 --- a/ibis/backends/polars/__init__.py +++ b/ibis/backends/polars/__init__.py @@ -412,14 +412,14 @@ def _get_sql_string_view_schema(self, name, table, query) -> sch.Schema: def _get_schema_using_query(self, query: str) -> sch.Schema: return PolarsSchema.to_ibis(self._context.execute(query).schema) - def execute( + def _to_dataframe( self, expr: ir.Expr, params: Mapping[ir.Expr, object] | None = None, limit: int | None = None, streaming: bool = False, **kwargs: Any, - ): + ) -> pl.DataFrame: lf = self.compile(expr, params=params, **kwargs) if limit == "default": limit = ibis.options.sql.default_limit @@ -427,7 +427,19 @@ def execute( df = lf.fetch(limit, streaming=streaming) else: df = lf.collect(streaming=streaming) + return df + def execute( + self, + expr: ir.Expr, + params: Mapping[ir.Expr, object] | None = None, + limit: int | None = None, + streaming: bool = False, + **kwargs: Any, + ): + df = self._to_dataframe( + expr, params=params, limit=limit, streaming=streaming, **kwargs + ) if isinstance(expr, (ir.Table, ir.Scalar)): return expr.__pandas_result__(df.to_pandas()) else: @@ -438,7 +450,7 @@ def execute( # note: skip frame-construction overhead return df.to_series().to_pandas() - def _to_pyarrow_table( + def to_polars( self, expr: ir.Expr, params: Mapping[ir.Expr, object] | None = None, @@ -446,12 +458,22 @@ def _to_pyarrow_table( streaming: bool = False, **kwargs: Any, ): - lf = self.compile(expr, params=params, **kwargs) - if limit is not None: - df = lf.fetch(limit, streaming=streaming) - else: - df = lf.collect(streaming=streaming) + df = self._to_dataframe( + expr, params=params, limit=limit, streaming=streaming, **kwargs + ) + return expr.__polars_result__(df) + def _to_pyarrow_table( + self, + expr: ir.Expr, + params: Mapping[ir.Expr, object] | None = None, + limit: int | None = None, + streaming: bool = False, + **kwargs: Any, + ): + df = self._to_dataframe( + expr, params=params, limit=limit, streaming=streaming, **kwargs + ) table = df.to_arrow() if isinstance(expr, (ir.Table, ir.Value)): schema = expr.as_table().schema().to_pyarrow() diff --git a/ibis/backends/tests/test_export.py b/ibis/backends/tests/test_export.py index 03ee2ccebe01..b76eace141fd 100644 --- a/ibis/backends/tests/test_export.py +++ b/ibis/backends/tests/test_export.py @@ -494,3 +494,42 @@ def test_to_pandas_batches_scalar(backend, con): result2 = list(t.to_pandas_batches()) assert result2 == [expected] + + +@pytest.mark.parametrize("limit", limit_no_limit) +def test_table_to_polars(limit, awards_players): + pl = pytest.importorskip("polars") + res = awards_players.to_polars(limit=limit) + assert isinstance(res, pl.DataFrame) + if limit is not None: + assert len(res) == limit + + expected_schema = { + "playerID": pl.Utf8, + "awardID": pl.Utf8, + "yearID": pl.Int64, + "lgID": pl.Utf8, + "tie": pl.Utf8, + "notes": pl.Utf8, + } + assert res.schema == expected_schema + + +@pytest.mark.parametrize("limit", limit_no_limit) +def test_column_to_polars(limit, awards_players): + pl = pytest.importorskip("polars") + res = awards_players.awardID.to_polars(limit=limit) + assert isinstance(res, pl.Series) + if limit is not None: + assert len(res) == limit + + +@pytest.mark.parametrize("limit", no_limit) +def test_scalar_to_polars(limit, awards_players): + pytest.importorskip("polars") + scalar = awards_players.yearID.min().to_polars(limit=limit) + assert isinstance(scalar, int) + + expr = awards_players.filter(awards_players.awardID == "DEADBEEF").yearID.min() + res = expr.to_polars(limit=limit) + assert res is None diff --git a/ibis/expr/types/core.py b/ibis/expr/types/core.py index a4f492bfa5d8..a665fbb397f7 100644 --- a/ibis/expr/types/core.py +++ b/ibis/expr/types/core.py @@ -22,6 +22,7 @@ from pathlib import Path import pandas as pd + import polars as pl import pyarrow as pa import torch @@ -425,6 +426,38 @@ def to_pyarrow( self, params=params, limit=limit, **kwargs ) + @experimental + def to_polars( + self, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + **kwargs: Any, + ) -> pl.DataFrame: + """Execute expression and return results as a polars dataframe. + + This method is eager and will execute the associated expression + immediately. + + Parameters + ---------- + params + Mapping of scalar parameter expressions to value. + limit + An integer to effect a specific row limit. A value of `None` means + "no limit". The default is in `ibis/config.py`. + kwargs + Keyword arguments + + Returns + ------- + DataFrame + A polars dataframe holding the results of the executed expression. + """ + return self._find_backend(use_default=True).to_polars( + self, params=params, limit=limit, **kwargs + ) + @experimental def to_pandas_batches( self, diff --git a/ibis/expr/types/generic.py b/ibis/expr/types/generic.py index 523ab2abb1ee..0c1dccfadc9f 100644 --- a/ibis/expr/types/generic.py +++ b/ibis/expr/types/generic.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: import pandas as pd + import polars as pl import pyarrow as pa import ibis.expr.types as ir @@ -1203,6 +1204,11 @@ def __pandas_result__(self, df: pd.DataFrame) -> Any: return PandasData.convert_scalar(df, self.type()) + def __polars_result__(self, df: pl.DataFrame) -> Any: + from ibis.formats.polars import PolarsData + + return PolarsData.convert_scalar(df, self.type()) + def as_scalar(self): """Inform ibis that the expression should be treated as a scalar. @@ -1332,6 +1338,11 @@ def __pandas_result__(self, df: pd.DataFrame) -> pd.Series: (column,) = df.columns return PandasData.convert_column(df.loc[:, column], self.type()) + def __polars_result__(self, df: pl.DataFrame) -> pl.Series: + from ibis.formats.polars import PolarsData + + return PolarsData.convert_column(df, self.type()) + def as_scalar(self) -> Scalar: """Inform ibis that the expression should be treated as a scalar. diff --git a/ibis/expr/types/relations.py b/ibis/expr/types/relations.py index e2c712d10c36..7d2c4bfe54d6 100644 --- a/ibis/expr/types/relations.py +++ b/ibis/expr/types/relations.py @@ -29,6 +29,7 @@ if TYPE_CHECKING: import pandas as pd + import polars as pl import pyarrow as pa import ibis.expr.types as ir @@ -279,6 +280,11 @@ def __pandas_result__(self, df: pd.DataFrame) -> pd.DataFrame: return PandasData.convert_table(df, self.schema()) + def __polars_result__(self, df: pl.DataFrame) -> Any: + from ibis.formats.polars import PolarsData + + return PolarsData.convert_table(df, self.schema()) + def _bind_reduction_filter(self, where): if where is None or not isinstance(where, Deferred): return where