Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

Retry expired watches #133

Merged
merged 1 commit into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import http
import json
import pydoc

Expand Down Expand Up @@ -86,7 +87,7 @@ def get_watch_argument_name(self, func):
def unmarshal_event(self, data, return_type):
js = json.loads(data)
js['raw_object'] = js['object']
if return_type:
if return_type and js['type'] != 'ERROR':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is where we fail to deserialize the response. Good catch

obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
Expand All @@ -102,6 +103,14 @@ def unmarshal_event(self, data, return_type):
def stream(self, func, *args, **kwargs):
"""Watch an API resource and stream the result back via a generator.

Note that watching an API resource can expire. The method tries to
resume automatically once from the last result, but if that last result
is too old as well, an `ApiException` exception will be thrown with
``code`` 410. In that case you have to recover yourself, probably
by listing the API resource to obtain the latest state and then
watching from that state on by setting ``resource_version`` to
one returned from listing.

:param func: The API function pointer. Any parameter to the function
can be passed after this parameter.

Expand Down Expand Up @@ -134,14 +143,31 @@ def stream(self, func, *args, **kwargs):
self.resource_version = kwargs['resource_version']

timeouts = ('timeout_seconds' in kwargs)
retry_after_410 = False
while True:
resp = func(*args, **kwargs)
try:
for line in iter_resp_lines(resp):
# unmarshal when we are receiving events from watch,
# return raw string when we are streaming log
if watch_arg == "watch":
yield self.unmarshal_event(line, return_type)
event = self.unmarshal_event(line, return_type)
if isinstance(event, dict) \
and event['type'] == 'ERROR':
obj = event['raw_object']
# Current request expired, let's retry,
# but only if we have not already retried.
if not retry_after_410 and \
obj['code'] == http.HTTPStatus.GONE:
retry_after_410 = True
break
else:
reason = "%s: %s" % (obj['reason'], obj['message'])
raise client.rest.ApiException(status=obj['code'],
reason=reason)
else:
retry_after_410 = False
yield event
else:
yield line
if self._stop:
Expand Down
27 changes: 27 additions & 0 deletions watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from mock import Mock, call

from kubernetes import client

from .watch import Watch


Expand Down Expand Up @@ -273,6 +275,31 @@ def test_watch_with_exception(self):
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_with_error_event(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
return_value=[
'{"type": "ERROR", "object": {"code": 410, '
'"reason": "Gone", "message": "error message"}}\n'])

fake_api = Mock()
fake_api.get_thing = Mock(return_value=fake_resp)

w = Watch()
try:
for _ in w.stream(fake_api.get_thing):
self.fail(self, "Should fail with ApiException.")
except client.rest.ApiException:
pass

fake_api.get_thing.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()


if __name__ == '__main__':
unittest.main()