Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add args from RowIterator.to_dataframe() to QueryJob.to_dataframe() #7241

Merged
merged 2 commits into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2778,9 +2778,34 @@ def result(self, timeout=None, retry=DEFAULT_RETRY):
dest_table = Table(dest_table_ref, schema=schema)
return self._client.list_rows(dest_table, retry=retry)

def to_dataframe(self):
def to_dataframe(self, bqstorage_client=None, dtypes=None):
"""Return a pandas DataFrame from a QueryJob

Args:
bqstorage_client ( \
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
):
**Alpha Feature** Optional. A BigQuery Storage API client. If
supplied, use the faster BigQuery Storage API to fetch rows
from BigQuery. This API is a billable API.

This method requires the ``fastavro`` and
``google-cloud-bigquery-storage`` libraries.

Reading from a specific partition or snapshot is not
currently supported by this method.

**Caution**: There is a known issue reading small anonymous
query result tables with the BQ Storage API. Write your query
results to a destination table to work around this issue.
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.

Returns:
A :class:`~pandas.DataFrame` populated with row data and column
headers from the query results. The column headers are derived
Expand All @@ -2789,7 +2814,9 @@ def to_dataframe(self):
Raises:
ValueError: If the `pandas` library cannot be imported.
"""
return self.result().to_dataframe()
return self.result().to_dataframe(
bqstorage_client=bqstorage_client, dtypes=dtypes
)

def __iter__(self):
return iter(self.result())
Expand Down
14 changes: 13 additions & 1 deletion bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,19 @@ class _EmptyRowIterator(object):
pages = ()
total_rows = 0

def to_dataframe(self):
def to_dataframe(self, bqstorage_client=None, dtypes=None):
"""Create an empty dataframe.

Args:
bqstorage_client (Any):
Ignored. Added for compatibility with RowIterator.
dtypes (Any):
Ignored. Added for compatibility with RowIterator.

Returns:
pandas.DataFrame:
An empty :class:`~pandas.DataFrame`.
"""
if pandas is None:
raise ValueError(_NO_PANDAS_ERROR)
return pandas.DataFrame()
Expand Down
113 changes: 113 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import copy
import json
import unittest

import mock
Expand All @@ -22,6 +23,10 @@
import pandas
except (ImportError, AttributeError): # pragma: NO COVER
pandas = None
try:
from google.cloud import bigquery_storage_v1beta1
except (ImportError, AttributeError): # pragma: NO COVER
bigquery_storage_v1beta1 = None


def _make_credentials():
Expand Down Expand Up @@ -4543,6 +4548,114 @@ def test_to_dataframe(self):
self.assertEqual(len(df), 4) # verify the number of rows
self.assertEqual(list(df), ["name", "age"]) # verify the column names

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_dataframe_ddl_query(self):
# Destination table may have no schema for some DDL and DML queries.
query_resource = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": []},
}
connection = _make_connection(query_resource)
client = _make_client(self.PROJECT, connection=connection)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)

df = job.to_dataframe()

self.assertEqual(len(df), 0)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_bqstorage(self):
query_resource = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"totalRows": "4",
"schema": {
"fields": [
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
]
},
}
connection = _make_connection(query_resource)
client = _make_client(self.PROJECT, connection=connection)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession()
session.avro_schema.schema = json.dumps(
{
"type": "record",
"name": "__root__",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "age", "type": ["null", "long"]},
],
}
)
bqstorage_client.create_read_session.return_value = session

job.to_dataframe(bqstorage_client=bqstorage_client)

bqstorage_client.create_read_session.assert_called_once_with(
mock.ANY, "projects/{}".format(self.PROJECT), read_options=mock.ANY
)

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_dataframe_column_dtypes(self):
begun_resource = self._make_resource()
query_resource = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"totalRows": "4",
"schema": {
"fields": [
{"name": "start_timestamp", "type": "TIMESTAMP"},
{"name": "seconds", "type": "INT64"},
{"name": "miles", "type": "FLOAT64"},
{"name": "km", "type": "FLOAT64"},
{"name": "payment_type", "type": "STRING"},
{"name": "complete", "type": "BOOL"},
{"name": "date", "type": "DATE"},
]
},
}
row_data = [
["1.4338368E9", "420", "1.1", "1.77", "Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"],
]
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
query_resource["rows"] = rows
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = _make_connection(
begun_resource, query_resource, done_resource, query_resource
)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)

df = job.to_dataframe(dtypes={"km": "float16"})

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 3) # verify the number of rows
exp_columns = [field["name"] for field in query_resource["schema"]["fields"]]
self.assertEqual(list(df), exp_columns) # verify the column names

self.assertEqual(df.start_timestamp.dtype.name, "datetime64[ns, UTC]")
self.assertEqual(df.seconds.dtype.name, "int64")
self.assertEqual(df.miles.dtype.name, "float64")
self.assertEqual(df.km.dtype.name, "float16")
self.assertEqual(df.payment_type.dtype.name, "object")
self.assertEqual(df.complete.dtype.name, "bool")
self.assertEqual(df.date.dtype.name, "object")

def test_iter(self):
import types

Expand Down