Skip to content

Commit

Permalink
[feat] Watch() retries 410 errors (#327)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomplus authored Aug 9, 2024
1 parent a37f664 commit 7222c98
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 6 deletions.
21 changes: 16 additions & 5 deletions kubernetes_asyncio/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ def _reconnect(self):

async def next(self):

watch_forever = 'timeout_seconds' not in self.func.keywords
retry_410 = watch_forever

while 1:

# Set the response object to the user supplied function (eg
Expand All @@ -163,9 +166,7 @@ async def next(self):
except asyncio.TimeoutError:
# This exception can be raised by aiohttp (client timeout)
# but we don't retry if server side timeout is applied.
# The base scenario would be to restart watching with timeout_seconds
# reduced by time spent in previous iterations.
if 'timeout_seconds' not in self.func.keywords:
if watch_forever:
self._reconnect()
continue
else:
Expand All @@ -176,15 +177,25 @@ async def next(self):
# Stop the iterator if K8s sends an empty response. This happens when
# eg the supplied timeout has expired.
if line == '':
if 'timeout_seconds' not in self.func.keywords:
if watch_forever:
self._reconnect()
continue

# Special case for faster log streaming
if self.return_type == 'str':
return line

return self.unmarshal_event(line, self.return_type)
# retry 410 error only once
try:
event = self.unmarshal_event(line, self.return_type)
except client.exceptions.ApiException as ex:
if ex.status == 410 and retry_410:
retry_410 = False # retry only once
self._reconnect()
continue
raise
retry_410 = watch_forever
return event

def stream(self, func, *args, **kwargs):
"""Watch an API resource and stream the result back via a generator.
Expand Down
104 changes: 103 additions & 1 deletion kubernetes_asyncio/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def test_watch_with_exception(self):
async for e in watch.stream(fake_api.get_namespaces, timeout_seconds=10): # noqa
pass

async def test_watch_timeout(self):
async def test_watch_retry_timeout(self):
fake_resp = AsyncMock()
fake_resp.content.readline = AsyncMock()
fake_resp.release = Mock()
Expand Down Expand Up @@ -256,6 +256,108 @@ async def test_watch_timeout(self):
call(_preload_content=False, watch=True, resource_version='1555')])
fake_resp.release.assert_called_once_with()

async def test_watch_retry_410(self):
fake_resp = AsyncMock()
fake_resp.content.readline = AsyncMock()
fake_resp.release = Mock()

mock_event1 = {
"type": "ADDED",
"object": {
"metadata":
{
"name": "test1555",
"resourceVersion": "1555"
},
"spec": {},
"status": {}
}
}

mock_event2 = {
"type": "ADDED",
"object": {
"metadata":
{
"name": "test1555",
"resourceVersion": "1555"
},
"spec": {},
"status": {}
}
}

mock_410 = {
'type': 'ERROR',
'object': {
'kind': 'Status',
'apiVersion': 'v1',
'metadata': {},
'status': 'Failure',
'message': 'too old resource version: 1 (8146471)',
'reason': 'Gone',
'code': 410
}
}

# retry 410
fake_resp.content.readline.side_effect = [json.dumps(mock_event1).encode('utf8'),
json.dumps(mock_410).encode('utf8'),
json.dumps(mock_event2).encode('utf8'),
json.dumps(mock_410).encode('utf8'),
b""]

fake_api = Mock()
fake_api.get_namespaces = AsyncMock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList'

watch = kubernetes_asyncio.watch.Watch()
async with watch.stream(fake_api.get_namespaces) as stream:
async for e in stream: # noqa
pass

fake_api.get_namespaces.assert_has_calls(
[call(_preload_content=False, watch=True),
call(_preload_content=False, watch=True, resource_version='1555')])
fake_resp.release.assert_called_once_with()

# retry 410 only once
fake_resp.content.readline.side_effect = [json.dumps(mock_event1).encode('utf8'),
json.dumps(mock_410).encode('utf8'),
json.dumps(mock_event2).encode('utf8'),
json.dumps(mock_410).encode('utf8'),
json.dumps(mock_410).encode('utf8'),
b""]

fake_api = Mock()
fake_api.get_namespaces = AsyncMock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList'

with self.assertRaisesRegex(
kubernetes_asyncio.client.exceptions.ApiException,
r'\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)'):
watch = kubernetes_asyncio.watch.Watch()
async with watch.stream(fake_api.get_namespaces) as stream:
async for e in stream: # noqa
pass

# no retry 410 if timeout is passed
fake_resp.content.readline.side_effect = [json.dumps(mock_event1).encode('utf8'),
json.dumps(mock_410).encode('utf8'),
b""]

fake_api = Mock()
fake_api.get_namespaces = AsyncMock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':rtype: V1NamespaceList'

with self.assertRaisesRegex(
kubernetes_asyncio.client.exceptions.ApiException,
r'\(410\)\nReason: Gone: too old resource version: 1 \(8146471\)'):
watch = kubernetes_asyncio.watch.Watch()
async with watch.stream(fake_api.get_namespaces, timeout_seconds=10) as stream:
async for e in stream: # noqa
pass

async def test_watch_timeout_with_resource_version(self):
fake_resp = AsyncMock()
fake_resp.content.readline = AsyncMock()
Expand Down

0 comments on commit 7222c98

Please sign in to comment.