Skip to content

Commit

Permalink
fix(bigquery): to_dataframe uses 2x faster to_arrow + to_pandas when …
Browse files Browse the repository at this point in the history
…pyarrow is available
  • Loading branch information
tswast committed Dec 30, 2019
1 parent 154c8ec commit 12654c9
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 61 deletions.
63 changes: 40 additions & 23 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,17 @@ def to_arrow(
if pyarrow is None:
raise ValueError(_NO_PYARROW_ERROR)

if (
bqstorage_client or create_bqstorage_client
) and self.max_results is not None:
warnings.warn(
"Cannot use bqstorage_client if max_results is set, "
"reverting to fetching data with the tabledata.list endpoint.",
stacklevel=2,
)
create_bqstorage_client = False
bqstorage_client = None

owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
owns_bqstorage_client = True
Expand Down Expand Up @@ -1674,33 +1685,39 @@ def to_dataframe(
create_bqstorage_client = False
bqstorage_client = None

owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
owns_bqstorage_client = True
bqstorage_client = self.client._create_bqstorage_client()

try:
progress_bar = self._get_progress_bar(progress_bar_type)
if pyarrow is not None:
# If pyarrow is available, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster. This is because pandas.concat is
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
record_batch = self.to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
)
df = record_batch.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df

frames = []
for frame in self._to_dataframe_iterable(
bqstorage_client=bqstorage_client, dtypes=dtypes
):
frames.append(frame)
# The bqstorage_client is only used if pyarrow is available, so the
# rest of this method only needs to account for tabledata.list.
progress_bar = self._get_progress_bar(progress_bar_type)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))
frames = []
for frame in self._to_dataframe_iterable(dtypes=dtypes):
frames.append(frame)

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()
finally:
if owns_bqstorage_client:
bqstorage_client.transport.channel.close()
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()

# Avoid concatting an empty list.
if not frames:
Expand Down
121 changes: 83 additions & 38 deletions bigquery/tests/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1809,6 +1809,43 @@ def test_to_arrow_w_empty_table(self):
self.assertEqual(child_field.type.value_type[0].name, "name")
self.assertEqual(child_field.type.value_type[1].name, "age")

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_to_arrow_max_results_w_create_bqstorage_warning(self):
from google.cloud.bigquery.schema import SchemaField

schema = [
SchemaField("name", "STRING", mode="REQUIRED"),
SchemaField("age", "INTEGER", mode="REQUIRED"),
]
rows = [
{"f": [{"v": "Phred Phlyntstone"}, {"v": "32"}]},
{"f": [{"v": "Bharney Rhubble"}, {"v": "33"}]},
]
path = "/foo"
api_request = mock.Mock(return_value={"rows": rows})
mock_client = _mock_client()

row_iterator = self._make_one(
client=mock_client,
api_request=api_request,
path=path,
schema=schema,
max_results=42,
)

with warnings.catch_warnings(record=True) as warned:
row_iterator.to_arrow(create_bqstorage_client=True)

matches = [
warning
for warning in warned
if warning.category is UserWarning
and "cannot use bqstorage_client" in str(warning).lower()
and "tabledata.list" in str(warning)
]
self.assertEqual(len(matches), 1, msg="User warning was not emitted.")
mock_client._create_bqstorage_client.assert_not_called()

@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
@unittest.skipIf(
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
Expand Down Expand Up @@ -1856,7 +1893,7 @@ def test_to_arrow_w_bqstorage(self):

mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays(
page_items, arrow_schema
page_items, schema=arrow_schema
)
mock_pages = (mock_page, mock_page, mock_page)
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)
Expand Down Expand Up @@ -2216,9 +2253,9 @@ def test_to_dataframe_w_various_types_nullable(self):
]
row_data = [
[None, None, None, None, None, None],
["1.4338368E9", "420", "1.1", "Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", "Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", "Credit", "true", "1981-11-04"],
["1.4338368E9", "420", "1.1", u"Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", u"Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", u"Credit", "true", "1981-11-04"],
]
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
path = "/foo"
Expand All @@ -2238,7 +2275,7 @@ def test_to_dataframe_w_various_types_nullable(self):
else:
self.assertIsInstance(row.start_timestamp, pandas.Timestamp)
self.assertIsInstance(row.seconds, float)
self.assertIsInstance(row.payment_type, str)
self.assertIsInstance(row.payment_type, six.string_types)
self.assertIsInstance(row.complete, bool)
self.assertIsInstance(row.date, datetime.date)

Expand All @@ -2256,9 +2293,9 @@ def test_to_dataframe_column_dtypes(self):
SchemaField("date", "DATE"),
]
row_data = [
["1.4338368E9", "420", "1.1", "1.77", "Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", "28.5", "Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", "7.1", "Credit", "true", "1981-11-04"],
["1.4338368E9", "420", "1.1", "1.77", u"Cash", "true", "1999-12-01"],
["1.3878117E9", "2580", "17.7", "28.5", u"Cash", "false", "1953-06-14"],
["1.3855653E9", "2280", "4.4", "7.1", u"Credit", "true", "1981-11-04"],
]
rows = [{"f": [{"v": field} for field in row]} for row in row_data]
path = "/foo"
Expand Down Expand Up @@ -2424,9 +2461,9 @@ def test_to_dataframe_w_bqstorage_no_streams(self):
api_request=None,
path=None,
schema=[
schema.SchemaField("colA", "IGNORED"),
schema.SchemaField("colC", "IGNORED"),
schema.SchemaField("colB", "IGNORED"),
schema.SchemaField("colA", "INTEGER"),
schema.SchemaField("colC", "FLOAT"),
schema.SchemaField("colB", "STRING"),
],
table=mut.TableReference.from_string("proj.dset.tbl"),
)
Expand Down Expand Up @@ -2498,10 +2535,11 @@ def test_to_dataframe_w_bqstorage_empty_streams(self):
mock_pages = mock.PropertyMock(return_value=())
type(mock_rows).pages = mock_pages

