From 5c7779e84ecd8e7b4f1e5b97e08dd1bd7b53386b Mon Sep 17 00:00:00 2001 From: Erik Nilsson Date: Sun, 12 May 2019 19:34:33 -0400 Subject: [PATCH 1/2] added tests confirming the bug --- bigquery/tests/unit/test_table.py | 67 +++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 07a625b98825..816ba4ec48da 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -1943,6 +1943,51 @@ def blocking_to_dataframe(*args, **kwargs): # Make sure that this test pushed to the progress queue. self.assertEqual(mock_queue().put_nowait.call_count, total_pages) + @unittest.skipIf(pandas is None, "Requires `pandas`") + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self): + from google.cloud.bigquery import schema + from google.cloud.bigquery import table as mut + from google.cloud.bigquery_storage_v1beta1 import reader + + streams = [ + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/1234"}, + {"name": "/projects/proj/dataset/dset/tables/tbl/streams/5678"}, + ] + session = bigquery_storage_v1beta1.types.ReadSession(streams=streams) + session.avro_schema.schema = json.dumps({"fields": [{"name": "colA"}]}) + + bqstorage_client = mock.create_autospec( + bigquery_storage_v1beta1.BigQueryStorageClient) + bqstorage_client.create_read_session.return_value = session + + mock_rowstream = mock.create_autospec(reader.ReadRowsStream) + bqstorage_client.read_rows.return_value = mock_rowstream + + mock_rows = mock.create_autospec(reader.ReadRowsIterable) + mock_rowstream.rows.return_value = mock_rows + + page_data_frame = pandas.DataFrame( + [{"colA": 1}, {"colA": -1}], columns=["colA"]) + mock_page = mock.create_autospec(reader.ReadRowsPage) + mock_page.to_dataframe.return_value = page_data_frame + mock_pages = (mock_page, mock_page, mock_page) + type(mock_rows).pages = mock.PropertyMock(return_value=mock_pages) + + row_iterator = self._make_one( + schema=[schema.SchemaField("colA", "IGNORED")], + table=mut.TableReference.from_string("proj.dset.tbl"), + ) + got = row_iterator.to_dataframe(bqstorage_client=bqstorage_client) + + self.assertEqual(list(got), ["colA"]) + total_pages = len(streams) * len(mock_pages) + total_rows = len(page_data_frame) * total_pages + self.assertEqual(len(got.index), total_rows) + self.assertTrue(got.index.is_unique) + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" @@ -2138,6 +2183,28 @@ def test_to_dataframe_w_bqstorage_fallback_to_tabledata_list(self): self.assertEqual(df.name.dtype.name, "object") self.assertEqual(df.age.dtype.name, "int64") + @unittest.skipIf(pandas is None, "Requires `pandas`") + @mock.patch("google.cloud.bigquery.table.RowIterator.pages", + new_callable=mock.PropertyMock) + def test_to_dataframe_tabledata_list_w_multiple_pages_return_unique_index(self, mock_pages): + from google.cloud.bigquery import schema + + iterator_schema = [ + schema.SchemaField("name", "STRING", mode="REQUIRED"), + ] + pages = [[{"name": "Bengt"}], [{"name": "Sven"}]] + + mock_pages.return_value = pages + row_iterator = self._make_one(schema=iterator_schema) + + df = row_iterator.to_dataframe(bqstorage_client=None) + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 2) + self.assertEqual(list(df), ["name"]) + self.assertEqual(df.name.dtype.name, "object") + self.assertTrue(df.index.is_unique) + @unittest.skipIf(pandas is None, "Requires `pandas`") @unittest.skipIf( bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" From 54d7c4369e5165b5270e5899079083f9063ede62 Mon Sep 17 00:00:00 2001 From: Erik Nilsson Date: Sun, 12 May 2019 19:38:04 -0400 Subject: [PATCH 2/2] reset index when concating data frames --- bigquery/google/cloud/bigquery/table.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index 864fff4458b1..1f9bb5eee3d4 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -1408,7 +1408,7 @@ def _to_dataframe_tabledata_list(self, dtypes, progress_bar=None): # Indicate that the download has finished. progress_bar.close() - return pandas.concat(frames) + return pandas.concat(frames, ignore_index=True) def _to_dataframe_bqstorage_stream( self, bqstorage_client, dtypes, columns, session, stream, worker_queue @@ -1585,7 +1585,7 @@ def get_frames(pool): # Update the progress bar one last time to close it. self._process_progress_updates(progress_queue, progress_bar) - return pandas.concat(frames) + return pandas.concat(frames, ignore_index=True) def _get_progress_bar(self, progress_bar_type): """Construct a tqdm progress bar object, if tqdm is installed."""