Skip to content

Commit

Permalink
refactor(bigquery): to_dataframe uses faster to_arrow + `to_panda…
Browse files Browse the repository at this point in the history
…s` when `pyarrow` is available (#10027)

* fix(bigquery): to_dataframe uses 2x faster to_arrow + to_pandas when pyarrow is available

* fix: skip to_arrow tests when pyarrow is missing

* test: update test to work around numpy array encoding of nested arrays

* test: add test for tabledata.list with no rows

* test: boost test coverage

* chore: fix lint
  • Loading branch information
tswast authored and plamut committed Jan 15, 2020
1 parent 121394c commit c71d5f8
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 62 deletions.
63 changes: 40 additions & 23 deletions bigquery/google/cloud/bigquery/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1519,6 +1519,17 @@ def to_arrow(
if pyarrow is None:
raise ValueError(_NO_PYARROW_ERROR)

if (
bqstorage_client or create_bqstorage_client
) and self.max_results is not None:
warnings.warn(
"Cannot use bqstorage_client if max_results is set, "
"reverting to fetching data with the tabledata.list endpoint.",
stacklevel=2,
)
create_bqstorage_client = False
bqstorage_client = None

owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
owns_bqstorage_client = True
Expand Down Expand Up @@ -1707,33 +1718,39 @@ def to_dataframe(
create_bqstorage_client = False
bqstorage_client = None

owns_bqstorage_client = False
if not bqstorage_client and create_bqstorage_client:
owns_bqstorage_client = True
bqstorage_client = self.client._create_bqstorage_client()

try:
progress_bar = self._get_progress_bar(progress_bar_type)
if pyarrow is not None:
# If pyarrow is available, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster. This is because pandas.concat is
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
record_batch = self.to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
)
df = record_batch.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df

frames = []
for frame in self.to_dataframe_iterable(
bqstorage_client=bqstorage_client, dtypes=dtypes
):
frames.append(frame)
# The bqstorage_client is only used if pyarrow is available, so the
# rest of this method only needs to account for tabledata.list.
progress_bar = self._get_progress_bar(progress_bar_type)

if progress_bar is not None:
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))
frames = []
for frame in self.to_dataframe_iterable(dtypes=dtypes):
frames.append(frame)

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()
finally:
if owns_bqstorage_client:
bqstorage_client.transport.channel.close()
# In some cases, the number of total rows is not populated
# until the first page of rows is fetched. Update the
# progress bar's total to keep an accurate count.
progress_bar.total = progress_bar.total or self.total_rows
progress_bar.update(len(frame))

if progress_bar is not None:
# Indicate that the download has finished.
progress_bar.close()

# Avoid concatting an empty list.
if not frames:
Expand Down
7 changes: 6 additions & 1 deletion bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -2372,7 +2372,12 @@ def test_nested_table_to_dataframe(self):
row = df.iloc[0]
# verify the row content
self.assertEqual(row["string_col"], "Some value")
self.assertEqual(row["record_col"], record)
expected_keys = tuple(sorted(record.keys()))
row_keys = tuple(sorted(row["record_col"].keys()))
self.assertEqual(row_keys, expected_keys)
# Can't compare numpy arrays, which pyarrow encodes the embedded
# repeated column to, so convert to list.
self.assertEqual(list(row["record_col"]["nested_repeated"]), [0, 1, 2])
# verify that nested data can be accessed with indices/keys
self.assertEqual(row["record_col"]["nested_repeated"][0], 0)
self.assertEqual(
Expand Down
Loading

0 comments on commit c71d5f8

Please sign in to comment.