Skip to content

Commit

Permalink
Fix an OverflowError in readline_channel and enhance timeout handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Masterchen09 committed May 1, 2024
1 parent 94e4211 commit 8a65ebb
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
28 changes: 22 additions & 6 deletions kubernetes/base/stream/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ def read_channel(self, channel, timeout=0):

def readline_channel(self, channel, timeout=None):
"""Read a line from a channel."""
if timeout is None:
timeout = float("inf")
if timeout is not None and timeout < 0:
timeout = None
start = time.time()
while self.is_open() and time.time() - start < timeout:
while timeout is None or time.time() - start < timeout:
if channel in self._channels:
data = self._channels[channel]
if self.newline in data:
Expand All @@ -104,7 +104,15 @@ def readline_channel(self, channel, timeout=None):
else:
del self._channels[channel]
return ret
self.update(timeout=(timeout - time.time() + start))

if not self.is_open():
return

if timeout is not None:
# the timeout here should never be negative, because otherwise this method could block indefinitly
self.update(timeout=max(timeout - time.time() + start, 0))
else:
self.update(timeout=None)

def write_channel(self, channel, data):
"""Write data to a channel."""
Expand Down Expand Up @@ -190,6 +198,13 @@ def update(self, timeout=0):
r = poll.poll(timeout)
poll.unregister(self.sock.sock)
else:
# select.select() does not work with negative timeouts, when a negative value is
# given select.epoll() and select.poll() are blocking until there is an event for
# the poll object, therefore set the timeout to None in order to have the same
# behaviour when select.select() is used
if timeout is not None and timeout < 0:
timeout = None

r, _, _ = select.select(
(self.sock.sock, ), (), (), timeout)

Expand Down Expand Up @@ -220,10 +235,11 @@ def update(self, timeout=0):
def run_forever(self, timeout=None):
"""Wait till connection is closed or timeout reached. Buffer any input
received during this time."""
if timeout:
if timeout is not None:
start = time.time()
while self.is_open() and time.time() - start < timeout:
self.update(timeout=(timeout - time.time() + start))
# the timeout here should never be negative, because otherwise this method could block indefinitly
self.update(timeout=max(timeout - time.time() + start, 0))
else:
while self.is_open():
self.update(timeout=None)
Expand Down
8 changes: 8 additions & 0 deletions kubernetes/e2e_test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ def test_pod_apis(self):
line = resp.readline_stderr(timeout=5)
self.assertFalse(resp.peek_stdout())
self.assertEqual("test string 2", line)
resp.write_stdin("sleep 2 && echo test string 3\n")
line = resp.readline_stdout(timeout=5)
self.assertEqual("test string 3", line)
resp.write_stdin("sleep 5 && echo test string 4\n")
line = resp.readline_stdout(timeout=0)
self.assertIsNone(line)
line = resp.readline_stdout(timeout=-1)
self.assertEqual("test string 4", line)
resp.write_stdin("exit\n")
resp.update(timeout=5)
while True:
Expand Down

0 comments on commit 8a65ebb

Please sign in to comment.