Skip to content

Commit

Permalink
refactor(bigquery): remove session use
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Nov 15, 2023
1 parent e3a7eac commit 60e7900
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 38 deletions.
39 changes: 15 additions & 24 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def _anonymous_unnest_to_explode(node: sg.exp.Expression):


def _qualify_memtable(
node: sg.exp.Expression, *, dataset: str, project: str
node: sg.exp.Expression, *, dataset: str | None, project: str | None
) -> sg.exp.Expression:
"""Add a BigQuery dataset and project to memtable references."""
if (
Expand Down Expand Up @@ -125,8 +125,8 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:

raw_name = op.name

project = self.billing_project
dataset = self._session_dataset
project = self._session_dataset.project
dataset = self._session_dataset.dataset_id

if raw_name not in self.list_tables(schema=dataset, database=project):
table_id = sg.table(
Expand Down Expand Up @@ -272,6 +272,10 @@ def do_connect(
client_info=_create_client_info(application_name),
)

self.client.default_query_job_config = bq.QueryJobConfig(
use_legacy_sql=False, allow_large_results=True
)

if storage_client is not None:
self.storage_client = storage_client
else:
Expand Down Expand Up @@ -411,41 +415,28 @@ def table(

def _make_session(self) -> tuple[str, str]:
if self._session_dataset is None:
job_config = bq.job.QueryJobConfig(
create_session=True, use_legacy_sql=False, use_query_cache=False
)
job_config = bq.QueryJobConfig(create_session=True, use_query_cache=False)
query = self.client.query(
"SELECT 1", job_config=job_config, project=self.billing_project
)
connection_properties = [
bq.ConnectionProperty("session_id", query.session_info.session_id)
]

# default load/query job configs are merged with further
# configuration in load/query jobs, e.g., when executing a query
self.client.default_load_job_config = bq.LoadJobConfig(
connection_properties=connection_properties
)
query.result()

self.client.default_query_job_config = bq.QueryJobConfig(
allow_large_results=True, connection_properties=connection_properties
)
self._session_dataset = query.destination.dataset_id

def _get_schema_using_query(self, query: str) -> sch.Schema:
self._make_session()

job = self.client.query(
query, job_config=bq.QueryJobConfig(dry_run=True, use_query_cache=False)
query,
job_config=bq.QueryJobConfig(dry_run=True, use_query_cache=False),
project=self.billing_project,
)
return BigQuerySchema.to_ibis(job.schema)

def _execute(self, stmt, results=True, query_parameters=None):
self._make_session()

job_config = bq.job.QueryJobConfig(
query_parameters=query_parameters or [], use_legacy_sql=False
)
job_config = bq.job.QueryJobConfig(query_parameters=query_parameters or [])
query = self.client.query(
stmt, job_config=job_config, project=self.billing_project
)
Expand Down Expand Up @@ -487,8 +478,8 @@ def compile(
.transform(
partial(
_qualify_memtable,
dataset=self._session_dataset,
project=getattr(self, "billing_project", None),
dataset=getattr(self._session_dataset, "dataset_id", None),
project=getattr(self._session_dataset, "project", None),
)
)
.sql(dialect=self.name, pretty=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE TEMPORARY FUNCTION my_struct_thing_0(a FLOAT64, b FLOAT64)
CREATE TEMPORARY FUNCTION my_struct_thing_0(a FLOAT64, b FLOAT64)
RETURNS STRUCT<width FLOAT64, height FLOAT64>
LANGUAGE js AS """
'use strict';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
CREATE OR REPLACE TEMPORARY FUNCTION my_len_0(
CREATE TEMPORARY FUNCTION my_len_0(
s STRING
)
RETURNS FLOAT64
LANGUAGE js AS
'\n\'use strict\';\nfunction my_len(s) {\n return s.length;\n}\nreturn my_len(s);\n';

CREATE OR REPLACE TEMPORARY FUNCTION my_len_1(
CREATE TEMPORARY FUNCTION my_len_1(
s STRING
)
RETURNS FLOAT64
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE TEMPORARY FUNCTION my_len_0(
CREATE TEMPORARY FUNCTION my_len_0(
s STRING
)
RETURNS FLOAT64
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE TEMPORARY FUNCTION my_len_0(
CREATE TEMPORARY FUNCTION my_len_0(
s STRING
)
RETURNS FLOAT64
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE TEMPORARY FUNCTION my_len_0(
CREATE TEMPORARY FUNCTION my_len_0(
s STRING
)
RETURNS FLOAT64
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE OR REPLACE TEMPORARY FUNCTION format_t_0(
CREATE TEMPORARY FUNCTION format_t_0(
input STRING
)
RETURNS FLOAT64 AS
Expand Down
14 changes: 7 additions & 7 deletions ibis/backends/bigquery/udf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def python(
... return x + 1
...
>>> print(add_one.sql)
CREATE OR REPLACE TEMPORARY FUNCTION add_one_0(x FLOAT64)
CREATE TEMPORARY FUNCTION add_one_0(x FLOAT64)
RETURNS FLOAT64
LANGUAGE js AS """
'use strict';
Expand All @@ -99,7 +99,7 @@ def python(
... result.append(value)
... return result
>>> print(my_range.sql)
CREATE OR REPLACE TEMPORARY FUNCTION my_range_0(start FLOAT64, stop FLOAT64)
CREATE TEMPORARY FUNCTION my_range_0(start FLOAT64, stop FLOAT64)
RETURNS ARRAY<FLOAT64>
LANGUAGE js AS """
'use strict';
Expand Down Expand Up @@ -140,7 +140,7 @@ def python(
...
... return Rectangle(width, height)
>>> print(my_rectangle.sql)
CREATE OR REPLACE TEMPORARY FUNCTION my_rectangle_0(width FLOAT64, height FLOAT64)
CREATE TEMPORARY FUNCTION my_rectangle_0(width FLOAT64, height FLOAT64)
RETURNS STRUCT<width FLOAT64, height FLOAT64>
LANGUAGE js AS """
'use strict';
Expand Down Expand Up @@ -239,7 +239,7 @@ def js(
... body="return x + 1",
... )
>>> print(add_one.sql)
CREATE OR REPLACE TEMPORARY FUNCTION add_one_0(x FLOAT64)
CREATE TEMPORARY FUNCTION add_one_0(x FLOAT64)
RETURNS FLOAT64
LANGUAGE js AS """
return x + 1
Expand Down Expand Up @@ -276,7 +276,7 @@ def js(

name = _make_udf_name(name)
sql_code = f'''\
CREATE OR REPLACE TEMPORARY FUNCTION {name}({bigquery_signature})
CREATE TEMPORARY FUNCTION {name}({bigquery_signature})
RETURNS {return_type}
{determinism_formatted}LANGUAGE js AS """
{body}
Expand Down Expand Up @@ -352,7 +352,7 @@ def sql(
... sql_expression="x + 1",
... )
>>> print(add_one.sql)
CREATE OR REPLACE TEMPORARY FUNCTION add_one_0(x FLOAT64)
CREATE TEMPORARY FUNCTION add_one_0(x FLOAT64)
RETURNS FLOAT64
AS (x + 1)
"""
Expand All @@ -374,7 +374,7 @@ def sql(
)
name = _make_udf_name(name)
sql_code = f"""\
CREATE OR REPLACE TEMPORARY FUNCTION {name}({bigquery_signature})
CREATE TEMPORARY FUNCTION {name}({bigquery_signature})
RETURNS {return_type}
AS ({sql_expression});"""

Expand Down

0 comments on commit 60e7900

Please sign in to comment.