From 833c6989a4ff8f3a7baeab9a326d399f2edf2e96 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Fri, 10 Mar 2023 06:39:36 -0500 Subject: [PATCH] feat(api): make create_table uniform This commit addresses most outstanding DDL API discrepancy issues including: - `create_table`/`create_view` for pandas, dask and polars - making the various DDL APIs as uniform as possible (see clickhouse for an example of divergence) - deprecation of `load_data` (except on impala, since it's significantly different from the others) - add clickhouse implementations of `create_table`/`create_view`/`create_database` - standardization of APIs for creating tables During the process of getting all of this to work, I uncovered multiple issues with `snowflake-sqlalchemy`'s quoting behavior and had to monkey patch in `normalize_name` to avoid the broken heuristic they are using. Additionally, to avoid having to solve the "which case should I use?" problem in multiple places, I decided to remove our backend-scoped use of `sqlalchemy.MetaData`. Without removing it, we'd have to deal with identifiers' case not matching. It's possible there's a performance hit, but removing this maintenance burden until someone comes along saying it's slow is worth it IMO. BREAKING CHANGE: Snowflake identifiers are now kept **as is** from the database. Many table names and column names may now be in SHOUTING CASE. Adjust code accordingly. --- ibis/backends/base/__init__.py | 44 ++--- ibis/backends/base/sql/alchemy/__init__.py | 169 ++++++++++-------- ibis/backends/bigquery/__init__.py | 33 +++- ibis/backends/clickhouse/__init__.py | 95 ++++++++++ ibis/backends/dask/__init__.py | 2 +- ibis/backends/dask/tests/test_client.py | 3 +- ibis/backends/datafusion/__init__.py | 12 ++ ibis/backends/duckdb/__init__.py | 48 ++--- ibis/backends/impala/__init__.py | 73 +++++--- ibis/backends/impala/client.py | 1 + ibis/backends/mssql/__init__.py | 10 +- ibis/backends/pandas/__init__.py | 56 +++++- ibis/backends/pandas/tests/test_client.py | 3 +- ibis/backends/polars/__init__.py | 56 +++++- ibis/backends/postgres/tests/test_client.py | 23 --- .../backends/postgres/tests/test_functions.py | 2 +- ibis/backends/pyspark/__init__.py | 87 +++++---- ibis/backends/pyspark/tests/test_ddl.py | 4 +- ibis/backends/snowflake/__init__.py | 13 +- ibis/backends/sqlite/__init__.py | 20 ++- ibis/backends/sqlite/tests/test_functions.py | 14 +- ibis/backends/tests/test_client.py | 55 +++--- ibis/backends/tests/test_map.py | 2 +- ibis/tests/expr/mocks.py | 22 ++- ibis/tests/sql/test_sqlalchemy.py | 4 +- poetry.lock | 14 +- pyproject.toml | 18 +- 27 files changed, 557 insertions(+), 326 deletions(-) diff --git a/ibis/backends/base/__init__.py b/ibis/backends/base/__init__.py index 63c81454e836..20b50d7a6ffc 100644 --- a/ibis/backends/base/__init__.py +++ b/ibis/backends/base/__init__.py @@ -769,17 +769,19 @@ def create_database(self, name: str, force: bool = False) -> None: f'Backend "{self.name}" does not implement "create_database"' ) + @abc.abstractmethod def create_table( self, name: str, obj: pd.DataFrame | ir.Table | None = None, + *, schema: ibis.Schema | None = None, database: str | None = None, + temp: bool = False, + overwrite: bool = False, ) -> ir.Table: """Create a new table. - Not all backends implement this method. - Parameters ---------- name @@ -794,19 +796,22 @@ def create_table( database Name of the database where the table will be created, if not the default. + temp + Whether a table is temporary or not + overwrite + Whether to clobber existing data Returns ------- Table The table that was created. """ - raise NotImplementedError( - f'Backend "{self.name}" does not implement "create_table"' - ) + @abc.abstractmethod def drop_table( self, name: str, + *, database: str | None = None, force: bool = False, ) -> None: @@ -825,36 +830,38 @@ def drop_table( f'Backend "{self.name}" does not implement "drop_table"' ) + @abc.abstractmethod def create_view( self, name: str, - expr: ir.Table, + obj: ir.Table, + *, database: str | None = None, + overwrite: bool = False, ) -> ir.Table: - """Create a view. + """Create a new view from an expression. Parameters ---------- name - Name for the new view. - expr - An Ibis table expression that will be used to extract the query - of the view. + Name of the new view. + obj + An Ibis table expression that will be used to create the view. database - Name of the database where the view will be created, if not the - default. + Name of the database where the view will be created, if not + provided the database's default is used. + overwrite + Whether to clobber an existing view with the same name Returns ------- Table The view that was created. """ - raise NotImplementedError( - f'Backend "{self.name}" does not implement "create_view"' - ) + @abc.abstractmethod def drop_view( - self, name: str, database: str | None = None, force: bool = False + self, name: str, *, database: str | None = None, force: bool = False ) -> None: """Drop a view. @@ -867,9 +874,6 @@ def drop_view( force If `False`, an exception is raised if the view does not exist. """ - raise NotImplementedError( - f'Backend "{self.name}" does not implement "drop_view"' - ) @classmethod def has_operation(cls, operation: type[ops.Value]) -> bool: diff --git a/ibis/backends/base/sql/alchemy/__init__.py b/ibis/backends/base/sql/alchemy/__init__.py index 8b43563c237d..e3f67f1c084a 100644 --- a/ibis/backends/base/sql/alchemy/__init__.py +++ b/ibis/backends/base/sql/alchemy/__init__.py @@ -11,7 +11,7 @@ import sqlalchemy as sa import ibis -import ibis.common.exceptions as exc +import ibis.common.exceptions as com import ibis.expr.operations as ops import ibis.expr.schema as sch import ibis.expr.types as ir @@ -92,7 +92,6 @@ def _current_schema(self) -> str | None: def do_connect(self, con: sa.engine.Engine) -> None: self.con = con self._inspector = sa.inspect(self.con) - self.meta = sa.MetaData() self._schemas: dict[str, sch.Schema] = {} self._temp_views: set[str] = set() @@ -170,11 +169,12 @@ def begin(self): def create_table( self, name: str, - expr: pd.DataFrame | ir.Table | None = None, + obj: pd.DataFrame | ir.Table | None = None, + *, schema: sch.Schema | None = None, database: str | None = None, - force: bool = False, temp: bool = False, + overwrite: bool = False, ) -> ir.Table: """Create a table. @@ -182,7 +182,7 @@ def create_table( ---------- name Name of the new table. - expr + obj An Ibis table expression or pandas table that will be used to extract the schema and the data of the new table. If not provided, `schema` must be given. @@ -192,20 +192,23 @@ def create_table( database Name of the database where the table will be created, if not the default. - force - Check whether a table exists before creating it temp Should the table be temporary for the session. + overwrite + Clobber existing data Returns ------- Table The table that was created. """ + if obj is None and schema is None: + raise com.IbisError("The schema or obj parameter is required") + import pandas as pd - if isinstance(expr, pd.DataFrame): - expr = ibis.memtable(expr) + if isinstance(obj, pd.DataFrame): + obj = ibis.memtable(obj) if database == self.current_database: # avoid fully qualified name @@ -213,20 +216,17 @@ def create_table( if database is not None: raise NotImplementedError( - 'Creating tables from a different database is not yet implemented' + "Creating tables from a different database is not yet implemented" ) - if expr is None and schema is None: - raise ValueError('You must pass either an expression or a schema') - - if expr is not None and schema is not None: - if not expr.schema().equals(ibis.schema(schema)): - raise TypeError( + if obj is not None and schema is not None: + if not obj.schema().equals(ibis.schema(schema)): + raise com.IbisTypeError( 'Expression schema is not equal to passed schema. ' 'Try passing the expression without the schema' ) if schema is None: - schema = expr.schema() + schema = obj.schema() self._schemas[self._fully_qualified_name(name, database)] = schema table = self._table_from_schema( @@ -236,15 +236,17 @@ def create_table( temp=temp, ) - if has_expr := expr is not None: + if has_expr := obj is not None: # this has to happen outside the `begin` block, so that in-memory # tables are visible inside the transaction created by it - self._register_in_memory_tables(expr) + self._register_in_memory_tables(obj) with self.begin() as bind: - table.create(bind=bind, checkfirst=force) + if overwrite: + table.drop(bind=bind, checkfirst=True) + table.create(bind=bind) if has_expr: - method = self._get_insert_method(expr) + method = self._get_insert_method(obj) bind.execute(method(table.insert())) return self.table(name, database=database) @@ -260,8 +262,12 @@ def _get_insert_method(self, expr): and isinstance(expr.op(), ops.InMemoryTable) ): (from_,) = compiled.get_final_froms() - (rows,) = from_._data - return methodcaller("values", rows) + try: + (rows,) = from_._data + except AttributeError: + return methodcaller("from_select", list(expr.columns), from_) + else: + return methodcaller("values", rows) return methodcaller("from_select", list(expr.columns), compiled) @@ -278,31 +284,28 @@ def _columns_from_schema(self, name: str, schema: sch.Schema) -> list[sa.Column] ] def _table_from_schema( - self, - name: str, - schema: sch.Schema, - database: str | None = None, - temp: bool = False, + self, name: str, schema: sch.Schema, temp: bool = False, **_: Any ) -> sa.Table: prefixes = [] if temp: prefixes.append('TEMPORARY') columns = self._columns_from_schema(name, schema) return sa.Table( - name, self.meta, *columns, prefixes=prefixes, quote=self.quote_table_names + name, + sa.MetaData(), + *columns, + prefixes=prefixes, + quote=self.quote_table_names, ) def drop_table( - self, - table_name: str, - database: str | None = None, - force: bool = False, + self, name: str, *, database: str | None = None, force: bool = False ) -> None: """Drop a table. Parameters ---------- - table_name + name Table to drop database Database to drop table from @@ -314,26 +317,23 @@ def drop_table( database = None if database is not None: - raise NotImplementedError( - 'Dropping tables from a different database is not yet implemented' + raise com.IbisInputError( + "Dropping tables from a different database is not yet implemented" ) - t = self._get_sqla_table(table_name, schema=database, autoload=False) + t = self._get_sqla_table(name, schema=database, autoload=False) with self.begin() as bind: t.drop(bind=bind, checkfirst=force) - assert not self.inspector.has_table( - table_name - ), f"Something went wrong during DROP of table {table_name!r}" - - self.meta.remove(t) - - qualified_name = self._fully_qualified_name(table_name, database) + qualified_name = self._fully_qualified_name(name, database) with contextlib.suppress(KeyError): # schemas won't be cached if created with raw_sql del self._schemas[qualified_name] + @util.deprecated( + as_of="5.0", removed_in="6.0", instead="Use create_table(overwrite=True)" + ) def load_data( self, table_name: str, @@ -378,12 +378,8 @@ def load_data( schema=self._current_schema, ) - def truncate_table( - self, - table_name: str, - database: str | None = None, - ) -> None: - t = self._get_sqla_table(table_name, schema=database) + def truncate_table(self, name: str, database: str | None = None) -> None: + t = self._get_sqla_table(name, schema=database) with self.begin() as con: con.execute(t.delete()) @@ -420,17 +416,13 @@ def _get_sqla_table( ) -> sa.Table: # If the underlying table (or more likely, view) has changed, remove it # to ensure a correct reflection - if autoload and self.inspector.has_table(name, schema=schema): - self.meta.remove( - sa.Table(name, self.meta, schema=schema, quote=self.quote_table_names) - ) with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message="Did not recognize type", category=sa.exc.SAWarning ) table = sa.Table( name, - self.meta, + sa.MetaData(), schema=schema, autoload_with=self.con if autoload else None, quote=self.quote_table_names, @@ -520,7 +512,7 @@ def table( """ if database is not None: if not isinstance(database, str): - raise exc.IbisTypeError( + raise com.IbisTypeError( f"`database` must be a string; got {type(database)}" ) if database != self.current_database: @@ -531,15 +523,13 @@ def table( def _insert_dataframe( self, table_name: str, df: pd.DataFrame, overwrite: bool ) -> None: - if not self.inspector.has_table(table_name): - raise exc.IbisError(f"Cannot insert into non-existent table {table_name}") - df.to_sql( - table_name, - self.con, - index=False, - if_exists='replace' if overwrite else 'append', - schema=self._current_schema, - ) + schema = self._current_schema + + t = self._get_sqla_table(table_name, schema=schema) + with self.con.begin() as con: + if overwrite: + con.execute(t.delete()) + con.execute(t.insert(), df.to_dict(orient="records")) def insert( self, @@ -607,12 +597,12 @@ def insert( with self.begin() as bind: if from_table_expr is not None: - bind.execute( - to_table.insert().from_select( - list(from_table_expr.columns), - from_table_expr.compile(), - ) - ) + compiled = from_table_expr.compile() + columns = [ + self.con.dialect.normalize_name(c) + for c in from_table_expr.columns + ] + bind.execute(to_table.insert().from_select(columns, compiled)) elif isinstance(obj, (list, dict)): to_table = self._get_sqla_table(table_name, schema=database) @@ -688,3 +678,40 @@ def _load_into_cache(self, name, expr): def _clean_up_cached_table(self, op): self.drop_table(op.name) + + def create_view( + self, + name: str, + obj: ir.Table, + *, + database: str | None = None, + overwrite: bool = False, + ) -> ir.Table: + import sqlalchemy_views as sav + + source = self.compile(obj) + view = sav.CreateView( + sa.Table( + name, sa.MetaData(), schema=database, quote=self.quote_table_names + ), + source, + or_replace=overwrite, + ) + with self.begin() as con: + con.execute(view) + return self.table(name, database=database) + + def drop_view( + self, name: str, *, database: str | None = None, force: bool = False + ) -> None: + import sqlalchemy_views as sav + + view = sav.DropView( + sa.Table( + name, sa.MetaData(), schema=database, quote=self.quote_table_names + ), + if_exists=not force, + ) + + with self.begin() as con: + con.execute(view) diff --git a/ibis/backends/bigquery/__init__.py b/ibis/backends/bigquery/__init__.py index 929c05acd525..0c317f547e03 100644 --- a/ibis/backends/bigquery/__init__.py +++ b/ibis/backends/bigquery/__init__.py @@ -15,6 +15,7 @@ from pydata_google_auth import cache import ibis +import ibis.common.exceptions as com import ibis.expr.operations as ops import ibis.expr.schema as sch import ibis.expr.types as ir @@ -441,11 +442,22 @@ def create_table( self, name: str, obj: pd.DataFrame | ir.Table | None = None, + *, schema: ibis.Schema | None = None, database: str | None = None, + temp: bool | None = None, + overwrite: bool = False, ) -> ir.Table: if obj is None and schema is None: - raise ValueError("The schema or obj parameter is required") + raise com.IbisError("The schema or obj parameter is required") + if temp is True: + raise NotImplementedError( + "BigQuery backend does not yet support temporary tables" + ) + if overwrite is not False: + raise NotImplementedError( + "BigQuery backend does not yet support overwriting tables" + ) if schema is not None: table_id = self._fully_qualified_name(name, database) bigquery_schema = ibis_schema_to_bigquery_schema(schema) @@ -463,23 +475,28 @@ def create_table( return self.table(name, database=database) def drop_table( - self, name: str, database: str | None = None, force: bool = False + self, name: str, *, database: str | None = None, force: bool = False ) -> None: table_id = self._fully_qualified_name(name, database) self.client.delete_table(table_id, not_found_ok=not force) def create_view( - self, name: str, expr: ir.Table, database: str | None = None + self, + name: str, + obj: ir.Table, + *, + database: str | None = None, + overwrite: bool = False, ) -> ir.Table: + or_replace = "OR REPLACE " * overwrite + sql_select = self.compile(obj) table_id = self._fully_qualified_name(name, database) - sql_select = self.compile(expr) - table = bq.Table(table_id) - table.view_query = sql_select - self.client.create_table(table) + code = f"CREATE {or_replace}VIEW {table_id} AS {sql_select}" + self.raw_sql(code) return self.table(name, database=database) def drop_view( - self, name: str, database: str | None = None, force: bool = False + self, name: str, *, database: str | None = None, force: bool = False ) -> None: self.drop_table(name=name, database=database, force=force) diff --git a/ibis/backends/clickhouse/__init__.py b/ibis/backends/clickhouse/__init__.py index d22d11624e3f..c4dd8b1f40a3 100644 --- a/ibis/backends/clickhouse/__init__.py +++ b/ibis/backends/clickhouse/__init__.py @@ -11,6 +11,7 @@ import toolz import ibis +import ibis.common.exceptions as com import ibis.config import ibis.expr.analysis as an import ibis.expr.operations as ops @@ -489,3 +490,97 @@ def has_operation(cls, operation: type[ops.Value]) -> bool: from ibis.backends.clickhouse.compiler.values import translate_val return operation in translate_val.registry + + def create_database( + self, name: str, *, force: bool = False, engine: str = "Atomic" + ) -> None: + self.raw_sql( + f"CREATE DATABASE {'IF NOT EXISTS ' * force}{name} ENGINE = {engine}" + ) + + def drop_database(self, name: str, *, force: bool = False) -> None: + self.raw_sql(f"DROP DATABASE {'IF EXISTS ' * force}{name}") + + def truncate_table(self, name: str, database: str | None = None) -> None: + ident = ".".join(filter(None, (database, name))) + self.raw_sql(f"DELETE FROM {ident}") + + def drop_table( + self, name: str, database: str | None = None, force: bool = False + ) -> None: + ident = ".".join(filter(None, (database, name))) + self.raw_sql(f"DROP TABLE {'IF EXISTS ' * force}{ident}") + + def create_table( + self, + name: str, + obj: pd.DataFrame | ir.Table | None = None, + *, + schema: ibis.Schema | None = None, + database: str | None = None, + temp: bool = False, + overwrite: bool = False, + # backend specific arguments + engine: str | None, + order_by: Iterable[str] | None = None, + partition_by: Iterable[str] | None = None, + sample_by: str | None = None, + settings: Mapping[str, Any] | None = None, + ) -> ir.Table: + tmp = "TEMPORARY " * temp + replace = "OR REPLACE " * overwrite + code = f"CREATE {replace}{tmp}TABLE {name}" + + if obj is None and schema is None: + raise com.IbisError("The schema or obj parameter is required") + + if schema is not None: + code += f" ({schema})" + + if isinstance(obj, pd.DataFrame): + obj = ibis.memtable(obj, schema=schema) + + if obj is not None: + self._register_in_memory_tables(obj) + query = self.compile(obj) + code += f" AS {query}" + + code += f" ENGINE = {engine}" + + if order_by is not None: + code += f" ORDER BY {', '.join(util.promote_list(order_by))}" + + if partition_by is not None: + code += f" PARTITION BY {', '.join(util.promote_list(partition_by))}" + + if sample_by is not None: + code += f" SAMPLE BY {sample_by}" + + if settings: + kvs = ", ".join(f"{name}={value!r}" for name, value in settings.items()) + code += f" SETTINGS {kvs}" + + self.raw_sql(code) + return self.table(name, database=database) + + def create_view( + self, + name: str, + obj: ir.Table, + *, + database: str | None = None, + overwrite: bool = False, + ) -> ir.Table: + name = ".".join(filter(None, (database, name))) + replace = "OR REPLACE " * overwrite + query = self.compile(obj) + code = f"CREATE {replace}VIEW {name} AS {query}" + self.raw_sql(code) + return self.table(name, database=database) + + def drop_view( + self, name: str, *, database: str | None = None, force: bool = False + ) -> None: + name = ".".join(filter(None, (database, name))) + if_not_exists = "IF EXISTS " * force + self.raw_sql(f"DROP VIEW {if_not_exists}{name}") diff --git a/ibis/backends/dask/__init__.py b/ibis/backends/dask/__init__.py index 7d773b4ae328..b1746ca7a273 100644 --- a/ibis/backends/dask/__init__.py +++ b/ibis/backends/dask/__init__.py @@ -128,7 +128,7 @@ def _convert_object(cls, obj: dd.DataFrame) -> dd.DataFrame: return obj def _load_into_cache(self, name, expr): - self.load_data(name, self.compile(expr).persist()) + self.create_table(name, self.compile(expr).persist()) def _clean_up_cached_table(self, op): del self.dictionary[op.name] diff --git a/ibis/backends/dask/tests/test_client.py b/ibis/backends/dask/tests/test_client.py index 837ad6649dcd..c13dd35a50ec 100644 --- a/ibis/backends/dask/tests/test_client.py +++ b/ibis/backends/dask/tests/test_client.py @@ -52,7 +52,8 @@ def test_client_table_repr(table): def test_load_data(client, npartitions): - client.load_data('testing', make_dask_data_frame(npartitions)) + with pytest.warns(FutureWarning): + client.load_data('testing', make_dask_data_frame(npartitions)) assert 'testing' in client.list_tables() assert client.get_schema('testing') diff --git a/ibis/backends/datafusion/__init__.py b/ibis/backends/datafusion/__init__.py index 249fac396ad0..3c619c26f42d 100644 --- a/ibis/backends/datafusion/__init__.py +++ b/ibis/backends/datafusion/__init__.py @@ -284,3 +284,15 @@ def has_operation(cls, operation: type[ops.Value]) -> bool: return operation in op_classes or any( issubclass(operation, op_impl) for op_impl in op_classes ) + + def create_table(self, *_, **__) -> ir.Table: + raise NotImplementedError(self.name) + + def create_view(self, *_, **__) -> ir.Table: + raise NotImplementedError(self.name) + + def drop_table(self, *_, **__) -> ir.Table: + raise NotImplementedError(self.name) + + def drop_view(self, *_, **__) -> ir.Table: + raise NotImplementedError(self.name) diff --git a/ibis/backends/duckdb/__init__.py b/ibis/backends/duckdb/__init__.py index 48f29d7db1b2..be77364b4ae0 100644 --- a/ibis/backends/duckdb/__init__.py +++ b/ibis/backends/duckdb/__init__.py @@ -43,18 +43,6 @@ def normalize_filenames(source_list): return list(map(util.normalize_filename, source_list)) -def _create_view(*args, **kwargs): - import sqlalchemy_views as sav - - return sav.CreateView(*args, **kwargs) - - -def _drop_view(*args, **kwargs): - import sqlalchemy_views as sav - - return sav.DropView(*args, **kwargs) - - def _format_kwargs(kwargs: Mapping[str, Any]): bindparams, pieces = [], [] for name, value in kwargs.items(): @@ -256,6 +244,7 @@ def read_json( Table An ibis table expression """ + import sqlalchemy_views as sav from packaging.version import parse as vparse if (version := vparse(self.version)) < vparse("0.7.0"): @@ -265,7 +254,7 @@ def read_json( if not table_name: table_name = f"ibis_read_json_{next(json_n)}" - view = _create_view( + view = sav.CreateView( sa.table(table_name), sa.select(sa.literal_column("*")).select_from( sa.func.read_json_auto( @@ -305,6 +294,8 @@ def read_csv( ir.Table The just-registered table """ + import sqlalchemy_views as sav + source_list = normalize_filenames(source_list) if not table_name: @@ -319,7 +310,7 @@ def read_csv( source = sa.select(sa.literal_column("*")).select_from( sa.func.read_csv(sa.func.list_value(*source_list), _format_kwargs(kwargs)) ) - view = _create_view(sa.table(table_name), source, or_replace=True) + view = sav.CreateView(sa.table(table_name), source, or_replace=True) with self.begin() as con: con.execute(view) return self.table(table_name) @@ -349,6 +340,8 @@ def read_parquet( ir.Table The just-registered table """ + import sqlalchemy_views as sav + source_list = normalize_filenames(source_list) if any(source.startswith("s3://") for source in source_list): @@ -383,7 +376,7 @@ def read_parquet( sa.func.list_value(*source_list), _format_kwargs(kwargs) ) ) - view = _create_view(sa.table(table_name), source, or_replace=True) + view = sav.CreateView(sa.table(table_name), source, or_replace=True) with self.begin() as con: con.execute(view) @@ -439,6 +432,8 @@ def read_postgres(self, uri, table_name: str | None = None, schema: str = "publi ir.Table The just-registered table. """ + import sqlalchemy_views as sav + if table_name is None: raise ValueError( "`table_name` is required when registering a postgres table" @@ -447,7 +442,7 @@ def read_postgres(self, uri, table_name: str | None = None, schema: str = "publi source = sa.select(sa.literal_column("*")).select_from( sa.func.postgres_scan_pushdown(uri, schema, table_name) ) - view = _create_view(sa.table(table_name), source, or_replace=True) + view = sav.CreateView(sa.table(table_name), source, or_replace=True) with self.begin() as con: con.execute(view) @@ -481,13 +476,15 @@ def read_sqlite(self, path: str | Path, table_name: str | None = None) -> ir.Tab 3 0.29 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63 4 0.31 Good J SI2 63.3 58.0 335 4.34 4.35 2.75 """ + import sqlalchemy_views as sav + if table_name is None: raise ValueError("`table_name` is required when registering a sqlite table") self._load_extensions(["sqlite"]) source = sa.select(sa.literal_column("*")).select_from( sa.func.sqlite_scan(str(path), table_name) ) - view = _create_view(sa.table(table_name), source, or_replace=True) + view = sav.CreateView(sa.table(table_name), source, or_replace=True) with self.begin() as con: con.execute(view) @@ -721,23 +718,6 @@ def _get_compiled_statement(self, view: sa.Table, definition: sa.sql.Selectable) view, definition, compile_kwargs={"literal_binds": True} ) - def create_view( - self, name: str, expr: ir.Table, database: str | None = None - ) -> ir.Table: - source = self.compile(expr) - view = _create_view(sa.table(name), source, or_replace=True) - with self.begin() as con: - con.execute(view) - return self.table(name, database=database) - - def drop_view( - self, name: str, database: str | None = None, force: bool = False - ) -> None: - view = _drop_view(sa.table(name), if_exists=not force) - - with self.begin() as con: - con.execute(view) - def _insert_dataframe( self, table_name: str, df: pd.DataFrame, overwrite: bool ) -> None: diff --git a/ibis/backends/impala/__init__.py b/ibis/backends/impala/__init__.py index 0bf9d6170eab..7961572a3eb4 100644 --- a/ibis/backends/impala/__init__.py +++ b/ibis/backends/impala/__init__.py @@ -512,11 +512,16 @@ def set_compression_codec(self, codec): self.set_options({'COMPRESSION_CODEC': codec}) def create_view( - self, name: str, expr: ir.Table, database: str | None = None + self, + name: str, + obj: ir.Table, + *, + database: str | None = None, + overwrite: bool = False, ) -> ir.Table: - ast = self.compiler.to_ast(expr) + ast = self.compiler.to_ast(obj) select = ast.queries[0] - statement = CreateView(name, select, database=database) + statement = CreateView(name, select, database=database, can_exist=overwrite) self.raw_sql(statement) return self.table(name, database=database) @@ -536,12 +541,14 @@ def _setup_insert(self, obj): def create_table( self, - table_name: str, + name: str, obj: ir.Table | None = None, + *, schema=None, database=None, - external=False, - force=False, + temp: bool | None = None, + overwrite: bool = False, + external: bool = False, # HDFS options format='parquet', location=None, @@ -554,7 +561,7 @@ def create_table( Parameters ---------- - table_name + name Table name obj If passed, creates table from select statement results @@ -563,7 +570,9 @@ def create_table( particular schema database Database name - force + temp + Whether a table is temporary + overwrite Do not create table if table with indicated name already exists external Create an external table; Impala will not delete the underlying @@ -578,11 +587,14 @@ def create_table( expression. like_parquet Can specify instead of a schema - - Examples - -------- - >>> con.create_table('new_table_name', table_expr) # doctest: +SKIP """ + if obj is None and schema is None: + raise com.IbisError("The schema or obj parameter is required") + + if temp is not None: + raise NotImplementedError( + "Impala backend does not yet support temporary tables" + ) if like_parquet is not None: raise NotImplementedError @@ -591,34 +603,34 @@ def create_table( ast = self.compiler.to_ast(to_insert) select = ast.queries[0] + if overwrite: + self.drop_table(name, force=True) self.raw_sql( CTAS( - table_name, + name, select, database=database, - can_exist=force, format=format, external=external, partition=partition, path=location, ) ) - elif schema is not None: + else: # schema is not None + if overwrite: + self.drop_table(name, force=True) self.raw_sql( CreateTableWithSchema( - table_name, + name, schema, database=database, format=format, - can_exist=force, external=external, path=location, partition=partition, ) ) - else: - raise com.IbisError('Must pass obj or schema') - return self.table(table_name, database=database) + return self.table(name, database=database) def avro_file( self, @@ -894,6 +906,9 @@ def insert( validate=validate, ) + @util.deprecated( + as_of="5.0", removed_in="6.0", instead="Use create_table(overwrite=True)" + ) def load_data( self, table_name, @@ -906,12 +921,14 @@ def load_data( table = self.table(table_name, database=database) return table.load_data(path, overwrite=overwrite, partition=partition) - def drop_table(self, table_name, database=None, force=False): + def drop_table( + self, name: str, *, database: str | None = None, force: bool = False + ) -> None: """Drop an Impala table. Parameters ---------- - table_name + name Table name database Database name @@ -924,23 +941,23 @@ def drop_table(self, table_name, database=None, force=False): >>> db = 'operations' >>> con.drop_table(table, database=db, force=True) # doctest: +SKIP """ - statement = DropTable(table_name, database=database, must_exist=not force) + statement = DropTable(name, database=database, must_exist=not force) self.raw_sql(statement) - def truncate_table(self, table_name, database=None): + def truncate_table(self, name: str, database: str | None = None) -> None: """Delete all rows from an existing table. Parameters ---------- - table_name + name Table name database Database name """ - statement = TruncateTable(table_name, database=database) + statement = TruncateTable(name, database=database) self.raw_sql(statement) - def drop_table_or_view(self, name, database=None, force=False): + def drop_table_or_view(self, name, *, database=None, force=False): """Drop view or table.""" try: self.drop_table(name, database=database) @@ -950,7 +967,7 @@ def drop_table_or_view(self, name, database=None, force=False): except Exception: # noqa: BLE001 raise e - def cache_table(self, table_name, database=None, pool='default'): + def cache_table(self, table_name, *, database=None, pool='default'): """Caches a table in cluster memory in the given pool. Parameters diff --git a/ibis/backends/impala/client.py b/ibis/backends/impala/client.py index 61656b4c9c3c..6ef94a4f4c3d 100644 --- a/ibis/backends/impala/client.py +++ b/ibis/backends/impala/client.py @@ -354,6 +354,7 @@ def load_data(self, path, overwrite=False, partition=None): path, partition=partition, partition_schema=partition_schema, + overwrite=overwrite, ) return self._client.raw_sql(stmt.compile()) diff --git a/ibis/backends/mssql/__init__.py b/ibis/backends/mssql/__init__.py index 1bc4e38f6690..6024163d882a 100644 --- a/ibis/backends/mssql/__init__.py +++ b/ibis/backends/mssql/__init__.py @@ -72,10 +72,6 @@ def _table_from_schema( database: str | None = None, temp: bool = False, ) -> sa.Table: - prefixes = [] - if temp: - raise ValueError( - 'MSSQL supports temporary table declaration through placing hash before the table name' - ) - columns = self._columns_from_schema(name, schema) - return sa.Table(name, self.meta, *columns, prefixes=prefixes) + return super()._table_from_schema( + temp * "#" + name, schema=schema, database=database, temp=False + ) diff --git a/ibis/backends/pandas/__init__.py b/ibis/backends/pandas/__init__.py index e2dc0ab823cc..06b2df0f9b05 100644 --- a/ibis/backends/pandas/__init__.py +++ b/ibis/backends/pandas/__init__.py @@ -11,6 +11,7 @@ import ibis.expr.operations as ops import ibis.expr.schema as sch import ibis.expr.types as ir +from ibis import util from ibis.backends.base import BaseBackend from ibis.backends.pandas.client import ( PandasDatabase, @@ -104,6 +105,9 @@ def table(self, name: str, schema: sch.Schema = None): def database(self, name=None): return self.database_class(name, self) + @util.deprecated( + as_of="5.0", removed_in="6.0", instead="Use create_table(overwrite=True)" + ) def load_data(self, table_name, obj, **kwargs): # kwargs is a catch all for any options required by other backends. self.dictionary[table_name] = obj @@ -120,11 +124,28 @@ def compile(self, expr, *args, **kwargs): return expr def create_table( - self, table_name: str, obj=None, schema: sch.Schema | None = None + self, + name: str, + obj: pd.DataFrame | ir.Table | None = None, + *, + schema: sch.Schema | None = None, + database: str | None = None, + temp: bool | None = None, + overwrite: bool = False, ) -> ir.Table: """Create a table.""" + if temp: + com.IbisError( + "Passing `temp=True` to the Pandas backend create_table method has no " + "effect: all tables are in memory and temporary." + ) + if database: + com.IbisError( + "Passing `database` to the Pandas backend create_table method has no " + "effect: Pandas cannot set a database." + ) if obj is None and schema is None: - raise com.IbisError('Must pass expr or schema') + raise com.IbisError("The schema or obj parameter is required") if obj is not None: if not self._supports_conversion(obj): @@ -138,11 +159,36 @@ def create_table( dtypes = dict(pandas_schema) df = self._from_pandas(pd.DataFrame(columns=dtypes.keys()).astype(dtypes)) - self.dictionary[table_name] = df + if name in self.dictionary and not overwrite: + raise com.IbisError(f"Cannot overwrite existing table `{name}`") + + self.dictionary[name] = df if schema is not None: - self.schemas[table_name] = schema - return self.table(table_name) + self.schemas[name] = schema + return self.table(name) + + def create_view( + self, + name: str, + obj: ir.Table, + *, + database: str | None = None, + overwrite: bool = False, + ) -> ir.Table: + return self.create_table( + name, obj=obj, temp=None, database=database, overwrite=overwrite + ) + + def drop_view(self, name: str, *, force: bool = False) -> None: + self.drop_table(name, force=force) + + def drop_table(self, name: str, *, force: bool = False) -> None: + if not force and name in self.dictionary: + raise com.IbisError( + "Cannot drop existing table. Call drop_table with force=True to drop existing table." + ) + del self.dictionary[name] @classmethod def _supports_conversion(cls, obj: Any) -> bool: diff --git a/ibis/backends/pandas/tests/test_client.py b/ibis/backends/pandas/tests/test_client.py index 83f425d42495..bf40d2365272 100644 --- a/ibis/backends/pandas/tests/test_client.py +++ b/ibis/backends/pandas/tests/test_client.py @@ -45,7 +45,8 @@ def test_client_table_repr(table): def test_load_data(client, test_data): - client.load_data('testing', test_data) + with pytest.warns(FutureWarning): + client.load_data('testing', test_data) assert 'testing' in client.list_tables() assert client.get_schema('testing') diff --git a/ibis/backends/polars/__init__.py b/ibis/backends/polars/__init__.py index 381a36162aed..f18b00886874 100644 --- a/ibis/backends/polars/__init__.py +++ b/ibis/backends/polars/__init__.py @@ -15,7 +15,7 @@ import ibis.expr.types as ir from ibis.backends.base import BaseBackend from ibis.backends.polars.compiler import translate -from ibis.util import normalize_filename +from ibis.util import deprecated, normalize_filename if TYPE_CHECKING: import pandas as pd @@ -218,10 +218,53 @@ def read_parquet( def database(self, name=None): return self.database_class(name, self) + @deprecated( + as_of="5.0", removed_in="6.0", instead="Use create_table(overwrite=True)" + ) def load_data(self, table_name, obj, **kwargs): # kwargs is a catch all for any options required by other backends. self._tables[table_name] = obj + def create_table( + self, + name: str, + obj: pd.DataFrame | ir.Table | None = None, + *, + schema: ibis.Schema | None = None, + database: str | None = None, + temp: bool | None = None, + overwrite: bool = False, + ) -> ir.Table: + if schema is not None and obj is None: + raise NotImplementedError( + "Empty table creation is not yet supported in the Polars backend" + ) + + if database is not None: + raise com.IbisError( + "Passing `database` to the Polars backend create_table method has no " + "effect: Polars cannot set a database." + ) + + if temp is not None: + raise com.IbisError( + "Passing `temp=True` to the Polars backend create_table method has no " + "effect: all tables are in memory and temporary. " + ) + + if not overwrite and name in self._tables: + raise com.IntegrityError( + f"Table {name} already exists. Use overwrite=True to clobber existing tables" + ) + + if isinstance(obj, ir.Table): + obj = obj.to_pyarrow() + + if not isinstance(obj, (pl.DataFrame, pl.LazyFrame)): + obj = pl.LazyFrame(obj) + + self._tables[name] = obj + def get_schema(self, table_name, database=None): return self._tables[table_name].schema @@ -352,7 +395,16 @@ def to_pyarrow_batches( return table.to_reader(chunk_size) def _load_into_cache(self, name, expr): - self.load_data(name, self.compile(expr).cache()) + self.create_table(name, self.compile(expr).cache()) def _clean_up_cached_table(self, op): del self._tables[op.name] + + def create_view(self, *_, **__) -> ir.Table: + raise NotImplementedError(self.name) + + def drop_table(self, *_, **__) -> ir.Table: + raise NotImplementedError(self.name) + + def drop_view(self, *_, **__) -> ir.Table: + raise NotImplementedError(self.name) diff --git a/ibis/backends/postgres/tests/test_client.py b/ibis/backends/postgres/tests/test_client.py index 98ff74cb084f..c1afd12a7b34 100644 --- a/ibis/backends/postgres/tests/test_client.py +++ b/ibis/backends/postgres/tests/test_client.py @@ -24,7 +24,6 @@ import ibis.expr.datatypes as dt import ibis.expr.types as ir from ibis.tests.util import assert_equal -from ibis.util import guid pytest.importorskip("psycopg2") sa = pytest.importorskip("sqlalchemy") @@ -82,28 +81,6 @@ def test_list_databases(con): assert POSTGRES_TEST_DB in con.list_databases() -def test_metadata_is_per_table(): - con = ibis.postgres.connect( - host=IBIS_POSTGRES_HOST, - database=POSTGRES_TEST_DB, - user=IBIS_POSTGRES_USER, - password=IBIS_POSTGRES_PASS, - port=IBIS_POSTGRES_PORT, - ) - - name = f"tmp_{guid()}" - with con.begin() as c: - c.exec_driver_sql(f"CREATE TABLE {name} (x BIGINT)") - - try: - assert name not in con.meta.tables - con.table(name) - assert name in con.meta.tables - finally: - with con.begin() as c: - c.exec_driver_sql(f"DROP TABLE {name}") - - def test_schema_type_conversion(): typespec = [ # name, type, nullable diff --git a/ibis/backends/postgres/tests/test_functions.py b/ibis/backends/postgres/tests/test_functions.py index 84114b6538b1..138c7c2966b8 100644 --- a/ibis/backends/postgres/tests/test_functions.py +++ b/ibis/backends/postgres/tests/test_functions.py @@ -1088,7 +1088,7 @@ def test_anti_join(t, s): def test_create_table_from_expr(con, trunc, guid2): - con.create_table(guid2, expr=trunc) + con.create_table(guid2, obj=trunc) t = con.table(guid2) assert list(t['name'].execute()) == list('abc') diff --git a/ibis/backends/pyspark/__init__.py b/ibis/backends/pyspark/__init__.py index 91f729f54e40..24e2210d2909 100644 --- a/ibis/backends/pyspark/__init__.py +++ b/ibis/backends/pyspark/__init__.py @@ -333,31 +333,33 @@ def get_schema( def create_table( self, - table_name: str, + name: str, obj: ir.Table | pd.DataFrame | None = None, + *, schema: sch.Schema | None = None, database: str | None = None, - force: bool = False, - # HDFS options - format: str = 'parquet', + temp: bool | None = None, + overwrite: bool = False, + format: str = "parquet", ) -> ir.Table: """Create a new table in Spark. Parameters ---------- - table_name + name Name of the new table. obj - If passed, creates table from select statement results + If passed, creates table from `SELECT` statement results schema - Mutually exclusive with obj, creates an empty table with a - schema + Mutually exclusive with `obj`, creates an empty table with a schema database Database name - force - If true, create table if table with indicated name already exists + temp + Whether the new table is temporary + overwrite + If `True`, overwrite existing data format - Table format + Format of the table on disk Returns ------- @@ -370,13 +372,17 @@ def create_table( """ import pandas as pd + if obj is None and schema is None: + raise com.IbisError("The schema or obj parameter is required") + if temp is True: + raise NotImplementedError( + "PySpark backend does not yet support temporary tables" + ) if obj is not None: if isinstance(obj, pd.DataFrame): spark_df = self._session.createDataFrame(obj) - mode = 'error' - if force: - mode = 'overwrite' - spark_df.write.saveAsTable(table_name, format=format, mode=mode) + mode = "overwrite" if overwrite else "error" + spark_df.write.saveAsTable(name, format=format, mode=mode) return None else: self._register_in_memory_tables(obj) @@ -385,25 +391,23 @@ def create_table( select = ast.queries[0] statement = ddl.CTAS( - table_name, + name, select, database=database, - can_exist=force, + can_exist=overwrite, format=format, ) - elif schema is not None: + else: statement = ddl.CreateTableWithSchema( - table_name, + name, schema, database=database, format=format, - can_exist=force, + can_exist=overwrite, ) - else: - raise com.IbisError('Must pass expr or schema') self.raw_sql(statement.compile()) - return self.table(table_name, database=database) + return self.table(name, database=database) def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: self.compile(op.to_expr()).createOrReplaceTempView(op.name) @@ -411,10 +415,10 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: def create_view( self, name: str, - expr: ir.Table, + obj: ir.Table, + *, database: str | None = None, - can_exist: bool = False, - temporary: bool = False, + overwrite: bool = False, ) -> ir.Table: """Create a Spark view from a table expression. @@ -422,28 +426,22 @@ def create_view( ---------- name View name - expr + obj Expression to use for the view database Database name - can_exist + overwrite Replace an existing view of the same name if it exists - temporary - Whether the table is temporary Returns ------- Table The created view """ - ast = self.compiler.to_ast(expr) + ast = self.compiler.to_ast(obj) select = ast.queries[0] statement = ddl.CreateView( - name, - select, - database=database, - can_exist=can_exist, - temporary=temporary, + name, select, database=database, can_exist=overwrite, temporary=True ) self.raw_sql(statement.compile()) return self.table(name, database=database) @@ -451,24 +449,27 @@ def create_view( def drop_table( self, name: str, + *, database: str | None = None, force: bool = False, ) -> None: """Drop a table.""" - self.drop_table_or_view(name, database, force) + self.drop_table_or_view(name, database=database, force=force) def drop_view( self, name: str, + *, database: str | None = None, force: bool = False, ): """Drop a view.""" - self.drop_table_or_view(name, database, force) + self.drop_table_or_view(name, database=database, force=force) def drop_table_or_view( self, name: str, + *, database: str | None = None, force: bool = False, ) -> None: @@ -492,21 +493,17 @@ def drop_table_or_view( statement = DropTable(name, database=database, must_exist=not force) self.raw_sql(statement.compile()) - def truncate_table( - self, - table_name: str, - database: str | None = None, - ) -> None: + def truncate_table(self, name: str, database: str | None = None) -> None: """Delete all rows from an existing table. Parameters ---------- - table_name + name Table name database Database name """ - statement = TruncateTable(table_name, database=database) + statement = TruncateTable(name, database=database) self.raw_sql(statement.compile()) def insert( diff --git a/ibis/backends/pyspark/tests/test_ddl.py b/ibis/backends/pyspark/tests/test_ddl.py index 5598bbda5547..8d9804fb6f72 100644 --- a/ibis/backends/pyspark/tests/test_ddl.py +++ b/ibis/backends/pyspark/tests/test_ddl.py @@ -16,7 +16,7 @@ def test_create_exists_view(client, alltypes, temp_view): assert tmp_name not in client.list_tables() t1 = alltypes.group_by('string_col').size() - t2 = client.create_view(tmp_name, t1, temporary=True) + t2 = client.create_view(tmp_name, t1) assert tmp_name in client.list_tables() # just check it works for now @@ -169,7 +169,7 @@ def test_compute_stats(client, alltypes): def created_view(client, alltypes): name = util.guid() expr = alltypes.limit(10) - client.create_view(name, expr, temporary=True) + client.create_view(name, expr) return name diff --git a/ibis/backends/snowflake/__init__.py b/ibis/backends/snowflake/__init__.py index dc30f6632ab9..63fbdd3311b8 100644 --- a/ibis/backends/snowflake/__init__.py +++ b/ibis/backends/snowflake/__init__.py @@ -75,12 +75,11 @@ class SnowflakeExprTranslator(AlchemyExprTranslator): class SnowflakeTableSetFormatter(_AlchemyTableSetFormatter): - def _format_in_memory_table(self, op, ref_op, translator): + def _format_in_memory_table(self, _, ref_op, translator): columns = translator._schema_to_sqlalchemy_columns(ref_op.schema) rows = list(ref_op.data.to_frame().itertuples(index=False)) pos_columns = [ - sa.column(f"${idx}") - for idx, name in enumerate(ref_op.schema.names, start=1) + sa.column(f"${idx}") for idx in range(1, len(ref_op.schema.names) + 1) ] return sa.select(*pos_columns).select_from(sa.values(*columns).data(rows)) @@ -135,6 +134,11 @@ class Backend(BaseAlchemyBackend): compiler = SnowflakeCompiler quote_table_names = True + @property + def _current_schema(self) -> str: + with self.begin() as con: + return con.execute(sa.select(sa.func.current_schema())).scalar() + def _convert_kwargs(self, kwargs): with contextlib.suppress(KeyError): kwargs["account"] = kwargs.pop("host") @@ -223,9 +227,8 @@ def _get_sqla_table( name, schema=schema, autoload=autoload, database=db, **kwargs ) - path = ".".join(self.con.url.database.split("/", 1)) with self.begin() as con: - con.exec_driver_sql(f"USE {path}") + con.exec_driver_sql(f"USE {default_db}.{default_schema}") result.schema = ident return result diff --git a/ibis/backends/sqlite/__init__.py b/ibis/backends/sqlite/__init__.py index ce26985c769b..1554328d1075 100644 --- a/ibis/backends/sqlite/__init__.py +++ b/ibis/backends/sqlite/__init__.py @@ -161,11 +161,6 @@ def connect(dbapi_connection, connection_record): super().do_connect(engine) - @sa.event.listens_for(self.meta, "column_reflect") - def column_reflect(inspector, table, column_info): - if type(column_info["type"]) is TIMESTAMP: - column_info["type"] = ISODATETIME() - def attach(self, name: str, path: str | Path) -> None: """Connect another SQLite database file to the current connection. @@ -189,14 +184,21 @@ def attach(self, name: str, path: str | Path) -> None: def _get_sqla_table( self, name: str, schema: str | None = None, autoload: bool = True, **_: Any ) -> sa.Table: + meta = sa.MetaData() + + @sa.event.listens_for(meta, "column_reflect") + def column_reflect(inspector, table, column_info): + if type(column_info["type"]) is TIMESTAMP: + column_info["type"] = ISODATETIME() + return sa.Table( name, - self.meta, + meta, schema=schema or self.current_database, autoload_with=self.con if autoload else None, ) - def table(self, name: str, database: str | None = None) -> ir.Table: + def table(self, name: str, database: str | None = None, **_: Any) -> ir.Table: """Create a table expression from a table in the SQLite database. Parameters @@ -222,7 +224,9 @@ def _table_from_schema( if temp: prefixes.append('TEMPORARY') columns = self._columns_from_schema(name, schema) - return sa.Table(name, self.meta, *columns, schema=database, prefixes=prefixes) + return sa.Table( + name, sa.MetaData(), *columns, schema=database, prefixes=prefixes + ) @property def _current_schema(self) -> str | None: diff --git a/ibis/backends/sqlite/tests/test_functions.py b/ibis/backends/sqlite/tests/test_functions.py index ee05aa5bb540..3b74f62295e0 100644 --- a/ibis/backends/sqlite/tests/test_functions.py +++ b/ibis/backends/sqlite/tests/test_functions.py @@ -576,7 +576,7 @@ def mj1(con): con.create_table( "mj1", schema=ibis.schema(dict(id1="int32", val1="float64")), - force=True, + overwrite=True, ) try: con.insert( @@ -592,22 +592,16 @@ def mj1(con): @pytest.fixture def mj2(con): con.create_table( - "mj2", - schema=ibis.schema(dict(id2="int32", val2="float64")), - force=True, + "mj2", schema=ibis.schema(dict(id2="int32", val2="float64")), overwrite=True ) try: - con.insert( - "mj2", - pd.DataFrame(dict(id2=[1, 2], val2=[15, 25])), - overwrite=True, - ) + con.insert("mj2", pd.DataFrame(dict(id2=[1, 2], val2=[15, 25])), overwrite=True) yield con.table("mj2") finally: con.drop_table("mj2", force=True) -def test_simple_join(con, mj1, mj2): +def test_simple_join(mj1, mj2): joined = mj1.join(mj2, mj1.id1 == mj2.id2) result = joined.val2.execute() assert len(result) == 2 diff --git a/ibis/backends/tests/test_client.py b/ibis/backends/tests/test_client.py index ddf31e503e7f..6df3fd122d19 100644 --- a/ibis/backends/tests/test_client.py +++ b/ibis/backends/tests/test_client.py @@ -26,23 +26,23 @@ def new_schema(): def _create_temp_table_with_schema(con, temp_table_name, schema, data=None): - con.drop_table(temp_table_name, force=True) temporary = con.create_table(temp_table_name, schema=schema) assert temporary.to_pandas().empty if data is not None and isinstance(data, pd.DataFrame): - con.load_data(temp_table_name, data, if_exists="append") - result = temporary.to_pandas() + assert not data.empty + tmp = con.create_table(temp_table_name, data, overwrite=True) + result = tmp.to_pandas() assert len(result) == len(data.index) tm.assert_frame_equal( result.sort_values(result.columns[0]).reset_index(drop=True), data.sort_values(result.columns[0]).reset_index(drop=True), ) + return tmp return temporary -@pytest.mark.notimpl(["snowflake"]) def test_load_data_sqlalchemy(alchemy_backend, alchemy_con, alchemy_temp_table): sch = ibis.schema( [ @@ -61,8 +61,7 @@ def test_load_data_sqlalchemy(alchemy_backend, alchemy_con, alchemy_temp_table): 'salary': [100.0, 200.0, 300.0], } ) - alchemy_con.create_table(alchemy_temp_table, schema=sch) - alchemy_con.load_data(alchemy_temp_table, df, if_exists='append') + alchemy_con.create_table(alchemy_temp_table, df, schema=sch, overwrite=True) result = ( alchemy_con.table(alchemy_temp_table) .execute() @@ -215,25 +214,17 @@ def test_nullable_input_output(con, temp_table): assert t.schema().types[2].nullable -@mark.notimpl( - [ - "clickhouse", - "datafusion", - "druid", - "mysql", - "postgres", - "sqlite", - "snowflake", - "polars", - "mssql", - "trino", - ] -) -@mark.notyet(["pyspark"]) +@mark.notimpl(["datafusion", "druid", "polars"]) def test_create_drop_view(ddl_con, temp_view): # setup table_name = 'functional_alltypes' - expr = ddl_con.table(table_name).limit(1) + try: + expr = ddl_con.table(table_name) + except KeyError: + table_name = table_name.upper() + expr = ddl_con.table(table_name) + + expr = expr.limit(1) # create a new view ddl_con.create_view(temp_view, expr) @@ -246,7 +237,7 @@ def test_create_drop_view(ddl_con, temp_view): assert set(t_expr.schema().names) == set(v_expr.schema().names) -@mark.notimpl(["postgres", "mysql", "clickhouse", "datafusion", "polars"]) +@mark.notimpl(["postgres", "mysql", "datafusion", "polars"]) def test_separate_database(ddl_con, alternate_current_database, current_data_db): # using alternate_current_database switches "con" current # database to a temporary one until a test is over @@ -257,14 +248,8 @@ def test_separate_database(ddl_con, alternate_current_database, current_data_db) assert tmp_db.name == alternate_current_database -def _skip_snowflake(con, reason="snowflake can't drop tables"): - if con.name == "snowflake": - pytest.skip(reason) - - @pytest.fixture def employee_empty_temp_table(alchemy_con, test_employee_schema): - _skip_snowflake(alchemy_con) temp_table_name = f"temp_to_table_{guid()[:6]}" _create_temp_table_with_schema( alchemy_con, @@ -283,7 +268,6 @@ def employee_data_1_temp_table( test_employee_schema, test_employee_data_1, ): - _skip_snowflake(alchemy_con) temp_table_name = f"temp_to_table_{guid()[:6]}" _create_temp_table_with_schema( alchemy_con, @@ -291,6 +275,7 @@ def employee_data_1_temp_table( test_employee_schema, data=test_employee_data_1, ) + assert temp_table_name in alchemy_con.list_tables() try: yield temp_table_name finally: @@ -303,7 +288,6 @@ def employee_data_2_temp_table( test_employee_schema, test_employee_data_2, ): - _skip_snowflake(alchemy_con) temp_table_name = f"temp_to_table_{guid()[:6]}" _create_temp_table_with_schema( alchemy_con, @@ -336,6 +320,11 @@ def test_insert_no_overwrite_from_dataframe( ) +@pytest.mark.notyet( + ["trino"], + reason="Connector doesn't support deletion (required for overwrite=True)", + raises=sa.exc.ProgrammingError, +) def test_insert_overwrite_from_dataframe( alchemy_con, employee_data_1_temp_table, @@ -488,7 +477,7 @@ def test_unsigned_integer_type(alchemy_con): alchemy_con.create_table( tname, schema=ibis.schema(dict(a="uint8", b="uint16", c="uint32", d="uint64")), - force=True, + overwrite=True, ) try: assert tname in alchemy_con.list_tables() @@ -1077,7 +1066,7 @@ def test_create_table_timestamp(con): dict(zip(string.ascii_letters, map("timestamp({:d})".format, range(10)))) ) name = f"timestamp_scale_{guid()}" - con.create_table(name, schema=schema, force=True) + con.create_table(name, schema=schema, overwrite=True) try: rows = con.raw_sql(f"DESCRIBE {name}").fetchall() result = ibis.schema((name, typ) for name, typ, *_ in rows) diff --git a/ibis/backends/tests/test_map.py b/ibis/backends/tests/test_map.py index 5bdaf27e6e4d..931b93d9133e 100644 --- a/ibis/backends/tests/test_map.py +++ b/ibis/backends/tests/test_map.py @@ -210,7 +210,7 @@ def tmptable(con): # some backends don't implement drop with contextlib.suppress(NotImplementedError): - con.drop_table(name) + con.drop_table(name, force=True) @pytest.mark.notimpl(["clickhouse"], reason=".create_table not yet implemented in ibis") diff --git a/ibis/tests/expr/mocks.py b/ibis/tests/expr/mocks.py index 443d5347c743..000a66cfca9e 100644 --- a/ibis/tests/expr/mocks.py +++ b/ibis/tests/expr/mocks.py @@ -412,6 +412,18 @@ def compile( queries = [q.compile() for q in ast.queries] return queries[0] if len(queries) == 1 else queries + def create_table(self, *_, **__) -> ir.Table: + raise NotImplementedError(self.name) + + def drop_table(self, *_, **__) -> ir.Table: + raise NotImplementedError(self.name) + + def create_view(self, *_, **__) -> ir.Table: + raise NotImplementedError(self.name) + + def drop_view(self, *_, **__) -> ir.Table: + raise NotImplementedError(self.name) + def table_from_schema(name, meta, schema, *, database: str | None = None): # Convert Ibis schema to SQLA table @@ -431,8 +443,8 @@ class MockAlchemyBackend(MockBackend): def __init__(self): super().__init__() - sa = pytest.importorskip('sqlalchemy') - self.meta = sa.MetaData() + pytest.importorskip('sqlalchemy') + self.tables = {} def table(self, name, database=None): schema = self.get_schema(name) @@ -440,9 +452,11 @@ def table(self, name, database=None): def _inject_table(self, name, schema): try: - alchemy_table = self.meta.tables[name] + alchemy_table = self.tables[name] except KeyError: - alchemy_table = table_from_schema(name, self.meta, schema) + alchemy_table = self.tables[name] = table_from_schema( + name, sa.MetaData(), schema + ) return self.table_class( source=self, sqla_table=alchemy_table, schema=schema diff --git a/ibis/tests/sql/test_sqlalchemy.py b/ibis/tests/sql/test_sqlalchemy.py index f26fb4cc160d..c2647aeedf61 100644 --- a/ibis/tests/sql/test_sqlalchemy.py +++ b/ibis/tests/sql/test_sqlalchemy.py @@ -65,7 +65,7 @@ def alltypes(con): return con.table('alltypes') -def test_sqla_schema_conversion(con): +def test_sqla_schema_conversion(): typespec = [ # name, type, nullable ('smallint', sat.SmallInteger, False, dt.int16), @@ -83,7 +83,7 @@ def test_sqla_schema_conversion(con): sqla_types.append(sa.Column(name, t, nullable=nullable)) ibis_types.append((name, ibis_type(nullable=nullable))) - table = sa.Table('tname', con.meta, *sqla_types) + table = sa.Table('tname', sa.MetaData(), *sqla_types) schema = schema_from_table(table) expected = ibis.schema(ibis_types) diff --git a/poetry.lock b/poetry.lock index 0957a2e0957c..9c091e205885 100644 --- a/poetry.lock +++ b/poetry.lock @@ -5574,18 +5574,18 @@ druid = ["pydruid", "sqlalchemy"] duckdb = ["duckdb", "duckdb-engine", "packaging", "pyarrow", "sqlalchemy", "sqlalchemy-views"] geospatial = ["GeoAlchemy2", "geopandas", "shapely"] impala = ["fsspec", "impyla", "requests", "sqlalchemy"] -mssql = ["sqlalchemy", "pymssql"] -mysql = ["sqlalchemy", "pymysql"] +mssql = ["sqlalchemy", "pymssql", "sqlalchemy-views"] +mysql = ["sqlalchemy", "pymysql", "sqlalchemy-views"] pandas = ["regex"] polars = ["polars", "pyarrow"] -postgres = ["psycopg2", "sqlalchemy"] +postgres = ["psycopg2", "sqlalchemy", "sqlalchemy-views"] pyspark = ["pyarrow", "pyspark", "sqlalchemy"] -snowflake = ["snowflake-connector-python", "snowflake-sqlalchemy"] -sqlite = ["regex", "sqlalchemy"] -trino = ["trino", "sqlalchemy"] +snowflake = ["snowflake-connector-python", "snowflake-sqlalchemy", "sqlalchemy-views"] +sqlite = ["regex", "sqlalchemy", "sqlalchemy-views"] +trino = ["trino", "sqlalchemy", "sqlalchemy-views"] visualization = ["graphviz"] [metadata] lock-version = "2.0" python-versions = "^3.8" -content-hash = "f113527d242423168c146d59aca1fdc1084b171cb11c3ee66bf8c5ba5c62cae8" +content-hash = "6eaf9b594415cc03edd48afa3c88edfa7b3be36213320c801d87c88233bc19f7" diff --git a/pyproject.toml b/pyproject.toml index c5a4c9821c5b..3a93123a32d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -187,15 +187,19 @@ duckdb = [ ] geospatial = ["geoalchemy2", "geopandas", "shapely"] impala = ["fsspec", "impyla", "requests", "sqlalchemy"] -mssql = ["sqlalchemy", "pymssql"] -mysql = ["sqlalchemy", "pymysql"] +mssql = ["sqlalchemy", "pymssql", "sqlalchemy-views"] +mysql = ["sqlalchemy", "pymysql", "sqlalchemy-views"] pandas = ["regex"] polars = ["polars", "pyarrow"] -postgres = ["psycopg2", "sqlalchemy"] -pyspark = ["pyarrow", "pyspark", "sqlalchemy"] # for make_url -snowflake = ["snowflake-connector-python", "snowflake-sqlalchemy"] -sqlite = ["regex", "sqlalchemy"] -trino = ["trino", "sqlalchemy"] +postgres = ["psycopg2", "sqlalchemy", "sqlalchemy-views"] +pyspark = ["pyarrow", "pyspark", "sqlalchemy"] +snowflake = [ + "snowflake-connector-python", + "snowflake-sqlalchemy", + "sqlalchemy-views" +] +sqlite = ["regex", "sqlalchemy", "sqlalchemy-views"] +trino = ["trino", "sqlalchemy", "sqlalchemy-views"] # non-backend extras visualization = ["graphviz"] decompiler = ["black"]