Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle AttributeError in bigquery_storage writer #414

Merged
merged 8 commits into from
Jun 24, 2022
40 changes: 27 additions & 13 deletions google/cloud/bigquery_storage_v1/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,35 @@ def _open(
# ValueError: Can not send() on an RPC that has never been open()ed.
#
# when they try to send a request.
while not self._rpc.is_active and self._consumer.is_active:
# Avoid 100% CPU while waiting for RPC to be ready.
time.sleep(_WRITE_OPEN_INTERVAL)

# TODO: Check retry.deadline instead of (per-request) timeout.
# Blocked by
# https://github.com/googleapis/python-api-core/issues/262
if timeout is None:
continue
current_time = time.monotonic()
if current_time - start_time > timeout:
break
try:
while not self._rpc.is_active and self._consumer.is_active:
# Avoid 100% CPU while waiting for RPC to be ready.
time.sleep(_WRITE_OPEN_INTERVAL)

# TODO: Check retry.deadline instead of (per-request) timeout.
# Blocked by
# https://github.com/googleapis/python-api-core/issues/262
if timeout is None:
continue
current_time = time.monotonic()
if current_time - start_time > timeout:
break
except AttributeError:
# Handle the AttributeError which can occur if the stream is
# unable to be opened. In that case, self._rpc or self._consumer
# may be None.
pass

try:
is_consumer_active = self._consumer.is_active
except AttributeError:
# Handle the AttributeError which can occur if the stream is
# unable to be opened. In that case, self._consumer
# may be None.
is_consumer_active = False

# Something went wrong when opening the RPC.
if not self._consumer.is_active:
if not is_consumer_active:
# TODO: Share the exception from _rpc.open(). Blocked by
# https://github.com/googleapis/python-api-core/issues/268
request_exception = exceptions.Unknown(
Expand Down
40 changes: 27 additions & 13 deletions google/cloud/bigquery_storage_v1beta2/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,35 @@ def _open(
# ValueError: Can not send() on an RPC that has never been open()ed.
#
# when they try to send a request.
while not self._rpc.is_active and self._consumer.is_active:
# Avoid 100% CPU while waiting for RPC to be ready.
time.sleep(_WRITE_OPEN_INTERVAL)

# TODO: Check retry.deadline instead of (per-request) timeout.
# Blocked by
# https://github.com/googleapis/python-api-core/issues/262
if timeout is None:
continue
current_time = time.monotonic()
if current_time - start_time > timeout:
break
try:
while not self._rpc.is_active and self._consumer.is_active:
# Avoid 100% CPU while waiting for RPC to be ready.
time.sleep(_WRITE_OPEN_INTERVAL)

# TODO: Check retry.deadline instead of (per-request) timeout.
# Blocked by
# https://github.com/googleapis/python-api-core/issues/262
if timeout is None:
continue
current_time = time.monotonic()
if current_time - start_time > timeout:
break
except AttributeError:
# Handle the AttributeError which can occur if the stream is
# unable to be opened. In that case, self._rpc or self._consumer
# may be None.
pass

try:
is_consumer_active = self._consumer.is_active
except AttributeError:
# Handle the AttributeError which can occur if the stream is
# unable to be opened. In that case, self._consumer
# may be None.
is_consumer_active = False

# Something went wrong when opening the RPC.
if not self._consumer.is_active:
if not is_consumer_active:
# TODO: Share the exception from _rpc.open(). Blocked by
# https://github.com/googleapis/python-api-core/issues/268
request_exception = exceptions.Unknown(
Expand Down