diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index e6eaf5fcb3ba..f3b7aab40789 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -353,6 +353,19 @@ def dataset(self, dataset_id, project=None): return DatasetReference(project, dataset_id) + def _create_bqstorage_client(self): + """Create a BigQuery Storage API client using this client's credentials. + + Returns: + google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient: + A BigQuery Storage API client. + """ + from google.cloud import bigquery_storage_v1beta1 + + return bigquery_storage_v1beta1.BigQueryStorageClient( + credentials=self._credentials + ) + def create_dataset(self, dataset, exists_ok=False, retry=DEFAULT_RETRY): """API call: create the dataset via a POST request. diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index f0312b0d4219..19e4aaf185e4 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -3152,7 +3152,12 @@ def result( # If changing the signature of this method, make sure to apply the same # changes to table.RowIterator.to_arrow() - def to_arrow(self, progress_bar_type=None, bqstorage_client=None): + def to_arrow( + self, + progress_bar_type=None, + bqstorage_client=None, + create_bqstorage_client=False, + ): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -3185,6 +3190,16 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): Reading from a specific partition or snapshot is not currently supported by this method. + create_bqstorage_client (bool): + **Beta Feature** Optional. If ``True``, create a BigQuery + Storage API client using the default API settings. The + BigQuery Storage API is a faster way to fetch rows from + BigQuery. See the ``bqstorage_client`` parameter for more + information. + + This argument does nothing if ``bqstorage_client`` is supplied. + + ..versionadded:: 1.24.0 Returns: pyarrow.Table @@ -3199,12 +3214,20 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): ..versionadded:: 1.17.0 """ return self.result().to_arrow( - progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client + progress_bar_type=progress_bar_type, + bqstorage_client=bqstorage_client, + create_bqstorage_client=create_bqstorage_client, ) # If changing the signature of this method, make sure to apply the same # changes to table.RowIterator.to_dataframe() - def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): + def to_dataframe( + self, + bqstorage_client=None, + dtypes=None, + progress_bar_type=None, + create_bqstorage_client=False, + ): """Return a pandas DataFrame from a QueryJob Args: @@ -3237,6 +3260,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non for details. ..versionadded:: 1.11.0 + create_bqstorage_client (bool): + **Beta Feature** Optional. If ``True``, create a BigQuery + Storage API client using the default API settings. The + BigQuery Storage API is a faster way to fetch rows from + BigQuery. See the ``bqstorage_client`` parameter for more + information. + + This argument does nothing if ``bqstorage_client`` is supplied. + + ..versionadded:: 1.24.0 Returns: A :class:`~pandas.DataFrame` populated with row data and column @@ -3250,6 +3283,7 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non bqstorage_client=bqstorage_client, dtypes=dtypes, progress_bar_type=progress_bar_type, + create_bqstorage_client=create_bqstorage_client, ) def __iter__(self): diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 77cb67bfd0fe..a71acf8ecc8a 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1456,7 +1456,12 @@ def _to_arrow_iterable(self, bqstorage_client=None): # If changing the signature of this method, make sure to apply the same # changes to job.QueryJob.to_arrow() - def to_arrow(self, progress_bar_type=None, bqstorage_client=None): + def to_arrow( + self, + progress_bar_type=None, + bqstorage_client=None, + create_bqstorage_client=False, + ): """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1489,6 +1494,16 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): Reading from a specific partition or snapshot is not currently supported by this method. + create_bqstorage_client (bool): + **Beta Feature** Optional. If ``True``, create a BigQuery + Storage API client using the default API settings. The + BigQuery Storage API is a faster way to fetch rows from + BigQuery. See the ``bqstorage_client`` parameter for more + information. + + This argument does nothing if ``bqstorage_client`` is supplied. + + ..versionadded:: 1.24.0 Returns: pyarrow.Table @@ -1504,22 +1519,33 @@ def to_arrow(self, progress_bar_type=None, bqstorage_client=None): if pyarrow is None: raise ValueError(_NO_PYARROW_ERROR) - progress_bar = self._get_progress_bar(progress_bar_type) + owns_bqstorage_client = False + if not bqstorage_client and create_bqstorage_client: + owns_bqstorage_client = True + bqstorage_client = self.client._create_bqstorage_client() - record_batches = [] - for record_batch in self._to_arrow_iterable(bqstorage_client=bqstorage_client): - record_batches.append(record_batch) + try: + progress_bar = self._get_progress_bar(progress_bar_type) - if progress_bar is not None: - # In some cases, the number of total rows is not populated - # until the first page of rows is fetched. Update the - # progress bar's total to keep an accurate count. - progress_bar.total = progress_bar.total or self.total_rows - progress_bar.update(record_batch.num_rows) + record_batches = [] + for record_batch in self._to_arrow_iterable( + bqstorage_client=bqstorage_client + ): + record_batches.append(record_batch) - if progress_bar is not None: - # Indicate that the download has finished. - progress_bar.close() + if progress_bar is not None: + # In some cases, the number of total rows is not populated + # until the first page of rows is fetched. Update the + # progress bar's total to keep an accurate count. + progress_bar.total = progress_bar.total or self.total_rows + progress_bar.update(record_batch.num_rows) + + if progress_bar is not None: + # Indicate that the download has finished. + progress_bar.close() + finally: + if owns_bqstorage_client: + bqstorage_client.transport.channel.close() if record_batches: return pyarrow.Table.from_batches(record_batches) @@ -1558,14 +1584,20 @@ def _to_dataframe_iterable(self, bqstorage_client=None, dtypes=None): # If changing the signature of this method, make sure to apply the same # changes to job.QueryJob.to_dataframe() - def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): + def to_dataframe( + self, + bqstorage_client=None, + dtypes=None, + progress_bar_type=None, + create_bqstorage_client=False, + ): """Create a pandas DataFrame by loading all pages of a query. Args: bqstorage_client (google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient): **Beta Feature** Optional. A BigQuery Storage API client. If supplied, use the faster BigQuery Storage API to fetch rows - from BigQuery. This API is a billable API. + from BigQuery. This method requires the ``pyarrow`` and ``google-cloud-bigquery-storage`` libraries. @@ -1602,6 +1634,16 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non progress bar as a graphical dialog box. ..versionadded:: 1.11.0 + create_bqstorage_client (bool): + **Beta Feature** Optional. If ``True``, create a BigQuery + Storage API client using the default API settings. The + BigQuery Storage API is a faster way to fetch rows from + BigQuery. See the ``bqstorage_client`` parameter for more + information. + + This argument does nothing if ``bqstorage_client`` is supplied. + + ..versionadded:: 1.24.0 Returns: pandas.DataFrame: @@ -1621,32 +1663,44 @@ def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=Non if dtypes is None: dtypes = {} - if bqstorage_client and self.max_results is not None: + if ( + bqstorage_client or create_bqstorage_client + ) and self.max_results is not None: warnings.warn( "Cannot use bqstorage_client if max_results is set, " "reverting to fetching data with the tabledata.list endpoint.", stacklevel=2, ) + create_bqstorage_client = False bqstorage_client = None - progress_bar = self._get_progress_bar(progress_bar_type) + owns_bqstorage_client = False + if not bqstorage_client and create_bqstorage_client: + owns_bqstorage_client = True + bqstorage_client = self.client._create_bqstorage_client() - frames = [] - for frame in self._to_dataframe_iterable( - bqstorage_client=bqstorage_client, dtypes=dtypes - ): - frames.append(frame) + try: + progress_bar = self._get_progress_bar(progress_bar_type) - if progress_bar is not None: - # In some cases, the number of total rows is not populated - # until the first page of rows is fetched. Update the - # progress bar's total to keep an accurate count. - progress_bar.total = progress_bar.total or self.total_rows - progress_bar.update(len(frame)) + frames = [] + for frame in self._to_dataframe_iterable( + bqstorage_client=bqstorage_client, dtypes=dtypes + ): + frames.append(frame) + + if progress_bar is not None: + # In some cases, the number of total rows is not populated + # until the first page of rows is fetched. Update the + # progress bar's total to keep an accurate count. + progress_bar.total = progress_bar.total or self.total_rows + progress_bar.update(len(frame)) - if progress_bar is not None: - # Indicate that the download has finished. - progress_bar.close() + if progress_bar is not None: + # Indicate that the download has finished. + progress_bar.close() + finally: + if owns_bqstorage_client: + bqstorage_client.transport.channel.close() # Avoid concatting an empty list. if not frames: @@ -1667,11 +1721,18 @@ class _EmptyRowIterator(object): pages = () total_rows = 0 - def to_arrow(self, progress_bar_type=None): + def to_arrow( + self, + progress_bar_type=None, + bqstorage_client=None, + create_bqstorage_client=False, + ): """[Beta] Create an empty class:`pyarrow.Table`. Args: progress_bar_type (Optional[str]): Ignored. Added for compatibility with RowIterator. + bqstorage_client (Any): Ignored. Added for compatibility with RowIterator. + create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. Returns: pyarrow.Table: An empty :class:`pyarrow.Table`. @@ -1680,13 +1741,20 @@ def to_arrow(self, progress_bar_type=None): raise ValueError(_NO_PYARROW_ERROR) return pyarrow.Table.from_arrays(()) - def to_dataframe(self, bqstorage_client=None, dtypes=None, progress_bar_type=None): + def to_dataframe( + self, + bqstorage_client=None, + dtypes=None, + progress_bar_type=None, + create_bqstorage_client=False, + ): """Create an empty dataframe. Args: bqstorage_client (Any): Ignored. Added for compatibility with RowIterator. dtypes (Any): Ignored. Added for compatibility with RowIterator. progress_bar_type (Any): Ignored. Added for compatibility with RowIterator. + create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. diff --git a/bigquery/samples/download_public_data.py b/bigquery/samples/download_public_data.py new file mode 100644 index 000000000000..815d140fc6f1 --- /dev/null +++ b/bigquery/samples/download_public_data.py @@ -0,0 +1,33 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def download_public_data(client): + + # [START bigquery_pandas_public_data] + # TODO(developer): Import the client library. + # from google.cloud import bigquery + + # TODO(developer): Construct a BigQuery client object. + # client = bigquery.Client() + + # TODO(developer): Set table_id to the fully-qualified table ID in standard + # SQL format, including the project ID and dataset ID. + table_id = "bigquery-public-data.usa_names.usa_1910_current" + + # Use the BigQuery Storage API to speed-up downloads of large tables. + dataframe = client.list_rows(table_id).to_dataframe(create_bqstorage_client=True) + + print(dataframe.info()) + # [END bigquery_pandas_public_data] diff --git a/bigquery/samples/download_public_data_sandbox.py b/bigquery/samples/download_public_data_sandbox.py new file mode 100644 index 000000000000..edb1466e4bd7 --- /dev/null +++ b/bigquery/samples/download_public_data_sandbox.py @@ -0,0 +1,34 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def download_public_data_sandbox(client): + + # [START bigquery_pandas_public_data_sandbox] + # TODO(developer): Import the client library. + # from google.cloud import bigquery + + # TODO(developer): Construct a BigQuery client object. + # client = bigquery.Client() + + # `SELECT *` is an anti-pattern in BigQuery because it is cheaper and + # faster to use the BigQuery Storage API directly, but BigQuery Sandbox + # users can only use the BigQuery Storage API to download query results. + query_string = "SELECT * FROM `bigquery-public-data.usa_names.usa_1910_current`" + + # Use the BigQuery Storage API to speed-up downloads of large tables. + dataframe = client.query(query_string).to_dataframe(create_bqstorage_client=True) + + print(dataframe.info()) + # [END bigquery_pandas_public_data_sandbox] diff --git a/bigquery/samples/tests/test_download_public_data.py b/bigquery/samples/tests/test_download_public_data.py new file mode 100644 index 000000000000..8ee0e6a68c17 --- /dev/null +++ b/bigquery/samples/tests/test_download_public_data.py @@ -0,0 +1,34 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from .. import download_public_data + + +def test_download_public_data(caplog, capsys, client): + # Enable debug-level logging to verify the BigQuery Storage API is used. + caplog.set_level(logging.DEBUG) + + download_public_data.download_public_data(client) + out, _ = capsys.readouterr() + assert "year" in out + assert "gender" in out + assert "name" in out + + assert any( + "Started reading table 'bigquery-public-data.usa_names.usa_1910_current' with BQ Storage API session" + in message + for message in caplog.messages + ) diff --git a/bigquery/samples/tests/test_download_public_data_sandbox.py b/bigquery/samples/tests/test_download_public_data_sandbox.py new file mode 100644 index 000000000000..74dadc1db3fb --- /dev/null +++ b/bigquery/samples/tests/test_download_public_data_sandbox.py @@ -0,0 +1,34 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from .. import download_public_data_sandbox + + +def test_download_public_data_sandbox(caplog, capsys, client): + # Enable debug-level logging to verify the BigQuery Storage API is used. + caplog.set_level(logging.DEBUG) + + download_public_data_sandbox.download_public_data_sandbox(client) + out, err = capsys.readouterr() + assert "year" in out + assert "gender" in out + assert "name" in out + + assert any( + # An anonymous table is used because this sample reads from query results. + ("Started reading table" in message and "BQ Storage API session" in message) + for message in caplog.messages + ) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index e661c86970db..c4cdb7fdfd2f 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -49,6 +49,11 @@ import google.cloud._helpers from google.cloud import bigquery_v2 from google.cloud.bigquery.dataset import DatasetReference + +try: + from google.cloud import bigquery_storage_v1beta1 +except (ImportError, AttributeError): # pragma: NO COVER + bigquery_storage_v1beta1 = None from tests.unit.helpers import make_connection @@ -535,6 +540,26 @@ def test_get_dataset(self): ) self.assertEqual(dataset.dataset_id, self.DS_ID) + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_create_bqstorage_client(self): + mock_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + mock_client_instance = object() + mock_client.return_value = mock_client_instance + creds = _make_credentials() + client = self._make_one(project=self.PROJECT, credentials=creds) + + with mock.patch( + "google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient", mock_client + ): + bqstorage_client = client._create_bqstorage_client() + + self.assertIs(bqstorage_client, mock_client_instance) + mock_client.assert_called_once_with(credentials=creds) + def test_create_dataset_minimal(self): from google.cloud.bigquery.dataset import Dataset diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 97a7b4ae745e..1043df45f9a3 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -26,8 +26,12 @@ try: from google.cloud import bigquery_storage_v1beta1 + from google.cloud.bigquery_storage_v1beta1.gapic.transports import ( + big_query_storage_grpc_transport, + ) except ImportError: # pragma: NO COVER bigquery_storage_v1beta1 = None + big_query_storage_grpc_transport = None try: import pandas @@ -1817,6 +1821,9 @@ def test_to_arrow_w_bqstorage(self): bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) + bqstorage_client.transport = mock.create_autospec( + big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + ) streams = [ # Use two streams we want to check frames are read from each stream. {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, @@ -1882,6 +1889,42 @@ def test_to_arrow_w_bqstorage(self): total_rows = expected_num_rows * total_pages self.assertEqual(actual_tbl.num_rows, total_rows) + # Don't close the client if it was passed in. + bqstorage_client.transport.channel.close.assert_not_called() + + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_arrow_w_bqstorage_creates_client(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + + mock_client = _mock_client() + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + bqstorage_client.transport = mock.create_autospec( + big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + ) + mock_client._create_bqstorage_client.return_value = bqstorage_client + session = bigquery_storage_v1beta1.types.ReadSession() + bqstorage_client.create_read_session.return_value = session + row_iterator = mut.RowIterator( + mock_client, + None, # api_request: ignored + None, # path: ignored + [ + schema.SchemaField("colA", "STRING"), + schema.SchemaField("colC", "STRING"), + schema.SchemaField("colB", "STRING"), + ], + table=mut.TableReference.from_string("proj.dset.tbl"), + ) + row_iterator.to_arrow(create_bqstorage_client=True) + mock_client._create_bqstorage_client.assert_called_once() + bqstorage_client.transport.channel.close.assert_called_once() + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" @@ -2292,6 +2335,76 @@ def test_to_dataframe_max_results_w_bqstorage_warning(self): ] self.assertEqual(len(matches), 1, msg="User warning was not emitted.") + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_to_dataframe_max_results_w_create_bqstorage_warning(self): + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + mock_client = _mock_client() + + row_iterator = self._make_one( + client=mock_client, + api_request=api_request, + path=path, + schema=schema, + max_results=42, + ) + + with warnings.catch_warnings(record=True) as warned: + row_iterator.to_dataframe(create_bqstorage_client=True) + + matches = [ + warning + for warning in warned + if warning.category is UserWarning + and "cannot use bqstorage_client" in str(warning).lower() + and "tabledata.list" in str(warning) + ] + self.assertEqual(len(matches), 1, msg="User warning was not emitted.") + mock_client._create_bqstorage_client.assert_not_called() + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_dataframe_w_bqstorage_creates_client(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + + mock_client = _mock_client() + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + bqstorage_client.transport = mock.create_autospec( + big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + ) + mock_client._create_bqstorage_client.return_value = bqstorage_client + session = bigquery_storage_v1beta1.types.ReadSession() + bqstorage_client.create_read_session.return_value = session + row_iterator = mut.RowIterator( + mock_client, + None, # api_request: ignored + None, # path: ignored + [ + schema.SchemaField("colA", "STRING"), + schema.SchemaField("colC", "STRING"), + schema.SchemaField("colB", "STRING"), + ], + table=mut.TableReference.from_string("proj.dset.tbl"), + ) + row_iterator.to_dataframe(create_bqstorage_client=True) + mock_client._create_bqstorage_client.assert_called_once() + bqstorage_client.transport.channel.close.assert_called_once() + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" @@ -2427,6 +2540,9 @@ def test_to_dataframe_w_bqstorage_nonempty(self): bqstorage_client = mock.create_autospec( bigquery_storage_v1beta1.BigQueryStorageClient ) + bqstorage_client.transport = mock.create_autospec( + big_query_storage_grpc_transport.BigQueryStorageGrpcTransport + ) streams = [ # Use two streams we want to check frames are read from each stream. {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, @@ -2481,6 +2597,9 @@ def test_to_dataframe_w_bqstorage_nonempty(self): total_rows = len(page_items) * total_pages self.assertEqual(len(got.index), total_rows) + # Don't close the client if it was passed in. + bqstorage_client.transport.channel.close.assert_not_called() + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"