Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

Making watch work with read_namespaced_pod_log #93

Merged
merged 3 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from kubernetes import client

PYDOC_RETURN_LABEL = ":return:"
PYDOC_FOLLOW_PARAM = ":param bool follow:"

# Removing this suffix from return type name should give us event's object
# type. e.g., if list_namespaces() returns "NamespaceList" type,
Expand Down Expand Up @@ -65,7 +66,7 @@ def __init__(self, return_type=None):
self._raw_return_type = return_type
self._stop = False
self._api_client = client.ApiClient()
self.resource_version = 0
self.resource_version = None

def stop(self):
self._stop = True
Expand All @@ -78,8 +79,17 @@ def get_return_type(self, func):
return return_type[:-len(TYPE_LIST_SUFFIX)]
return return_type

def get_watch_argument_name(self, func):
if PYDOC_FOLLOW_PARAM in pydoc.getdoc(func):
return 'follow'
else:
return 'watch'

def unmarshal_event(self, data, return_type):
js = json.loads(data)
try:
js = json.loads(data)
except ValueError:
return data
js['raw_object'] = js['object']
if return_type:
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
Expand Down Expand Up @@ -122,7 +132,7 @@ def stream(self, func, *args, **kwargs):

self._stop = False
return_type = self.get_return_type(func)
kwargs['watch'] = True
kwargs[self.get_watch_argument_name(func)] = True
kwargs['_preload_content'] = False
if 'resource_version' in kwargs:
self.resource_version = kwargs['resource_version']
Expand All @@ -136,9 +146,12 @@ def stream(self, func, *args, **kwargs):
if self._stop:
break
finally:
kwargs['resource_version'] = self.resource_version
resp.close()
resp.release_conn()
if self.resource_version is not None:
kwargs['resource_version'] = self.resource_version
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need the else: break here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The log streaming does not have resource_version. So once the response finishes, you have to stop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact that was not good because using break inside finally swallows up the exception. I change it to setting a flag.

self._stop = True

if timeouts or self._stop:
break
29 changes: 29 additions & 0 deletions watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,35 @@ def test_watch_with_decode(self):
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_for_follow(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
return_value=[
'log_line_1\n',
'log_line_2\n'])

fake_api = Mock()
fake_api.read_namespaced_pod_log = Mock(return_value=fake_resp)
fake_api.read_namespaced_pod_log.__doc__ = ':param bool follow:\n:return: str'

w = Watch()
count = 1
for e in w.stream(fake_api.read_namespaced_pod_log):
self.assertEqual("log_line_1", e)
count += 1
# make sure we can stop the watch and the last event with won't be
# returned
if count == 2:
w.stop()

fake_api.read_namespaced_pod_log.assert_called_once_with(
_preload_content=False, follow=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_resource_version_set(self):
# https://github.com/kubernetes-client/python/issues/700
# ensure watching from a resource version does reset to resource
Expand Down