diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index f72bb320c80f..4df7a92ba63c 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -2778,9 +2778,34 @@ def result(self, timeout=None, retry=DEFAULT_RETRY): dest_table = Table(dest_table_ref, schema=schema) return self._client.list_rows(dest_table, retry=retry) - def to_dataframe(self): + def to_dataframe(self, bqstorage_client=None, dtypes=None): """Return a pandas DataFrame from a QueryJob + Args: + bqstorage_client ( \ + google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \ + ): + **Alpha Feature** Optional. A BigQuery Storage API client. If + supplied, use the faster BigQuery Storage API to fetch rows + from BigQuery. This API is a billable API. + + This method requires the ``fastavro`` and + ``google-cloud-bigquery-storage`` libraries. + + Reading from a specific partition or snapshot is not + currently supported by this method. + + **Caution**: There is a known issue reading small anonymous + query result tables with the BQ Storage API. Write your query + results to a destination table to work around this issue. + dtypes ( \ + Map[str, Union[str, pandas.Series.dtype]] \ + ): + Optional. A dictionary of column names pandas ``dtype``s. The + provided ``dtype`` is used when constructing the series for + the column specified. Otherwise, the default pandas behavior + is used. + Returns: A :class:`~pandas.DataFrame` populated with row data and column headers from the query results. The column headers are derived @@ -2789,7 +2814,9 @@ def to_dataframe(self): Raises: ValueError: If the `pandas` library cannot be imported. """ - return self.result().to_dataframe() + return self.result().to_dataframe( + bqstorage_client=bqstorage_client, dtypes=dtypes + ) def __iter__(self): return iter(self.result()) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index b7668fd28ed1..7f3392631adb 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1441,7 +1441,19 @@ class _EmptyRowIterator(object): pages = () total_rows = 0 - def to_dataframe(self): + def to_dataframe(self, bqstorage_client=None, dtypes=None): + """Create an empty dataframe. + + Args: + bqstorage_client (Any): + Ignored. Added for compatibility with RowIterator. + dtypes (Any): + Ignored. Added for compatibility with RowIterator. + + Returns: + pandas.DataFrame: + An empty :class:`~pandas.DataFrame`. + """ if pandas is None: raise ValueError(_NO_PANDAS_ERROR) return pandas.DataFrame() diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index 75a84d77e2c7..5023c41e6374 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -13,6 +13,7 @@ # limitations under the License. import copy +import json import unittest import mock @@ -22,6 +23,10 @@ import pandas except (ImportError, AttributeError): # pragma: NO COVER pandas = None +try: + from google.cloud import bigquery_storage_v1beta1 +except (ImportError, AttributeError): # pragma: NO COVER + bigquery_storage_v1beta1 = None def _make_credentials(): @@ -4543,6 +4548,114 @@ def test_to_dataframe(self): self.assertEqual(len(df), 4) # verify the number of rows self.assertEqual(list(df), ["name", "age"]) # verify the column names + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_to_dataframe_ddl_query(self): + # Destination table may have no schema for some DDL and DML queries. + query_resource = { + "jobComplete": True, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "schema": {"fields": []}, + } + connection = _make_connection(query_resource) + client = _make_client(self.PROJECT, connection=connection) + resource = self._make_resource(ended=True) + job = self._get_target_class().from_api_repr(resource, client) + + df = job.to_dataframe() + + self.assertEqual(len(df), 0) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_dataframe_bqstorage(self): + query_resource = { + "jobComplete": True, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "totalRows": "4", + "schema": { + "fields": [ + {"name": "name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "age", "type": "INTEGER", "mode": "NULLABLE"}, + ] + }, + } + connection = _make_connection(query_resource) + client = _make_client(self.PROJECT, connection=connection) + resource = self._make_resource(ended=True) + job = self._get_target_class().from_api_repr(resource, client) + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient + ) + session = bigquery_storage_v1beta1.types.ReadSession() + session.avro_schema.schema = json.dumps( + { + "type": "record", + "name": "__root__", + "fields": [ + {"name": "name", "type": ["null", "string"]}, + {"name": "age", "type": ["null", "long"]}, + ], + } + ) + bqstorage_client.create_read_session.return_value = session + + job.to_dataframe(bqstorage_client=bqstorage_client) + + bqstorage_client.create_read_session.assert_called_once_with( + mock.ANY, "projects/{}".format(self.PROJECT), read_options=mock.ANY + ) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_to_dataframe_column_dtypes(self): + begun_resource = self._make_resource() + query_resource = { + "jobComplete": True, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "totalRows": "4", + "schema": { + "fields": [ + {"name": "start_timestamp", "type": "TIMESTAMP"}, + {"name": "seconds", "type": "INT64"}, + {"name": "miles", "type": "FLOAT64"}, + {"name": "km", "type": "FLOAT64"}, + {"name": "payment_type", "type": "STRING"}, + {"name": "complete", "type": "BOOL"}, + {"name": "date", "type": "DATE"}, + ] + }, + } + row_data = [ + ["1.4338368E9", "420", "1.1", "1.77", "Cash", "true", "1999-12-01"], + ["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"], + ["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"], + ] + rows = [{"f": [{"v": field} for field in row]} for row in row_data] + query_resource["rows"] = rows + done_resource = copy.deepcopy(begun_resource) + done_resource["status"] = {"state": "DONE"} + connection = _make_connection( + begun_resource, query_resource, done_resource, query_resource + ) + client = _make_client(project=self.PROJECT, connection=connection) + job = self._make_one(self.JOB_ID, self.QUERY, client) + + df = job.to_dataframe(dtypes={"km": "float16"}) + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 3) # verify the number of rows + exp_columns = [field["name"] for field in query_resource["schema"]["fields"]] + self.assertEqual(list(df), exp_columns) # verify the column names + + self.assertEqual(df.start_timestamp.dtype.name, "datetime64[ns, UTC]") + self.assertEqual(df.seconds.dtype.name, "int64") + self.assertEqual(df.miles.dtype.name, "float64") + self.assertEqual(df.km.dtype.name, "float16") + self.assertEqual(df.payment_type.dtype.name, "object") + self.assertEqual(df.complete.dtype.name, "bool") + self.assertEqual(df.date.dtype.name, "object") + def test_iter(self): import types