diff --git a/ibis/backends/base/sql/compiler/query_builder.py b/ibis/backends/base/sql/compiler/query_builder.py index 2a120eaae16b..50bd21eb62ae 100644 --- a/ibis/backends/base/sql/compiler/query_builder.py +++ b/ibis/backends/base/sql/compiler/query_builder.py @@ -77,6 +77,9 @@ def _quote_identifier(self, name): return quote_identifier(name) def _format_in_memory_table(self, op): + if self.context.compiler.cheap_in_memory_tables: + return op.name + names = op.schema.names raw_rows = [] for row in op.data.to_frame().itertuples(index=False): diff --git a/ibis/backends/bigquery/__init__.py b/ibis/backends/bigquery/__init__.py index a66fc9db9a41..99366370dc06 100644 --- a/ibis/backends/bigquery/__init__.py +++ b/ibis/backends/bigquery/__init__.py @@ -3,7 +3,9 @@ from __future__ import annotations import contextlib +import re import warnings +from functools import partial from typing import TYPE_CHECKING, Any, Callable from urllib.parse import parse_qs, urlparse @@ -20,7 +22,7 @@ import ibis.expr.operations as ops import ibis.expr.types as ir from ibis import util -from ibis.backends.base import CanCreateSchema, CanListDatabases, Database +from ibis.backends.base import CanCreateSchema, Database from ibis.backends.base.sql import BaseSQLBackend from ibis.backends.bigquery.client import ( BigQueryCursor, @@ -89,10 +91,26 @@ def _anonymous_unnest_to_explode(node: sg.exp.Expression): return node -class Backend(BaseSQLBackend, CanCreateSchema, CanListDatabases): +_MEMTABLE_PATTERN = re.compile(r"^_ibis_(?:pandas|pyarrow)_memtable_[a-z0-9]{26}$") + + +def _qualify_memtable( + node: sg.exp.Expression, *, dataset: str, project: str +) -> sg.exp.Expression: + """Add a BigQuery dataset and project to memtable references.""" + if ( + isinstance(node, sg.exp.Table) + and _MEMTABLE_PATTERN.match(node.name) is not None + ): + node.args["db"] = dataset + node.args["catalog"] = project + return node + + +class Backend(BaseSQLBackend, CanCreateSchema): name = "bigquery" compiler = BigQueryCompiler - supports_in_memory_tables = False + supports_in_memory_tables = True supports_python_udfs = False def __init__(self, *args, **kwargs) -> None: @@ -102,6 +120,31 @@ def __init__(self, *args, **kwargs) -> None: name, schema=self._session_dataset, database=self.billing_project ).op() + def _register_in_memory_table(self, op: ops.InMemoryTable) -> None: + self._make_session() + + raw_name = op.name + + project = self.billing_project + dataset = self._session_dataset + + if raw_name not in self.list_tables(schema=dataset, database=project): + table_id = sg.table( + raw_name, db=dataset, catalog=project, quoted=False + ).sql(dialect=self.name) + + bq_schema = BigQuerySchema.from_ibis(op.schema) + load_job = self.client.load_table_from_dataframe( + op.data.to_frame(), + table_id, + job_config=bq.LoadJobConfig( + # fail if the table already exists and contains data + write_disposition=bq.WriteDisposition.WRITE_EMPTY, + schema=bq_schema, + ), + ) + load_job.result() + def _from_url(self, url: str, **kwargs): result = urlparse(url) params = parse_qs(result.query) @@ -385,7 +428,7 @@ def _make_session(self) -> tuple[str, str]: ) self.client.default_query_job_config = bq.QueryJobConfig( - connection_properties=connection_properties + allow_large_results=True, connection_properties=connection_properties ) self._session_dataset = query.destination.dataset_id @@ -434,14 +477,21 @@ def compile( The output of compilation. The type of this value depends on the backend. """ - self._define_udf_translation_rules(expr) sql = self.compiler.to_ast_ensure_limit(expr, limit, params=params).compile() return ";\n\n".join( - query.transform(_anonymous_unnest_to_explode).sql( - dialect=self.name, pretty=True + # convert unnest function calls to explode + query.transform(_anonymous_unnest_to_explode) + # add dataset and project to memtable references + .transform( + partial( + _qualify_memtable, + dataset=self._session_dataset, + project=getattr(self, "billing_project", None), + ) ) + .sql(dialect=self.name, pretty=True) for query in sg.parse(sql, read=self.name) ) @@ -510,6 +560,7 @@ def execute(self, expr, params=None, limit="default", **kwargs): # TODO: upstream needs to pass params to raw_sql, I think. kwargs.pop("timecontext", None) + self._register_in_memory_tables(expr) sql = self.compile(expr, limit=limit, params=params, **kwargs) self._log(sql) cursor = self.raw_sql(sql, params=params, **kwargs) @@ -557,6 +608,7 @@ def to_pyarrow( **kwargs: Any, ) -> pa.Table: self._import_pyarrow() + self._register_in_memory_tables(expr) sql = self.compile(expr, limit=limit, params=params, **kwargs) self._log(sql) cursor = self.raw_sql(sql, params=params, **kwargs) @@ -576,6 +628,7 @@ def to_pyarrow_batches( schema = expr.as_table().schema() + self._register_in_memory_tables(expr) sql = self.compile(expr, limit=limit, params=params, **kwargs) self._log(sql) cursor = self.raw_sql(sql, params=params, **kwargs) @@ -772,6 +825,8 @@ def create_table( if isinstance(obj, (pd.DataFrame, pa.Table)): obj = ibis.memtable(obj, schema=schema) + self._register_in_memory_tables(obj) + if temp: dataset = self._session_dataset else: @@ -798,7 +853,7 @@ def create_table( ), constraints=( None - if typ.nullable + if typ.nullable or typ.is_array() else [ sg.exp.ColumnConstraint(kind=sg.exp.NotNullColumnConstraint()) ] @@ -833,7 +888,7 @@ def drop_table( this=sg.table( name, db=schema or self.current_schema, - catalog=database or self.data_project, + catalog=database or self.billing_project, ), exists=force, ) @@ -853,11 +908,12 @@ def create_view( this=sg.table( name, db=schema or self.current_schema, - catalog=database or self.data_project, + catalog=database or self.billing_project, ), expression=self.compile(obj), replace=overwrite, ) + self._register_in_memory_tables(obj) self.raw_sql(stmt.sql(self.name)) return self.table(name, schema=schema, database=database) @@ -874,7 +930,7 @@ def drop_view( this=sg.table( name, db=schema or self.current_schema, - catalog=database or self.data_project, + catalog=database or self.billing_project, ), exists=force, ) diff --git a/ibis/backends/bigquery/compiler.py b/ibis/backends/bigquery/compiler.py index cb5491fe23f3..c5e101426f5b 100644 --- a/ibis/backends/bigquery/compiler.py +++ b/ibis/backends/bigquery/compiler.py @@ -9,12 +9,10 @@ import toolz import ibis.common.graph as lin -import ibis.expr.datatypes as dt import ibis.expr.operations as ops import ibis.expr.types as ir from ibis.backends.base.sql import compiler as sql_compiler from ibis.backends.bigquery import operations, registry, rewrites -from ibis.backends.bigquery.datatypes import BigQueryType class BigQueryUDFDefinition(sql_compiler.DDL): @@ -117,25 +115,6 @@ class BigQueryTableSetFormatter(sql_compiler.TableSetFormatter): def _quote_identifier(self, name): return sg.to_identifier(name).sql("bigquery") - def _format_in_memory_table(self, op): - import ibis - - schema = op.schema - names = schema.names - types = schema.types - - raw_rows = [] - for row in op.data.to_frame().itertuples(index=False): - raw_row = ", ".join( - f"{self._translate(lit.op())} AS {name}" - for lit, name in zip( - map(ibis.literal, row, types), map(self._quote_identifier, names) - ) - ) - raw_rows.append(f"STRUCT({raw_row})") - array_type = BigQueryType.from_ibis(dt.Array(op.schema.as_struct())) - return f"UNNEST({array_type}[{', '.join(raw_rows)}])" - class BigQueryCompiler(sql_compiler.Compiler): translator_class = BigQueryExprTranslator @@ -146,6 +125,7 @@ class BigQueryCompiler(sql_compiler.Compiler): support_values_syntax_in_select = False null_limit = None + cheap_in_memory_tables = True @staticmethod def _generate_setup_queries(expr, context): diff --git a/ibis/backends/bigquery/datatypes.py b/ibis/backends/bigquery/datatypes.py index 2639f21ffb67..f4dbd5478706 100644 --- a/ibis/backends/bigquery/datatypes.py +++ b/ibis/backends/bigquery/datatypes.py @@ -3,6 +3,7 @@ import google.cloud.bigquery as bq import sqlglot as sg +import ibis import ibis.expr.datatypes as dt import ibis.expr.schema as sch from ibis.formats import SchemaMapper, TypeMapper @@ -91,6 +92,8 @@ def from_ibis(cls, dtype: dt.DataType) -> str: "BigQuery geography uses points on WGS84 reference ellipsoid." f"Current geotype: {dtype.geotype}, Current srid: {dtype.srid}" ) + elif dtype.is_map(): + raise NotImplementedError("Maps are not supported in BigQuery") else: return str(dtype).upper() @@ -98,16 +101,34 @@ def from_ibis(cls, dtype: dt.DataType) -> str: class BigQuerySchema(SchemaMapper): @classmethod def from_ibis(cls, schema: sch.Schema) -> list[bq.SchemaField]: - result = [] - for name, dtype in schema.items(): - if isinstance(dtype, dt.Array): + schema_fields = [] + + for name, typ in ibis.schema(schema).items(): + if typ.is_array(): + value_type = typ.value_type + if value_type.is_array(): + raise TypeError("Nested arrays are not supported in BigQuery") + + is_struct = value_type.is_struct() + + field_type = ( + "RECORD" if is_struct else BigQueryType.from_ibis(typ.value_type) + ) mode = "REPEATED" - dtype = dtype.value_type + fields = cls.from_ibis(ibis.schema(getattr(value_type, "fields", {}))) + elif typ.is_struct(): + field_type = "RECORD" + mode = "NULLABLE" if typ.nullable else "REQUIRED" + fields = cls.from_ibis(ibis.schema(typ.fields)) else: - mode = "REQUIRED" if not dtype.nullable else "NULLABLE" - field = bq.SchemaField(name, BigQueryType.from_ibis(dtype), mode=mode) - result.append(field) - return result + field_type = BigQueryType.from_ibis(typ) + mode = "NULLABLE" if typ.nullable else "REQUIRED" + fields = () + + schema_fields.append( + bq.SchemaField(name, field_type=field_type, mode=mode, fields=fields) + ) + return schema_fields @classmethod def _dtype_from_bigquery_field(cls, field: bq.SchemaField) -> dt.DataType: @@ -125,7 +146,8 @@ def _dtype_from_bigquery_field(cls, field: bq.SchemaField) -> dt.DataType: elif mode == "REQUIRED": return dtype.copy(nullable=False) elif mode == "REPEATED": - return dt.Array(dtype) + # arrays with NULL elements aren't supported + return dt.Array(dtype.copy(nullable=False)) else: raise TypeError(f"Unknown BigQuery field.mode: {mode}") @@ -148,6 +170,5 @@ def spread_type(dt: dt.DataType): for type_ in dt.types: yield from spread_type(type_) elif dt.is_map(): - yield from spread_type(dt.key_type) - yield from spread_type(dt.value_type) + raise NotImplementedError("Maps are not supported in BigQuery") yield dt diff --git a/ibis/backends/bigquery/tests/conftest.py b/ibis/backends/bigquery/tests/conftest.py index bd27a91ca13d..6f0f6f92046d 100644 --- a/ibis/backends/bigquery/tests/conftest.py +++ b/ibis/backends/bigquery/tests/conftest.py @@ -2,10 +2,9 @@ import concurrent.futures import contextlib -import functools import io import os -from typing import TYPE_CHECKING, Any +from typing import Any import google.api_core.exceptions as gexc import google.auth @@ -13,16 +12,12 @@ from google.cloud import bigquery as bq import ibis -import ibis.expr.datatypes as dt from ibis.backends.bigquery import EXTERNAL_DATA_SCOPES, Backend -from ibis.backends.bigquery.datatypes import BigQueryType +from ibis.backends.bigquery.datatypes import BigQuerySchema from ibis.backends.conftest import TEST_TABLES from ibis.backends.tests.base import BackendTest, RoundAwayFromZero, UnorderedComparator from ibis.backends.tests.data import json_types, non_null_array_types, struct_types, win -if TYPE_CHECKING: - from collections.abc import Mapping - DATASET_ID = "ibis_gbq_testing" DATASET_ID_TOKYO = "ibis_gbq_testing_tokyo" REGION_TOKYO = "asia-northeast1" @@ -30,43 +25,6 @@ PROJECT_ID_ENV_VAR = "GOOGLE_BIGQUERY_PROJECT_ID" -@functools.singledispatch -def ibis_type_to_bq_field(typ: dt.DataType) -> Mapping[str, Any]: - raise NotImplementedError(typ) - - -@ibis_type_to_bq_field.register(dt.DataType) -def _(typ: dt.DataType) -> Mapping[str, Any]: - return {"field_type": BigQueryType.from_ibis(typ)} - - -@ibis_type_to_bq_field.register(dt.Array) -def _(typ: dt.Array) -> Mapping[str, Any]: - return { - "field_type": BigQueryType.from_ibis(typ.value_type), - "mode": "REPEATED", - } - - -@ibis_type_to_bq_field.register(dt.Struct) -def _(typ: dt.Struct) -> Mapping[str, Any]: - return { - "field_type": "RECORD", - "mode": "NULLABLE" if typ.nullable else "REQUIRED", - "fields": ibis_schema_to_bq_schema(ibis.schema(typ.fields)), - } - - -def ibis_schema_to_bq_schema(schema): - return [ - bq.SchemaField( - name.replace(":", "").replace(" ", "_"), - **ibis_type_to_bq_field(typ), - ) - for name, typ in ibis.schema(schema).items() - ] - - class TestConf(UnorderedComparator, BackendTest, RoundAwayFromZero): """Backend-specific class with information for testing.""" @@ -129,9 +87,13 @@ def _load_data(self, **_: Any) -> None: timestamp_table = bq.Table( bq.TableReference(testing_dataset, "timestamp_column_parted") ) - timestamp_table.schema = ibis_schema_to_bq_schema( - dict( - my_timestamp_parted_col="timestamp", string_col="string", int_col="int" + timestamp_table.schema = BigQuerySchema.from_ibis( + ibis.schema( + dict( + my_timestamp_parted_col="timestamp", + string_col="string", + int_col="int", + ) ) ) timestamp_table.time_partitioning = bq.TimePartitioning( @@ -141,8 +103,10 @@ def _load_data(self, **_: Any) -> None: # ingestion date partitioning date_table = bq.Table(bq.TableReference(testing_dataset, "date_column_parted")) - date_table.schema = ibis_schema_to_bq_schema( - dict(my_date_parted_col="date", string_col="string", int_col="int") + date_table.schema = BigQuerySchema.from_ibis( + ibis.schema( + dict(my_date_parted_col="date", string_col="string", int_col="int") + ) ) date_table.time_partitioning = bq.TimePartitioning(field="my_date_parted_col") client.create_table(date_table, exists_ok=True) @@ -161,8 +125,10 @@ def _load_data(self, **_: Any) -> None: bq.TableReference(testing_dataset, "struct"), job_config=bq.LoadJobConfig( write_disposition=write_disposition, - schema=ibis_schema_to_bq_schema( - dict(abc="struct") + schema=BigQuerySchema.from_ibis( + ibis.schema( + dict(abc="struct") + ) ), ), ) @@ -176,13 +142,15 @@ def _load_data(self, **_: Any) -> None: bq.TableReference(testing_dataset, "array_types"), job_config=bq.LoadJobConfig( write_disposition=write_disposition, - schema=ibis_schema_to_bq_schema( - dict( - x="array", - y="array", - z="array", - grouper="string", - scalar_column="float64", + schema=BigQuerySchema.from_ibis( + ibis.schema( + dict( + x="array", + y="array", + z="array", + grouper="string", + scalar_column="float64", + ) ) ), ), @@ -219,8 +187,10 @@ def _load_data(self, **_: Any) -> None: bq.TableReference(testing_dataset, "numeric_table"), job_config=bq.LoadJobConfig( write_disposition=write_disposition, - schema=ibis_schema_to_bq_schema( - dict(string_col="string", numeric_col="decimal(38, 9)") + schema=BigQuerySchema.from_ibis( + ibis.schema( + dict(string_col="string", numeric_col="decimal(38, 9)") + ) ), source_format=bq.SourceFormat.NEWLINE_DELIMITED_JSON, ), @@ -235,8 +205,8 @@ def _load_data(self, **_: Any) -> None: bq.TableReference(testing_dataset, "win"), job_config=bq.LoadJobConfig( write_disposition=write_disposition, - schema=ibis_schema_to_bq_schema( - dict(g="string", x="int64", y="int64") + schema=BigQuerySchema.from_ibis( + ibis.schema(dict(g="string", x="int64", y="int64")) ), ), ) @@ -250,7 +220,7 @@ def _load_data(self, **_: Any) -> None: bq.TableReference(testing_dataset, "json_t"), job_config=bq.LoadJobConfig( write_disposition=write_disposition, - schema=ibis_schema_to_bq_schema(dict(js="json")), + schema=BigQuerySchema.from_ibis(ibis.schema(dict(js="json"))), source_format=bq.SourceFormat.NEWLINE_DELIMITED_JSON, ), ) @@ -267,7 +237,7 @@ def _load_data(self, **_: Any) -> None: ), bq.TableReference(testing_dataset, table), job_config=bq.LoadJobConfig( - schema=ibis_schema_to_bq_schema(schema), + schema=BigQuerySchema.from_ibis(ibis.schema(schema)), write_disposition=write_disposition, source_format=bq.SourceFormat.PARQUET, ), @@ -288,7 +258,7 @@ def _load_data(self, **_: Any) -> None: ), bq.TableReference(testing_dataset_tokyo, table), job_config=bq.LoadJobConfig( - schema=ibis_schema_to_bq_schema(schema), + schema=BigQuerySchema.from_ibis(ibis.schema(schema)), write_disposition=write_disposition, source_format=bq.SourceFormat.PARQUET, ), diff --git a/ibis/backends/bigquery/tests/unit/snapshots/test_compiler/test_compile_in_memory_table/out.sql b/ibis/backends/bigquery/tests/unit/snapshots/test_compiler/test_compile_in_memory_table/out.sql deleted file mode 100644 index a062c8b488da..000000000000 --- a/ibis/backends/bigquery/tests/unit/snapshots/test_compiler/test_compile_in_memory_table/out.sql +++ /dev/null @@ -1,3 +0,0 @@ -SELECT - t0.* -FROM UNNEST(ARRAY>[STRUCT(1 AS `Column One`), STRUCT(2 AS `Column One`), STRUCT(3 AS `Column One`)]) AS t0 \ No newline at end of file diff --git a/ibis/backends/bigquery/tests/unit/test_compiler.py b/ibis/backends/bigquery/tests/unit/test_compiler.py index 963508a258f1..fafeddbdaec2 100644 --- a/ibis/backends/bigquery/tests/unit/test_compiler.py +++ b/ibis/backends/bigquery/tests/unit/test_compiler.py @@ -628,9 +628,3 @@ def test_unnest(snapshot): ).select(level_two=lambda t: t.level_one.unnest()) ) snapshot.assert_match(result, "out_two_unnests.sql") - - -def test_compile_in_memory_table(snapshot): - t = ibis.memtable({"Column One": [1, 2, 3]}) - result = ibis.bigquery.compile(t) - snapshot.assert_match(result, "out.sql") diff --git a/ibis/backends/tests/test_array.py b/ibis/backends/tests/test_array.py index 2cdf4babf202..a4f28c56ffe6 100644 --- a/ibis/backends/tests/test_array.py +++ b/ibis/backends/tests/test_array.py @@ -566,7 +566,18 @@ def test_array_map(backend, con, input, output): @pytest.mark.parametrize( ("input", "output"), [ - param({"a": [[1, None, 2], [4]]}, {"a": [[2], [4]]}, id="nulls"), + param( + {"a": [[1, None, 2], [4]]}, + {"a": [[2], [4]]}, + id="nulls", + marks=[ + pytest.mark.notyet( + ["bigquery"], + raises=BadRequest, + reason="NULLs are not allowed as array elements", + ) + ], + ), param({"a": [[1, 2], [4]]}, {"a": [[2], [4]]}, id="no_nulls"), ], ) @@ -711,10 +722,28 @@ def test_array_union(con): @pytest.mark.notimpl( ["sqlite"], raises=NotImplementedError, reason="Unsupported type: Array..." ) -def test_array_intersect(con): - t = ibis.memtable( - {"a": [[3, 2], [], []], "b": [[1, 3], [None], [5]], "c": range(3)} - ) +@pytest.mark.parametrize( + "data", + [ + param( + {"a": [[3, 2], [], []], "b": [[1, 3], [None], [5]], "c": range(3)}, + id="nulls", + marks=[ + pytest.mark.notyet( + ["bigquery"], + raises=BadRequest, + reason="BigQuery doesn't support arrays with null elements", + ) + ], + ), + param( + {"a": [[3, 2], [], []], "b": [[1, 3], [], [5]], "c": range(3)}, + id="no_nulls", + ), + ], +) +def test_array_intersect(con, data): + t = ibis.memtable(data) expr = t.select("c", d=t.a.intersect(t.b)).order_by("c").drop("c").d result = con.execute(expr).map(set, na_action="ignore") expected = pd.Series([{3}, set(), set()], dtype="object")