diff --git a/watch/watch.py b/watch/watch.py index fe7a9247..a86bc2cc 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -86,7 +86,7 @@ def get_watch_argument_name(self, func): def unmarshal_event(self, data, return_type): js = json.loads(data) js['raw_object'] = js['object'] - if return_type: + if return_type and js['type'] != 'ERROR': obj = SimpleNamespace(data=json.dumps(js['raw_object'])) js['object'] = self._api_client.deserialize(obj, return_type) if hasattr(js['object'], 'metadata'): @@ -102,6 +102,14 @@ def unmarshal_event(self, data, return_type): def stream(self, func, *args, **kwargs): """Watch an API resource and stream the result back via a generator. + Note that watching an API resource can expire. The method tries to + resume automatically from the last result, but if that last result + is too old as well, an `ApiException` exception will be thrown with + ``code`` 410. In that case you have to recover yourself, probably + by listing the API resource to obtain the latest state and then + watching from that state on by setting ``resource_version`` to + one returned from listing. + :param func: The API function pointer. Any parameter to the function can be passed after this parameter. @@ -134,6 +142,7 @@ def stream(self, func, *args, **kwargs): self.resource_version = kwargs['resource_version'] timeouts = ('timeout_seconds' in kwargs) + retry_after_410 = False while True: resp = func(*args, **kwargs) try: @@ -141,7 +150,22 @@ def stream(self, func, *args, **kwargs): # unmarshal when we are receiving events from watch, # return raw string when we are streaming log if watch_arg == "watch": - yield self.unmarshal_event(line, return_type) + event = self.unmarshal_event(line, return_type) + if isinstance(event, dict) \ + and event['type'] == 'ERROR': + obj = event['raw_object'] + # Current request expired, let's retry, + # but only if we have not already retried. + if not retry_after_410 and obj['code'] == 410: + retry_after_410 = True + break + else: + reason = "%s: %s" % (obj['reason'], obj['message']) + raise client.rest.ApiException(status=obj['code'], + reason=reason) + else: + retry_after_410 = False + yield event else: yield line if self._stop: diff --git a/watch/watch_test.py b/watch/watch_test.py index 6fec23ec..b8cefd20 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -16,6 +16,8 @@ from mock import Mock, call +from kubernetes import client + from .watch import Watch @@ -273,6 +275,31 @@ def test_watch_with_exception(self): fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + def test_watch_with_error_event(self): + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.read_chunked = Mock( + return_value=[ + '{"type": "ERROR", "object": {"code": 410, ' + '"reason": "Gone", "message": "error message"}}\n']) + + fake_api = Mock() + fake_api.get_thing = Mock(return_value=fake_resp) + + w = Watch() + try: + for _ in w.stream(fake_api.get_thing): + self.fail(self, "Should fail with ApiException.") + except client.rest.ApiException: + pass + + fake_api.get_thing.assert_called_once_with( + _preload_content=False, watch=True) + fake_resp.read_chunked.assert_called_once_with(decode_content=False) + fake_resp.close.assert_called_once() + fake_resp.release_conn.assert_called_once() + if __name__ == '__main__': unittest.main()