Skip to content

Commit

Permalink
refactor(table ddl): remove hierarchical schema from *_table methods
Browse files Browse the repository at this point in the history
The `schema` kwarg here was introduced during the-epic-split, so not deprecating.
  • Loading branch information
gforsyth committed Mar 25, 2024
1 parent 21f57d4 commit 14b0944
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 66 deletions.
69 changes: 37 additions & 32 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ def _read_file(

table_ref = self._session_dataset.table(table_name)

schema = self._session_dataset.dataset_id
database = self._session_dataset.project
database = self._session_dataset.dataset_id
catalog = self._session_dataset.project

# drop the table if it exists
#
Expand All @@ -188,7 +188,7 @@ def _read_file(
#
# dropping the table first means all write_dispositions can be
# WRITE_APPEND
self.drop_table(table_name, schema=schema, database=database, force=True)
self.drop_table(table_name, database=(catalog, database), force=True)

if os.path.isdir(path):
raise NotImplementedError("Reading from a directory is not supported.")
Expand All @@ -214,7 +214,7 @@ def load(file: str) -> None:
):
fut.result()

return self.table(table_name, schema=schema, database=database)
return self.table(table_name, database=(catalog, database))

def read_parquet(
self, path: str | Path, table_name: str | None = None, **kwargs: Any
Expand Down Expand Up @@ -517,31 +517,7 @@ def drop_database(
def table(
self, name: str, database: str | None = None, schema: str | None = None
) -> ir.Table:
if schema is not None:
# TODO: remove _warn_schema when the schema kwarg is removed
from ibis.util import _warn_schema

_warn_schema()
if database is not None and schema is not None:
if isinstance(database, str):
table_loc = f"{database}.{schema}"
elif isinstance(database, tuple):
table_loc = database + schema
elif schema is not None:
table_loc = schema
elif database is not None:
table_loc = database
else:
table_loc = None

table_loc = self._to_sqlglot_table(table_loc)

if table_loc is not None:
if (sg_cat := table_loc.args["catalog"]) is not None:
sg_cat.args["quoted"] = False
if (sg_db := table_loc.args["db"]) is not None:
sg_db.args["quoted"] = False
table_loc = table_loc.sql(dialect=self.name)
table_loc = self._warn_and_create_table_loc(database, schema)

project, dataset = self._parse_project_and_dataset(table_loc)

Expand Down Expand Up @@ -1053,15 +1029,17 @@ def drop_table(
name: str,
*,
schema: str | None = None,
database: str | None = None,
database: tuple[str | str] | str | None = None,
force: bool = False,
) -> None:
table_loc = self._warn_and_create_table_loc(database, schema)
catalog, db = self._to_catalog_db_tuple(table_loc)
stmt = sge.Drop(
kind="TABLE",
this=sg.table(
name,
db=schema or self.current_database,
catalog=database or self.billing_project,
db=db or self.current_database,
catalog=catalog or self.billing_project,
),
exists=force,
)
Expand Down Expand Up @@ -1188,6 +1166,33 @@ def _register_udfs(self, expr: ir.Expr) -> None:
def _safe_raw_sql(self, *args, **kwargs):
yield self.raw_sql(*args, **kwargs)

# TODO: remove when the schema kwarg is removed
def _warn_and_create_table_loc(self, database=None, schema=None):
if schema is not None:
self._warn_schema()
if database is not None and schema is not None:
if isinstance(database, str):
table_loc = f"{database}.{schema}"
elif isinstance(database, tuple):
table_loc = database + schema
elif schema is not None:
table_loc = schema
elif database is not None:
table_loc = database
else:
table_loc = None

table_loc = self._to_sqlglot_table(table_loc)

if table_loc is not None:
if (sg_cat := table_loc.args["catalog"]) is not None:
sg_cat.args["quoted"] = False
if (sg_db := table_loc.args["db"]) is not None:
sg_db.args["quoted"] = False
table_loc = table_loc.sql(dialect=self.name)

return table_loc


def compile(expr, params=None, **kwargs):
"""Compile an expression for BigQuery."""
Expand Down
20 changes: 10 additions & 10 deletions ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,7 @@ def create_table(
else:
temp_name = name

initial_table = sg.table(
temp_name, catalog=database, quoted=self.compiler.quoted
)
initial_table = sg.table(temp_name, db=database, quoted=self.compiler.quoted)
target = sge.Schema(this=initial_table, expressions=column_defs)

create_stmt = sge.Create(
Expand All @@ -414,7 +412,7 @@ def create_table(
)

# This is the same table as initial_table unless overwrite == True
final_table = sg.table(name, catalog=database, quoted=self.compiler.quoted)
final_table = sg.table(name, db=database, quoted=self.compiler.quoted)
with self._safe_raw_sql(create_stmt) as cur:
if query is not None:
insert_stmt = sge.Insert(this=initial_table, expression=query).sql(
Expand All @@ -424,14 +422,14 @@ def create_table(

if overwrite:
self.drop_table(
final_table.name, final_table.catalog, final_table.db, force=True
name=final_table.name, database=final_table.db, force=True
)
cur.execute(
f"ALTER TABLE IF EXISTS {initial_table.sql(self.name)} RENAME TO {final_table.sql(self.name)}"
)

if schema is None:
return self.table(name, schema=database)
return self.table(name, database=database)

# preserve the input schema if it was provided
return ops.DatabaseTable(
Expand All @@ -441,11 +439,13 @@ def create_table(
def drop_table(
self,
name: str,
database: str | None = None,
schema: str | None = None,
database: tuple[str, str] | str | None = None,
force: bool = False,
) -> None:
table = sg.table(name, db=schema, catalog=database, quoted=self.compiler.quoted)
table_loc = self._to_sqlglot_table(database or None)
catalog, db = self._to_catalog_db_tuple(table_loc)

table = sg.table(name, db=db, catalog=catalog, quoted=self.compiler.quoted)

with self.begin() as bind:
# global temporary tables cannot be dropped without first truncating them
Expand All @@ -457,7 +457,7 @@ def drop_table(
with contextlib.suppress(oracledb.DatabaseError):
bind.execute(f"TRUNCATE TABLE {table.sql(self.name)}")

super().drop_table(name, database=database, schema=schema, force=force)
super().drop_table(name, database=(catalog, db), force=force)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
Expand Down
20 changes: 2 additions & 18 deletions ibis/backends/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,13 +678,6 @@ def create_table(
if obj is None and schema is None:
raise ValueError("Either `obj` or `schema` must be specified")

if database is not None and database != self.current_catalog:
raise com.UnsupportedOperationError(
f"Creating tables in other databases is not supported by {self.name}"
)
else:
database = None

properties = []

if temp:
Expand Down Expand Up @@ -720,7 +713,7 @@ def create_table(
else:
temp_name = name

table = sg.table(temp_name, catalog=database, quoted=self.compiler.quoted)
table = sg.table(temp_name, db=database, quoted=self.compiler.quoted)
target = sge.Schema(this=table, expressions=column_defs)

create_stmt = sge.Create(
Expand Down Expand Up @@ -755,20 +748,11 @@ def drop_table(
self,
name: str,
database: str | None = None,
schema: str | None = None,
force: bool = False,
) -> None:
if database is not None and database != self.current_catalog:
raise com.UnsupportedOperationError(
f"Dropping tables in other databases is not supported by {self.name}"
)
else:
database = None
drop_stmt = sg.exp.Drop(
kind="TABLE",
this=sg.table(
name, db=schema, catalog=database, quoted=self.compiler.quoted
),
this=sg.table(name, db=database, quoted=self.compiler.quoted),
exists=force,
)
with self._safe_raw_sql(drop_stmt):
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/postgres/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def test_unsupported_intervals(con):
assert t["g"].type() == dt.Interval("M")


@pytest.mark.parametrize("params", [{}, {"database": POSTGRES_TEST_DB}])
@pytest.mark.parametrize("params", [{}, {"database": "public"}])
def test_create_and_drop_table(con, temp_table, params):
sch = ibis.schema(
[
Expand Down
10 changes: 5 additions & 5 deletions ibis/backends/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,15 +303,15 @@ def execute(
def drop_table(
self,
name: str,
database: str | None = None,
schema: str | None = None,
database: tuple[str, str] | str | None = None,
force: bool = False,
) -> None:
table_loc = self._warn_and_create_table_loc(database, None)
catalog, db = self._to_catalog_db_tuple(table_loc)

drop_stmt = sg.exp.Drop(
kind="TABLE",
this=sg.table(
name, db=schema, catalog=database, quoted=self.compiler.quoted
),
this=sg.table(name, db=db, catalog=catalog, quoted=self.compiler.quoted),
exists=force,
)
with self._safe_raw_sql(drop_stmt):
Expand Down

0 comments on commit 14b0944

Please sign in to comment.