From 730df17ae1ab0b0bb2454f3c134c8f62665bc51b Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 24 Nov 2020 16:44:02 -0600 Subject: [PATCH] perf: don't fetch rows when waiting for query to finish (#400) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When there are large result sets, fetching rows while waiting for the query to finish can cause the API to hang indefinitely. (This may be due to an interaction between connection timeout and API timeout.) This reverts commit 86f6a516d1c7c5dc204ab085ea2578793e6561ff (#374). Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [x] Appropriate docs were updated (if necessary) Fixes https://github.com/pydata/pandas-gbq/issues/343 Fixes #394 🦕 --- google/cloud/bigquery/client.py | 4 +- google/cloud/bigquery/job/query.py | 5 --- tests/unit/job/test_query.py | 55 +++++++--------------------- tests/unit/job/test_query_pandas.py | 44 ++++++++-------------- tests/unit/test_client.py | 4 +- tests/unit/test_table.py | 57 +++++++++++++++++++++++++++++ 6 files changed, 89 insertions(+), 80 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index c67ef54e0..cd1474336 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -1534,7 +1534,7 @@ def _get_query_results( A new ``_QueryResults`` instance. """ - extra_params = {} + extra_params = {"maxResults": 0} if project is None: project = self.project @@ -3187,7 +3187,6 @@ def _list_rows_from_query_results( page_size=None, retry=DEFAULT_RETRY, timeout=None, - first_page_response=None, ): """List the rows of a completed query. See @@ -3248,7 +3247,6 @@ def _list_rows_from_query_results( table=destination, extra_params=params, total_rows=total_rows, - first_page_response=first_page_response, ) return row_iterator diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 7a1a74954..9e8908613 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1177,10 +1177,6 @@ def result( if self._query_results.total_rows is None: return _EmptyRowIterator() - first_page_response = None - if max_results is None and page_size is None and start_index is None: - first_page_response = self._query_results._properties - rows = self._client._list_rows_from_query_results( self.job_id, self.location, @@ -1193,7 +1189,6 @@ def result( start_index=start_index, retry=retry, timeout=timeout, - first_page_response=first_page_response, ) rows._preserve_order = _contains_order_by(self.query) return rows diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index 41e31f469..daaf2e557 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -787,9 +787,7 @@ def test_result(self): "location": "EU", }, "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, - "totalRows": "3", - "rows": [{"f": [{"v": "abc"}]}], - "pageToken": "next-page", + "totalRows": "2", } job_resource = self._make_resource(started=True, location="EU") job_resource_done = self._make_resource(started=True, ended=True, location="EU") @@ -801,9 +799,9 @@ def test_result(self): query_page_resource = { # Explicitly set totalRows to be different from the initial # response to test update during iteration. - "totalRows": "2", + "totalRows": "1", "pageToken": None, - "rows": [{"f": [{"v": "def"}]}], + "rows": [{"f": [{"v": "abc"}]}], } conn = _make_connection( query_resource, query_resource_done, job_resource_done, query_page_resource @@ -814,20 +812,19 @@ def test_result(self): result = job.result() self.assertIsInstance(result, RowIterator) - self.assertEqual(result.total_rows, 3) + self.assertEqual(result.total_rows, 2) rows = list(result) - self.assertEqual(len(rows), 2) + self.assertEqual(len(rows), 1) self.assertEqual(rows[0].col1, "abc") - self.assertEqual(rows[1].col1, "def") # Test that the total_rows property has changed during iteration, based # on the response from tabledata.list. - self.assertEqual(result.total_rows, 2) + self.assertEqual(result.total_rows, 1) query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}" query_results_call = mock.call( method="GET", path=query_results_path, - query_params={"location": "EU"}, + query_params={"maxResults": 0, "location": "EU"}, timeout=None, ) reload_call = mock.call( @@ -842,7 +839,6 @@ def test_result(self): query_params={ "fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS, "location": "EU", - "pageToken": "next-page", }, timeout=None, ) @@ -855,9 +851,7 @@ def test_result_with_done_job_calls_get_query_results(self): "jobComplete": True, "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, - "totalRows": "2", - "rows": [{"f": [{"v": "abc"}]}], - "pageToken": "next-page", + "totalRows": "1", } job_resource = self._make_resource(started=True, ended=True, location="EU") job_resource["configuration"]["query"]["destinationTable"] = { @@ -866,9 +860,9 @@ def test_result_with_done_job_calls_get_query_results(self): "tableId": "dest_table", } results_page_resource = { - "totalRows": "2", + "totalRows": "1", "pageToken": None, - "rows": [{"f": [{"v": "def"}]}], + "rows": [{"f": [{"v": "abc"}]}], } conn = _make_connection(query_resource_done, results_page_resource) client = _make_client(self.PROJECT, connection=conn) @@ -877,15 +871,14 @@ def test_result_with_done_job_calls_get_query_results(self): result = job.result() rows = list(result) - self.assertEqual(len(rows), 2) + self.assertEqual(len(rows), 1) self.assertEqual(rows[0].col1, "abc") - self.assertEqual(rows[1].col1, "def") query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}" query_results_call = mock.call( method="GET", path=query_results_path, - query_params={"location": "EU"}, + query_params={"maxResults": 0, "location": "EU"}, timeout=None, ) query_results_page_call = mock.call( @@ -894,7 +887,6 @@ def test_result_with_done_job_calls_get_query_results(self): query_params={ "fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS, "location": "EU", - "pageToken": "next-page", }, timeout=None, ) @@ -908,12 +900,6 @@ def test_result_with_max_results(self): "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, "totalRows": "5", - # These rows are discarded because max_results is set. - "rows": [ - {"f": [{"v": "xyz"}]}, - {"f": [{"v": "uvw"}]}, - {"f": [{"v": "rst"}]}, - ], } query_page_resource = { "totalRows": "5", @@ -939,7 +925,6 @@ def test_result_with_max_results(self): rows = list(result) self.assertEqual(len(rows), 3) - self.assertEqual(rows[0].col1, "abc") self.assertEqual(len(connection.api_request.call_args_list), 2) query_page_request = connection.api_request.call_args_list[1] self.assertEqual( @@ -994,7 +979,7 @@ def test_result_w_retry(self): query_results_call = mock.call( method="GET", path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}", - query_params={"location": "asia-northeast1"}, + query_params={"maxResults": 0, "location": "asia-northeast1"}, timeout=None, ) reload_call = mock.call( @@ -1094,12 +1079,6 @@ def test_result_w_page_size(self): "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, "totalRows": "4", - # These rows are discarded because page_size is set. - "rows": [ - {"f": [{"v": "xyz"}]}, - {"f": [{"v": "uvw"}]}, - {"f": [{"v": "rst"}]}, - ], } job_resource = self._make_resource(started=True, ended=True, location="US") q_config = job_resource["configuration"]["query"] @@ -1130,7 +1109,6 @@ def test_result_w_page_size(self): # Assert actual_rows = list(result) self.assertEqual(len(actual_rows), 4) - self.assertEqual(actual_rows[0].col1, "row1") query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}" query_page_1_call = mock.call( @@ -1164,12 +1142,6 @@ def test_result_with_start_index(self): "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, "totalRows": "5", - # These rows are discarded because start_index is set. - "rows": [ - {"f": [{"v": "xyz"}]}, - {"f": [{"v": "uvw"}]}, - {"f": [{"v": "rst"}]}, - ], } tabledata_resource = { "totalRows": "5", @@ -1196,7 +1168,6 @@ def test_result_with_start_index(self): rows = list(result) self.assertEqual(len(rows), 4) - self.assertEqual(rows[0].col1, "abc") self.assertEqual(len(connection.api_request.call_args_list), 2) tabledata_list_request = connection.api_request.call_args_list[1] self.assertEqual( diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index f9d823eb0..cdd6f2b3c 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -100,7 +100,6 @@ def test_to_dataframe_bqstorage_preserve_order(query): ] }, "totalRows": "4", - "pageToken": "next-page", } connection = _make_connection(get_query_results_resource, job_resource) client = _make_client(connection=connection) @@ -135,16 +134,7 @@ def test_to_dataframe_bqstorage_preserve_order(query): @pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") -@pytest.mark.parametrize( - "method_kwargs", - [ - {"create_bqstorage_client": False}, - # Since all rows are contained in the first page of results, the BigQuery - # Storage API won't actually be used. - {"create_bqstorage_client": True}, - ], -) -def test_to_arrow(method_kwargs): +def test_to_arrow(): from google.cloud.bigquery.job import QueryJob as target_class begun_resource = _make_job_resource(job_type="query") @@ -172,6 +162,8 @@ def test_to_arrow(method_kwargs): }, ] }, + } + tabledata_resource = { "rows": [ { "f": [ @@ -185,15 +177,17 @@ def test_to_arrow(method_kwargs): {"v": {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}}, ] }, - ], + ] } done_resource = copy.deepcopy(begun_resource) done_resource["status"] = {"state": "DONE"} - connection = _make_connection(begun_resource, query_resource, done_resource) + connection = _make_connection( + begun_resource, query_resource, done_resource, tabledata_resource + ) client = _make_client(connection=connection) job = target_class.from_api_repr(begun_resource, client) - tbl = job.to_arrow(**method_kwargs) + tbl = job.to_arrow(create_bqstorage_client=False) assert isinstance(tbl, pyarrow.Table) assert tbl.num_rows == 2 @@ -375,16 +369,7 @@ def test_to_arrow_w_tqdm_wo_query_plan(): @pytest.mark.skipif(pandas is None, reason="Requires `pandas`") -@pytest.mark.parametrize( - "method_kwargs", - [ - {"create_bqstorage_client": False}, - # Since all rows are contained in the first page of results, the BigQuery - # Storage API won't actually be used. - {"create_bqstorage_client": True}, - ], -) -def test_to_dataframe(method_kwargs): +def test_to_dataframe(): from google.cloud.bigquery.job import QueryJob as target_class begun_resource = _make_job_resource(job_type="query") @@ -398,20 +383,24 @@ def test_to_dataframe(method_kwargs): {"name": "age", "type": "INTEGER", "mode": "NULLABLE"}, ] }, + } + tabledata_resource = { "rows": [ {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, {"f": [{"v": "Wylma Phlyntstone"}, {"v": "29"}]}, {"f": [{"v": "Bhettye Rhubble"}, {"v": "27"}]}, - ], + ] } done_resource = copy.deepcopy(begun_resource) done_resource["status"] = {"state": "DONE"} - connection = _make_connection(begun_resource, query_resource, done_resource) + connection = _make_connection( + begun_resource, query_resource, done_resource, tabledata_resource + ) client = _make_client(connection=connection) job = target_class.from_api_repr(begun_resource, client) - df = job.to_dataframe(**method_kwargs) + df = job.to_dataframe(create_bqstorage_client=False) assert isinstance(df, pandas.DataFrame) assert len(df) == 4 # verify the number of rows @@ -456,7 +445,6 @@ def test_to_dataframe_bqstorage(): {"name": "age", "type": "INTEGER", "mode": "NULLABLE"}, ] }, - "pageToken": "next-page", } connection = _make_connection(query_resource) client = _make_client(connection=connection) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 4fba1150c..c4bdea2f8 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -319,7 +319,7 @@ def test__get_query_results_miss_w_explicit_project_and_timeout(self): conn.api_request.assert_called_once_with( method="GET", path=path, - query_params={"timeoutMs": 500, "location": self.LOCATION}, + query_params={"maxResults": 0, "timeoutMs": 500, "location": self.LOCATION}, timeout=42, ) @@ -336,7 +336,7 @@ def test__get_query_results_miss_w_client_location(self): conn.api_request.assert_called_once_with( method="GET", path="/projects/PROJECT/queries/nothere", - query_params={"location": self.LOCATION}, + query_params={"maxResults": 0, "location": self.LOCATION}, timeout=None, ) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index be67eafcd..1dd5fab46 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -1630,6 +1630,40 @@ def test_iterate(self): api_request.assert_called_once_with(method="GET", path=path, query_params={}) + def test_iterate_with_cached_first_page(self): + from google.cloud.bigquery.schema import SchemaField + + first_page = { + "rows": [ + {"f": [{"v": "Whillma Phlyntstone"}, {"v": "27"}]}, + {"f": [{"v": "Bhetty Rhubble"}, {"v": "28"}]}, + ], + "pageToken": "next-page", + } + schema = [ + SchemaField("name", "STRING", mode="REQUIRED"), + SchemaField("age", "INTEGER", mode="REQUIRED"), + ] + rows = [ + {"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]}, + {"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]}, + ] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = self._make_one( + _mock_client(), api_request, path, schema, first_page_response=first_page + ) + rows = list(row_iterator) + self.assertEqual(len(rows), 4) + self.assertEqual(rows[0].age, 27) + self.assertEqual(rows[1].age, 28) + self.assertEqual(rows[2].age, 32) + self.assertEqual(rows[3].age, 33) + + api_request.assert_called_once_with( + method="GET", path=path, query_params={"pageToken": "next-page"} + ) + def test_page_size(self): from google.cloud.bigquery.schema import SchemaField @@ -1655,6 +1689,29 @@ def test_page_size(self): query_params={"maxResults": row_iterator._page_size}, ) + def test__is_completely_cached_returns_false_without_first_page(self): + iterator = self._make_one(first_page_response=None) + self.assertFalse(iterator._is_completely_cached()) + + def test__is_completely_cached_returns_false_with_page_token(self): + first_page = {"pageToken": "next-page"} + iterator = self._make_one(first_page_response=first_page) + self.assertFalse(iterator._is_completely_cached()) + + def test__is_completely_cached_returns_true(self): + first_page = {"rows": []} + iterator = self._make_one(first_page_response=first_page) + self.assertTrue(iterator._is_completely_cached()) + + def test__validate_bqstorage_returns_false_when_completely_cached(self): + first_page = {"rows": []} + iterator = self._make_one(first_page_response=first_page) + self.assertFalse( + iterator._validate_bqstorage( + bqstorage_client=None, create_bqstorage_client=True + ) + ) + @unittest.skipIf(pyarrow is None, "Requires `pyarrow`") def test_to_arrow(self): from google.cloud.bigquery.schema import SchemaField