-
Notifications
You must be signed in to change notification settings - Fork 46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: retry RESOURCE_EXHAUSTED
errors in read_rows
#366
Conversation
167e3fc
to
96caee6
Compare
96caee6
to
f6cbd60
Compare
@@ -81,14 +83,12 @@ class ReadRowsStream(object): | |||
method to parse all messages into a :class:`pandas.DataFrame`. | |||
""" | |||
|
|||
def __init__(self, wrapped, client, name, offset, read_rows_kwargs): | |||
def __init__( | |||
self, client, name, offset, read_rows_kwargs, retry_delay_callback=None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically a breaking change, though I don't expect anyone to be constructing a ReadRowsStream directly, except perhaps in unit tests.
We should have had this before, but please add a comment to this class's docstring like the following:
This object should not be created directly, but is returned by other
methods in this library.
(Pulled from https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/futures.py)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, it's not clear to me why we need to make this breaking change to begin with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the comment.
I removed the wrapped argument because ReadRowsStream assumes it will always get a valid stream with it. However, the RESOURCE_EXHAUSTED error can be the very first thing returned from the bq storage api server. If we keep the old api, we need to handle RESOURCE_EXHAUSTED errors in two different places. With this change we only need to handle it in one place.
@@ -106,6 +106,12 @@ def __init__(self, wrapped, client, name, offset, read_rows_kwargs): | |||
read_rows_kwargs (dict): | |||
Keyword arguments to use when reconnecting to a ReadRows | |||
stream. | |||
retry_delay_callback (Optional[Callable[[float], None]]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first glance, I'm a bit confused why this parameter is necessary. Who needs a notification that we're going to sleep?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it just for unit testing? In that case, please remove this parameter and use the freezegun
library, instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users of the library may choose to provide this callback to be aware of delayed retries. When the users are aware of the delayed retry attempts, they can adjust their autoscaling algorithms. Apache Beam already does with the java sdk. My plan is to do the same with their python sdk.
def _reconnect(self): | ||
"""Reconnect to the ReadRows stream using the most recent offset.""" | ||
self._wrapped = self._client.read_rows( | ||
return self._client.read_rows( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this create a new ReadRowsStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this function doesn't do anything different than before. It just returns the stream instead of assigning to a member variable.
d2dddeb
to
869914a
Compare
41169fb
to
b207dc1
Compare
@@ -123,19 +130,12 @@ def read_rows( | |||
ValueError: If the parameters are invalid. | |||
""" | |||
gapic_client = super(BigQueryReadClient, self) | |||
stream = gapic_client.read_rows( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: A side-effect of this change is that some non-retryable errors won't happen until the user starts iterating through rows.
I would consider this a breaking change even more-so than the ReadRowsStream constructor change, as users who were expecting an exception now will get it later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the code so read_rows is called in ReadRowsStream construction and any exception besides the retryable resource exhausted is propagated.
tests/unit/test_reader_v1.py
Outdated
# Don't reconnect on DeadlineException. This allows user-specified timeouts | ||
# to be respected. | ||
mock_gapic_client.read_rows.assert_not_called() | ||
mock_gapic_client.read_rows.assert_called_once() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is out-of-date now. I believe this is the breaking change side-effect I mentioned earlier.
I'd prefer we don't break this behavior and find a way to implement the reconnect logic in BigQueryReadClient.read_rows
and ReadRowsStream
. Perhaps a helper function could be created to keep it DRY?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed comment. As mentioned above, behavior is the same as before.
a69b1ee
to
3b97341
Compare
# Don't reconnect on DeadlineException. This allows user-specified timeouts | ||
# to be respected. | ||
mock_gapic_client.read_rows.assert_not_called() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a rather important comment. I'd like to make sure we're still testing this behavior somehow (no reconnect on DeadlineException)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.python.org/3/library/unittest.mock.html#unittest.mock.Mock.reset_mock before line 211 may be helpful here, though possibly unnecessary if we move _reconnect
out of the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read_rows will always be called once because that's what throws the DeadlineException. I updated the comment so I hope the intention is clear now.
self._client = client | ||
self._name = name | ||
self._offset = offset | ||
self._read_rows_kwargs = read_rows_kwargs | ||
self._retry_delay_callback = retry_delay_callback | ||
self._reconnect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing heavy work during construction/initialization time is a bit of an OO anti-pattern. I'd prefer we find another way to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this from construction and added it as an additional statement to wherever ReadRowsStream is constructed previously.
# ResourceExhausted errors are only retried if a valid | ||
# RetryInfo is provided with the error. | ||
# ResourceExhausted doesn't seem to have details/_details | ||
# fields by default when it is generated by Python 3.6 unit | ||
# tests, so we have to work around that. | ||
# TODO: to remove this logic when we require | ||
# google-api-core >= 2.2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# ResourceExhausted errors are only retried if a valid | |
# RetryInfo is provided with the error. | |
# ResourceExhausted doesn't seem to have details/_details | |
# fields by default when it is generated by Python 3.6 unit | |
# tests, so we have to work around that. | |
# TODO: to remove this logic when we require | |
# google-api-core >= 2.2.0 | |
# ResourceExhausted errors are only retried if a valid | |
# RetryInfo is provided with the error. | |
# | |
# TODO: Remove hasattr logic when we require google-api-core >= 2.2.0. | |
# ResourceExhausted added details/_details in google-api-core 2.2.0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the comment as suggested.
BigQuery Storage Read API will start returning retryable RESOURCE_EXHAUSTED errors in 2022 when certain concurrency limits are hit, so this PR adds some code to handle them. Tested with unit tests and system tests. System tests ran successfully on a test project that intentionally returns retryable RESOURCE_EXHAUSTED errors.
3b97341
to
214d80d
Compare
RESOURCE_EXHAUSTED
errors in read_rows
BigQuery Storage Read API will start returning retryable RESOURCE_EXHAUSTED errors in 2022 when certain concurrency limits are hit, so this PR adds some code to handle them.
Tested with unit tests and system tests. System tests ran successfully on a test project that intentionally returns retryable RESOURCE_EXHAUSTED errors.