Skip to content

Commit

Permalink
refactor(table-api): unify exception type for all backends to `TableN…
Browse files Browse the repository at this point in the history
…otFound` when a table does not exist (#9695)

Co-authored-by: Phillip Cloud <417981+cpcloud@users.noreply.github.com>
Co-authored-by: Gil Forsyth <gil@forsyth.dev>
  • Loading branch information
3 people authored Sep 16, 2024
1 parent 44c4de8 commit 0c49e3b
Show file tree
Hide file tree
Showing 24 changed files with 160 additions and 67 deletions.
24 changes: 16 additions & 8 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ def _in_memory_table_exists(self, name: str) -> bool:
table_ref = bq.TableReference(self._session_dataset, name)

try:
self.client.get_table(table_ref)
except google.api_core.exceptions.NotFound:
self._get_table(table_ref)
except com.TableNotFound:
return False
else:
return True
Expand Down Expand Up @@ -619,12 +619,11 @@ def table(

project, dataset = self._parse_project_and_dataset(database)

bq_table = self.client.get_table(
bq.TableReference(
bq.DatasetReference(project=project, dataset_id=dataset),
table.name,
)
table_ref = bq.TableReference(
bq.DatasetReference(project=project, dataset_id=dataset),
table.name,
)
bq_table = self._get_table(table_ref)

node = ops.DatabaseTable(
table.name,
Expand All @@ -636,6 +635,12 @@ def table(
table_expr = node.to_expr()
return rename_partitioned_column(table_expr, bq_table, self.partition_column)

def _get_table(self, table_ref: bq.TableReference):
try:
return self.client.get_table(table_ref)
except google.api_core.exceptions.NotFound as e:
raise com.TableNotFound(str(table_ref)) from e

def _make_session(self) -> tuple[str, str]:
if (client := getattr(self, "client", None)) is not None:
job_config = bq.QueryJobConfig(use_query_cache=False)
Expand Down Expand Up @@ -867,8 +872,11 @@ def get_schema(
),
name,
)

table = self._get_table(table_ref)

return schema_from_bigquery_table(
self.client.get_table(table_ref),
table,
# https://cloud.google.com/bigquery/docs/querying-wildcard-tables#filtering_selected_tables_using_table_suffix
wildcard=name[-1] == "*",
)
Expand Down
11 changes: 9 additions & 2 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import ast
import contextlib
import glob
import re
from contextlib import closing
from functools import partial
from typing import TYPE_CHECKING, Any, Literal
Expand All @@ -14,6 +15,7 @@
import sqlglot as sg
import sqlglot.expressions as sge
import toolz
from clickhouse_connect.driver.exceptions import DatabaseError
from clickhouse_connect.driver.external import ExternalData

import ibis
Expand Down Expand Up @@ -510,8 +512,13 @@ def get_schema(
"`catalog` namespaces are not supported by clickhouse"
)
query = sge.Describe(this=sg.table(table_name, db=database))
with self._safe_raw_sql(query) as results:
names, types, *_ = results.result_columns
try:
with self._safe_raw_sql(query) as results:
names, types, *_ = results.result_columns
except DatabaseError as e:
if re.search(r"\bUNKNOWN_TABLE\b", str(e)):
raise com.TableNotFound(table_name) from e

return sch.Schema(
dict(zip(names, map(self.compiler.type_mapper.from_string, types)))
)
Expand Down
3 changes: 3 additions & 0 deletions ibis/backends/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ def get_schema(
else:
database = catalog.database()

if table_name not in database.names():
raise com.TableNotFound(table_name)

table = database.table(table_name)
return sch.schema(table.schema)

Expand Down
28 changes: 27 additions & 1 deletion ibis/backends/druid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@

import pydruid.db
import sqlglot as sg
import sqlglot.expressions as sge

import ibis.backends.sql.compilers as sc
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
import ibis.expr.schema as sch
from ibis import util
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers.base import STAR
from ibis.backends.sql.datatypes import DruidType
from ibis.backends.tests.errors import PyDruidProgrammingError

if TYPE_CHECKING:
from collections.abc import Iterable, Mapping
Expand Down Expand Up @@ -147,18 +150,41 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
schema[name] = dtype
return sch.Schema(schema)

def _table_exists(self, name: str):
quoted = self.compiler.quoted
t = sg.table("TABLES", db="INFORMATION_SCHEMA", quoted=quoted)
table_name = sg.column("TABLE_NAME", quoted=quoted)
query = (
sg.select(table_name)
.from_(t)
.where(table_name.eq(sge.convert(name)))
.sql(self.dialect)
)

with self._safe_raw_sql(query) as result:
tables = result.fetchall()
return bool(tables)

def get_schema(
self,
table_name: str,
*,
catalog: str | None = None,
database: str | None = None,
) -> sch.Schema:
return self._get_schema_using_query(
query = (
sg.select(STAR)
.from_(sg.table(table_name, db=database, catalog=catalog))
.sql(self.dialect)
)
try:
schema = self._get_schema_using_query(query)
except PyDruidProgrammingError as e:
if not self._table_exists(table_name):
raise com.TableNotFound(table_name) from e
raise

return schema

def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
import pandas as pd
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def get_schema(
try:
result = self.con.sql(query)
except duckdb.CatalogException:
raise exc.IbisError(f"Table not found: {table_name!r}")
raise exc.TableNotFound(table_name)
else:
meta = result.fetch_arrow_table()

Expand Down
8 changes: 7 additions & 1 deletion ibis/backends/exasol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def get_schema(
catalog: str | None = None,
database: str | None = None,
) -> sch.Schema:
return self._get_schema_using_query(
query = (
sg.select(STAR)
.from_(
sg.table(
Expand All @@ -236,6 +236,12 @@ def get_schema(
)
.sql(self.dialect)
)
try:
return self._get_schema_using_query(query)
except pyexasol.exceptions.ExaQueryError as e:
if not self.con.meta.table_exists(table_name):
raise com.TableNotFound(table_name) from e
raise

def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
import pandas as pd
Expand Down
17 changes: 16 additions & 1 deletion ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import itertools
import re
from typing import TYPE_CHECKING, Any

import sqlglot as sg
Expand Down Expand Up @@ -302,7 +303,21 @@ def get_schema(
qualified_name = sg.table(table_name, db=catalog, catalog=database).sql(
self.name
)
table = self._table_env.from_path(qualified_name)
try:
table = self._table_env.from_path(qualified_name)
except Py4JJavaError as e:
# This seems too msg specific but not sure what a good work around is
#
# Flink doesn't have a way to check whether a table exists other
# than to all tables and check potentially every element in the list
if re.search(
"table .+ was not found",
str(e.java_exception.toString()),
flags=re.IGNORECASE,
):
raise exc.TableNotFound(table_name) from e
raise

pyflink_schema = table.get_schema()

return sch.Schema.from_pyarrow(
Expand Down
12 changes: 8 additions & 4 deletions ibis/backends/impala/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,14 +383,18 @@ def get_schema(
Ibis schema
"""
query = sge.Describe(
this=sg.table(
table_name, db=database, catalog=catalog, quoted=self.compiler.quoted
)
table = sg.table(
table_name, db=database, catalog=catalog, quoted=self.compiler.quoted
)

with contextlib.closing(self.con.cursor()) as cur:
if not cur.table_exists(table_name, database_name=database or catalog):
raise com.TableNotFound(table.sql(self.dialect))

query = sge.Describe(this=table)
with self._safe_raw_sql(query) as cur:
meta = fetchall(cur)

return sch.Schema.from_tuples(
zip(meta["name"], meta["type"].map(self.compiler.type_mapper.from_string))
)
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def get_schema(

if not meta:
fqn = sg.table(name, db=database, catalog=catalog).sql(self.dialect)
raise com.IbisError(f"Table not found: {fqn}")
raise com.TableNotFound(fqn)

mapping = {}
for (
Expand Down
11 changes: 8 additions & 3 deletions ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import sqlglot as sg
import sqlglot.expressions as sge
from pymysql.constants import ER
from pymysql.err import ProgrammingError

import ibis
import ibis.backends.sql.compilers as sc
Expand Down Expand Up @@ -208,7 +209,6 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
.limit(0)
.sql(self.dialect)
)

return sch.Schema(
{
field.name: _type_from_cursor_info(descr, field)
Expand All @@ -224,8 +224,13 @@ def get_schema(
).sql(self.dialect)

with self.begin() as cur:
cur.execute(sge.Describe(this=table).sql(self.dialect))
result = cur.fetchall()
try:
cur.execute(sge.Describe(this=table).sql(self.dialect))
except ProgrammingError as e:
if e.args[0] == ER.NO_SUCH_TABLE:
raise com.TableNotFound(name) from e
else:
result = cur.fetchall()

type_mapper = self.compiler.type_mapper
fields = {
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def get_schema(
results = cur.fetchall()

if not results:
raise exc.IbisError(f"Table not found: {name!r}")
raise exc.TableNotFound(name)

type_mapper = self.compiler.type_mapper
fields = {
Expand Down
6 changes: 5 additions & 1 deletion ibis/backends/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ def list_tables(self, like=None, database=None):
return self._filter_with_like(list(self._tables.keys()), like)

def table(self, name: str) -> ir.Table:
schema = sch.infer(self._tables[name])
table = self._tables.get(name)
if table is None:
raise com.TableNotFound(name)

schema = sch.infer(table)
return ops.DatabaseTable(name, schema, self).to_expr()

def _in_memory_table_exists(self, name: str) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ def get_schema(
rows = cur.fetchall()

if not rows:
raise com.IbisError(f"Table not found: {name!r}")
raise com.TableNotFound(name)

return sch.Schema(
{
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/postgres/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def test_create_and_drop_table(con, temp_table, params):

con.drop_table(temp_table, **params)

with pytest.raises(com.IbisError):
with pytest.raises(com.TableNotFound, match=temp_table):
con.table(temp_table, **params)


Expand Down
15 changes: 14 additions & 1 deletion ibis/backends/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
import sqlglot.expressions as sge
from packaging.version import parse as vparse
from pyspark import SparkConf

try:
from pyspark.errors.exceptions.base import AnalysisException # PySpark 3.5+
except ImportError:
from pyspark.sql.utils import AnalysisException # PySpark 3.3


from pyspark.sql import SparkSession
from pyspark.sql.types import BooleanType, DoubleType, LongType, StringType

Expand Down Expand Up @@ -542,7 +549,13 @@ def get_schema(
table_loc = self._to_sqlglot_table((catalog, database))
catalog, db = self._to_catalog_db_tuple(table_loc)
with self._active_catalog_database(catalog, db):
df = self._session.table(table_name)
try:
df = self._session.table(table_name)
except AnalysisException as e:
if not self._session.catalog.tableExists(table_name):
raise com.TableNotFound(table_name) from e
raise

struct = PySparkType.to_ibis(df.schema)

return sch.Schema(struct)
Expand Down
23 changes: 21 additions & 2 deletions ibis/backends/snowflake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,8 @@ def get_schema(
catalog: str | None = None,
database: str | None = None,
) -> Iterable[tuple[str, dt.DataType]]:
import snowflake.connector

# this will always show temp tables with the same name as a non-temp
# table first
#
Expand All @@ -548,8 +550,25 @@ def get_schema(
table = sg.table(
table_name, db=database, catalog=catalog, quoted=self.compiler.quoted
)
with self._safe_raw_sql(sge.Describe(kind="TABLE", this=table)) as cur:
result = cur.fetchall()
query = sge.Describe(kind="TABLE", this=table)

try:
with self._safe_raw_sql(query) as cur:
result = cur.fetchall()
except snowflake.connector.errors.ProgrammingError as e:
# apparently sqlstate codes are "standard", in the same way that
# SQL is standard, because sqlstate codes are part of the SQL
# standard
#
# Nowhere does this exist in Snowflake's documentation but this
# exists in MariaDB's docs and matches the SQLSTATE error code
#
# https://mariadb.com/kb/en/sqlstate/
# https://mariadb.com/kb/en/mariadb-error-code-reference/
# and the least helpful version: https://docs.snowflake.com/en/developer-guide/snowflake-scripting/exceptions#handling-an-exception
if e.sqlstate == "42S02":
raise com.TableNotFound(table.sql(self.dialect)) from e
raise

type_mapper = self.compiler.type_mapper
return sch.Schema(
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/sqlite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def _inspect_schema(
cur.execute(sql)
rows = cur.fetchall()
if not rows:
raise com.IbisError(f"Table not found: {table_name!r}")
raise com.TableNotFound(table_name)

table_info = {name: (typ, not notnull) for name, typ, notnull in rows}

Expand Down
Loading

0 comments on commit 0c49e3b

Please sign in to comment.