diff --git a/.github/workflows/ibis-backends.yml b/.github/workflows/ibis-backends.yml index 264532a305c2..263fff053dc6 100644 --- a/.github/workflows/ibis-backends.yml +++ b/.github/workflows/ibis-backends.yml @@ -141,6 +141,7 @@ jobs: extras: - mysql - geospatial + - polars sys-deps: - libgeos-dev - name: postgres @@ -186,6 +187,7 @@ jobs: title: MS SQL Server extras: - mssql + - polars services: - mssql sys-deps: @@ -216,6 +218,7 @@ jobs: serial: true extras: - oracle + - polars services: - oracle - name: flink @@ -271,6 +274,7 @@ jobs: extras: - mysql - geospatial + - polars services: - mysql sys-deps: @@ -352,6 +356,7 @@ jobs: title: MS SQL Server extras: - mssql + - polars services: - mssql sys-deps: @@ -381,6 +386,7 @@ jobs: serial: true extras: - oracle + - polars services: - oracle - os: ubuntu-latest diff --git a/ibis/backends/bigquery/__init__.py b/ibis/backends/bigquery/__init__.py index a90f45dc86bf..4961a9529516 100644 --- a/ibis/backends/bigquery/__init__.py +++ b/ibis/backends/bigquery/__init__.py @@ -13,7 +13,6 @@ import google.auth.credentials import google.cloud.bigquery as bq import google.cloud.bigquery_storage_v1 as bqstorage -import pandas as pd import pydata_google_auth import sqlglot as sg import sqlglot.expressions as sge @@ -42,6 +41,8 @@ from collections.abc import Callable, Iterable, Mapping from pathlib import Path + import pandas as pd + import polars as pl import pyarrow as pa from google.cloud.bigquery.table import RowIterator @@ -940,7 +941,12 @@ def version(self): def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: ibis.Schema | None = None, database: str | None = None, @@ -1027,14 +1033,11 @@ def create_table( for name, value in (options or {}).items() ) - if obj is not None: - import pyarrow as pa - import pyarrow_hotfix # noqa: F401 + if obj is not None and not isinstance(obj, ir.Table): + obj = ibis.memtable(obj, schema=schema) - if isinstance(obj, (pd.DataFrame, pa.Table)): - obj = ibis.memtable(obj, schema=schema) - - self._register_in_memory_tables(obj) + # This is a no-op if there aren't any memtables + self._register_in_memory_tables(obj) if temp: dataset = self._session_dataset.dataset_id diff --git a/ibis/backends/clickhouse/__init__.py b/ibis/backends/clickhouse/__init__.py index 1385b40a03ba..c80db370aadb 100644 --- a/ibis/backends/clickhouse/__init__.py +++ b/ibis/backends/clickhouse/__init__.py @@ -34,6 +34,7 @@ from pathlib import Path import pandas as pd + import polars as pl def _to_memtable(v): @@ -586,7 +587,12 @@ def read_csv( def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: ibis.Schema | None = None, database: str | None = None, diff --git a/ibis/backends/datafusion/__init__.py b/ibis/backends/datafusion/__init__.py index 03bf78cf2fa3..921eaece720c 100644 --- a/ibis/backends/datafusion/__init__.py +++ b/ibis/backends/datafusion/__init__.py @@ -14,7 +14,6 @@ import sqlglot as sg import sqlglot.expressions as sge -import ibis import ibis.common.exceptions as com import ibis.expr.datatypes as dt import ibis.expr.operations as ops @@ -24,6 +23,7 @@ from ibis.backends.datafusion.compiler import DataFusionCompiler from ibis.backends.sql import SQLBackend from ibis.backends.sql.compiler import C +from ibis.common.dispatch import lazy_singledispatch from ibis.expr.operations.udf import InputType from ibis.formats.pyarrow import PyArrowType from ibis.util import gen_name, normalize_filename @@ -40,6 +40,7 @@ if TYPE_CHECKING: import pandas as pd + import polars as pl class Backend(SQLBackend, CanCreateCatalog, CanCreateDatabase, CanCreateSchema, NoUrl): @@ -272,7 +273,13 @@ def list_tables( list[str] The list of the table names that match the pattern `like`. """ - return self._filter_with_like(self.con.tables(), like) + database = database or "public" + query = ( + sg.select("table_name") + .from_("information_schema.tables") + .where(sg.column("table_schema").eq(sge.convert(database))) + ) + return self.raw_sql(query).to_pydict()["table_name"] def get_schema( self, @@ -550,7 +557,14 @@ def execute(self, expr: ir.Expr, **kwargs: Any): def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pa.RecordBatchReader + | pa.RecordBatch + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: sch.Schema | None = None, database: str | None = None, @@ -589,12 +603,10 @@ def create_table( quoted = self.compiler.quoted - if obj is not None: - if not isinstance(obj, ir.Expr): - table = ibis.memtable(obj) - else: - table = obj + if isinstance(obj, ir.Expr): + table = obj + # If it's a memtable, it will get registered in the pre-execute hooks self._run_pre_execute_hooks(table) relname = "_" @@ -610,10 +622,13 @@ def create_table( sg.to_identifier(relname, quoted=quoted) ) ) + elif obj is not None: + _read_in_memory(obj, name, self, overwrite=overwrite) + return self.table(name, database=database) else: query = None - table_ident = sg.to_identifier(name, quoted=quoted) + table_ident = sg.table(name, db=database, quoted=quoted) if query is None: column_defs = [ @@ -670,3 +685,92 @@ def truncate_table( ident = sg.table(name, db=db, catalog=catalog).sql(self.name) with self._safe_raw_sql(sge.delete(ident)): pass + + +@contextlib.contextmanager +def _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite): + """Workaround inability to overwrite tables in dataframe API. + + Datafusion has helper methods for loading in-memory data, but these methods + don't allow overwriting tables. + The SQL interface allows creating tables from existing tables, so we register + the data as a table using the dataframe API, then run a + + CREATE [OR REPLACE] TABLE table_name AS SELECT * FROM in_memory_thing + + and that allows us to toggle the overwrite flag. + """ + src = sge.Create( + this=table_name, + kind="TABLE", + expression=sg.select("*").from_(tmp_name), + replace=overwrite, + ) + + yield + + _conn.raw_sql(src) + _conn.drop_table(tmp_name) + + +@lazy_singledispatch +def _read_in_memory( + source: Any, table_name: str, _conn: Backend, overwrite: bool = False +): + raise NotImplementedError("No support for source or imports missing") + + +@_read_in_memory.register(dict) +def _pydict(source, table_name, _conn, overwrite: bool = False): + tmp_name = gen_name("pydict") + with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite): + _conn.con.from_pydict(source, name=tmp_name) + + +@_read_in_memory.register("polars.DataFrame") +def _polars(source, table_name, _conn, overwrite: bool = False): + tmp_name = gen_name("polars") + with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite): + _conn.con.from_polars(source, name=tmp_name) + + +@_read_in_memory.register("polars.LazyFrame") +def _polars(source, table_name, _conn, overwrite: bool = False): + tmp_name = gen_name("polars") + with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite): + _conn.con.from_polars(source.collect(), name=tmp_name) + + +@_read_in_memory.register("pyarrow.Table") +def _pyarrow_table(source, table_name, _conn, overwrite: bool = False): + tmp_name = gen_name("pyarrow") + with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite): + _conn.con.from_arrow_table(source, name=tmp_name) + + +@_read_in_memory.register("pyarrow.RecordBatchReader") +def _pyarrow_rbr(source, table_name, _conn, overwrite: bool = False): + tmp_name = gen_name("pyarrow") + with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite): + _conn.con.from_arrow_table(source.read_all(), name=tmp_name) + + +@_read_in_memory.register("pyarrow.RecordBatch") +def _pyarrow_rb(source, table_name, _conn, overwrite: bool = False): + tmp_name = gen_name("pyarrow") + with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite): + _conn.con.register_record_batches(tmp_name, [[source]]) + + +@_read_in_memory.register("pyarrow.dataset.Dataset") +def _pyarrow_rb(source, table_name, _conn, overwrite: bool = False): + tmp_name = gen_name("pyarrow") + with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite): + _conn.con.register_dataset(tmp_name, source) + + +@_read_in_memory.register("pandas.DataFrame") +def _pandas(source: pd.DataFrame, table_name, _conn, overwrite: bool = False): + tmp_name = gen_name("pandas") + with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite): + _conn.con.from_pandas(source, name=tmp_name) diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index 51130755ee62..c55fb66c92b2 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -28,12 +28,14 @@ from ibis.backends.duckdb.converter import DuckDBPandasData from ibis.backends.sql import SQLBackend from ibis.backends.sql.compiler import STAR, C +from ibis.common.dispatch import lazy_singledispatch from ibis.expr.operations.udf import InputType if TYPE_CHECKING: from collections.abc import Iterable, Mapping, MutableMapping, Sequence import pandas as pd + import polars as pl import torch from fsspec import AbstractFileSystem @@ -121,7 +123,12 @@ def _to_sqlglot( def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: ibis.Schema | None = None, database: str | None = None, @@ -846,11 +853,19 @@ def _read_parquet_pyarrow_dataset( # explicitly. def read_in_memory( + # TODO: deprecate this in favor of `create_table` self, - source: pd.DataFrame | pa.Table | pa.ipc.RecordBatchReader, + source: pd.DataFrame + | pa.Table + | pa.RecordBatchReader + | pl.DataFrame + | pl.LazyFrame, table_name: str | None = None, ) -> ir.Table: - """Register a Pandas DataFrame or pyarrow object as a table in the current database. + """Register an in-memory table object in the current database. + + Supported objects include pandas DataFrame, a Polars + DataFrame/LazyFrame, or a PyArrow Table or RecordBatchReader. Parameters ---------- @@ -867,13 +882,7 @@ def read_in_memory( """ table_name = table_name or util.gen_name("read_in_memory") - self.con.register(table_name, source) - - if isinstance(source, pa.ipc.RecordBatchReader): - # Ensure the reader isn't marked as started, in case the name is - # being overwritten. - self._record_batch_readers_consumed[table_name] = False - + _read_in_memory(source, table_name, self) return self.table(table_name) def read_delta( @@ -1598,3 +1607,28 @@ def _get_temp_view_definition(self, name: str, definition: str) -> str: def _create_temp_view(self, table_name, source): with self._safe_raw_sql(self._get_temp_view_definition(table_name, source)): pass + + +@lazy_singledispatch +def _read_in_memory(source: Any, table_name: str, _conn: Backend, **kwargs: Any): + raise NotImplementedError( + f"The `{_conn.name}` backend currently does not support " + f"reading data of {type(source)!r}" + ) + + +@_read_in_memory.register("polars.DataFrame") +@_read_in_memory.register("polars.LazyFrame") +@_read_in_memory.register("pyarrow.Table") +@_read_in_memory.register("pandas.DataFrame") +@_read_in_memory.register("pyarrow.dataset.Dataset") +def _default(source, table_name, _conn, **kwargs: Any): + _conn.con.register(table_name, source) + + +@_read_in_memory.register("pyarrow.RecordBatchReader") +def _pyarrow_rbr(source, table_name, _conn, **kwargs: Any): + _conn.con.register(table_name, source) + # Ensure the reader isn't marked as started, in case the name is + # being overwritten. + _conn._record_batch_readers_consumed[table_name] = False diff --git a/ibis/backends/duckdb/tests/test_register.py b/ibis/backends/duckdb/tests/test_register.py index 33703ea97d81..7c8ba5ded109 100644 --- a/ibis/backends/duckdb/tests/test_register.py +++ b/ibis/backends/duckdb/tests/test_register.py @@ -311,16 +311,6 @@ def test_attach_sqlite(data_dir, tmp_path): assert dt.String(nullable=True) in set(types) -def test_read_in_memory(con): - df_arrow = pa.table({"a": ["a"], "b": [1]}) - df_pandas = pd.DataFrame({"a": ["a"], "b": [1]}) - con.read_in_memory(df_arrow, table_name="df_arrow") - con.read_in_memory(df_pandas, table_name="df_pandas") - - assert "df_arrow" in con.list_tables() - assert "df_pandas" in con.list_tables() - - def test_re_read_in_memory_overwrite(con): df_pandas_1 = pd.DataFrame({"a": ["a"], "b": [1], "d": ["hi"]}) df_pandas_2 = pd.DataFrame({"a": [1], "c": [1.4]}) diff --git a/ibis/backends/exasol/__init__.py b/ibis/backends/exasol/__init__.py index 04b79cd87e68..bbafb48430b3 100644 --- a/ibis/backends/exasol/__init__.py +++ b/ibis/backends/exasol/__init__.py @@ -27,6 +27,7 @@ from collections.abc import Iterable, Mapping import pandas as pd + import polars as pl import pyarrow as pa from ibis.backends import BaseBackend @@ -286,7 +287,12 @@ def _clean_up_tmp_table(self, ident: sge.Identifier) -> None: def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: sch.Schema | None = None, database: str | None = None, @@ -331,9 +337,11 @@ def create_table( quoted = self.compiler.quoted + temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) + temp_memtable_view = table.op().name else: table = obj @@ -383,6 +391,10 @@ def create_table( ) if schema is None: + # Clean up temporary memtable if we've created one + # for in-memory reads + if temp_memtable_view is not None: + self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/impala/__init__.py b/ibis/backends/impala/__init__.py index e00276d57634..0371d3fffe8c 100644 --- a/ibis/backends/impala/__init__.py +++ b/ibis/backends/impala/__init__.py @@ -46,6 +46,7 @@ from pathlib import Path import pandas as pd + import polars as pl import pyarrow as pa import ibis.expr.operations as ops @@ -447,7 +448,12 @@ def table(self, name: str, database: str | None = None, **kwargs: Any) -> ir.Tab def create_table( self, name: str, - obj: ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema=None, database=None, @@ -459,7 +465,7 @@ def create_table( partition=None, like_parquet=None, ) -> ir.Table: - """Create a new table in Impala using an Ibis table expression. + """Create a new table using an Ibis table expression or in-memory data. Parameters ---------- diff --git a/ibis/backends/mssql/__init__.py b/ibis/backends/mssql/__init__.py index 316fecd81283..0bde35b60b52 100644 --- a/ibis/backends/mssql/__init__.py +++ b/ibis/backends/mssql/__init__.py @@ -31,6 +31,7 @@ from collections.abc import Iterable, Mapping import pandas as pd + import polars as pl import pyarrow as pa @@ -435,7 +436,12 @@ def list_databases( def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: sch.Schema | None = None, database: str | None = None, @@ -457,9 +463,11 @@ def create_table( if temp: properties.append(sge.TemporaryProperty()) + temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) + temp_memtable_view = table.op().name else: table = obj @@ -513,6 +521,10 @@ def create_table( cur.execute(f"EXEC sp_rename '{old}', '{new}'") if schema is None: + # Clean up temporary memtable if we've created one + # for in-memory reads + if temp_memtable_view is not None: + self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/mysql/__init__.py b/ibis/backends/mysql/__init__.py index 6be88633b5f5..aca0225f13f9 100644 --- a/ibis/backends/mysql/__init__.py +++ b/ibis/backends/mysql/__init__.py @@ -31,6 +31,7 @@ from collections.abc import Mapping import pandas as pd + import polars as pl import pyarrow as pa @@ -360,7 +361,12 @@ def execute( def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: ibis.Schema | None = None, database: str | None = None, @@ -382,9 +388,11 @@ def create_table( if temp: properties.append(sge.TemporaryProperty()) + temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) + temp_memtable_view = table.op().name else: table = obj @@ -436,6 +444,11 @@ def create_table( ) if schema is None: + # Clean up temporary memtable if we've created one + # for in-memory reads + if temp_memtable_view is not None: + self.drop_table(temp_memtable_view) + return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/oracle/__init__.py b/ibis/backends/oracle/__init__.py index f4f8b199e445..2f1dc277b8b8 100644 --- a/ibis/backends/oracle/__init__.py +++ b/ibis/backends/oracle/__init__.py @@ -28,7 +28,8 @@ if TYPE_CHECKING: import pandas as pd - import pyrrow as pa + import polars as pl + import pyarrow as pa def metadata_row_to_type( @@ -336,7 +337,12 @@ def get_schema( def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: ibis.Schema | None = None, database: str | None = None, @@ -373,9 +379,11 @@ def create_table( if temp: properties.append(sge.TemporaryProperty()) + temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) + temp_memtable_view = table.op().name else: table = obj @@ -430,6 +438,10 @@ def create_table( ) if schema is None: + # Clean up temporary memtable if we've created one + # for in-memory reads + if temp_memtable_view is not None: + self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/pandas/__init__.py b/ibis/backends/pandas/__init__.py index 0062bf66fa24..646bb0c49764 100644 --- a/ibis/backends/pandas/__init__.py +++ b/ibis/backends/pandas/__init__.py @@ -14,6 +14,7 @@ import ibis.expr.types as ir from ibis import util from ibis.backends import BaseBackend, NoUrl +from ibis.common.dispatch import lazy_singledispatch from ibis.formats.pandas import PandasData, PandasSchema from ibis.formats.pyarrow import PyArrowData @@ -243,6 +244,7 @@ def drop_table(self, name: str, *, force: bool = False) -> None: del self.dictionary[name] def _convert_object(self, obj: Any) -> Any: + return _convert_object(obj, self) if isinstance(obj, pd.DataFrame): return obj elif isinstance(obj, ir.Table): @@ -339,3 +341,38 @@ def execute(self, query, params=None, limit="default", **kwargs): def _load_into_cache(self, name, expr): self.create_table(name, expr.execute()) + + +@lazy_singledispatch +def _convert_object(obj: Any, _conn): + raise com.BackendConversionError( + f"Unable to convert {obj.__class__} object " + f"to backend type: {_conn.__class__.backend_table_type}" + ) + + +@_convert_object.register("ibis.expr.types.Table") +def _table(obj, _conn): + if isinstance(op := obj.op(), ops.InMemoryTable): + return op.data.to_frame() + else: + raise com.BackendConversionError( + f"Unable to convert {obj.__class__} object " + f"to backend type: {_conn.__class__.backend_table_type}" + ) + + +@_convert_object.register("polars.DataFrame") +@_convert_object.register("pyarrow.Table") +def _pa_polars(obj, _conn): + return obj.to_pandas() + + +@_convert_object.register("polars.LazyFrame") +def _polars_lazy(obj, _conn): + return obj.collect().to_pandas() + + +@_convert_object.register("pandas.DataFrame") +def _pandas(obj, _conn): + return obj diff --git a/ibis/backends/polars/__init__.py b/ibis/backends/polars/__init__.py index 4ce9ce7c5359..cb6a2ac1e2da 100644 --- a/ibis/backends/polars/__init__.py +++ b/ibis/backends/polars/__init__.py @@ -1,6 +1,6 @@ from __future__ import annotations -from collections.abc import Mapping +from collections.abc import Iterable, Mapping from functools import lru_cache from pathlib import Path from typing import TYPE_CHECKING, Any @@ -20,6 +20,7 @@ ) from ibis.backends.polars.compiler import translate from ibis.backends.sql.dialects import Polars +from ibis.common.dispatch import lazy_singledispatch from ibis.expr.rewrites import lower_stringslice from ibis.formats.polars import PolarsSchema from ibis.util import gen_name, normalize_filename, normalize_filenames @@ -293,6 +294,7 @@ def read_pandas( """ table_name = table_name or gen_name("read_in_memory") + self._add_table(table_name, pl.from_pandas(source, **kwargs).lazy()) return self.table(table_name) @@ -347,16 +349,20 @@ def read_parquet( def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pa.RecordBatchReader + | pa.RecordBatch + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: ibis.Schema | None = None, database: str | None = None, temp: bool | None = None, overwrite: bool = False, ) -> ir.Table: - if schema is not None and obj is None: - obj = pl.LazyFrame([], schema=PolarsSchema.from_ibis(schema)) - if database is not None: raise com.IbisError( "Passing `database` to the Polars backend create_table method has no " @@ -374,13 +380,12 @@ def create_table( f"Table {name} already exists. Use overwrite=True to clobber existing tables" ) - if isinstance(obj, ir.Table): - obj = self.to_pyarrow(obj) - - if not isinstance(obj, (pl.DataFrame, pl.LazyFrame)): - obj = pl.LazyFrame(obj) + if schema is not None and obj is None: + obj = pl.LazyFrame([], schema=PolarsSchema.from_ibis(schema)) + self._add_table(name, obj) + else: + _read_in_memory(obj, name, self) - self._add_table(name, obj) return self.table(name) def create_view( @@ -553,3 +558,34 @@ def _load_into_cache(self, name, expr): def _clean_up_cached_table(self, op): self._remove_table(op.name) + + +@lazy_singledispatch +def _read_in_memory(source: Any, table_name: str, _conn: Backend, **kwargs: Any): + raise NotImplementedError( + f"The `{_conn.name}` backend currently does not support " + f"reading data of {type(source)!r}" + ) + + +@_read_in_memory.register("ibis.expr.types.Table") +def _table(source, table_name, _conn, **kwargs: Any): + _conn._add_table(table_name, source.to_polars()) + + +@_read_in_memory.register("polars.DataFrame") +@_read_in_memory.register("polars.LazyFrame") +def _polars(source, table_name, _conn, **kwargs: Any): + _conn._add_table(table_name, source) + + +@_read_in_memory.register("pyarrow.Table") +@_read_in_memory.register("pyarrow.RecordBatchReader") +@_read_in_memory.register("pyarrow.RecordBatch") +def _pyarrow(source, table_name, _conn, **kwargs: Any): + _conn._add_table(table_name, pl.from_arrow(source, **kwargs).lazy()) + + +@_read_in_memory.register("pandas.DataFrame") +def _pandas(source: pd.DataFrame, table_name, _conn, **kwargs: Any): + _conn._add_table(table_name, pl.from_pandas(source, **kwargs).lazy()) diff --git a/ibis/backends/postgres/__init__.py b/ibis/backends/postgres/__init__.py index fb6d233d6c52..bdd8f2467967 100644 --- a/ibis/backends/postgres/__init__.py +++ b/ibis/backends/postgres/__init__.py @@ -35,6 +35,7 @@ from collections.abc import Callable import pandas as pd + import polars as pl import pyarrow as pa @@ -663,7 +664,12 @@ def drop_database( def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: ibis.Schema | None = None, database: str | None = None, diff --git a/ibis/backends/pyspark/__init__.py b/ibis/backends/pyspark/__init__.py index 8ef4ed7af09e..66a9dbe9fd57 100644 --- a/ibis/backends/pyspark/__init__.py +++ b/ibis/backends/pyspark/__init__.py @@ -32,6 +32,7 @@ from collections.abc import Mapping, Sequence import pandas as pd + import polars as pl import pyarrow as pa PYSPARK_LT_34 = vparse(pyspark.__version__) < vparse("3.4") @@ -438,7 +439,12 @@ def get_schema( def create_table( self, name: str, - obj: ir.Table | pd.DataFrame | pa.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: sch.Schema | None = None, database: str | None = None, @@ -487,8 +493,13 @@ def create_table( table_loc = self._to_sqlglot_table(database) catalog, db = self._to_catalog_db_tuple(table_loc) + temp_memtable_view = None if obj is not None: - table = obj if isinstance(obj, ir.Expr) else ibis.memtable(obj) + if isinstance(obj, ir.Expr): + table = obj + else: + table = ibis.memtable(obj) + temp_memtable_view = table.op().name query = self.compile(table) mode = "overwrite" if overwrite else "error" with self._active_catalog_database(catalog, db): @@ -502,6 +513,11 @@ def create_table( else: raise com.IbisError("The schema or obj parameter is required") + # Clean up temporary memtable if we've created one + # for in-memory reads + if temp_memtable_view is not None: + self.drop_table(temp_memtable_view) + return self.table(name, database=(catalog, db)) def create_view( diff --git a/ibis/backends/risingwave/__init__.py b/ibis/backends/risingwave/__init__.py index 6f7d7af8e11b..a6ee2e696e0f 100644 --- a/ibis/backends/risingwave/__init__.py +++ b/ibis/backends/risingwave/__init__.py @@ -23,6 +23,7 @@ if TYPE_CHECKING: import pandas as pd + import polars as pl import pyarrow as pa @@ -124,7 +125,12 @@ def do_connect( def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: ibis.Schema | None = None, database: str | None = None, @@ -188,9 +194,11 @@ def create_table( f"Creating temp tables is not supported by {self.name}" ) + temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) + temp_memtable_view = table.op().name else: table = obj @@ -252,6 +260,10 @@ def create_table( ) if schema is None: + # Clean up temporary memtable if we've created one + # for in-memory reads + if temp_memtable_view is not None: + self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/snowflake/__init__.py b/ibis/backends/snowflake/__init__.py index 165aa5206b79..9db3f388528c 100644 --- a/ibis/backends/snowflake/__init__.py +++ b/ibis/backends/snowflake/__init__.py @@ -41,6 +41,7 @@ from collections.abc import Iterable, Iterator, Mapping import pandas as pd + import polars as pl _SNOWFLAKE_MAP_UDFS = { @@ -738,7 +739,12 @@ def drop_database( def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: sch.Schema | None = None, database: str | None = None, @@ -808,9 +814,11 @@ def create_table( if comment is not None: properties.append(sge.SchemaCommentProperty(this=sge.convert(comment))) + temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): table = ibis.memtable(obj) + temp_memtable_view = table.op().name else: table = obj @@ -831,6 +839,11 @@ def create_table( with self._safe_raw_sql(create_stmt): pass + # Clean up temporary memtable if we've created one + # for in-memory reads + if temp_memtable_view is not None: + self.drop_table(temp_memtable_view) + return self.table(name, database=(catalog, db)) def read_csv( diff --git a/ibis/backends/sqlite/__init__.py b/ibis/backends/sqlite/__init__.py index d3825dee05d1..9e52f3128f55 100644 --- a/ibis/backends/sqlite/__init__.py +++ b/ibis/backends/sqlite/__init__.py @@ -27,6 +27,7 @@ from pathlib import Path import pandas as pd + import polars as pl import pyarrow as pa @@ -386,7 +387,12 @@ def attach(self, name: str, path: str | Path) -> None: def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: ibis.Schema | None = None, database: str | None = None, @@ -421,9 +427,11 @@ def create_table( if schema is not None: schema = ibis.schema(schema) + temp_memtable_view = None if obj is not None: if not isinstance(obj, ir.Expr): obj = ibis.memtable(obj) + temp_memtable_view = obj.op().name self._run_pre_execute_hooks(obj) @@ -479,6 +487,10 @@ def create_table( ) if schema is None: + # Clean up temporary memtable if we've created one + # for in-memory reads + if temp_memtable_view is not None: + self.drop_table(temp_memtable_view) return self.table(name, database=database) # preserve the input schema if it was provided diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index bba0f64de98d..ac794bf1e8f5 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -16,6 +16,7 @@ import numpy as np import pandas as pd import pyarrow as pa +import pyarrow.dataset import pytest import rich.console import toolz @@ -42,6 +43,8 @@ if TYPE_CHECKING: from ibis.backends import BaseBackend +pl = pytest.importorskip("polars", reason="Polars is not installed") + @pytest.fixture def new_schema(): @@ -883,24 +886,147 @@ def test_self_join_memory_table(backend, con, monkeypatch): @pytest.mark.parametrize( - ("arg", "func"), + "obj, table_name", [ - ([("a", 1.0)], lambda arg: ibis.memtable(arg, columns=["a", "b"])), - (pd.DataFrame([("a", 1.0)], columns=["a", "b"]), ibis.memtable), + param(lambda: pa.table({"a": ["a"], "b": [1]}), "df_arrow", id="pyarrow table"), + param( + lambda: pa.table({"a": ["a"], "b": [1]}).to_reader(), + "df_arrow_batch_reader", + marks=[ + pytest.mark.notimpl( + [ + "bigquery", + "clickhouse", + "dask", + "duckdb", + "exasol", + "impala", + "mssql", + "mysql", + "oracle", + "pandas", + "postgres", + "pyspark", + "risingwave", + "snowflake", + "sqlite", + "trino", + ] + ) + ], + id="pyarrow_rbr", + ), + param( + lambda: pa.table({"a": ["a"], "b": [1]}).to_batches()[0], + "df_arrow_single_batch", + marks=[ + pytest.mark.notimpl( + [ + "bigquery", + "clickhouse", + "dask", + "duckdb", + "exasol", + "impala", + "mssql", + "mysql", + "oracle", + "pandas", + "postgres", + "pyspark", + "risingwave", + "snowflake", + "sqlite", + "trino", + ] + ) + ], + id="pyarrow_single_batch", + ), + param( + lambda: pa.dataset.dataset(pa.table({"a": ["a"], "b": [1]})), + "df_arrow_dataset", + marks=[ + pytest.mark.notimpl( + [ + "bigquery", + "clickhouse", + "dask", + "duckdb", + "exasol", + "impala", + "mssql", + "mysql", + "oracle", + "pandas", + "polars", + "postgres", + "pyspark", + "risingwave", + "snowflake", + "sqlite", + "trino", + ] + ), + ], + id="pyarrow dataset", + ), + param(lambda: pd.DataFrame({"a": ["a"], "b": [1]}), "df_pandas", id="pandas"), + param( + lambda: pl.DataFrame({"a": ["a"], "b": [1]}), + "df_polars_eager", + id="polars dataframe", + ), + param( + lambda: pl.LazyFrame({"a": ["a"], "b": [1]}), + "df_polars_lazy", + id="polars lazyframe", + ), + param( + lambda: ibis.memtable([("a", 1)], columns=["a", "b"]), + "memtable", + id="memtable_list", + ), + param( + lambda: ibis.memtable(pa.table({"a": ["a"], "b": [1]})), + "memtable_pa", + id="memtable pyarrow", + ), + param( + lambda: ibis.memtable(pd.DataFrame({"a": ["a"], "b": [1]})), + "memtable_pandas", + id="memtable pandas", + ), + param( + lambda: ibis.memtable(pl.DataFrame({"a": ["a"], "b": [1]})), + "memtable_polars_eager", + id="memtable polars dataframe", + ), + param( + lambda: ibis.memtable(pl.LazyFrame({"a": ["a"], "b": [1]})), + "memtable_polars_lazy", + id="memtable polars lazyframe", + ), ], - ids=["python", "pandas"], ) @pytest.mark.notimpl(["druid"]) @pytest.mark.notimpl( ["flink"], reason="Flink backend supports creating only TEMPORARY VIEW for in-memory data.", ) -def test_create_from_in_memory_table(con, temp_table, arg, func, monkeypatch): +def test_create_table_in_memory(con, obj, table_name, monkeypatch): monkeypatch.setattr(ibis.options, "default_backend", con) + obj = obj() + t = con.create_table(table_name, obj) - t = func(arg) - con.create_table(temp_table, t) - assert temp_table in con.list_tables() + result = pa.table({"a": ["a"], "b": [1]}) + assert table_name in con.list_tables() + + assert result.equals(t.to_pyarrow()) + + with contextlib.suppress(NotImplementedError): + # polars doesn't have drop_table + con.drop_table(table_name, force=True) def test_default_backend_option(con, monkeypatch): diff --git a/ibis/backends/trino/__init__.py b/ibis/backends/trino/__init__.py index 41bebebe6985..89eb08519c63 100644 --- a/ibis/backends/trino/__init__.py +++ b/ibis/backends/trino/__init__.py @@ -26,6 +26,7 @@ from collections.abc import Iterator, Mapping import pandas as pd + import polars as pl import pyarrow as pa import ibis.expr.operations as ops @@ -357,7 +358,12 @@ def drop_database( def create_table( self, name: str, - obj: pd.DataFrame | pa.Table | ir.Table | None = None, + obj: ir.Table + | pd.DataFrame + | pa.Table + | pl.DataFrame + | pl.LazyFrame + | None = None, *, schema: sch.Schema | None = None, database: str | None = None, @@ -435,15 +441,13 @@ def create_table( if comment: property_list.append(sge.SchemaCommentProperty(this=sge.convert(comment))) + temp_memtable_view = None if obj is not None: - import pandas as pd - import pyarrow as pa - import pyarrow_hotfix # noqa: F401 - - if isinstance(obj, (pd.DataFrame, pa.Table)): - table = ibis.memtable(obj, schema=schema) - else: + if isinstance(obj, ir.Table): table = obj + else: + table = ibis.memtable(obj, schema=schema) + temp_memtable_view = table.op().name self._run_pre_execute_hooks(table) @@ -487,6 +491,9 @@ def create_table( ).sql(self.name) ) + if temp_memtable_view is not None: + self.drop_table(temp_memtable_view) + return self.table(orig_table_ref.name) def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: