Skip to content
This repository has been archived by the owner on Mar 13, 2022. It is now read-only.

fix watching with a specified resource version #109

Merged
merged 1 commit into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ def stream(self, func, *args, **kwargs):
return_type = self.get_return_type(func)
kwargs['watch'] = True
kwargs['_preload_content'] = False
if 'resource_version' in kwargs:
self.resource_version = kwargs['resource_version']

timeouts = ('timeout_seconds' in kwargs)
while True:
Expand Down
73 changes: 72 additions & 1 deletion watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@

import unittest

from mock import Mock
from mock import Mock, call

from .watch import Watch


class WatchTests(unittest.TestCase):
def setUp(self):
# counter for a test that needs test global state
self.callcount = 0

def test_watch_with_decode(self):
fake_resp = Mock()
Expand Down Expand Up @@ -62,6 +65,74 @@ def test_watch_with_decode(self):
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()

def test_watch_resource_version_set(self):
# https://github.com/kubernetes-client/python/issues/700
# ensure watching from a resource version does reset to resource
# version 0 after k8s resets the watch connection
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
values = [
'{"type": "ADDED", "object": {"metadata": {"name": "test1",'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you use a real k8s object, like a pod? that could make the test easier to understand

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one could though it does not matter for the test
none of the existing tests use real objects, should all be updated?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine to leave it as is

'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',
'{"type": "ADDED", "object": {"metadata": {"name": "test2",'
'"resourceVersion": "2"}, "spec": {}, "sta',
'tus": {}}}\n'
'{"type": "ADDED", "object": {"metadata": {"name": "test3",'
'"resourceVersion": "3"}, "spec": {}, "status": {}}}\n'
]
# return nothing on the first call and values on the second
# this emulates a watch from a rv that returns nothing in the first k8s
# watch reset and values later

def get_values(*args, **kwargs):
self.callcount += 1
if self.callcount == 1:
return []
else:
return values

fake_resp.read_chunked = Mock(
side_effect=get_values)

fake_api = Mock()
fake_api.get_namespaces = Mock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

w = Watch()
# ensure we keep our requested resource version or the version latest
# returned version when the existing versions are older than the
# requested version
# needed for the list existing objects, then watch from there use case
calls = []

iterations = 2
# first two calls must use the passed rv, the first call is a
# "reset" and does not actually return anything
# the second call must use the same rv but will return values
# (with a wrong rv but a real cluster would behave correctly)
# calls following that will use the rv from those returned values
calls.append(call(_preload_content=False, watch=True,
resource_version="5"))
calls.append(call(_preload_content=False, watch=True,
resource_version="5"))
for i in range(iterations):
# ideally we want 5 here but as rv must be treated as an
# opaque value we cannot interpret it and order it so rely
# on k8s returning the events completely and in order
calls.append(call(_preload_content=False, watch=True,
resource_version="3"))

for c, e in enumerate(w.stream(fake_api.get_namespaces,
resource_version="5")):
if c == len(values) * iterations:
w.stop()

# check calls are in the list, gives good error output
fake_api.get_namespaces.assert_has_calls(calls)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you simulate the case the 'connection got reset'?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

via the global variable changing the return value of the mock read_chunked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    if 'resource_version' in kwargs:
        self.resource_version = kwargs['resource_version']

===================================================
In the above test, the code in the function stream() from its beginning to "while True" is executed only once, i.e. the two lines of fix code you added taken above is executed only once.
when the above two lines are executed, kwargs is None, hence essentially the two lines you added did not make any effect. I'm wondering how the test code tests the fix? am I missing anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the testcode does run with kwargs['resource_version] set which sets the rv member of the watch object.
The function then calls the get_namespaces mock which would direct k8s to provide a watch from the provided resource version, in our mock it is just ignored and returns nothing simulating the k8s api not returning anything and resetting the connection.
Now instead of resetting the resource version member variable back to zero and issuing a watch from zero in the next iteration it will reuse the input resource version which is what we want.
The mock in the second call returns something but what doesn't really matter we just want to register that the second mocked call was with the input resource version and not with zero which causes the linked issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here the failure when you remove the two lines I added, it matches what happens in the real testcase I posted in the linked issue:

E   AssertionError: Calls not found.
E   Expected: [call(_preload_content=False, resource_version='5', watch=True),
E    call(_preload_content=False, resource_version='3', watch=True),
E    call(_preload_content=False, resource_version='3', watch=True)]
E   Actual: [call(_preload_content=False, resource_version='5', watch=True),
E    call(_preload_content=False, resource_version=0, watch=True), <<<<<<<<< ERROR HERE
E    call(_preload_content=False, resource_version='3', watch=True),
E    call(_preload_content=False, resource_version='3', watch=True)]

# more strict test with worse error message
self.assertEqual(fake_api.get_namespaces.mock_calls, calls)

def test_watch_stream_twice(self):
w = Watch(float)
for step in ['first', 'second']:
Expand Down