Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support CSV format in load_table_from_dataframe pandas connector #399

Merged
82 changes: 55 additions & 27 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2111,9 +2111,12 @@ def load_table_from_dataframe(

.. note::

Due to the way REPEATED fields are encoded in the ``parquet`` file
format, a mismatch with the existing table schema can occur, and
100% compatibility cannot be guaranteed for REPEATED fields.
REPEATED fields are NOT supported when using the CSV source format.
They are supported when using the PARQUET source format, but
due to the way they are encoded in the ``parquet`` file,
a mismatch with the existing table schema can occur, so
100% compatibility cannot be guaranteed for REPEATED fields when
using the parquet format.
tswast marked this conversation as resolved.
Show resolved Hide resolved

https://github.com/googleapis/python-bigquery/issues/17

Expand Down Expand Up @@ -2153,6 +2156,14 @@ def load_table_from_dataframe(
column names matching those of the dataframe. The BigQuery
schema is used to determine the correct data type conversion.
Indexes are not loaded. Requires the :mod:`pyarrow` library.

By default, this method uses the parquet source format. To
override this, supply a value for
:attr:`~google.cloud.bigquery.job.LoadJobConfig.source_format`
with the format name. Currently only
:attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and
:attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
supported.
parquet_compression (Optional[str]):
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file.
Expand Down Expand Up @@ -2181,10 +2192,6 @@ def load_table_from_dataframe(
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.LoadJobConfig`
class.
"""
if pyarrow is None:
# pyarrow is now the only supported parquet engine.
raise ValueError("This method requires pyarrow to be installed")

job_id = _make_job_id(job_id, job_id_prefix)

if job_config:
Expand All @@ -2197,15 +2204,20 @@ def load_table_from_dataframe(
else:
job_config = job.LoadJobConfig()

if job_config.source_format:
if job_config.source_format != job.SourceFormat.PARQUET:
raise ValueError(
"Got unexpected source_format: '{}'. Currently, only PARQUET is supported".format(
job_config.source_format
)
)
else:
supported_formats = {job.SourceFormat.CSV, job.SourceFormat.PARQUET}
if job_config.source_format is None:
# default value
job_config.source_format = job.SourceFormat.PARQUET
if job_config.source_format not in supported_formats:
raise ValueError(
"Got unexpected source_format: '{}'. Currently, only PARQUET and CSV are supported".format(
job_config.source_format
)
)

if pyarrow is None and job_config.source_format == job.SourceFormat.PARQUET:
# pyarrow is now the only supported parquet engine.
raise ValueError("This method requires pyarrow to be installed")

if location is None:
location = self.location
Expand Down Expand Up @@ -2245,27 +2257,43 @@ def load_table_from_dataframe(
stacklevel=2,
)

tmpfd, tmppath = tempfile.mkstemp(suffix="_job_{}.parquet".format(job_id[:8]))
tmpfd, tmppath = tempfile.mkstemp(
suffix="_job_{}.{}".format(job_id[:8], job_config.source_format.lower())
)
os.close(tmpfd)

try:
if job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()

_pandas_helpers.dataframe_to_parquet(
dataframe,
job_config.schema,
if job_config.source_format == job.SourceFormat.PARQUET:

if job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()

_pandas_helpers.dataframe_to_parquet(
dataframe,
job_config.schema,
tmppath,
parquet_compression=parquet_compression,
)
else:
dataframe.to_parquet(tmppath, compression=parquet_compression)

else:

dataframe.to_csv(
tmppath,
parquet_compression=parquet_compression,
index=False,
header=False,
encoding="utf-8",
float_format="%.17g",
date_format="%Y-%m-%d %H:%M:%S.%f",
)
else:
dataframe.to_parquet(tmppath, compression=parquet_compression)

with open(tmppath, "rb") as parquet_file:
with open(tmppath, "rb") as tmpfile:
file_size = os.path.getsize(tmppath)
return self.load_table_from_file(
parquet_file,
tmpfile,
destination,
num_retries=num_retries,
rewind=True,
Expand Down
134 changes: 134 additions & 0 deletions tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,140 @@ def test_load_table_from_json_basic_use(self):
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 2)

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_load_table_from_dataframe_w_explicit_schema_source_format_csv(self):
from google.cloud.bigquery.job import SourceFormat

table_schema = (
bigquery.SchemaField("bool_col", "BOOLEAN"),
bigquery.SchemaField("bytes_col", "BYTES"),
bigquery.SchemaField("date_col", "DATE"),
bigquery.SchemaField("dt_col", "DATETIME"),
bigquery.SchemaField("float_col", "FLOAT"),
bigquery.SchemaField("geo_col", "GEOGRAPHY"),
bigquery.SchemaField("int_col", "INTEGER"),
bigquery.SchemaField("num_col", "NUMERIC"),
bigquery.SchemaField("str_col", "STRING"),
bigquery.SchemaField("time_col", "TIME"),
bigquery.SchemaField("ts_col", "TIMESTAMP"),
)
df_data = collections.OrderedDict(
[
("bool_col", [True, None, False]),
("bytes_col", ["abc", None, "def"]),
(
"date_col",
[datetime.date(1, 1, 1), None, datetime.date(9999, 12, 31)],
),
(
"dt_col",
[
datetime.datetime(1, 1, 1, 0, 0, 0),
None,
datetime.datetime(9999, 12, 31, 23, 59, 59, 999999),
],
),
("float_col", [float("-inf"), float("nan"), float("inf")]),
cguardia marked this conversation as resolved.
Show resolved Hide resolved
(
"geo_col",
[
"POINT(30 10)",
None,
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
],
),
("int_col", [-9223372036854775808, None, 9223372036854775807]),
(
"num_col",
[
decimal.Decimal("-99999999999999999999999999999.999999999"),
None,
decimal.Decimal("99999999999999999999999999999.999999999"),
],
),
("str_col", [u"abc", None, u"def"]),
(
"time_col",
[datetime.time(0, 0, 0), None, datetime.time(23, 59, 59, 999999)],
),
(
"ts_col",
[
datetime.datetime(1, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
None,
datetime.datetime(
9999, 12, 31, 23, 59, 59, 999999, tzinfo=pytz.utc
),
],
),
]
)
dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys())

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema_csv".format(
Config.CLIENT.project, dataset_id
)

job_config = bigquery.LoadJobConfig(
schema=table_schema, source_format=SourceFormat.CSV
)
load_job = Config.CLIENT.load_table_from_dataframe(
dataframe, table_id, job_config=job_config
)
load_job.result()

table = Config.CLIENT.get_table(table_id)
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 3)

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_load_table_from_dataframe_w_explicit_schema_source_format_csv_floats(self):
from google.cloud.bigquery.job import SourceFormat

table_schema = (bigquery.SchemaField("float_col", "FLOAT"),)
df_data = collections.OrderedDict(
[
(
"float_col",
[
0.14285714285714285,
0.51428571485748,
0.87128748,
1.807960649,
2.0679610649,
2.4406779661016949,
3.7148514257,
3.8571428571428572,
1.51251252e40,
],
),
]
)
dataframe = pandas.DataFrame(df_data, dtype="object", columns=df_data.keys())

dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_explicit_schema_csv".format(
Config.CLIENT.project, dataset_id
)

job_config = bigquery.LoadJobConfig(
schema=table_schema, source_format=SourceFormat.CSV
)
load_job = Config.CLIENT.load_table_from_dataframe(
dataframe, table_id, job_config=job_config
)
load_job.result()

table = Config.CLIENT.get_table(table_id)
rows = self._fetch_single_page(table)
floats = [r.values()[0] for r in rows]
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 9)
self.assertEqual(floats, df_data["float_col"])

def test_load_table_from_json_schema_autodetect(self):
json_rows = [
{"name": "John", "age": 18, "birthday": "2001-10-15", "is_awesome": False},
Expand Down
50 changes: 50 additions & 0 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8366,6 +8366,56 @@ def test_load_table_from_dataframe_w_invaild_job_config(self):
err_msg = str(exc.value)
assert "Expected an instance of LoadJobConfig" in err_msg

@unittest.skipIf(pandas is None, "Requires `pandas`")
def test_load_table_from_dataframe_with_csv_source_format(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()
records = [{"id": 1, "age": 100}, {"id": 2, "age": 60}]
dataframe = pandas.DataFrame(records)
job_config = job.LoadJobConfig(
write_disposition=job.WriteDisposition.WRITE_TRUNCATE,
source_format=job.SourceFormat.CSV,
)

get_table_patch = mock.patch(
"google.cloud.bigquery.client.Client.get_table",
autospec=True,
return_value=mock.Mock(
schema=[SchemaField("id", "INTEGER"), SchemaField("age", "INTEGER")]
),
)
load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)
with load_patch as load_table_from_file, get_table_patch:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, job_config=job_config
)

load_table_from_file.assert_called_once_with(
client,
mock.ANY,
self.TABLE_REF,
num_retries=_DEFAULT_NUM_RETRIES,
rewind=True,
size=mock.ANY,
job_id=mock.ANY,
job_id_prefix=None,
location=None,
project=None,
job_config=mock.ANY,
timeout=None,
)

sent_file = load_table_from_file.mock_calls[0][1][1]
assert sent_file.closed

sent_config = load_table_from_file.mock_calls[0][2]["job_config"]
assert sent_config.source_format == job.SourceFormat.CSV

def test_load_table_from_json_basic_use(self):
from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES
from google.cloud.bigquery import job
Expand Down