Skip to content

Commit

Permalink
BigQuery: Raise helpful error when loading table from dataframe with …
Browse files Browse the repository at this point in the history
…STRUCT columns (googleapis#9053)

* Issue warning if no schema when loading from DF

* Raise error if serializing DF with struct fields

* Rewrite test assertion to make coverage happy

* Make the unsupported type message more general

* Remove warning on missing schema

The warning will be added once the support for partial schemas and
automatic schema detection is implemented.
  • Loading branch information
plamut authored Aug 21, 2019
1 parent 42150d0 commit a7bbfa2
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
11 changes: 11 additions & 0 deletions bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import _STRUCT_TYPES
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import _table_arg_to_table
from google.cloud.bigquery.table import _table_arg_to_table_ref
Expand Down Expand Up @@ -1529,6 +1530,15 @@ def load_table_from_dataframe(
os.close(tmpfd)

try:
if job_config.schema:
for field in job_config.schema:
if field.field_type in _STRUCT_TYPES:
raise ValueError(
"Uploading dataframes with struct (record) column types "
"is not supported. See: "
"https://github.com/googleapis/google-cloud-python/issues/8191"
)

if pyarrow and job_config.schema:
if parquet_compression == "snappy": # adjust the default value
parquet_compression = parquet_compression.upper()
Expand All @@ -1548,6 +1558,7 @@ def load_table_from_dataframe(
PendingDeprecationWarning,
stacklevel=2,
)

dataframe.to_parquet(tmppath, compression=parquet_compression)

with open(tmppath, "rb") as parquet_file:
Expand Down
34 changes: 34 additions & 0 deletions bigquery/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5328,6 +5328,40 @@ def test_load_table_from_dataframe_w_custom_job_config(self):
assert sent_config is job_config
assert sent_config.source_format == job.SourceFormat.PARQUET

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_struct_fields_error(self):
from google.cloud.bigquery import job
from google.cloud.bigquery.schema import SchemaField

client = self._make_client()

records = [{"float_column": 3.14, "struct_column": [{"foo": 1}, {"bar": -1}]}]
dataframe = pandas.DataFrame(data=records)

schema = [
SchemaField("float_column", "FLOAT"),
SchemaField(
"agg_col",
"RECORD",
fields=[SchemaField("foo", "INTEGER"), SchemaField("bar", "INTEGER")],
),
]
job_config = job.LoadJobConfig(schema=schema)

load_patch = mock.patch(
"google.cloud.bigquery.client.Client.load_table_from_file", autospec=True
)

with pytest.raises(ValueError) as exc_info, load_patch:
client.load_table_from_dataframe(
dataframe, self.TABLE_REF, job_config=job_config, location=self.LOCATION
)

err_msg = str(exc_info.value)
assert "struct" in err_msg
assert "not support" in err_msg

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_schema_wo_pyarrow(self):
Expand Down

0 comments on commit a7bbfa2

Please sign in to comment.