diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index cd1474336..c514a2ad4 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2111,9 +2111,12 @@ def load_table_from_dataframe( .. note:: - Due to the way REPEATED fields are encoded in the ``parquet`` file - format, a mismatch with the existing table schema can occur, and - 100% compatibility cannot be guaranteed for REPEATED fields. + REPEATED fields are NOT supported when using the CSV source format. + They are supported when using the PARQUET source format, but + due to the way they are encoded in the ``parquet`` file, + a mismatch with the existing table schema can occur, so + 100% compatibility cannot be guaranteed for REPEATED fields when + using the parquet format. https://github.com/googleapis/python-bigquery/issues/17 @@ -2153,6 +2156,14 @@ def load_table_from_dataframe( column names matching those of the dataframe. The BigQuery schema is used to determine the correct data type conversion. Indexes are not loaded. Requires the :mod:`pyarrow` library. + + By default, this method uses the parquet source format. To + override this, supply a value for + :attr:`~google.cloud.bigquery.job.LoadJobConfig.source_format` + with the format name. Currently only + :attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and + :attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are + supported. parquet_compression (Optional[str]): [Beta] The compression method to use if intermittently serializing ``dataframe`` to a parquet file. @@ -2181,10 +2192,6 @@ def load_table_from_dataframe( If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig` class. """ - if pyarrow is None: - # pyarrow is now the only supported parquet engine. - raise ValueError("This method requires pyarrow to be installed") - job_id = _make_job_id(job_id, job_id_prefix) if job_config: @@ -2197,15 +2204,20 @@ def load_table_from_dataframe( else: job_config = job.LoadJobConfig() - if job_config.source_format: - if job_config.source_format != job.SourceFormat.PARQUET: - raise ValueError( - "Got unexpected source_format: '{}'. Currently, only PARQUET is supported".format( - job_config.source_format - ) - ) - else: + supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET} + if job_config.source_format is None: + # default value job_config.source_format = job.SourceFormat.PARQUET + if job_config.source_format not in supported_formats: + raise ValueError( + "Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format( + job_config.source_format + ) + ) + + if pyarrow is None and job_config.source_format == job.SourceFormat.PARQUET: + # pyarrow is now the only supported parquet engine. + raise ValueError("This method requires pyarrow to be installed") if location is None: location = self.location @@ -2245,27 +2257,43 @@ def load_table_from_dataframe( stacklevel=2, ) - tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8])) + tmpfd, tmppath = tempfile.mkstemp( + suffix="_job_{}.{}".format(job_id[:8], job_config.source_format.lower()) + ) os.close(tmpfd) try: - if job_config.schema: - if parquet_compression == "snappy": # adjust the default value - parquet_compression = parquet_compression.upper() - _pandas_helpers.dataframe_to_parquet( - dataframe, - job_config.schema, + if job_config.source_format == job.SourceFormat.PARQUET: + + if job_config.schema: + if parquet_compression == "snappy": # adjust the default value + parquet_compression = parquet_compression.upper() + + _pandas_helpers.dataframe_to_parquet( + dataframe, + job_config.schema, + tmppath, + parquet_compression=parquet_compression, + ) + else: + dataframe.to_parquet(tmppath, compression=parquet_compression) + + else: + + dataframe.to_csv( tmppath, - parquet_compression=parquet_compression, + index=False, + header=False, + encoding="utf-8", + float_format="%.17g", + date_format="%Y-%m-%d %H:%M:%S.%f", ) - else: - dataframe.to_parquet(tmppath, compression=parquet_compression) - with open(tmppath, "rb") as parquet_file: + with open(tmppath, "rb") as tmpfile: file_size = os.path.getsize(tmppath) return self.load_table_from_file( - parquet_file, + tmpfile, destination, num_retries=num_retries, rewind=True, diff --git a/tests/system.py b/tests/system.py index e347c8a70..d481967d8 100644 --- a/tests/system.py +++ b/tests/system.py @@ -1165,6 +1165,140 @@ def test_load_table_from_json_basic_use(self): self.assertEqual(tuple(table.schema), table_schema) self.assertEqual(table.num_rows, 2) + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_load_table_from_dataframe_w_explicit_schema_source_format_csv(self): + from google.cloud.bigquery.job import SourceFormat + + table_schema = ( + bigquery.SchemaField("bool_col", "BOOLEAN"), + bigquery.SchemaField("bytes_col", "BYTES"), + bigquery.SchemaField("date_col", "DATE"), + bigquery.SchemaField("dt_col", "DATETIME"), + bigquery.SchemaField("float_col", "FLOAT"), + bigquery.SchemaField("geo_col", "GEOGRAPHY"), + bigquery.SchemaField("int_col", "INTEGER"), + bigquery.SchemaField("num_col", "NUMERIC"), + bigquery.SchemaField("str_col", "STRING"), + bigquery.SchemaField("time_col", "TIME"), + bigquery.SchemaField("ts_col", "TIMESTAMP"), + ) + df_data = collections.OrderedDict( + [ + ("bool_col", [True, None, False]), + ("bytes_col", ["abc", None, "def"]), + ( + "date_col", + [datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)], + ), + ( + "dt_col", + [ + datetime.datetime(1, 1, 1, 0, 0, 0), + None, + datetime.datetime(9999, 12, 31, 23, 59, 59, 999999), + ], + ), + ("float_col", [float("-inf"), float("nan"), float("inf")]), + ( + "geo_col", + [ + "POINT(30 10)", + None, + "POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))", + ], + ), + ("int_col", [-9223372036854775808, None, 9223372036854775807]), + ( + "num_col", + [ + decimal.Decimal("-99999999999999999999999999999.999999999"), + None, + decimal.Decimal("99999999999999999999999999999.999999999"), + ], + ), + ("str_col", [u"abc", None, u"def"]), + ( + "time_col", + [datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)], + ), + ( + "ts_col", + [ + datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc), + None, + datetime.datetime( + 9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc + ), + ], + ), + ] + ) + dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys()) + + dataset_id = _make_dataset_id("bq_load_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema_csv".format( + Config.CLIENT.project, dataset_id + ) + + job_config = bigquery.LoadJobConfig( + schema=table_schema, source_format=SourceFormat.CSV + ) + load_job = Config.CLIENT.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = Config.CLIENT.get_table(table_id) + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, 3) + + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_load_table_from_dataframe_w_explicit_schema_source_format_csv_floats(self): + from google.cloud.bigquery.job import SourceFormat + + table_schema = (bigquery.SchemaField("float_col", "FLOAT"),) + df_data = collections.OrderedDict( + [ + ( + "float_col", + [ + 0.14285714285714285, + 0.51428571485748, + 0.87128748, + 1.807960649, + 2.0679610649, + 2.4406779661016949, + 3.7148514257, + 3.8571428571428572, + 1.51251252e40, + ], + ), + ] + ) + dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys()) + + dataset_id = _make_dataset_id("bq_load_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema_csv".format( + Config.CLIENT.project, dataset_id + ) + + job_config = bigquery.LoadJobConfig( + schema=table_schema, source_format=SourceFormat.CSV + ) + load_job = Config.CLIENT.load_table_from_dataframe( + dataframe, table_id, job_config=job_config + ) + load_job.result() + + table = Config.CLIENT.get_table(table_id) + rows = self._fetch_single_page(table) + floats = [r.values()[0] for r in rows] + self.assertEqual(tuple(table.schema), table_schema) + self.assertEqual(table.num_rows, 9) + self.assertEqual(floats, df_data["float_col"]) + def test_load_table_from_json_schema_autodetect(self): json_rows = [ {"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False}, diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index c4bdea2f8..a705f109d 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -8366,6 +8366,56 @@ def test_load_table_from_dataframe_w_invaild_job_config(self): err_msg = str(exc.value) assert "Expected an instance of LoadJobConfig" in err_msg + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_load_table_from_dataframe_with_csv_source_format(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + from google.cloud.bigquery import job + from google.cloud.bigquery.schema import SchemaField + + client = self._make_client() + records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}] + dataframe = pandas.DataFrame(records) + job_config = job.LoadJobConfig( + write_disposition=job.WriteDisposition.WRITE_TRUNCATE, + source_format=job.SourceFormat.CSV, + ) + + get_table_patch = mock.patch( + "google.cloud.bigquery.client.Client.get_table", + autospec=True, + return_value=mock.Mock( + schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")] + ), + ) + load_patch = mock.patch( + "google.cloud.bigquery.client.Client.load_table_from_file", autospec=True + ) + with load_patch as load_table_from_file, get_table_patch: + client.load_table_from_dataframe( + dataframe, self.TABLE_REF, job_config=job_config + ) + + load_table_from_file.assert_called_once_with( + client, + mock.ANY, + self.TABLE_REF, + num_retries=_DEFAULT_NUM_RETRIES, + rewind=True, + size=mock.ANY, + job_id=mock.ANY, + job_id_prefix=None, + location=None, + project=None, + job_config=mock.ANY, + timeout=None, + ) + + sent_file = load_table_from_file.mock_calls[0][1][1] + assert sent_file.closed + + sent_config = load_table_from_file.mock_calls[0][2]["job_config"] + assert sent_config.source_format == job.SourceFormat.CSV + def test_load_table_from_json_basic_use(self): from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES from google.cloud.bigquery import job