From d588cecba734737e888528ad28eda88823875506 Mon Sep 17 00:00:00 2001 From: Carlos de la Guardia Date: Tue, 24 Nov 2020 03:39:14 -0600 Subject: [PATCH 1/3] WIP: support alternative serialization formats for load_table_from_dataframe --- google/cloud/bigquery/client.py | 101 +++++++++++++++++++++----------- tests/system.py | 97 ++++++++++++++++++++++++++++++ tests/unit/test_client.py | 50 ++++++++++++++++ 3 files changed, 214 insertions(+), 34 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index c67ef54e0..8613f76e2 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2197,15 +2197,16 @@ def load_table_from_dataframe( else: job_config = job.LoadJobConfig() - if job_config.source_format: - if job_config.source_format != job.SourceFormat.PARQUET: - raise ValueError( - "Got unexpected source_format: '{}'. Currently, only PARQUET is supported".format( - job_config.source_format - ) - ) - else: + supported_formats = (job.SourceFormat.CSV, job.SourceFormat.PARQUET) + if job_config.source_format is None: + # default value job_config.source_format = job.SourceFormat.PARQUET + if job_config.source_format not in supported_formats: + raise ValueError( + "Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format( + job_config.source_format + ) + ) if location is None: location = self.location @@ -2245,39 +2246,71 @@ def load_table_from_dataframe( stacklevel=2, ) - tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8])) + tmpfd, tmppath = tempfile.mkstemp( + suffix="_job_{}.{}".format(job_id[:8], job_config.source_format.lower(),) + ) os.close(tmpfd) try: - if job_config.schema: - if parquet_compression == "snappy": # adjust the default value - parquet_compression = parquet_compression.upper() - _pandas_helpers.dataframe_to_parquet( - dataframe, - job_config.schema, - tmppath, - parquet_compression=parquet_compression, - ) + if job_config.source_format == job.SourceFormat.PARQUET: + + if job_config.schema: + if parquet_compression == "snappy": # adjust the default value + parquet_compression = parquet_compression.upper() + + _pandas_helpers.dataframe_to_parquet( + dataframe, + job_config.schema, + tmppath, + parquet_compression=parquet_compression, + ) + else: + dataframe.to_parquet(tmppath, compression=parquet_compression) + + with open(tmppath, "rb") as parquet_file: + file_size = os.path.getsize(tmppath) + return self.load_table_from_file( + parquet_file, + destination, + num_retries=num_retries, + rewind=True, + size=file_size, + job_id=job_id, + job_id_prefix=job_id_prefix, + location=location, + project=project, + job_config=job_config, + timeout=timeout, + ) + else: - dataframe.to_parquet(tmppath, compression=parquet_compression) - - with open(tmppath, "rb") as parquet_file: - file_size = os.path.getsize(tmppath) - return self.load_table_from_file( - parquet_file, - destination, - num_retries=num_retries, - rewind=True, - size=file_size, - job_id=job_id, - job_id_prefix=job_id_prefix, - location=location, - project=project, - job_config=job_config, - timeout=timeout, + + dataframe.to_csv( + tmppath, + index=False, + header=False, + encoding="utf-8", + float_format="%.17g", + date_format="%Y-%m-%d %H:%M:%S.%f", ) + with open(tmppath, "rb") as csv_file: + file_size = os.path.getsize(tmppath) + return self.load_table_from_file( + csv_file, + destination, + num_retries=num_retries, + rewind=True, + size=file_size, + job_id=job_id, + job_id_prefix=job_id_prefix, + location=location, + project=project, + job_config=job_config, + timeout=timeout, + ) + finally: os.remove(tmppath) diff --git a/tests/system.py b/tests/system.py index e347c8a70..e6a0e163d 100644 --- a/tests/system.py +++ b/tests/system.py @@ -1165,6 +1165,103 @@ def test_load_table_from_json_basic_use(self): self.assertEqual(tuple(table.schema), table_schema) self.assertEqual(table.num_rows, 2) + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_load_table_from_dataframe_w_explicit_schema_source_format_csv(self): + from google.cloud.bigquery.job import SourceFormat + + scalars_schema = ( + bigquery.SchemaField("bool_col", "BOOLEAN"), + bigquery.SchemaField("bytes_col", "BYTES"), + bigquery.SchemaField("date_col", "DATE"), + bigquery.SchemaField("dt_col", "DATETIME"), + bigquery.SchemaField("float_col", "FLOAT"), + bigquery.SchemaField("geo_col", "GEOGRAPHY"), + bigquery.SchemaField("int_col", "INTEGER"), + bigquery.SchemaField("num_col", "NUMERIC"), + bigquery.SchemaField("str_col", "STRING"), + bigquery.SchemaField("time_col", "TIME"), + bigquery.SchemaField("ts_col", "TIMESTAMP"), + ) + table_schema = scalars_schema + ( + # TODO: Array columns can't be read due to NULLABLE versus REPEATED + # mode mismatch. See: + # https://issuetracker.google.com/133415569#comment3 + # bigquery.SchemaField("array_col", "INTEGER", mode="REPEATED"), + # TODO: Support writing StructArrays to Parquet. See: + # https://jira.apache.org/jira/browse/ARROW-2587 + # bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), + ) + df_data = collections.OrderedDict( + [ + ("bool_col", [True, None, False]), + ("bytes_col", ["abc", None, "def"]), + ( + "date_col", + [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)], + ), + ( + "dt_col", + [ + datetime.datetime(1, 1, 1, 0, 0, 0), + None, + datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), + ], + ), + ("float_col", [float("-inf"), float("nan"), float("inf")]), + ( + "geo_col", + [ + "POINT(30 10)", + None, + "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", + ], + ), + ("int_col", [-9223372036854775808, None, 9223372036854775807]), + ( + "num_col", + [ + decimal.Decimal("-99999999999999999999999999999.999999999"), + None, + decimal.Decimal("99999999999999999999999999999.999999999"), + ], + ), + ("str_col", [u"abc", None, u"def"]), + ( + "time_col", + [datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)], + ), + ( + "ts_col", + [ + datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + None, + datetime.datetime( + 9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc + ), + ], + ), + ] + ) + dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys()) + + dataset_id = _make_dataset_id("bq_load_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema_csv".format( + Config.CLIENT.project, dataset_id + ) + + job_config = bigquery.LoadJobConfig( + schema=table_schema, source_format=SourceFormat.CSV + ) + load_job = Config.CLIENT.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = Config.CLIENT.get_table(table_id) + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, 3) + def test_load_table_from_json_schema_autodetect(self): json_rows = [ {"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False}, diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 4fba1150c..92975b57c 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -8366,6 +8366,56 @@ def test_load_table_from_dataframe_w_invaild_job_config(self): err_msg = str(exc.value) assert "Expected an instance of LoadJobConfig" in err_msg + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_load_table_from_dataframe_with_csv_source_format(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] + dataframe = pandas.DataFrame(records) + job_config = job.LoadJobConfig( + write_disposition=job.WriteDisposition.WRITE_TRUNCATE, + source_format=job.SourceFormat.CSV, + ) + + get_table_patch = mock.patch( + "google.cloud.bigquery.client.Client.get_table", + autospec=True, + return_value=mock.Mock( + schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")] + ), + ) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + with load_patch as load_table_from_file, get_table_patch: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + size=mock.ANY, + job_id=mock.ANY, + job_id_prefix=None, + location=None, + project=None, + job_config=mock.ANY, + timeout=None, + ) + + sent_file = load_table_from_file.mock_calls[0][1][1] + assert sent_file.closed + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.CSV + def test_load_table_from_json_basic_use(self): from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES from google.cloud.bigquery import job From 24aedef960ab08780e606ba26bad202b2d53b25a Mon Sep 17 00:00:00 2001 From: Carlos de la Guardia Date: Wed, 2 Dec 2020 03:21:32 -0600 Subject: [PATCH 2/3] fix: address review comments --- google/cloud/bigquery/client.py | 69 +++++++++++++++------------------ tests/system.py | 57 ++++++++++++++++++++++----- 2 files changed, 78 insertions(+), 48 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 4bd0b6177..11cd53f91 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2113,7 +2113,8 @@ def load_table_from_dataframe( Due to the way REPEATED fields are encoded in the ``parquet`` file format, a mismatch with the existing table schema can occur, and - 100% compatibility cannot be guaranteed for REPEATED fields. + 100% compatibility cannot be guaranteed for REPEATED fields when + using the parquet format. https://github.com/googleapis/python-bigquery/issues/17 @@ -2153,6 +2154,14 @@ def load_table_from_dataframe( column names matching those of the dataframe. The BigQuery schema is used to determine the correct data type conversion. Indexes are not loaded. Requires the :mod:`pyarrow` library. + + By default, this method uses the parquet source format. To + override this, supply a value for + :attr:`~google.cloud.bigquery.job.LoadJobConfig.source_format` + with the format name. Currently only + :attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and + :attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are + supported. parquet_compression (Optional[str]): [Beta] The compression method to use if intermittently serializing ``dataframe`` to a parquet file. @@ -2181,10 +2190,6 @@ def load_table_from_dataframe( If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ - if pyarrow is None: - # pyarrow is now the only supported parquet engine. - raise ValueError("This method requires pyarrow to be installed") - job_id = _make_job_id(job_id, job_id_prefix) if job_config: @@ -2197,7 +2202,7 @@ def load_table_from_dataframe( else: job_config = job.LoadJobConfig() - supported_formats = (job.SourceFormat.CSV, job.SourceFormat.PARQUET) + supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET} if job_config.source_format is None: # default value job_config.source_format = job.SourceFormat.PARQUET @@ -2208,6 +2213,10 @@ def load_table_from_dataframe( ) ) + if pyarrow is None and job_config.source_format == job.SourceFormat.PARQUET: + # pyarrow is now the only supported parquet engine. + raise ValueError("This method requires pyarrow to be installed") + if location is None: location = self.location @@ -2247,7 +2256,7 @@ def load_table_from_dataframe( ) tmpfd, tmppath = tempfile.mkstemp( - suffix="_job_{}.{}".format(job_id[:8], job_config.source_format.lower(),) + suffix="_job_{}.{}".format(job_id[:8], job_config.source_format.lower()) ) os.close(tmpfd) @@ -2268,22 +2277,6 @@ def load_table_from_dataframe( else: dataframe.to_parquet(tmppath, compression=parquet_compression) - with open(tmppath, "rb") as parquet_file: - file_size = os.path.getsize(tmppath) - return self.load_table_from_file( - parquet_file, - destination, - num_retries=num_retries, - rewind=True, - size=file_size, - job_id=job_id, - job_id_prefix=job_id_prefix, - location=location, - project=project, - job_config=job_config, - timeout=timeout, - ) - else: dataframe.to_csv( @@ -2295,21 +2288,21 @@ def load_table_from_dataframe( date_format="%Y-%m-%d %H:%M:%S.%f", ) - with open(tmppath, "rb") as csv_file: - file_size = os.path.getsize(tmppath) - return self.load_table_from_file( - csv_file, - destination, - num_retries=num_retries, - rewind=True, - size=file_size, - job_id=job_id, - job_id_prefix=job_id_prefix, - location=location, - project=project, - job_config=job_config, - timeout=timeout, - ) + with open(tmppath, "rb") as tmpfile: + file_size = os.path.getsize(tmppath) + return self.load_table_from_file( + tmpfile, + destination, + num_retries=num_retries, + rewind=True, + size=file_size, + job_id=job_id, + job_id_prefix=job_id_prefix, + location=location, + project=project, + job_config=job_config, + timeout=timeout, + ) finally: os.remove(tmppath) diff --git a/tests/system.py b/tests/system.py index e6a0e163d..d481967d8 100644 --- a/tests/system.py +++ b/tests/system.py @@ -1169,7 +1169,7 @@ def test_load_table_from_json_basic_use(self): def test_load_table_from_dataframe_w_explicit_schema_source_format_csv(self): from google.cloud.bigquery.job import SourceFormat - scalars_schema = ( + table_schema = ( bigquery.SchemaField("bool_col", "BOOLEAN"), bigquery.SchemaField("bytes_col", "BYTES"), bigquery.SchemaField("date_col", "DATE"), @@ -1182,15 +1182,6 @@ def test_load_table_from_dataframe_w_explicit_schema_source_format_csv(self): bigquery.SchemaField("time_col", "TIME"), bigquery.SchemaField("ts_col", "TIMESTAMP"), ) - table_schema = scalars_schema + ( - # TODO: Array columns can't be read due to NULLABLE versus REPEATED - # mode mismatch. See: - # https://issuetracker.google.com/133415569#comment3 - # bigquery.SchemaField("array_col", "INTEGER", mode="REPEATED"), - # TODO: Support writing StructArrays to Parquet. See: - # https://jira.apache.org/jira/browse/ARROW-2587 - # bigquery.SchemaField("struct_col", "RECORD", fields=scalars_schema), - ) df_data = collections.OrderedDict( [ ("bool_col", [True, None, False]), @@ -1262,6 +1253,52 @@ def test_load_table_from_dataframe_w_explicit_schema_source_format_csv(self): self.assertEqual(tuple(table.schema), table_schema) self.assertEqual(table.num_rows, 3) + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_load_table_from_dataframe_w_explicit_schema_source_format_csv_floats(self): + from google.cloud.bigquery.job import SourceFormat + + table_schema = (bigquery.SchemaField("float_col", "FLOAT"),) + df_data = collections.OrderedDict( + [ + ( + "float_col", + [ + 0.14285714285714285, + 0.51428571485748, + 0.87128748, + 1.807960649, + 2.0679610649, + 2.4406779661016949, + 3.7148514257, + 3.8571428571428572, + 1.51251252e40, + ], + ), + ] + ) + dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys()) + + dataset_id = _make_dataset_id("bq_load_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema_csv".format( + Config.CLIENT.project, dataset_id + ) + + job_config = bigquery.LoadJobConfig( + schema=table_schema, source_format=SourceFormat.CSV + ) + load_job = Config.CLIENT.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = Config.CLIENT.get_table(table_id) + rows = self._fetch_single_page(table) + floats = [r.values()[0] for r in rows] + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, 9) + self.assertEqual(floats, df_data["float_col"]) + def test_load_table_from_json_schema_autodetect(self): json_rows = [ {"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False}, From 9f5f08b1afb905e435a86a508d3ce9dcc30de287 Mon Sep 17 00:00:00 2001 From: Carlos de la Guardia Date: Thu, 3 Dec 2020 11:45:43 -0600 Subject: [PATCH 3/3] docs: make clear repeated fields are not supportedin csv --- google/cloud/bigquery/client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 11cd53f91..c514a2ad4 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2111,8 +2111,10 @@ def load_table_from_dataframe( .. note:: - Due to the way REPEATED fields are encoded in the ``parquet`` file - format, a mismatch with the existing table schema can occur, and + REPEATED fields are NOT supported when using the CSV source format. + They are supported when using the PARQUET source format, but + due to the way they are encoded in the ``parquet`` file, + a mismatch with the existing table schema can occur, so 100% compatibility cannot be guaranteed for REPEATED fields when using the parquet format.