Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

Commit

Permalink
Retry watch if request expires.
Browse files Browse the repository at this point in the history
  • Loading branch information
mitar committed Jul 15, 2020
1 parent 3ea8003 commit 06e48c5
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 2 deletions.
30 changes: 28 additions & 2 deletions watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import http
import json
import pydoc

Expand Down Expand Up @@ -86,7 +87,7 @@ def get_watch_argument_name(self, func):
def unmarshal_event(self, data, return_type):
js = json.loads(data)
js['raw_object'] = js['object']
if return_type:
if return_type and js['type'] != 'ERROR':
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
Expand All @@ -102,6 +103,14 @@ def unmarshal_event(self, data, return_type):
def stream(self, func, *args, **kwargs):
"""Watch an API resource and stream the result back via a generator.
Note that watching an API resource can expire. The method tries to
resume automatically once from the last result, but if that last result
is too old as well, an `ApiException` exception will be thrown with
``code`` 410. In that case you have to recover yourself, probably
by listing the API resource to obtain the latest state and then
watching from that state on by setting ``resource_version`` to
one returned from listing.
:param func: The API function pointer. Any parameter to the function
can be passed after this parameter.
Expand Down Expand Up @@ -134,14 +143,31 @@ def stream(self, func, *args, **kwargs):
self.resource_version = kwargs['resource_version']

timeouts = ('timeout_seconds' in kwargs)
retry_after_410 = False
while True:
resp = func(*args, **kwargs)
try:
for line in iter_resp_lines(resp):
# unmarshal when we are receiving events from watch,
# return raw string when we are streaming log
if watch_arg == "watch":
yield self.unmarshal_event(line, return_type)
event = self.unmarshal_event(line, return_type)
if isinstance(event, dict) \
and event['type'] == 'ERROR':
obj = event['raw_object']
# Current request expired, let's retry,
# but only if we have not already retried.
if not retry_after_410 and \
obj['code'] == http.HTTPStatus.GONE:
retry_after_410 = True
break
else:
reason = "%s: %s" % (obj['reason'], obj['message'])
raise client.rest.ApiException(status=obj['code'],
reason=reason)
else:
retry_after_410 = False
yield event
else:
yield line
if self._stop:
Expand Down
27 changes: 27 additions & 0 deletions watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from mock import Mock, call

from kubernetes import client

from .watch import Watch


Expand Down Expand Up @@ -273,6 +275,31 @@ def test_watch_with_exception(self):
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_with_error_event(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
return_value=[
'{"type": "ERROR", "object": {"code": 410, '
'"reason": "Gone", "message": "error message"}}\n'])

fake_api = Mock()
fake_api.get_thing = Mock(return_value=fake_resp)

w = Watch()
try:
for _ in w.stream(fake_api.get_thing):
self.fail(self, "Should fail with ApiException.")
except client.rest.ApiException:
pass

fake_api.get_thing.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()


if __name__ == '__main__':
unittest.main()

0 comments on commit 06e48c5

Please sign in to comment.