Skip to content

Commit

Permalink
fix(bigquery): handle column name mismatches and _TABLE_SUFFIX ever…
Browse files Browse the repository at this point in the history
…ywhere
  • Loading branch information
jcrist committed Sep 6, 2024
1 parent 440f51d commit 82a70e4
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 61 deletions.
138 changes: 80 additions & 58 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,15 @@ def _force_quote_table(table: sge.Table) -> sge.Table:
return table


def _postprocess_arrow(
table_or_batch: pa.Table | pa.RecordBatch, names: list[str]
) -> pa.Table | pa.RecordBatch:
"""Drop `_TABLE_SUFFIX` if present in the results, then rename columns."""
if "_TABLE_SUFFIX" in table_or_batch.column_names:
table_or_batch = table_or_batch.drop_columns(["_TABLE_SUFFIX"])
return table_or_batch.rename_columns(names)


class Backend(SQLBackend, CanCreateDatabase, CanCreateSchema):
name = "bigquery"
compiler = sc.bigquery.compiler
Expand Down Expand Up @@ -702,50 +711,6 @@ def compile(
self._log(sql)
return sql

def execute(self, expr, params=None, limit="default", **kwargs):
"""Compile and execute the given Ibis expression.
Compile and execute Ibis expression using this backend client
interface, returning results in-memory in the appropriate object type
Parameters
----------
expr
Ibis expression to execute
limit
Retrieve at most this number of values/rows. Overrides any limit
already set on the expression.
params
Query parameters
kwargs
Extra arguments specific to the backend
Returns
-------
pd.DataFrame | pd.Series | scalar
Output from execution
"""
from ibis.backends.bigquery.converter import BigQueryPandasData

self._run_pre_execute_hooks(expr)

schema = expr.as_table().schema() - ibis.schema({"_TABLE_SUFFIX": "string"})

sql = self.compile(expr, limit=limit, params=params, **kwargs)
self._log(sql)
query = self.raw_sql(sql, params=params, **kwargs)

arrow_t = query.to_arrow(
progress_bar_type=None, bqstorage_client=self.storage_client
)

result = BigQueryPandasData.convert_table(
arrow_t.to_pandas(timestamp_as_object=True), schema
)

return expr.__pandas_result__(result, schema=schema)

def insert(
self,
table_name: str,
Expand Down Expand Up @@ -784,6 +749,21 @@ def insert(
overwrite=overwrite,
)

def _to_query(
self,
table_expr: ir.Table,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
page_size: int | None = None,
**kwargs: Any,
):
self._run_pre_execute_hooks(table_expr)
sql = self.compile(table_expr, limit=limit, params=params, **kwargs)
self._log(sql)

return self.raw_sql(sql, params=params, page_size=page_size)

def to_pyarrow(
self,
expr: ir.Expr,
Expand All @@ -793,15 +773,16 @@ def to_pyarrow(
**kwargs: Any,
) -> pa.Table:
self._import_pyarrow()
self._register_in_memory_tables(expr)
sql = self.compile(expr, limit=limit, params=params, **kwargs)
self._log(sql)
query = self.raw_sql(sql, params=params, **kwargs)

table_expr = expr.as_table()
schema = table_expr.schema() - ibis.schema({"_TABLE_SUFFIX": "string"})

query = self._to_query(table_expr, params=params, limit=limit, **kwargs)
table = query.to_arrow(
progress_bar_type=None, bqstorage_client=self.storage_client
)
table = table.rename_columns(list(expr.as_table().schema().names))
return expr.__pyarrow_result__(table)
table = _postprocess_arrow(table, list(schema.names))
return expr.__pyarrow_result__(table, schema=schema)

def to_pyarrow_batches(
self,
Expand All @@ -814,14 +795,55 @@ def to_pyarrow_batches(
):
pa = self._import_pyarrow()

schema = expr.as_table().schema()
table_expr = expr.as_table()
schema = table_expr.schema() - ibis.schema({"_TABLE_SUFFIX": "string"})
colnames = list(schema.names)

self._register_in_memory_tables(expr)
sql = self.compile(expr, limit=limit, params=params, **kwargs)
self._log(sql)
query = self.raw_sql(sql, params=params, page_size=chunk_size, **kwargs)
query = self._to_query(
table_expr, params=params, limit=limit, page_size=chunk_size, **kwargs
)
batch_iter = query.to_arrow_iterable(bqstorage_client=self.storage_client)
return pa.ipc.RecordBatchReader.from_batches(schema.to_pyarrow(), batch_iter)
return pa.ipc.RecordBatchReader.from_batches(
schema.to_pyarrow(),
(_postprocess_arrow(b, colnames) for b in batch_iter),
)

def execute(self, expr, params=None, limit="default", **kwargs):
"""Compile and execute the given Ibis expression.
Compile and execute Ibis expression using this backend client
interface, returning results in-memory in the appropriate object type
Parameters
----------
expr
Ibis expression to execute
limit
Retrieve at most this number of values/rows. Overrides any limit
already set on the expression.
params
Query parameters
kwargs
Extra arguments specific to the backend
Returns
-------
pd.DataFrame | pd.Series | scalar
Output from execution
"""
from ibis.backends.bigquery.converter import BigQueryPandasData

table_expr = expr.as_table()
schema = table_expr.schema() - ibis.schema({"_TABLE_SUFFIX": "string"})
query = self._to_query(table_expr, params=params, limit=limit, **kwargs)
df = query.to_arrow(
progress_bar_type=None, bqstorage_client=self.storage_client
).to_pandas(timestamp_as_object=True)
# Drop _TABLE_SUFFIX if present in the results, then rename columns
df = df.drop(columns="_TABLE_SUFFIX", errors="ignore")
df.columns = schema.names
return expr.__pandas_result__(df, schema=schema, data_mapper=BigQueryPandasData)

def _gen_udf_name(self, name: str, schema: Optional[str]) -> str:
func = ".".join(filter(None, (schema, name)))
Expand Down Expand Up @@ -1010,7 +1032,7 @@ def create_table(
obj = ibis.memtable(obj, schema=schema)

if obj is not None:
self._register_in_memory_tables(obj)
self._run_pre_execute_hooks(obj)

if temp:
dataset = self._session_dataset.dataset_id
Expand Down Expand Up @@ -1107,7 +1129,7 @@ def create_view(
expression=self.compile(obj),
replace=overwrite,
)
self._register_in_memory_tables(obj)
self._run_pre_execute_hooks(obj)
self.raw_sql(stmt.sql(self.name))
return self.table(name, database=(catalog, database))

Expand Down
16 changes: 13 additions & 3 deletions ibis/backends/bigquery/tests/system/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,12 +421,22 @@ def test_create_table_from_scratch_with_spaces(project_id, dataset_id):
con.drop_table(name)


def test_table_suffix():
@pytest.mark.parametrize("ret_type", ["pandas", "pyarrow", "pyarrow_batches"])
def test_table_suffix(ret_type):
con = ibis.connect("bigquery://ibis-gbq")
t = con.table("gsod*", database="bigquery-public-data.noaa_gsod")
expr = t.filter(t._TABLE_SUFFIX == "1929", t.max != 9999.9).head(1)
result = expr.execute()
assert not result.empty
if ret_type == "pandas":
result = expr.to_pandas()
cols = list(result.columns)
elif ret_type == "pyarrow":
result = expr.to_pyarrow()
cols = result.column_names
elif ret_type == "pyarrow_batches":
result = pa.Table.from_batches(expr.to_pyarrow_batches())
cols = result.column_names
assert len(result)
assert "_TABLE_PREFIX" not in cols


def test_parameters_in_url_connect(mocker):
Expand Down

0 comments on commit 82a70e4

Please sign in to comment.