# Schema is required when there are no record batches in the stream.
schema = [
schema.SchemaField("colA", "IGNORED"),
schema.SchemaField("colC", "IGNORED"),
schema.SchemaField("colB", "IGNORED"),
schema.SchemaField("colA", "INTEGER"),
schema.SchemaField("colC", "FLOAT"),
schema.SchemaField("colB", "STRING"),
]

row_iterator = mut.RowIterator(
Expand Down Expand Up @@ -2560,14 +2598,15 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows
page_items = [
{"colA": 1, "colB": "abc", "colC": 2.0},
{"colA": -1, "colB": "def", "colC": 4.0},
pyarrow.array([1, -1]),
pyarrow.array([2.0, 4.0]),
pyarrow.array(["abc", "def"]),
]

mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_dataframe.return_value = pandas.DataFrame(
page_items, columns=["colA", "colB", "colC"]
page_record_batch = pyarrow.RecordBatch.from_arrays(
page_items, schema=arrow_schema
)
mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_arrow.return_value = page_record_batch
mock_pages = (mock_page, mock_page, mock_page)
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)

Expand All @@ -2594,7 +2633,7 @@ def test_to_dataframe_w_bqstorage_nonempty(self):

# Have expected number of rows?
total_pages = len(streams) * len(mock_pages)
total_rows = len(page_items) * total_pages
total_rows = len(page_items[0]) * total_pages
self.assertEqual(len(got.index), total_rows)

# Don't close the client if it was passed in.
Expand Down Expand Up @@ -2633,11 +2672,14 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self):
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_rowstream.rows.return_value = mock_rows

page_data_frame = pandas.DataFrame(
[{"colA": 1}, {"colA": -1}], columns=["colA"]
page_items = [
pyarrow.array([1, -1]),
]
page_record_batch = pyarrow.RecordBatch.from_arrays(
page_items, schema=arrow_schema
)
mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_dataframe.return_value = page_data_frame
mock_page.to_arrow.return_value = page_record_batch
mock_pages = (mock_page, mock_page, mock_page)
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)

Expand All @@ -2649,7 +2691,7 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self):

self.assertEqual(list(got), ["colA"])
total_pages = len(streams) * len(mock_pages)
total_rows = len(page_data_frame) * total_pages
total_rows = len(page_items[0]) * total_pages
self.assertEqual(len(got.index), total_rows)
self.assertTrue(got.index.is_unique)

Expand Down Expand Up @@ -2695,14 +2737,15 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
page_items = [-1, 0, 1]
type(mock_page).num_items = mock.PropertyMock(return_value=len(page_items))

def blocking_to_dataframe(*args, **kwargs):
# Sleep for longer than the waiting interval. This ensures the
# progress_queue gets written to more than once because it gives
# the worker->progress updater time to sum intermediate updates.
def blocking_to_arrow(*args, **kwargs):
# Sleep for longer than the waiting interval so that we know we're
# only reading one page per loop at most.
time.sleep(2 * mut._PROGRESS_INTERVAL)
return pandas.DataFrame({"testcol": page_items})
return pyarrow.RecordBatch.from_arrays(
[pyarrow.array(page_items)], schema=arrow_schema
)

mock_page.to_dataframe.side_effect = blocking_to_dataframe
mock_page.to_arrow.side_effect = blocking_to_arrow
mock_pages = (mock_page, mock_page, mock_page, mock_page, mock_page)
type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages)

Expand All @@ -2728,7 +2771,7 @@ def blocking_to_dataframe(*args, **kwargs):
progress_updates = [
args[0] for args, kwargs in tqdm_mock().update.call_args_list
]
# Should have sent >1 update due to delay in blocking_to_dataframe.
# Should have sent >1 update due to delay in blocking_to_arrow.
self.assertGreater(len(progress_updates), 1)
self.assertEqual(sum(progress_updates), expected_total_rows)
tqdm_mock().close.assert_called_once()
Expand Down Expand Up @@ -2768,18 +2811,20 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
arrow_schema={"serialized_schema": arrow_schema.serialize().to_pybytes()},
)
bqstorage_client.create_read_session.return_value = session
page_items = [
pyarrow.array([1, -1]),
pyarrow.array([2.0, 4.0]),
pyarrow.array(["abc", "def"]),
]

def blocking_to_dataframe(*args, **kwargs):
def blocking_to_arrow(*args, **kwargs):
# Sleep for longer than the waiting interval so that we know we're
# only reading one page per loop at most.
time.sleep(2 * mut._PROGRESS_INTERVAL)
return pandas.DataFrame(
{"colA": [1, -1], "colB": ["abc", "def"], "colC": [2.0, 4.0]},
columns=["colA", "colB", "colC"],
)
return pyarrow.RecordBatch.from_arrays(page_items, schema=arrow_schema)

mock_page = mock.create_autospec(reader.ReadRowsPage)
mock_page.to_dataframe.side_effect = blocking_to_dataframe
mock_page.to_arrow.side_effect = blocking_to_arrow
mock_rows = mock.create_autospec(reader.ReadRowsIterable)
mock_pages = mock.PropertyMock(return_value=(mock_page, mock_page, mock_page))
type(mock_rows).pages = mock_pages
Expand Down

0 comments on commit 12654c9

Please sign in to comment.