Skip to content

Commit

Permalink
Fix bug where load_table_from_dataframe could not append to REQUIRE…
Browse files Browse the repository at this point in the history
…D fields. (#8230)

If a BigQuery schema is supplied as part of the `job_config`, it can be
used to set the `nullable` bit correctly on the serialized parquet file.
  • Loading branch information
tswast authored and plamut committed Jun 7, 2019
1 parent 879ef99 commit 5c85d51
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 12 deletions.
50 changes: 39 additions & 11 deletions bigquery/google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

"""Shared helper functions for connecting BigQuery and pandas."""

import warnings

try:
import pyarrow
import pyarrow.parquet
Expand Down Expand Up @@ -107,6 +109,8 @@ def bq_to_arrow_field(bq_field):
if arrow_type:
is_nullable = bq_field.mode.upper() == "NULLABLE"
return pyarrow.field(bq_field.name, arrow_type, nullable=is_nullable)

warnings.warn("Unable to determine type for field '{}'.".format(bq_field.name))
return None


Expand All @@ -119,34 +123,58 @@ def bq_to_arrow_array(series, bq_field):
return pyarrow.array(series, type=arrow_type)


def to_parquet(dataframe, bq_schema, filepath):
"""Write dataframe as a Parquet file, according to the desired BQ schema.
This function requires the :mod:`pyarrow` package. Arrow is used as an
intermediate format.
def to_arrow(dataframe, bq_schema):
"""Convert pandas dataframe to Arrow table, using BigQuery schema.
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
filepath (str):
Path to write Parquet file to.
"""
if pyarrow is None:
raise ValueError("pyarrow is required for BigQuery schema conversion.")
Returns:
pyarrow.Table:
Table containing dataframe data, with schema derived from
BigQuery schema.
"""
if len(bq_schema) != len(dataframe.columns):
raise ValueError(
"Number of columns in schema must match number of columns in dataframe."
)

arrow_arrays = []
arrow_names = []
arrow_fields = []
for bq_field in bq_schema:
arrow_fields.append(bq_to_arrow_field(bq_field))
arrow_names.append(bq_field.name)
arrow_arrays.append(bq_to_arrow_array(dataframe[bq_field.name], bq_field))

arrow_table = pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names)
if all((field is not None for field in arrow_fields)):
return pyarrow.Table.from_arrays(
arrow_arrays, schema=pyarrow.schema(arrow_fields)
)
return pyarrow.Table.from_arrays(arrow_arrays, names=arrow_names)


def to_parquet(dataframe, bq_schema, filepath):
"""Write dataframe as a Parquet file, according to the desired BQ schema.
This function requires the :mod:`pyarrow` package. Arrow is used as an
intermediate format.
Args:
dataframe (pandas.DataFrame):
DataFrame to convert to convert to Parquet file.
bq_schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
Desired BigQuery schema. Number of columns must match number of
columns in the DataFrame.
filepath (str):
Path to write Parquet file to.
"""
if pyarrow is None:
raise ValueError("pyarrow is required for BigQuery schema conversion.")

arrow_table = to_arrow(dataframe, bq_schema)
pyarrow.parquet.write_table(arrow_table, filepath)
39 changes: 39 additions & 0 deletions bigquery/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,45 @@ def test_load_table_from_dataframe_w_nulls(self):
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, num_rows)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_required(self):
"""Test that a DataFrame with required columns can be uploaded if a
BigQuery schema is specified.
See: https://github.com/googleapis/google-cloud-python/issues/8093
"""
table_schema = (
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
)

records = [{"name": "Chip", "age": 2}, {"name": "Dale", "age": 3}]
dataframe = pandas.DataFrame(records)
job_config = bigquery.LoadJobConfig(schema=table_schema)
dataset_id = _make_dataset_id("bq_load_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_dataframe_w_required".format(
Config.CLIENT.project, dataset_id
)

# Create the table before loading so that schema mismatch errors are
# identified.
table = retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

job_config = bigquery.LoadJobConfig(schema=table_schema)
load_job = Config.CLIENT.load_table_from_dataframe(
dataframe, table_id, job_config=job_config
)
load_job.result()

table = Config.CLIENT.get_table(table)
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 2)

@unittest.skipIf(pandas is None, "Requires `pandas`")
@unittest.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_load_table_from_dataframe_w_explicit_schema(self):
Expand Down
92 changes: 91 additions & 1 deletion bigquery/tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import datetime
import decimal
import functools
import warnings

