diff --git a/watch/watch.py b/watch/watch.py index 21899dd8..a9c315cd 100644 --- a/watch/watch.py +++ b/watch/watch.py @@ -122,6 +122,8 @@ def stream(self, func, *args, **kwargs): return_type = self.get_return_type(func) kwargs['watch'] = True kwargs['_preload_content'] = False + if 'resource_version' in kwargs: + self.resource_version = kwargs['resource_version'] timeouts = ('timeout_seconds' in kwargs) while True: diff --git a/watch/watch_test.py b/watch/watch_test.py index d1ec80a1..969bb5a6 100644 --- a/watch/watch_test.py +++ b/watch/watch_test.py @@ -14,12 +14,15 @@ import unittest -from mock import Mock +from mock import Mock, call from .watch import Watch class WatchTests(unittest.TestCase): + def setUp(self): + # counter for a test that needs test global state + self.callcount = 0 def test_watch_with_decode(self): fake_resp = Mock() @@ -62,6 +65,68 @@ def test_watch_with_decode(self): fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + def test_watch_resource_version_set(self): + # https://github.com/kubernetes-client/python/issues/700 + # ensure watching from a resource version does reset to resource + # version 0 after k8s resets the watch connection + fake_resp = Mock() + fake_resp.close = Mock() + fake_resp.release_conn = Mock() + values = [ + '{"type": "ADDED", "object": {"metadata": {"name": "test1",' + '"resourceVersion": "1"}, "spec": {}, "status": {}}}\n', + '{"type": "ADDED", "object": {"metadata": {"name": "test2",' + '"resourceVersion": "2"}, "spec": {}, "sta', + 'tus": {}}}\n' + '{"type": "ADDED", "object": {"metadata": {"name": "test3",' + '"resourceVersion": "3"}, "spec": {}, "status": {}}}\n' + ] + # return nothing on the first call and values on the second + # this emulates a watch from a rv that returns nothing in the first k8s + # watch reset and values later + def get_values(*args, **kwargs): + self.callcount += 1 + if self.callcount == 1: + return [] + else: + return values + + fake_resp.read_chunked = Mock( + side_effect=get_values) + + fake_api = Mock() + fake_api.get_namespaces = Mock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + w = Watch() + count = 1 + # ensure we keep our requested resource version or the version latest + # returned version when the existing versions are older than the + # requested version + # needed for the list existing objects, then watch from there use case + calls = [] + first = True + for e in w.stream(fake_api.get_namespaces, resource_version="5"): + count += 1 + # first call must use the passed rv + # following calls the last rv of the returned values + if count % 3 == 0: + if first: + rv = "5" + first = False + else: + # ideally we want 5 here but as rv must be treated as an + # opaque value we cannot interpret it and order it so rely + # on k8s returning the events completely and in order + rv = "3" + calls.append(call(_preload_content=False, watch=True, + resource_version=rv)) + # returned + if count == len(values) * 3: + w.stop() + + fake_api.get_namespaces.assert_has_calls(calls) + def test_watch_stream_twice(self): w = Watch(float) for step in ['first', 'second']: