Skip to content

Commit

Permalink
Add unit tests for new QueryJob.to_dataframe() arguments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tswast committed Feb 1, 2019
1 parent 2b0c3d1 commit 276d08d
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 1 deletion.
14 changes: 13 additions & 1 deletion bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1441,7 +1441,19 @@ class _EmptyRowIterator(object):
pages = ()
total_rows = 0

def to_dataframe(self):
def to_dataframe(self, bqstorage_client=None, dtypes=None):
"""Create an empty dataframe.
Args:
bqstorage_client (Any):
Ignored. Added for compatibility with RowIterator.
dtypes (Any):
Ignored. Added for compatibility with RowIterator.
Returns:
pandas.DataFrame:
An empty :class:`~pandas.DataFrame`.
"""
if pandas is None:
raise ValueError(_NO_PANDAS_ERROR)
return pandas.DataFrame()
Expand Down
113 changes: 113 additions & 0 deletions bigquery/tests/unit/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import copy
import json
import unittest

import mock
Expand All @@ -22,6 +23,10 @@
import pandas
except (ImportError, AttributeError): # pragma: NO COVER
pandas = None
try:
from google.cloud import bigquery_storage_v1beta1
except (ImportError, AttributeError): # pragma: NO COVER
bigquery_storage_v1beta1 = None


def _make_credentials():
Expand Down Expand Up @@ -4543,6 +4548,114 @@ def test_to_dataframe(self):
self.assertEqual(len(df), 4) # verify the number of rows
self.assertEqual(list(df), ["name", "age"]) # verify the column names

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_dataframe_ddl_query(self):
# Destination table may have no schema for some DDL and DML queries.
query_resource = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"schema": {"fields": []},
}
connection = _make_connection(query_resource)
client = _make_client(self.PROJECT, connection=connection)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)

df = job.to_dataframe()

self.assertEqual(len(df), 0)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
)
def test_to_dataframe_bqstorage(self):
query_resource = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"totalRows": "4",
"schema": {
"fields": [
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "age", "type": "INTEGER", "mode": "NULLABLE"},
]
},
}
connection = _make_connection(query_resource)
client = _make_client(self.PROJECT, connection=connection)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
bqstorage_client = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
session = bigquery_storage_v1beta1.types.ReadSession()
session.avro_schema.schema = json.dumps(
{
"type": "record",
"name": "__root__",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "age", "type": ["null", "long"]},
],
}
)
bqstorage_client.create_read_session.return_value = session

job.to_dataframe(bqstorage_client=bqstorage_client)

bqstorage_client.create_read_session.assert_called_once_with(
mock.ANY, "projects/{}".format(self.PROJECT), read_options=mock.ANY
)

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_dataframe_column_dtypes(self):
begun_resource = self._make_resource()
query_resource = {
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"totalRows": "4",
"schema": {
"fields": [
{"name": "start_timestamp", "type": "TIMESTAMP"},
{"name": "seconds", "type": "INT64"},
{"name": "miles", "type": "FLOAT64"},
{"name": "km", "type": "FLOAT64"},
{"name": "payment_type", "type": "STRING"},
{"name": "complete", "type": "BOOL"},
{"name": "date", "type": "DATE"},
]
},
}
row_data = [
["1.4338368E9", "420", "1.1", "1.77", "Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"],
]
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
query_resource["rows"] = rows
done_resource = copy.deepcopy(begun_resource)
done_resource["status"] = {"state": "DONE"}
connection = _make_connection(
begun_resource, query_resource, done_resource, query_resource
)
client = _make_client(project=self.PROJECT, connection=connection)
job = self._make_one(self.JOB_ID, self.QUERY, client)

df = job.to_dataframe(dtypes={"km": "float16"})

self.assertIsInstance(df, pandas.DataFrame)
self.assertEqual(len(df), 3) # verify the number of rows
exp_columns = [field["name"] for field in query_resource["schema"]["fields"]]
self.assertEqual(list(df), exp_columns) # verify the column names

self.assertEqual(df.start_timestamp.dtype.name, "datetime64[ns, UTC]")
self.assertEqual(df.seconds.dtype.name, "int64")
self.assertEqual(df.miles.dtype.name, "float64")
self.assertEqual(df.km.dtype.name, "float16")
self.assertEqual(df.payment_type.dtype.name, "object")
self.assertEqual(df.complete.dtype.name, "bool")
self.assertEqual(df.date.dtype.name, "object")

def test_iter(self):
import types

Expand Down

0 comments on commit 276d08d

Please sign in to comment.