try:
import pandas
Expand All @@ -26,6 +27,7 @@
except ImportError: # pragma: NO COVER
pyarrow = None
import pytest
import pytz

from google.cloud.bigquery import schema

Expand Down Expand Up @@ -373,7 +375,7 @@ def test_bq_to_arrow_data_type_w_struct_unknown_subfield(module_under_test):
(
"GEOGRAPHY",
[
"POINT(30, 10)",
"POINT(30 10)",
None,
"LINESTRING (30 10, 10 30, 40 40)",
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
Expand Down Expand Up @@ -440,6 +442,94 @@ def test_bq_to_arrow_array_w_special_floats(module_under_test):
assert roundtrip[3] is None


@pytest.mark.skipIf(pandas is None, "Requires `pandas`")
@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_to_arrow_w_required_fields(module_under_test):
bq_schema = (
schema.SchemaField("field01", "STRING", mode="REQUIRED"),
schema.SchemaField("field02", "BYTES", mode="REQUIRED"),
schema.SchemaField("field03", "INTEGER", mode="REQUIRED"),
schema.SchemaField("field04", "INT64", mode="REQUIRED"),
schema.SchemaField("field05", "FLOAT", mode="REQUIRED"),
schema.SchemaField("field06", "FLOAT64", mode="REQUIRED"),
schema.SchemaField("field07", "NUMERIC", mode="REQUIRED"),
schema.SchemaField("field08", "BOOLEAN", mode="REQUIRED"),
schema.SchemaField("field09", "BOOL", mode="REQUIRED"),
schema.SchemaField("field10", "TIMESTAMP", mode="REQUIRED"),
schema.SchemaField("field11", "DATE", mode="REQUIRED"),
schema.SchemaField("field12", "TIME", mode="REQUIRED"),
schema.SchemaField("field13", "DATETIME", mode="REQUIRED"),
schema.SchemaField("field14", "GEOGRAPHY", mode="REQUIRED"),
)
dataframe = pandas.DataFrame(
{
"field01": ["hello", "world"],
"field02": [b"abd", b"efg"],
"field03": [1, 2],
"field04": [3, 4],
"field05": [1.25, 9.75],
"field06": [-1.75, -3.5],
"field07": [decimal.Decimal("1.2345"), decimal.Decimal("6.7891")],
"field08": [True, False],
"field09": [False, True],
"field10": [
datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=pytz.utc),
datetime.datetime(2012, 12, 21, 9, 7, 42, tzinfo=pytz.utc),
],
"field11": [datetime.date(9999, 12, 31), datetime.date(1970, 1, 1)],
"field12": [datetime.time(23, 59, 59, 999999), datetime.time(12, 0, 0)],
"field13": [
datetime.datetime(1970, 1, 1, 0, 0, 0),
datetime.datetime(2012, 12, 21, 9, 7, 42),
],
"field14": [
"POINT(30 10)",
"POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))",
],
}
)

arrow_table = module_under_test.to_arrow(dataframe, bq_schema)
arrow_schema = arrow_table.schema

assert len(arrow_schema) == len(bq_schema)
for arrow_field in arrow_schema:
assert not arrow_field.nullable


@pytest.mark.skipIf(pandas is None, "Requires `pandas`")
@pytest.mark.skipIf(pyarrow is None, "Requires `pyarrow`")
def test_to_arrow_w_unknown_type(module_under_test):
bq_schema = (
schema.SchemaField("field00", "UNKNOWN_TYPE"),
schema.SchemaField("field01", "STRING"),
schema.SchemaField("field02", "BYTES"),
schema.SchemaField("field03", "INTEGER"),
)
dataframe = pandas.DataFrame(
{
"field00": ["whoami", "whatami"],
"field01": ["hello", "world"],
"field02": [b"abd", b"efg"],
"field03": [1, 2],
}
)

with warnings.catch_warnings(record=True) as warned:
arrow_table = module_under_test.to_arrow(dataframe, bq_schema)
arrow_schema = arrow_table.schema

assert len(warned) == 1
warning = warned[0]
assert "field00" in str(warning)

assert len(arrow_schema) == len(bq_schema)
assert arrow_schema[0].name == "field00"
assert arrow_schema[1].name == "field01"
assert arrow_schema[2].name == "field02"
assert arrow_schema[3].name == "field03"


@pytest.mark.skipIf(pandas is None, "Requires `pandas`")
def test_to_parquet_without_pyarrow(module_under_test, monkeypatch):
monkeypatch.setattr(module_under_test, "pyarrow", None)
Expand Down

0 comments on commit 5c85d51

Please sign in to comment.