Skip to content

Commit

Permalink
chore: refactor BigQueryStorageVersions (#1699)
Browse files Browse the repository at this point in the history
* chore: refactor BigQueryStorageVersions

* address comments in #1680

* add unit test
  • Loading branch information
Linchin authored Oct 26, 2023
1 parent 76d0e5a commit e8da978
Show file tree
Hide file tree
Showing 13 changed files with 286 additions and 196 deletions.
65 changes: 0 additions & 65 deletions google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@
from google.cloud._helpers import _RFC3339_NO_FRACTION
from google.cloud._helpers import _to_bytes

import packaging.version

from google.cloud.bigquery import exceptions

_RFC3339_MICROS_NO_ZULU = "%Y-%m-%dT%H:%M:%S.%f"
_TIMEONLY_WO_MICROS = "%H:%M:%S"
_TIMEONLY_W_MICROS = "%H:%M:%S.%f"
Expand All @@ -52,10 +48,6 @@
r"(?P<time_sign>-?)(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)\.?(?P<fraction>\d*)?$"
)

_MIN_BQ_STORAGE_VERSION = packaging.version.Version("2.0.0")

_BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION = packaging.version.Version("2.6.0")

BIGQUERY_EMULATOR_HOST = "BIGQUERY_EMULATOR_HOST"
"""Environment variable defining host for emulator."""

Expand All @@ -67,63 +59,6 @@ def _get_bigquery_host():
return os.environ.get(BIGQUERY_EMULATOR_HOST, _DEFAULT_HOST)


class BQStorageVersions:
"""Version comparisons for google-cloud-bigqueyr-storage package."""

def __init__(self):
self._installed_version = None

@property
def installed_version(self) -> packaging.version.Version:
"""Return the parsed version of google-cloud-bigquery-storage."""
if self._installed_version is None:
from google.cloud import bigquery_storage

self._installed_version = packaging.version.parse(
# Use 0.0.0, since it is earlier than any released version.
# Legacy versions also have the same property, but
# creating a LegacyVersion has been deprecated.
# https://github.com/pypa/packaging/issues/321
getattr(bigquery_storage, "__version__", "0.0.0")
)

return self._installed_version # type: ignore

@property
def is_read_session_optional(self) -> bool:
"""True if read_session is optional to rows().
See: https://github.com/googleapis/python-bigquery-storage/pull/228
"""
return self.installed_version >= _BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION

def verify_version(self):
"""Verify that a recent enough version of BigQuery Storage extra is
installed.
The function assumes that google-cloud-bigquery-storage extra is
installed, and should thus be used in places where this assumption
holds.
Because `pip` can install an outdated version of this extra despite the
constraints in `setup.py`, the calling code can use this helper to
verify the version compatibility at runtime.
Raises:
exceptions.LegacyBigQueryStorageError:
If the google-cloud-bigquery-storage package is outdated.
"""
if self.installed_version < _MIN_BQ_STORAGE_VERSION:
msg = (
"Dependency google-cloud-bigquery-storage is outdated, please upgrade "
f"it to version >= {_MIN_BQ_STORAGE_VERSION} (version found: {self.installed_version})."
)
raise exceptions.LegacyBigQueryStorageError(msg)


BQ_STORAGE_VERSIONS = BQStorageVersions()


def _not_null(value, field):
"""Check whether 'value' should be coerced to 'field' type."""
return value is not None or (field is not None and field.mode != "NULLABLE")
Expand Down
3 changes: 1 addition & 2 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import warnings
from typing import Any, Union

