Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(all): enable passing in-memory data to create_table #9251

Merged
merged 24 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e9e0adc
refactor(duckdb): add polars df as option, move test to backend suite
gforsyth Apr 11, 2024
1c6833d
feat(polars): enable passing in-memory data to create_table
gforsyth Apr 11, 2024
68a6c3c
feat(datafusion): enable passing in-memory data to create_table
gforsyth Apr 11, 2024
28231bd
feat(datafusion): use info_schema for list_tables
gforsyth May 23, 2024
0b3ea31
feat(duckdb): enable passing in-memory data to create_table
gforsyth May 24, 2024
66d2fb7
feat(postgres): allow passing in-memory data to create_table
gforsyth May 24, 2024
819abd1
feat(trino): allow passing in-memory date to create_table
gforsyth May 24, 2024
13bd22f
feat(mysql): allow passing in-memory data to create_table
gforsyth May 24, 2024
7545abf
feat(mssql): allow passing in-memory data to create_table
gforsyth May 24, 2024
c6815e7
feat(exasol): allow passing in-memory data to create_table
gforsyth May 24, 2024
7fe4130
feat(risingwave): allow passing in-memory data to create_table
gforsyth May 24, 2024
8ddcd9a
feat(sqlite): allow passing in-memory data to create_table
gforsyth May 24, 2024
235c03f
feat(clickhouse): enable passing in-memory data to create_table
gforsyth May 24, 2024
4486097
feat(oracle): enable passing in-memory data to create_table
gforsyth May 24, 2024
59b8b73
feat(snowflake): allow passing in-memory data to create_table
gforsyth May 24, 2024
a17e76e
feat(pyspark): enable passing in-memory data to create_table
gforsyth May 24, 2024
0219d63
feat(pandas,dask): allow passing in-memory data to create_table
gforsyth May 24, 2024
2f9252b
chore: apply suggestions
gforsyth May 28, 2024
f65379d
chore(polars,datafusion): remove nascent `read_in_memory`
gforsyth May 28, 2024
e9fef1c
feat(bigquery): allow passing in-memory data to create_table
gforsyth May 28, 2024
fe6b114
feat(impala): allow passing in-memory data to create_table
gforsyth May 28, 2024
a1d9227
test(create_table): create initial memtable on backend being tested
gforsyth May 28, 2024
4e45c10
test(create_table): use lambdas for all inputs
gforsyth May 28, 2024
9a3a698
chore(ci): add polars to mysql, mssql, and oracle
gforsyth May 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ jobs:
extras:
- mysql
- geospatial
- polars
sys-deps:
- libgeos-dev
- name: postgres
Expand Down Expand Up @@ -186,6 +187,7 @@ jobs:
title: MS SQL Server
extras:
- mssql
- polars
services:
- mssql
sys-deps:
Expand Down Expand Up @@ -216,6 +218,7 @@ jobs:
serial: true
extras:
- oracle
- polars
services:
- oracle
- name: flink
Expand Down Expand Up @@ -271,6 +274,7 @@ jobs:
extras:
- mysql
- geospatial
- polars
services:
- mysql
sys-deps:
Expand Down Expand Up @@ -352,6 +356,7 @@ jobs:
title: MS SQL Server
extras:
- mssql
- polars
services:
- mssql
sys-deps:
Expand Down Expand Up @@ -381,6 +386,7 @@ jobs:
serial: true
extras:
- oracle
- polars
services:
- oracle
- os: ubuntu-latest
Expand Down
21 changes: 12 additions & 9 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import google.auth.credentials
import google.cloud.bigquery as bq
import google.cloud.bigquery_storage_v1 as bqstorage
import pandas as pd
import pydata_google_auth
import sqlglot as sg
import sqlglot.expressions as sge
Expand Down Expand Up @@ -42,6 +41,8 @@
from collections.abc import Callable, Iterable, Mapping
from pathlib import Path

import pandas as pd
import polars as pl
import pyarrow as pa
from google.cloud.bigquery.table import RowIterator

