diff --git a/docs/usage/pandas.rst b/docs/usage/pandas.rst index 109259711..550a67792 100644 --- a/docs/usage/pandas.rst +++ b/docs/usage/pandas.rst @@ -50,13 +50,25 @@ The following data types are used when creating a pandas DataFrame. - * - DATETIME - datetime64[ns], object - - object is used when there are values not representable in pandas + - The object dtype is used when there are values not representable in a + pandas nanosecond-precision timestamp. + * - DATE + - dbdate, object + - The object dtype is used when there are values not representable in a + pandas nanosecond-precision timestamp. + + Requires the ``db-dtypes`` package. See the `db-dtypes usage guide + `_ * - FLOAT64 - float64 - * - INT64 - Int64 - + * - TIME + - dbtime + - Requires the ``db-dtypes`` package. See the `db-dtypes usage guide + `_ Retrieve BigQuery GEOGRAPHY data as a GeoPandas GeoDataFrame ------------------------------------------------------------ diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 07b288236..39fa74dea 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -18,16 +18,21 @@ import functools import logging import queue -from typing import Dict, Sequence import warnings try: import pandas # type: ignore except ImportError: # pragma: NO COVER pandas = None + date_dtype_name = time_dtype_name = "" # Use '' rather than None because pytype else: import numpy + from db_dtypes import DateDtype, TimeDtype # type: ignore + + date_dtype_name = DateDtype.name + time_dtype_name = TimeDtype.name + import pyarrow # type: ignore import pyarrow.parquet # type: ignore @@ -77,15 +82,6 @@ def _to_wkb(v): _MAX_QUEUE_SIZE_DEFAULT = object() # max queue size sentinel for BQ Storage downloads -# If you update the default dtypes, also update the docs at docs/usage/pandas.rst. -_BQ_TO_PANDAS_DTYPE_NULLSAFE = { - "BOOL": "boolean", - "BOOLEAN": "boolean", - "FLOAT": "float64", - "FLOAT64": "float64", - "INT64": "Int64", - "INTEGER": "Int64", -} _PANDAS_DTYPE_TO_BQ = { "bool": "BOOLEAN", "datetime64[ns, UTC]": "TIMESTAMP", @@ -102,6 +98,8 @@ def _to_wkb(v): "uint16": "INTEGER", "uint32": "INTEGER", "geometry": "GEOGRAPHY", + date_dtype_name: "DATE", + time_dtype_name: "TIME", } @@ -267,26 +265,40 @@ def bq_to_arrow_schema(bq_schema): return pyarrow.schema(arrow_fields) -def bq_schema_to_nullsafe_pandas_dtypes( - bq_schema: Sequence[schema.SchemaField], -) -> Dict[str, str]: - """Return the default dtypes to use for columns in a BigQuery schema. +def default_types_mapper(date_as_object: bool = False): + """Create a mapping from pyarrow types to pandas types. - Only returns default dtypes which are safe to have NULL values. This - includes Int64, which has pandas.NA values and does not result in - loss-of-precision. + This overrides the pandas defaults to use null-safe extension types where + available. - Returns: - A mapping from column names to pandas dtypes. + See: https://arrow.apache.org/docs/python/api/datatypes.html for a list of + data types. See: + tests/unit/test__pandas_helpers.py::test_bq_to_arrow_data_type for + BigQuery to Arrow type mapping. + + Note to google-cloud-bigquery developers: If you update the default dtypes, + also update the docs at docs/usage/pandas.rst. """ - dtypes = {} - for bq_field in bq_schema: - if bq_field.mode.upper() not in {"NULLABLE", "REQUIRED"}: - continue - field_type = bq_field.field_type.upper() - if field_type in _BQ_TO_PANDAS_DTYPE_NULLSAFE: - dtypes[bq_field.name] = _BQ_TO_PANDAS_DTYPE_NULLSAFE[field_type] - return dtypes + + def types_mapper(arrow_data_type): + if pyarrow.types.is_boolean(arrow_data_type): + return pandas.BooleanDtype() + + elif ( + # If date_as_object is True, we know some DATE columns are + # out-of-bounds of what is supported by pandas. + not date_as_object + and pyarrow.types.is_date(arrow_data_type) + ): + return DateDtype() + + elif pyarrow.types.is_integer(arrow_data_type): + return pandas.Int64Dtype() + + elif pyarrow.types.is_time(arrow_data_type): + return TimeDtype() + + return types_mapper def bq_to_arrow_array(series, bq_field): diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 7ff752da1..6b8b5ce12 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1556,7 +1556,6 @@ def to_dataframe( dtypes: Dict[str, Any] = None, progress_bar_type: str = None, create_bqstorage_client: bool = True, - date_as_object: bool = True, max_results: Optional[int] = None, geography_as_object: bool = False, ) -> "pandas.DataFrame": @@ -1599,12 +1598,6 @@ def to_dataframe( .. versionadded:: 1.24.0 - date_as_object (Optional[bool]): - If ``True`` (default), cast dates to objects. If ``False``, convert - to datetime64[ns] dtype. - - .. versionadded:: 1.26.0 - max_results (Optional[int]): Maximum number of rows to include in the result. No limit by default. @@ -1638,7 +1631,6 @@ def to_dataframe( dtypes=dtypes, progress_bar_type=progress_bar_type, create_bqstorage_client=create_bqstorage_client, - date_as_object=date_as_object, geography_as_object=geography_as_object, ) @@ -1651,7 +1643,6 @@ def to_geodataframe( dtypes: Dict[str, Any] = None, progress_bar_type: str = None, create_bqstorage_client: bool = True, - date_as_object: bool = True, max_results: Optional[int] = None, geography_column: Optional[str] = None, ) -> "geopandas.GeoDataFrame": @@ -1694,12 +1685,6 @@ def to_geodataframe( .. versionadded:: 1.24.0 - date_as_object (Optional[bool]): - If ``True`` (default), cast dates to objects. If ``False``, convert - to datetime64[ns] dtype. - - .. versionadded:: 1.26.0 - max_results (Optional[int]): Maximum number of rows to include in the result. No limit by default. @@ -1732,7 +1717,6 @@ def to_geodataframe( dtypes=dtypes, progress_bar_type=progress_bar_type, create_bqstorage_client=create_bqstorage_client, - date_as_object=date_as_object, geography_column=geography_column, ) diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 140fa13ae..f434688e7 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -28,6 +28,8 @@ import pandas # type: ignore except ImportError: # pragma: NO COVER pandas = None +else: + import db_dtypes # type: ignore # noqa import pyarrow # type: ignore @@ -1815,7 +1817,6 @@ def to_dataframe( dtypes: Dict[str, Any] = None, progress_bar_type: str = None, create_bqstorage_client: bool = True, - date_as_object: bool = True, geography_as_object: bool = False, ) -> "pandas.DataFrame": """Create a pandas DataFrame by loading all pages of a query. @@ -1865,12 +1866,6 @@ def to_dataframe( .. versionadded:: 1.24.0 - date_as_object (Optional[bool]): - If ``True`` (default), cast dates to objects. If ``False``, convert - to datetime64[ns] dtype. - - .. versionadded:: 1.26.0 - geography_as_object (Optional[bool]): If ``True``, convert GEOGRAPHY data to :mod:`shapely` geometry objects. If ``False`` (default), don't cast @@ -1912,40 +1907,44 @@ def to_dataframe( bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, ) - default_dtypes = _pandas_helpers.bq_schema_to_nullsafe_pandas_dtypes( - self.schema - ) - # Let the user-defined dtypes override the default ones. - # https://stackoverflow.com/a/26853961/101923 - dtypes = {**default_dtypes, **dtypes} - - # When converting timestamp values to nanosecond precision, the result + # When converting date or timestamp values to nanosecond precision, the result # can be out of pyarrow bounds. To avoid the error when converting to - # Pandas, we set the timestamp_as_object parameter to True, if necessary. - types_to_check = { - pyarrow.timestamp("us"), - pyarrow.timestamp("us", tz=datetime.timezone.utc), - } - - for column in record_batch: - if column.type in types_to_check: - try: - column.cast("timestamp[ns]") - except pyarrow.lib.ArrowInvalid: - timestamp_as_object = True - break - else: - timestamp_as_object = False - - extra_kwargs = {"timestamp_as_object": timestamp_as_object} + # Pandas, we set the date_as_object or timestamp_as_object parameter to True, + # if necessary. + date_as_object = not all( + self.__can_cast_timestamp_ns(col) + for col in record_batch + # Type can be date32 or date64 (plus units). + # See: https://arrow.apache.org/docs/python/api/datatypes.html + if str(col.type).startswith("date") + ) - df = record_batch.to_pandas( - date_as_object=date_as_object, integer_object_nulls=True, **extra_kwargs + timestamp_as_object = not all( + self.__can_cast_timestamp_ns(col) + for col in record_batch + # Type can be timestamp (plus units and time zone). + # See: https://arrow.apache.org/docs/python/api/datatypes.html + if str(col.type).startswith("timestamp") ) + if len(record_batch) > 0: + df = record_batch.to_pandas( + date_as_object=date_as_object, + timestamp_as_object=timestamp_as_object, + integer_object_nulls=True, + types_mapper=_pandas_helpers.default_types_mapper( + date_as_object=date_as_object + ), + ) + else: + # Avoid "ValueError: need at least one array to concatenate" on + # older versions of pandas when converting empty RecordBatch to + # DataFrame. See: https://github.com/pandas-dev/pandas/issues/41241 + df = pandas.DataFrame([], columns=record_batch.schema.names) + for column in dtypes: - df[column] = pandas.Series(df[column], dtype=dtypes[column]) + df[column] = pandas.Series(df[column], dtype=dtypes[column], copy=False) if geography_as_object: for field in self.schema: @@ -1954,6 +1953,15 @@ def to_dataframe( return df + @staticmethod + def __can_cast_timestamp_ns(column): + try: + column.cast("timestamp[ns]") + except pyarrow.lib.ArrowInvalid: + return False + else: + return True + # If changing the signature of this method, make sure to apply the same # changes to job.QueryJob.to_geodataframe() def to_geodataframe( @@ -1962,7 +1970,6 @@ def to_geodataframe( dtypes: Dict[str, Any] = None, progress_bar_type: str = None, create_bqstorage_client: bool = True, - date_as_object: bool = True, geography_column: Optional[str] = None, ) -> "geopandas.GeoDataFrame": """Create a GeoPandas GeoDataFrame by loading all pages of a query. @@ -2010,10 +2017,6 @@ def to_geodataframe( This argument does nothing if ``bqstorage_client`` is supplied. - date_as_object (Optional[bool]): - If ``True`` (default), cast dates to objects. If ``False``, convert - to datetime64[ns] dtype. - geography_column (Optional[str]): If there are more than one GEOGRAPHY column, identifies which one to use to construct a geopandas @@ -2069,7 +2072,6 @@ def to_geodataframe( dtypes, progress_bar_type, create_bqstorage_client, - date_as_object, geography_as_object=True, ) @@ -2126,7 +2128,6 @@ def to_dataframe( dtypes=None, progress_bar_type=None, create_bqstorage_client=True, - date_as_object=True, geography_as_object=False, ) -> "pandas.DataFrame": """Create an empty dataframe. @@ -2136,7 +2137,6 @@ def to_dataframe( dtypes (Any): Ignored. Added for compatibility with RowIterator. progress_bar_type (Any): Ignored. Added for compatibility with RowIterator. create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. - date_as_object (bool): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. @@ -2151,7 +2151,6 @@ def to_geodataframe( dtypes=None, progress_bar_type=None, create_bqstorage_client=True, - date_as_object=True, geography_column: Optional[str] = None, ) -> "pandas.DataFrame": """Create an empty dataframe. @@ -2161,7 +2160,6 @@ def to_geodataframe( dtypes (Any): Ignored. Added for compatibility with RowIterator. progress_bar_type (Any): Ignored. Added for compatibility with RowIterator. create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. - date_as_object (bool): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. diff --git a/samples/geography/requirements.txt b/samples/geography/requirements.txt index e2de86673..d33da667c 100644 --- a/samples/geography/requirements.txt +++ b/samples/geography/requirements.txt @@ -7,6 +7,7 @@ click==8.0.1 click-plugins==1.1.1 cligj==0.7.2 dataclasses==0.6; python_version < '3.7' +db-dtypes==0.3.0 Fiona==1.8.20 geojson==2.5.0 geopandas==0.9.0 diff --git a/samples/magics/requirements.txt b/samples/magics/requirements.txt index 5cc7ec33f..0d36904c4 100644 --- a/samples/magics/requirements.txt +++ b/samples/magics/requirements.txt @@ -1,3 +1,4 @@ +db-dtypes==0.3.0 google-cloud-bigquery-storage==2.9.0 google-auth-oauthlib==0.4.6 grpcio==1.41.0 diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index f79552392..4f04611ba 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1,3 +1,4 @@ +db-dtypes==0.3.0 google-cloud-bigquery-storage==2.9.0 google-auth-oauthlib==0.4.6 grpcio==1.41.0 diff --git a/setup.py b/setup.py index 7ff571b05..09a374303 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,7 @@ # Keep the no-op bqstorage extra for backward compatibility. # See: https://github.com/googleapis/python-bigquery/issues/757 "bqstorage": [], - "pandas": ["pandas>=1.0.0"], + "pandas": ["pandas>=1.0.0", "db-dtypes>=0.3.0,<2.0.0dev"], "geopandas": ["geopandas>=0.9.0, <1.0dev", "Shapely>=1.6.0, <2.0dev"], "tqdm": ["tqdm >= 4.7.4, <5.0.0dev"], "opentelemetry": [ diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index b9b93d4f1..a2fc429a3 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -5,6 +5,7 @@ # # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 +db-dtypes==0.3.0 geopandas==0.9.0 google-api-core==1.29.0 google-cloud-bigquery-storage==2.0.0 diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index e69de29bb..684864f2b 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -0,0 +1 @@ +pandas==1.1.0 diff --git a/testing/constraints-3.8.txt b/testing/constraints-3.8.txt index e69de29bb..3fd8886e6 100644 --- a/testing/constraints-3.8.txt +++ b/testing/constraints-3.8.txt @@ -0,0 +1 @@ +pandas==1.2.0 diff --git a/tests/system/test_pandas.py b/tests/system/test_pandas.py index 39ea3e878..bc8e43370 100644 --- a/tests/system/test_pandas.py +++ b/tests/system/test_pandas.py @@ -24,7 +24,6 @@ import google.api_core.retry import pkg_resources import pytest -import numpy from google.cloud import bigquery from google.cloud import bigquery_storage @@ -34,6 +33,7 @@ pandas = pytest.importorskip("pandas", minversion="0.23.0") +numpy = pytest.importorskip("numpy") PANDAS_INSTALLED_VERSION = pkg_resources.get_distribution("pandas").parsed_version @@ -84,6 +84,28 @@ def test_load_table_from_dataframe_w_automatic_schema(bigquery_client, dataset_i ("uint8_col", pandas.Series([0, 1, 2], dtype="uint8")), ("uint16_col", pandas.Series([3, 4, 5], dtype="uint16")), ("uint32_col", pandas.Series([6, 7, 8], dtype="uint32")), + ( + "date_col", + pandas.Series( + [ + datetime.date(2010, 1, 2), + datetime.date(2011, 2, 3), + datetime.date(2012, 3, 14), + ], + dtype="dbdate", + ), + ), + ( + "time_col", + pandas.Series( + [ + datetime.time(3, 44, 50), + datetime.time(14, 50, 59), + datetime.time(15, 16), + ], + dtype="dbtime", + ), + ), ("array_bool_col", pandas.Series([[True], [False], [True]])), ( "array_ts_col", @@ -186,6 +208,8 @@ def test_load_table_from_dataframe_w_automatic_schema(bigquery_client, dataset_i bigquery.SchemaField("uint8_col", "INTEGER"), bigquery.SchemaField("uint16_col", "INTEGER"), bigquery.SchemaField("uint32_col", "INTEGER"), + bigquery.SchemaField("date_col", "DATE"), + bigquery.SchemaField("time_col", "TIME"), bigquery.SchemaField("array_bool_col", "BOOLEAN", mode="REPEATED"), bigquery.SchemaField("array_ts_col", "TIMESTAMP", mode="REPEATED"), # TODO: Update to DATETIME in V3 @@ -201,7 +225,87 @@ def test_load_table_from_dataframe_w_automatic_schema(bigquery_client, dataset_i bigquery.SchemaField("array_uint16_col", "INTEGER", mode="REPEATED"), bigquery.SchemaField("array_uint32_col", "INTEGER", mode="REPEATED"), ) - assert table.num_rows == 3 + assert numpy.array( + sorted(map(list, bigquery_client.list_rows(table)), key=lambda r: r[5]), + dtype="object", + ).transpose().tolist() == [ + # bool_col + [True, False, True], + # ts_col + [ + datetime.datetime(2010, 1, 2, 3, 44, 50, tzinfo=datetime.timezone.utc), + datetime.datetime(2011, 2, 3, 14, 50, 59, tzinfo=datetime.timezone.utc), + datetime.datetime(2012, 3, 14, 15, 16, tzinfo=datetime.timezone.utc), + ], + # dt_col + # TODO: Remove tzinfo in V3. + # https://github.com/googleapis/python-bigquery/issues/985 + [ + datetime.datetime(2010, 1, 2, 3, 44, 50, tzinfo=datetime.timezone.utc), + datetime.datetime(2011, 2, 3, 14, 50, 59, tzinfo=datetime.timezone.utc), + datetime.datetime(2012, 3, 14, 15, 16, tzinfo=datetime.timezone.utc), + ], + # float32_col + [1.0, 2.0, 3.0], + # float64_col + [4.0, 5.0, 6.0], + # int8_col + [-12, -11, -10], + # int16_col + [-9, -8, -7], + # int32_col + [-6, -5, -4], + # int64_col + [-3, -2, -1], + # uint8_col + [0, 1, 2], + # uint16_col + [3, 4, 5], + # uint32_col + [6, 7, 8], + # date_col + [ + datetime.date(2010, 1, 2), + datetime.date(2011, 2, 3), + datetime.date(2012, 3, 14), + ], + # time_col + [datetime.time(3, 44, 50), datetime.time(14, 50, 59), datetime.time(15, 16)], + # array_bool_col + [[True], [False], [True]], + # array_ts_col + [ + [datetime.datetime(2010, 1, 2, 3, 44, 50, tzinfo=datetime.timezone.utc)], + [datetime.datetime(2011, 2, 3, 14, 50, 59, tzinfo=datetime.timezone.utc)], + [datetime.datetime(2012, 3, 14, 15, 16, tzinfo=datetime.timezone.utc)], + ], + # array_dt_col + # TODO: Remove tzinfo in V3. + # https://github.com/googleapis/python-bigquery/issues/985 + [ + [datetime.datetime(2010, 1, 2, 3, 44, 50, tzinfo=datetime.timezone.utc)], + [datetime.datetime(2011, 2, 3, 14, 50, 59, tzinfo=datetime.timezone.utc)], + [datetime.datetime(2012, 3, 14, 15, 16, tzinfo=datetime.timezone.utc)], + ], + # array_float32_col + [[1.0], [2.0], [3.0]], + # array_float64_col + [[4.0], [5.0], [6.0]], + # array_int8_col + [[-12], [-11], [-10]], + # array_int16_col + [[-9], [-8], [-7]], + # array_int32_col + [[-6], [-5], [-4]], + # array_int64_col + [[-3], [-2], [-1]], + # array_uint8_col + [[0], [1], [2]], + # array_uint16_col + [[3], [4], [5]], + # array_uint32_col + [[6], [7], [8]], + ] @pytest.mark.skipif( @@ -697,6 +801,8 @@ def test_insert_rows_from_dataframe(bigquery_client, dataset_id): SF("int_col", "INTEGER", mode="REQUIRED"), SF("bool_col", "BOOLEAN", mode="REQUIRED"), SF("string_col", "STRING", mode="NULLABLE"), + SF("date_col", "DATE", mode="NULLABLE"), + SF("time_col", "TIME", mode="NULLABLE"), ] dataframe = pandas.DataFrame( @@ -706,30 +812,40 @@ def test_insert_rows_from_dataframe(bigquery_client, dataset_id): "bool_col": True, "string_col": "my string", "int_col": 10, + "date_col": datetime.date(2021, 1, 1), + "time_col": datetime.time(21, 1, 1), }, { "float_col": 2.22, "bool_col": False, "string_col": "another string", "int_col": 20, + "date_col": datetime.date(2021, 1, 2), + "time_col": datetime.time(21, 1, 2), }, { "float_col": 3.33, "bool_col": False, "string_col": "another string", "int_col": 30, + "date_col": datetime.date(2021, 1, 3), + "time_col": datetime.time(21, 1, 3), }, { "float_col": 4.44, "bool_col": True, "string_col": "another string", "int_col": 40, + "date_col": datetime.date(2021, 1, 4), + "time_col": datetime.time(21, 1, 4), }, { "float_col": 5.55, "bool_col": False, "string_col": "another string", "int_col": 50, + "date_col": datetime.date(2021, 1, 5), + "time_col": datetime.time(21, 1, 5), }, { "float_col": 6.66, @@ -738,9 +854,13 @@ def test_insert_rows_from_dataframe(bigquery_client, dataset_id): # NULL value indicator. "string_col": float("NaN"), "int_col": 60, + "date_col": datetime.date(2021, 1, 6), + "time_col": datetime.time(21, 1, 6), }, ] ) + dataframe["date_col"] = dataframe["date_col"].astype("dbdate") + dataframe["time_col"] = dataframe["time_col"].astype("dbtime") table_id = f"{bigquery_client.project}.{dataset_id}.test_insert_rows_from_dataframe" table_arg = bigquery.Table(table_id, schema=schema) @@ -916,15 +1036,8 @@ def test_list_rows_nullable_scalars_dtypes(bigquery_client, scalars_table, max_r assert df.dtypes["float64_col"].name == "float64" assert df.dtypes["int64_col"].name == "Int64" assert df.dtypes["timestamp_col"].name == "datetime64[ns, UTC]" - - # object is used by default, but we can use "datetime64[ns]" automatically - # when data is within the supported range. - # https://github.com/googleapis/python-bigquery/issues/861 - assert df.dtypes["date_col"].name == "object" - - # object is used by default, but we can use "timedelta64[ns]" automatically - # https://github.com/googleapis/python-bigquery/issues/862 - assert df.dtypes["time_col"].name == "object" + assert df.dtypes["date_col"].name == "dbdate" + assert df.dtypes["time_col"].name == "dbtime" # decimal.Decimal is used to avoid loss of precision. assert df.dtypes["bignumeric_col"].name == "object" @@ -974,10 +1087,7 @@ def test_list_rows_nullable_scalars_extreme_dtypes( assert df.dtypes["bool_col"].name == "boolean" assert df.dtypes["float64_col"].name == "float64" assert df.dtypes["int64_col"].name == "Int64" - - # object is used by default, but we can use "timedelta64[ns]" automatically - # https://github.com/googleapis/python-bigquery/issues/862 - assert df.dtypes["time_col"].name == "object" + assert df.dtypes["time_col"].name == "dbtime" # decimal.Decimal is used to avoid loss of precision. assert df.dtypes["numeric_col"].name == "object" diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index 8e4fba770..044ca6e9a 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -21,6 +21,8 @@ import pytest from google.cloud import bigquery_storage +import google.cloud.bigquery_storage_v1.reader +import google.cloud.bigquery_storage_v1.services.big_query_read.client try: import pandas @@ -39,8 +41,8 @@ except (ImportError, AttributeError): # pragma: NO COVER tqdm = None +from google.cloud.bigquery._helpers import BQ_STORAGE_VERSIONS from ..helpers import make_connection - from .helpers import _make_client from .helpers import _make_job_resource @@ -108,7 +110,7 @@ def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg): ) job_resource["configuration"]["query"]["query"] = query job_resource["status"] = {"state": "DONE"} - get_query_results_resource = { + query_resource = { "jobComplete": True, "jobReference": {"projectId": "test-project", "jobId": "test-job"}, "schema": { @@ -119,25 +121,44 @@ def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg): }, "totalRows": "4", } - connection = make_connection(get_query_results_resource, job_resource) + stream_id = "projects/1/locations/2/sessions/3/streams/4" + name_array = pyarrow.array( + ["John", "Paul", "George", "Ringo"], type=pyarrow.string() + ) + age_array = pyarrow.array([17, 24, 21, 15], type=pyarrow.int64()) + arrow_schema = pyarrow.schema( + [ + pyarrow.field("name", pyarrow.string(), True), + pyarrow.field("age", pyarrow.int64(), True), + ] + ) + record_batch = pyarrow.RecordBatch.from_arrays( + [name_array, age_array], schema=arrow_schema + ) + connection = make_connection(query_resource) client = _make_client(connection=connection) job = target_class.from_api_repr(job_resource, client) bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient) session = bigquery_storage.types.ReadSession() - session.avro_schema.schema = json.dumps( - { - "type": "record", - "name": "__root__", - "fields": [ - {"name": "name", "type": ["null", "string"]}, - {"name": "age", "type": ["null", "long"]}, - ], - } - ) + session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes() + session.streams = [bigquery_storage.types.ReadStream(name=stream_id)] bqstorage_client.create_read_session.return_value = session + bqstorage_base_client = mock.create_autospec(bigquery_storage.BigQueryReadClient) + page = bigquery_storage.types.ReadRowsResponse() + if BQ_STORAGE_VERSIONS.is_read_session_optional: + page.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes() + page.arrow_record_batch.serialized_record_batch = ( + record_batch.serialize().to_pybytes() + ) + bqstorage_base_client.read_rows.return_value = [page] + reader = google.cloud.bigquery_storage_v1.reader.ReadRowsStream( + [page], bqstorage_base_client, stream_id, 0, {} + ) + bqstorage_client.read_rows.return_value = reader - job.to_dataframe(bqstorage_client=bqstorage_client) + dataframe = job.to_dataframe(bqstorage_client=bqstorage_client) + assert len(dataframe) == 4 destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format( **job_resource["configuration"]["query"]["destinationTable"] ) @@ -498,25 +519,44 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg): ] }, } + stream_id = "projects/1/locations/2/sessions/3/streams/4" + name_array = pyarrow.array( + ["John", "Paul", "George", "Ringo"], type=pyarrow.string() + ) + age_array = pyarrow.array([17, 24, 21, 15], type=pyarrow.int64()) + arrow_schema = pyarrow.schema( + [ + pyarrow.field("name", pyarrow.string(), True), + pyarrow.field("age", pyarrow.int64(), True), + ] + ) + record_batch = pyarrow.RecordBatch.from_arrays( + [name_array, age_array], schema=arrow_schema + ) connection = make_connection(query_resource) client = _make_client(connection=connection) job = target_class.from_api_repr(resource, client) bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient) session = bigquery_storage.types.ReadSession() - session.avro_schema.schema = json.dumps( - { - "type": "record", - "name": "__root__", - "fields": [ - {"name": "name", "type": ["null", "string"]}, - {"name": "age", "type": ["null", "long"]}, - ], - } - ) + session.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes() + session.streams = [bigquery_storage.types.ReadStream(name=stream_id)] bqstorage_client.create_read_session.return_value = session + bqstorage_base_client = mock.create_autospec(bigquery_storage.BigQueryReadClient) + page = bigquery_storage.types.ReadRowsResponse() + if BQ_STORAGE_VERSIONS.is_read_session_optional: + page.arrow_schema.serialized_schema = arrow_schema.serialize().to_pybytes() + page.arrow_record_batch.serialized_record_batch = ( + record_batch.serialize().to_pybytes() + ) + bqstorage_base_client.read_rows.return_value = [page] + reader = google.cloud.bigquery_storage_v1.reader.ReadRowsStream( + [page], bqstorage_base_client, stream_id, 0, {} + ) + bqstorage_client.read_rows.return_value = reader - job.to_dataframe(bqstorage_client=bqstorage_client) + dataframe = job.to_dataframe(bqstorage_client=bqstorage_client) + assert len(dataframe) == 4 destination_table = "projects/{projectId}/datasets/{datasetId}/tables/{tableId}".format( **resource["configuration"]["query"]["destinationTable"] ) @@ -530,6 +570,7 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg): read_session=expected_session, max_stream_count=0, # Use default number of streams for best performance. ) + bqstorage_client.read_rows.assert_called_once_with(stream_id) def test_to_dataframe_bqstorage_no_pyarrow_compression(): @@ -630,7 +671,7 @@ def test_to_dataframe_column_dtypes(): assert df.km.dtype.name == "float16" assert df.payment_type.dtype.name == "object" assert df.complete.dtype.name == "boolean" - assert df.date.dtype.name == "object" + assert df.date.dtype.name == "dbdate" def test_to_dataframe_column_date_dtypes(): @@ -655,13 +696,13 @@ def test_to_dataframe_column_date_dtypes(): ) client = _make_client(connection=connection) job = target_class.from_api_repr(begun_resource, client) - df = job.to_dataframe(date_as_object=False, create_bqstorage_client=False) + df = job.to_dataframe(create_bqstorage_client=False) assert isinstance(df, pandas.DataFrame) assert len(df) == 1 # verify the number of rows exp_columns = [field["name"] for field in query_resource["schema"]["fields"]] assert list(df) == exp_columns # verify the column names - assert df.date.dtype.name == "datetime64[ns]" + assert df.date.dtype.name == "dbdate" @pytest.mark.skipif(tqdm is None, reason="Requires `tqdm`") @@ -916,7 +957,6 @@ def test_query_job_to_geodataframe_delegation(wait_for_query): dtypes = dict(xxx=numpy.dtype("int64")) progress_bar_type = "normal" create_bqstorage_client = False - date_as_object = False max_results = 42 geography_column = "g" @@ -925,7 +965,6 @@ def test_query_job_to_geodataframe_delegation(wait_for_query): dtypes=dtypes, progress_bar_type=progress_bar_type, create_bqstorage_client=create_bqstorage_client, - date_as_object=date_as_object, max_results=max_results, geography_column=geography_column, ) @@ -939,7 +978,6 @@ def test_query_job_to_geodataframe_delegation(wait_for_query): dtypes=dtypes, progress_bar_type=progress_bar_type, create_bqstorage_client=create_bqstorage_client, - date_as_object=date_as_object, geography_column=geography_column, ) assert df is row_iterator.to_geodataframe.return_value diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 04b5f2b85..8bd1fe1df 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -5206,14 +5206,39 @@ def test_insert_rows_from_dataframe(self): self.PROJECT, self.DS_ID, self.TABLE_REF.table_id ) - dataframe = pandas.DataFrame( - [ - {"name": "Little One", "age": 10, "adult": False}, - {"name": "Young Gun", "age": 20, "adult": True}, - {"name": "Dad", "age": 30, "adult": True}, - {"name": "Stranger", "age": 40, "adult": True}, - ] - ) + data = [ + { + "name": "Little One", + "age": 10, + "adult": False, + "bdate": datetime.date(2011, 1, 2), + "btime": datetime.time(19, 1, 10), + }, + { + "name": "Young Gun", + "age": 20, + "adult": True, + "bdate": datetime.date(2001, 1, 2), + "btime": datetime.time(19, 1, 20), + }, + { + "name": "Dad", + "age": 30, + "adult": True, + "bdate": datetime.date(1991, 1, 2), + "btime": datetime.time(19, 1, 30), + }, + { + "name": "Stranger", + "age": 40, + "adult": True, + "bdate": datetime.date(1981, 1, 2), + "btime": datetime.time(19, 1, 40), + }, + ] + dataframe = pandas.DataFrame(data) + dataframe["bdate"] = dataframe["bdate"].astype("dbdate") + dataframe["btime"] = dataframe["btime"].astype("dbtime") # create client creds = _make_credentials() @@ -5226,6 +5251,8 @@ def test_insert_rows_from_dataframe(self): SchemaField("name", "STRING", mode="REQUIRED"), SchemaField("age", "INTEGER", mode="REQUIRED"), SchemaField("adult", "BOOLEAN", mode="REQUIRED"), + SchemaField("bdata", "DATE", mode="REQUIRED"), + SchemaField("btime", "TIME", mode="REQUIRED"), ] table = Table(self.TABLE_REF, schema=schema) @@ -5238,32 +5265,14 @@ def test_insert_rows_from_dataframe(self): for chunk_errors in error_info: assert chunk_errors == [] - EXPECTED_SENT_DATA = [ - { - "rows": [ - { - "insertId": "0", - "json": {"name": "Little One", "age": "10", "adult": "false"}, - }, - { - "insertId": "1", - "json": {"name": "Young Gun", "age": "20", "adult": "true"}, - }, - { - "insertId": "2", - "json": {"name": "Dad", "age": "30", "adult": "true"}, - }, - ] - }, - { - "rows": [ - { - "insertId": "3", - "json": {"name": "Stranger", "age": "40", "adult": "true"}, - } - ] - }, - ] + for row in data: + row["age"] = str(row["age"]) + row["adult"] = str(row["adult"]).lower() + row["bdate"] = row["bdate"].isoformat() + row["btime"] = row["btime"].isoformat() + + rows = [dict(insertId=str(i), json=row) for i, row in enumerate(data)] + EXPECTED_SENT_DATA = [dict(rows=rows[:3]), dict(rows=rows[3:])] actual_calls = conn.api_request.call_args_list @@ -7084,6 +7093,28 @@ def test_load_table_from_dataframe_w_automatic_schema(self): dtype="datetime64[ns]", ).dt.tz_localize(datetime.timezone.utc), ), + ( + "date_col", + pandas.Series( + [ + datetime.date(2010, 1, 2), + datetime.date(2011, 2, 3), + datetime.date(2012, 3, 14), + ], + dtype="dbdate", + ), + ), + ( + "time_col", + pandas.Series( + [ + datetime.time(3, 44, 50), + datetime.time(14, 50, 59), + datetime.time(15, 16), + ], + dtype="dbtime", + ), + ), ] ) dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) @@ -7124,6 +7155,8 @@ def test_load_table_from_dataframe_w_automatic_schema(self): SchemaField("bool_col", "BOOLEAN"), SchemaField("dt_col", "TIMESTAMP"), SchemaField("ts_col", "TIMESTAMP"), + SchemaField("date_col", "DATE"), + SchemaField("time_col", "TIME"), ) @unittest.skipIf(pandas is None, "Requires `pandas`") diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index a34b0d56b..c9a3d2815 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -3115,7 +3115,37 @@ def test_to_dataframe_column_dtypes(self): self.assertEqual(df.km.dtype.name, "float16") self.assertEqual(df.payment_type.dtype.name, "object") self.assertEqual(df.complete.dtype.name, "boolean") - self.assertEqual(df.date.dtype.name, "object") + self.assertEqual(df.date.dtype.name, "dbdate") + + @unittest.skipIf(pandas is None, "Requires `pandas`") + def test_to_dataframe_datetime_objects(self): + # When converting date or timestamp values to nanosecond + # precision, the result can be out of pyarrow bounds. To avoid + # the error when converting to Pandas, we use object type if + # necessary. + + from google.cloud.bigquery.schema import SchemaField + + schema = [ + SchemaField("ts", "TIMESTAMP"), + SchemaField("date", "DATE"), + ] + row_data = [ + ["-20000000000000000", "1111-01-01"], + ] + rows = [{"f": [{"v": field} for field in row]} for row in row_data] + path = "/foo" + api_request = mock.Mock(return_value={"rows": rows}) + row_iterator = self._make_one(_mock_client(), api_request, path, schema) + + df = row_iterator.to_dataframe(create_bqstorage_client=False) + + self.assertIsInstance(df, pandas.DataFrame) + self.assertEqual(len(df), 1) # verify the number of rows + self.assertEqual(df["ts"].dtype.name, "object") + self.assertEqual(df["date"].dtype.name, "object") + self.assertEqual(df["ts"][0].date(), datetime.date(1336, 3, 23)) + self.assertEqual(df["date"][0], datetime.date(1111, 1, 1)) @mock.patch("google.cloud.bigquery.table.pandas", new=None) def test_to_dataframe_error_if_pandas_is_none(self): @@ -4036,7 +4066,6 @@ def test_rowiterator_to_geodataframe_delegation(self, to_dataframe): dtypes = dict(xxx=numpy.dtype("int64")) progress_bar_type = "normal" create_bqstorage_client = False - date_as_object = False geography_column = "g" to_dataframe.return_value = pandas.DataFrame( @@ -4048,7 +4077,6 @@ def test_rowiterator_to_geodataframe_delegation(self, to_dataframe): dtypes=dtypes, progress_bar_type=progress_bar_type, create_bqstorage_client=create_bqstorage_client, - date_as_object=date_as_object, geography_column=geography_column, ) @@ -4057,7 +4085,6 @@ def test_rowiterator_to_geodataframe_delegation(self, to_dataframe): dtypes, progress_bar_type, create_bqstorage_client, - date_as_object, geography_as_object=True, ) diff --git a/tests/unit/test_table_pandas.py b/tests/unit/test_table_pandas.py index a223e6652..8e37ed504 100644 --- a/tests/unit/test_table_pandas.py +++ b/tests/unit/test_table_pandas.py @@ -101,13 +101,13 @@ def test_to_dataframe_nullable_scalars(monkeypatch, class_under_test): assert df.dtypes["bignumeric_col"].name == "object" assert df.dtypes["bool_col"].name == "boolean" assert df.dtypes["bytes_col"].name == "object" - assert df.dtypes["date_col"].name == "object" + assert df.dtypes["date_col"].name == "dbdate" assert df.dtypes["datetime_col"].name == "datetime64[ns]" assert df.dtypes["float64_col"].name == "float64" assert df.dtypes["int64_col"].name == "Int64" assert df.dtypes["numeric_col"].name == "object" assert df.dtypes["string_col"].name == "object" - assert df.dtypes["time_col"].name == "object" + assert df.dtypes["time_col"].name == "dbtime" assert df.dtypes["timestamp_col"].name == "datetime64[ns, UTC]" # Check for expected values.