from google.cloud.bigquery import _helpers
from google.cloud.bigquery import _pyarrow_helpers
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery import schema
Expand Down Expand Up @@ -745,7 +744,7 @@ def _download_table_bqstorage_stream(

# Avoid deprecation warnings for passing in unnecessary read session.
# https://github.com/googleapis/python-bigquery-storage/issues/229
if _helpers.BQ_STORAGE_VERSIONS.is_read_session_optional:
if _versions_helpers.BQ_STORAGE_VERSIONS.is_read_session_optional:
rowstream = reader.rows()
else:
rowstream = reader.rows(session)
Expand Down
81 changes: 80 additions & 1 deletion google/cloud/bigquery/_versions_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@


_MIN_PYARROW_VERSION = packaging.version.Version("3.0.0")
_MIN_BQ_STORAGE_VERSION = packaging.version.Version("2.0.0")
_BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION = packaging.version.Version("2.6.0")


class PyarrowVersions:
Expand Down Expand Up @@ -51,7 +53,7 @@ def use_compliant_nested_type(self) -> bool:
return self.installed_version.major >= 4

def try_import(self, raise_if_error: bool = False) -> Any:
"""Verify that a recent enough version of pyarrow extra is installed.
"""Verifies that a recent enough version of pyarrow extra is installed.
The function assumes that pyarrow extra is installed, and should thus
be used in places where this assumption holds.
Expand Down Expand Up @@ -92,3 +94,80 @@ def try_import(self, raise_if_error: bool = False) -> Any:


PYARROW_VERSIONS = PyarrowVersions()


class BQStorageVersions:
"""Version comparisons for google-cloud-bigqueyr-storage package."""

def __init__(self):
self._installed_version = None

@property
def installed_version(self) -> packaging.version.Version:
"""Return the parsed version of google-cloud-bigquery-storage."""
if self._installed_version is None:
from google.cloud import bigquery_storage

self._installed_version = packaging.version.parse(
# Use 0.0.0, since it is earlier than any released version.
# Legacy versions also have the same property, but
# creating a LegacyVersion has been deprecated.
# https://github.com/pypa/packaging/issues/321
getattr(bigquery_storage, "__version__", "0.0.0")
)

return self._installed_version # type: ignore

@property
def is_read_session_optional(self) -> bool:
"""True if read_session is optional to rows().
See: https://github.com/googleapis/python-bigquery-storage/pull/228
"""
return self.installed_version >= _BQ_STORAGE_OPTIONAL_READ_SESSION_VERSION

def try_import(self, raise_if_error: bool = False) -> Any:
"""Tries to import the bigquery_storage module, and returns results
accordingly. It also verifies the module version is recent enough.
If the import succeeds, returns the ``bigquery_storage`` module.
If the import fails,
returns ``None`` when ``raise_if_error == False``,
raises Error when ``raise_if_error == True``.
Returns:
The ``bigquery_storage`` module or ``None``.
Raises:
exceptions.BigQueryStorageNotFoundError:
If google-cloud-bigquery-storage is not installed
exceptions.LegacyBigQueryStorageError:
If google-cloud-bigquery-storage package is outdated
"""
try:
from google.cloud import bigquery_storage # type: ignore
except ImportError:
if raise_if_error:
msg = (
"Package google-cloud-bigquery-storage not found. "
"Install google-cloud-bigquery-storage version >= "
f"{_MIN_BQ_STORAGE_VERSION}."
)
raise exceptions.BigQueryStorageNotFoundError(msg)
return None

if self.installed_version < _MIN_BQ_STORAGE_VERSION:
if raise_if_error:
msg = (
"Dependency google-cloud-bigquery-storage is outdated, "
f"please upgrade it to version >= {_MIN_BQ_STORAGE_VERSION} "
f"(version found: {self.installed_version})."
)
raise exceptions.LegacyBigQueryStorageError(msg)
return None

return bigquery_storage


BQ_STORAGE_VERSIONS = BQStorageVersions()
42 changes: 22 additions & 20 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,25 @@
DEFAULT_BQSTORAGE_CLIENT_INFO = None # type: ignore


from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _job_helpers
from google.cloud.bigquery._job_helpers import make_job_id as _make_job_id
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery import enums
from google.cloud.bigquery import exceptions as bq_exceptions
from google.cloud.bigquery import job
from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._helpers import _get_bigquery_host
from google.cloud.bigquery._helpers import BQ_STORAGE_VERSIONS
from google.cloud.bigquery._helpers import _DEFAULT_HOST
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery._job_helpers import make_job_id as _make_job_id
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery import enums
from google.cloud.bigquery.enums import AutoRowIDs
from google.cloud.bigquery import exceptions as bq_exceptions
from google.cloud.bigquery.opentelemetry_tracing import create_span
from google.cloud.bigquery import job
from google.cloud.bigquery.format_options import ParquetOptions
from google.cloud.bigquery.job import (
CopyJob,
CopyJobConfig,
Expand All @@ -98,6 +97,7 @@
from google.cloud.bigquery.model import Model
from google.cloud.bigquery.model import ModelReference
from google.cloud.bigquery.model import _model_arg_to_model_ref
from google.cloud.bigquery.opentelemetry_tracing import create_span
from google.cloud.bigquery.query import _QueryResults
from google.cloud.bigquery.retry import (
DEFAULT_JOB_RETRY,
Expand All @@ -113,7 +113,6 @@
from google.cloud.bigquery.table import TableListItem
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.format_options import ParquetOptions

pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import()

Expand Down Expand Up @@ -545,29 +544,32 @@ def _ensure_bqstorage_client(
An existing BigQuery Storage client instance. If ``None``, a new
instance is created and returned.
client_options:
Custom options used with a new BigQuery Storage client instance if one
is created.
Custom options used with a new BigQuery Storage client instance
if one is created.
client_info:
The client info used with a new BigQuery Storage client instance if one
is created.
The client info used with a new BigQuery Storage client
instance if one is created.
Returns:
A BigQuery Storage API client.
"""

try:
from google.cloud import bigquery_storage # type: ignore
except ImportError:
bigquery_storage = _versions_helpers.BQ_STORAGE_VERSIONS.try_import(
raise_if_error=True
)
except bq_exceptions.BigQueryStorageNotFoundError:
warnings.warn(
"Cannot create BigQuery Storage client, the dependency "
"google-cloud-bigquery-storage is not installed."
)
return None

try:
BQ_STORAGE_VERSIONS.verify_version()
except bq_exceptions.LegacyBigQueryStorageError as exc:
warnings.warn(str(exc))
warnings.warn(
"Dependency google-cloud-bigquery-storage is outdated: " + str(exc)
)
return None

if bqstorage_client is None:
bqstorage_client = bigquery_storage.BigQueryReadClient(
credentials=self._credentials,
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/bigquery/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ class LegacyBigQueryStorageError(BigQueryError):

class LegacyPyarrowError(BigQueryError):
"""Raised when too old a version of pyarrow package is detected at runtime."""


class BigQueryStorageNotFoundError(BigQueryError):
"""Raised when BigQuery Storage extra is not installed when trying to
import it.
"""
27 changes: 25 additions & 2 deletions google/cloud/bigquery/magics/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@
import google.auth # type: ignore
from google.cloud import bigquery
import google.cloud.bigquery.dataset
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery import exceptions
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.magics import line_arg_parser as lap

Expand Down Expand Up @@ -744,19 +746,40 @@ def _split_args_line(line):


def _make_bqstorage_client(client, use_bqstorage_api, client_options):
"""Creates a BigQuery Storage client.
Args:
client (:class:`~google.cloud.bigquery.client.Client`): BigQuery client.
use_bqstorage_api (bool): whether BigQuery Storage API is used or not.
client_options (:class:`google.api_core.client_options.ClientOptions`):
Custom options used with a new BigQuery Storage client instance
if one is created.
Raises:
ImportError: if google-cloud-bigquery-storage is not installed, or
grpcio package is not installed.
Returns:
None: if ``use_bqstorage_api == False``, or google-cloud-bigquery-storage
is outdated.
BigQuery Storage Client:
"""
if not use_bqstorage_api:
return None

try:
from google.cloud import bigquery_storage # type: ignore # noqa: F401
except ImportError as err:
_versions_helpers.BQ_STORAGE_VERSIONS.try_import(raise_if_error=True)
except exceptions.BigQueryStorageNotFoundError as err:
customized_error = ImportError(
"The default BigQuery Storage API client cannot be used, install "
"the missing google-cloud-bigquery-storage and pyarrow packages "
"to use it. Alternatively, use the classic REST API by specifying "
"the --use_rest_api magic option."
)
raise customized_error from err
except exceptions.LegacyBigQueryStorageError:
pass

try:
from google.api_core.gapic_v1 import client_info as gapic_client_info
Expand Down
Loading

0 comments on commit e8da978

Please sign in to comment.