Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

Commit

Permalink
Merge pull request #109 from juliantaylor/fix-watch-reset
Browse files Browse the repository at this point in the history
fix watching with a specified resource version
  • Loading branch information
k8s-ci-robot committed Jan 23, 2019
2 parents 8497dfb + 3c30a30 commit 2d69e89
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 1 deletion.
2 changes: 2 additions & 0 deletions watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ def stream(self, func, *args, **kwargs):
return_type = self.get_return_type(func)
kwargs['watch'] = True
kwargs['_preload_content'] = False
if 'resource_version' in kwargs:
self.resource_version = kwargs['resource_version']

timeouts = ('timeout_seconds' in kwargs)
while True:
Expand Down
73 changes: 72 additions & 1 deletion watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

import unittest

from mock import Mock
from mock import Mock, call

from .watch import Watch


class WatchTests(unittest.TestCase):
def setUp(self):
# counter for a test that needs test global state
self.callcount = 0

def test_watch_with_decode(self):
fake_resp = Mock()
Expand Down Expand Up @@ -64,6 +67,74 @@ def test_watch_with_decode(self):
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_resource_version_set(self):
# https://github.com/kubernetes-client/python/issues/700
# ensure watching from a resource version does reset to resource
# version 0 after k8s resets the watch connection
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
values = [
'{"type": "ADDED", "object": {"metadata": {"name": "test1",'
'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',
'{"type": "ADDED", "object": {"metadata": {"name": "test2",'
'"resourceVersion": "2"}, "spec": {}, "sta',
'tus": {}}}\n'
'{"type": "ADDED", "object": {"metadata": {"name": "test3",'
'"resourceVersion": "3"}, "spec": {}, "status": {}}}\n'
]
# return nothing on the first call and values on the second
# this emulates a watch from a rv that returns nothing in the first k8s
# watch reset and values later

def get_values(*args, **kwargs):
self.callcount += 1
if self.callcount == 1:
return []
else:
return values

fake_resp.read_chunked = Mock(
side_effect=get_values)

fake_api = Mock()
fake_api.get_namespaces = Mock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

w = Watch()
# ensure we keep our requested resource version or the version latest
# returned version when the existing versions are older than the
# requested version
# needed for the list existing objects, then watch from there use case
calls = []

iterations = 2
# first two calls must use the passed rv, the first call is a
# "reset" and does not actually return anything
# the second call must use the same rv but will return values
# (with a wrong rv but a real cluster would behave correctly)
# calls following that will use the rv from those returned values
calls.append(call(_preload_content=False, watch=True,
resource_version="5"))
calls.append(call(_preload_content=False, watch=True,
resource_version="5"))
for i in range(iterations):
# ideally we want 5 here but as rv must be treated as an
# opaque value we cannot interpret it and order it so rely
# on k8s returning the events completely and in order
calls.append(call(_preload_content=False, watch=True,
resource_version="3"))

for c, e in enumerate(w.stream(fake_api.get_namespaces,
resource_version="5")):
if c == len(values) * iterations:
w.stop()

# check calls are in the list, gives good error output
fake_api.get_namespaces.assert_has_calls(calls)
# more strict test with worse error message
self.assertEqual(fake_api.get_namespaces.mock_calls, calls)

def test_watch_stream_twice(self):
w = Watch(float)
for step in ['first', 'second']:
Expand Down

0 comments on commit 2d69e89

Please sign in to comment.