Expand Down Expand Up @@ -940,7 +941,12 @@
def create_table(
self,
name: str,
obj: pd.DataFrame | pa.Table | ir.Table | None = None,
obj: ir.Table
| pd.DataFrame
| pa.Table
| pl.DataFrame
| pl.LazyFrame
| None = None,
*,
schema: ibis.Schema | None = None,
database: str | None = None,
Expand Down Expand Up @@ -1027,14 +1033,11 @@
for name, value in (options or {}).items()
)

if obj is not None:
import pyarrow as pa
import pyarrow_hotfix # noqa: F401
if obj is not None and not isinstance(obj, ir.Table):
obj = ibis.memtable(obj, schema=schema)

Check warning on line 1037 in ibis/backends/bigquery/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/__init__.py#L1037

Added line #L1037 was not covered by tests

if isinstance(obj, (pd.DataFrame, pa.Table)):
obj = ibis.memtable(obj, schema=schema)

self._register_in_memory_tables(obj)
# This is a no-op if there aren't any memtables
self._register_in_memory_tables(obj)

Check warning on line 1040 in ibis/backends/bigquery/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/bigquery/__init__.py#L1040

Added line #L1040 was not covered by tests

if temp:
dataset = self._session_dataset.dataset_id
Expand Down
8 changes: 7 additions & 1 deletion ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from pathlib import Path

import pandas as pd
import polars as pl


def _to_memtable(v):
Expand Down Expand Up @@ -586,7 +587,12 @@ def read_csv(
def create_table(
self,
name: str,
obj: pd.DataFrame | pa.Table | ir.Table | None = None,
obj: ir.Table
| pd.DataFrame
| pa.Table
| pl.DataFrame
| pl.LazyFrame
| None = None,
*,
schema: ibis.Schema | None = None,
database: str | None = None,
Expand Down
122 changes: 113 additions & 9 deletions ibis/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import sqlglot as sg
import sqlglot.expressions as sge

import ibis
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
Expand All @@ -24,6 +23,7 @@
from ibis.backends.datafusion.compiler import DataFusionCompiler
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compiler import C
from ibis.common.dispatch import lazy_singledispatch
from ibis.expr.operations.udf import InputType
from ibis.formats.pyarrow import PyArrowType
from ibis.util import gen_name, normalize_filename
Expand All @@ -40,6 +40,7 @@

if TYPE_CHECKING:
import pandas as pd
import polars as pl


class Backend(SQLBackend, CanCreateCatalog, CanCreateDatabase, CanCreateSchema, NoUrl):
Expand Down Expand Up @@ -272,7 +273,13 @@
list[str]
The list of the table names that match the pattern `like`.
"""
return self._filter_with_like(self.con.tables(), like)
database = database or "public"
query = (
sg.select("table_name")
.from_("information_schema.tables")
.where(sg.column("table_schema").eq(sge.convert(database)))
)
return self.raw_sql(query).to_pydict()["table_name"]

def get_schema(
self,
Expand Down Expand Up @@ -550,7 +557,14 @@
def create_table(
self,
name: str,
obj: pd.DataFrame | pa.Table | ir.Table | None = None,
obj: ir.Table
| pd.DataFrame
| pa.Table
| pa.RecordBatchReader
| pa.RecordBatch
| pl.DataFrame
| pl.LazyFrame
| None = None,
*,
schema: sch.Schema | None = None,
database: str | None = None,
Expand Down Expand Up @@ -589,12 +603,10 @@

quoted = self.compiler.quoted

if obj is not None:
if not isinstance(obj, ir.Expr):
table = ibis.memtable(obj)
else:
table = obj
if isinstance(obj, ir.Expr):
table = obj

Check warning on line 607 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L607

Added line #L607 was not covered by tests

# If it's a memtable, it will get registered in the pre-execute hooks
self._run_pre_execute_hooks(table)

relname = "_"
Expand All @@ -610,10 +622,13 @@
sg.to_identifier(relname, quoted=quoted)
)
)
elif obj is not None:
_read_in_memory(obj, name, self, overwrite=overwrite)
return self.table(name, database=database)

Check warning on line 627 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L626-L627

Added lines #L626 - L627 were not covered by tests
else:
query = None

table_ident = sg.to_identifier(name, quoted=quoted)
table_ident = sg.table(name, db=database, quoted=quoted)

if query is None:
column_defs = [
Expand Down Expand Up @@ -670,3 +685,92 @@
ident = sg.table(name, db=db, catalog=catalog).sql(self.name)
with self._safe_raw_sql(sge.delete(ident)):
pass


@contextlib.contextmanager
def _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
"""Workaround inability to overwrite tables in dataframe API.

Datafusion has helper methods for loading in-memory data, but these methods
don't allow overwriting tables.
The SQL interface allows creating tables from existing tables, so we register
the data as a table using the dataframe API, then run a

CREATE [OR REPLACE] TABLE table_name AS SELECT * FROM in_memory_thing

and that allows us to toggle the overwrite flag.
"""
src = sge.Create(

Check warning on line 703 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L703

Added line #L703 was not covered by tests
this=table_name,
kind="TABLE",
expression=sg.select("*").from_(tmp_name),
replace=overwrite,
)

yield

Check warning on line 710 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L710

Added line #L710 was not covered by tests

_conn.raw_sql(src)
_conn.drop_table(tmp_name)

Check warning on line 713 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L712-L713

Added lines #L712 - L713 were not covered by tests


@lazy_singledispatch
def _read_in_memory(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this could be ibis.memtable()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, and that would unify the implementations across the backends, too. I'll open an follow-up to make use of the lazy single-dispatching for memtable insertion for the in-process backends.

source: Any, table_name: str, _conn: Backend, overwrite: bool = False
):
raise NotImplementedError("No support for source or imports missing")


@_read_in_memory.register(dict)
def _pydict(source, table_name, _conn, overwrite: bool = False):
tmp_name = gen_name("pydict")

Check warning on line 725 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L725

Added line #L725 was not covered by tests
with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
_conn.con.from_pydict(source, name=tmp_name)

Check warning on line 727 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L727

Added line #L727 was not covered by tests


@_read_in_memory.register("polars.DataFrame")
def _polars(source, table_name, _conn, overwrite: bool = False):
tmp_name = gen_name("polars")

Check warning on line 732 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L732

Added line #L732 was not covered by tests
with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
_conn.con.from_polars(source, name=tmp_name)

Check warning on line 734 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L734

Added line #L734 was not covered by tests


@_read_in_memory.register("polars.LazyFrame")
def _polars(source, table_name, _conn, overwrite: bool = False):
tmp_name = gen_name("polars")

Check warning on line 739 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L739

Added line #L739 was not covered by tests
with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
_conn.con.from_polars(source.collect(), name=tmp_name)

Check warning on line 741 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L741

Added line #L741 was not covered by tests


@_read_in_memory.register("pyarrow.Table")
def _pyarrow_table(source, table_name, _conn, overwrite: bool = False):
tmp_name = gen_name("pyarrow")

Check warning on line 746 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L746

Added line #L746 was not covered by tests
with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
_conn.con.from_arrow_table(source, name=tmp_name)

Check warning on line 748 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L748

Added line #L748 was not covered by tests


@_read_in_memory.register("pyarrow.RecordBatchReader")
def _pyarrow_rbr(source, table_name, _conn, overwrite: bool = False):
tmp_name = gen_name("pyarrow")

Check warning on line 753 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L753

Added line #L753 was not covered by tests
with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
_conn.con.from_arrow_table(source.read_all(), name=tmp_name)

Check warning on line 755 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L755

Added line #L755 was not covered by tests


@_read_in_memory.register("pyarrow.RecordBatch")
def _pyarrow_rb(source, table_name, _conn, overwrite: bool = False):
tmp_name = gen_name("pyarrow")

Check warning on line 760 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L760

Added line #L760 was not covered by tests
with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
_conn.con.register_record_batches(tmp_name, [[source]])

Check warning on line 762 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L762

Added line #L762 was not covered by tests


@_read_in_memory.register("pyarrow.dataset.Dataset")
def _pyarrow_rb(source, table_name, _conn, overwrite: bool = False):
tmp_name = gen_name("pyarrow")

Check warning on line 767 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L767

Added line #L767 was not covered by tests
with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
_conn.con.register_dataset(tmp_name, source)

Check warning on line 769 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L769

Added line #L769 was not covered by tests


@_read_in_memory.register("pandas.DataFrame")
def _pandas(source: pd.DataFrame, table_name, _conn, overwrite: bool = False):
tmp_name = gen_name("pandas")

Check warning on line 774 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L774

Added line #L774 was not covered by tests
with _create_and_drop_memtable(_conn, table_name, tmp_name, overwrite):
_conn.con.from_pandas(source, name=tmp_name)

Check warning on line 776 in ibis/backends/datafusion/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/datafusion/__init__.py#L776

Added line #L776 was not covered by tests
54 changes: 44 additions & 10 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
from ibis.backends.duckdb.converter import DuckDBPandasData
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compiler import STAR, C
from ibis.common.dispatch import lazy_singledispatch
from ibis.expr.operations.udf import InputType

if TYPE_CHECKING:
from collections.abc import Iterable, Mapping, MutableMapping, Sequence

import pandas as pd
import polars as pl
import torch
from fsspec import AbstractFileSystem

Expand Down Expand Up @@ -121,7 +123,12 @@ def _to_sqlglot(
def create_table(
self,
name: str,
obj: pd.DataFrame | pa.Table | ir.Table | None = None,
obj: ir.Table
| pd.DataFrame
| pa.Table
| pl.DataFrame
| pl.LazyFrame
| None = None,
*,
schema: ibis.Schema | None = None,
database: str | None = None,
Expand Down Expand Up @@ -846,11 +853,19 @@ def _read_parquet_pyarrow_dataset(
# explicitly.

def read_in_memory(
# TODO: deprecate this in favor of `create_table`
self,
source: pd.DataFrame | pa.Table | pa.ipc.RecordBatchReader,
source: pd.DataFrame
| pa.Table
| pa.RecordBatchReader
| pl.DataFrame
| pl.LazyFrame,
table_name: str | None = None,
) -> ir.Table:
"""Register a Pandas DataFrame or pyarrow object as a table in the current database.
"""Register an in-memory table object in the current database.

Supported objects include pandas DataFrame, a Polars
DataFrame/LazyFrame, or a PyArrow Table or RecordBatchReader.

Parameters
----------
Expand All @@ -867,13 +882,7 @@ def read_in_memory(

"""
table_name = table_name or util.gen_name("read_in_memory")
self.con.register(table_name, source)

if isinstance(source, pa.ipc.RecordBatchReader):
# Ensure the reader isn't marked as started, in case the name is
# being overwritten.
self._record_batch_readers_consumed[table_name] = False

_read_in_memory(source, table_name, self)
return self.table(table_name)

def read_delta(
Expand Down Expand Up @@ -1598,3 +1607,28 @@ def _get_temp_view_definition(self, name: str, definition: str) -> str:
def _create_temp_view(self, table_name, source):
with self._safe_raw_sql(self._get_temp_view_definition(table_name, source)):
pass


@lazy_singledispatch
def _read_in_memory(source: Any, table_name: str, _conn: Backend, **kwargs: Any):
raise NotImplementedError(
f"The `{_conn.name}` backend currently does not support "
f"reading data of {type(source)!r}"
)


@_read_in_memory.register("polars.DataFrame")
@_read_in_memory.register("polars.LazyFrame")
@_read_in_memory.register("pyarrow.Table")
@_read_in_memory.register("pandas.DataFrame")
@_read_in_memory.register("pyarrow.dataset.Dataset")
def _default(source, table_name, _conn, **kwargs: Any):
_conn.con.register(table_name, source)


@_read_in_memory.register("pyarrow.RecordBatchReader")
def _pyarrow_rbr(source, table_name, _conn, **kwargs: Any):
_conn.con.register(table_name, source)
# Ensure the reader isn't marked as started, in case the name is
# being overwritten.
_conn._record_batch_readers_consumed[table_name] = False
Loading