From 8e6f0435a38e24aac700d9ebac700bdf6138ba8c Mon Sep 17 00:00:00 2001 From: Mitar Date: Mon, 15 Oct 2018 23:57:46 -0700 Subject: [PATCH 1/3] Making watch work with read_namespaced_pod_log. Fixes https://github.com/kubernetes-client/python/issues/199. --- watch/watch.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/watch/watch.py b/watch/watch.py index bdf24f1a..79b2358d 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -20,6 +20,7 @@ from kubernetes import client PYDOC_RETURN_LABEL = ":return:" +PYDOC_FOLLOW_PARAM = ":param bool follow:" # Removing this suffix from return type name should give us event's object # type. e.g., if list_namespaces() returns "NamespaceList" type, @@ -65,7 +66,7 @@ def __init__(self, return_type=None): self._raw_return_type = return_type self._stop = False self._api_client = client.ApiClient() - self.resource_version = 0 + self.resource_version = None def stop(self): self._stop = True @@ -78,8 +79,17 @@ def get_return_type(self, func): return return_type[:-len(TYPE_LIST_SUFFIX)] return return_type + def get_watch_argument_name(self, func): + if PYDOC_FOLLOW_PARAM in pydoc.getdoc(func): + return 'follow' + else: + return 'watch' + def unmarshal_event(self, data, return_type): - js = json.loads(data) + try: + js = json.loads(data) + except ValueError: + return data js['raw_object'] = js['object'] if return_type: obj = SimpleNamespace(data=json.dumps(js['raw_object'])) @@ -122,7 +132,7 @@ def stream(self, func, *args, **kwargs): self._stop = False return_type = self.get_return_type(func) - kwargs['watch'] = True + kwargs[self.get_watch_argument_name(func)] = True kwargs['_preload_content'] = False if 'resource_version' in kwargs: self.resource_version = kwargs['resource_version'] @@ -136,9 +146,12 @@ def stream(self, func, *args, **kwargs): if self._stop: break finally: - kwargs['resource_version'] = self.resource_version resp.close() resp.release_conn() + if self.resource_version is not None: + kwargs['resource_version'] = self.resource_version + else: + break if timeouts or self._stop: break From ad06e5c923b2d4e5db86f7e91deddb95a6dc9a43 Mon Sep 17 00:00:00 2001 From: Mitar Date: Mon, 18 Feb 2019 16:43:50 -0800 Subject: [PATCH 2/3] Added tests. --- watch/watch_test.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/watch/watch_test.py b/watch/watch_test.py index 08eb36c2..ebc400af 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -67,6 +67,35 @@ def test_watch_with_decode(self): fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + def test_watch_for_follow(self): + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + fake_resp.read_chunked = Mock( + return_value=[ + 'log_line_1\n', + 'log_line_2\n']) + + fake_api = Mock() + fake_api.read_namespaced_pod_log = Mock(return_value=fake_resp) + fake_api.read_namespaced_pod_log.__doc__ = ':param bool follow:\n:return: str' + + w = Watch() + count = 1 + for e in w.stream(fake_api.read_namespaced_pod_log): + self.assertEqual("log_line_1", e) + count += 1 + # make sure we can stop the watch and the last event with won't be + # returned + if count == 2: + w.stop() + + fake_api.read_namespaced_pod_log.assert_called_once_with( + _preload_content=False, follow=True) + fake_resp.read_chunked.assert_called_once_with(decode_content=False) + fake_resp.close.assert_called_once() + fake_resp.release_conn.assert_called_once() + def test_watch_resource_version_set(self): # https://github.com/kubernetes-client/python/issues/700 # ensure watching from a resource version does reset to resource From 972a76a83d0133b45db03495b0f9fd05ed2b94a3 Mon Sep 17 00:00:00 2001 From: Mitar Date: Wed, 20 Feb 2019 23:56:38 -0800 Subject: [PATCH 3/3] Don't use break inside finally. It swallows exceptions. --- watch/watch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/watch/watch.py b/watch/watch.py index 79b2358d..5966eace 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -151,7 +151,7 @@ def stream(self, func, *args, **kwargs): if self.resource_version is not None: kwargs['resource_version'] = self.resource_version else: - break + self._stop = True if timeouts or self._stop: break