Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an OverflowError in readline_channel and enhance timeout handling #1989

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions kubernetes/base/stream/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ def read_channel(self, channel, timeout=0):

def readline_channel(self, channel, timeout=None):
"""Read a line from a channel."""
if timeout is None:
timeout = float("inf")
if timeout is not None and timeout < 0:
timeout = None
start = time.time()
while self.is_open() and time.time() - start < timeout:
while timeout is None or time.time() - start < timeout:
if channel in self._channels:
data = self._channels[channel]
if self.newline in data:
Expand All @@ -104,7 +104,15 @@ def readline_channel(self, channel, timeout=None):
else:
del self._channels[channel]
return ret
self.update(timeout=(timeout - time.time() + start))

if not self.is_open():
return

if timeout is not None:
# the timeout here should never be negative, because otherwise this method could block indefinitly
self.update(timeout=max(timeout - time.time() + start, 0))
else:
self.update(timeout=None)

def write_channel(self, channel, data):
"""Write data to a channel."""
Expand Down Expand Up @@ -190,6 +198,13 @@ def update(self, timeout=0):
r = poll.poll(timeout)
poll.unregister(self.sock.sock)
else:
# select.select() does not work with negative timeouts, when a negative value is
# given select.epoll() and select.poll() are blocking until there is an event for
# the poll object, therefore set the timeout to None in order to have the same
# behaviour when select.select() is used
if timeout is not None and timeout < 0:
timeout = None

r, _, _ = select.select(
(self.sock.sock, ), (), (), timeout)

Expand Down Expand Up @@ -220,10 +235,11 @@ def update(self, timeout=0):
def run_forever(self, timeout=None):
"""Wait till connection is closed or timeout reached. Buffer any input
received during this time."""
if timeout:
if timeout is not None:
start = time.time()
while self.is_open() and time.time() - start < timeout:
self.update(timeout=(timeout - time.time() + start))
# the timeout here should never be negative, because otherwise this method could block indefinitly
self.update(timeout=max(timeout - time.time() + start, 0))
else:
while self.is_open():
self.update(timeout=None)
Expand Down
8 changes: 8 additions & 0 deletions kubernetes/e2e_test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ def test_pod_apis(self):
line = resp.readline_stderr(timeout=5)
self.assertFalse(resp.peek_stdout())
self.assertEqual("test string 2", line)
resp.write_stdin("sleep 2 && echo test string 3\n")
line = resp.readline_stdout(timeout=5)
self.assertEqual("test string 3", line)
resp.write_stdin("sleep 5 && echo test string 4\n")
line = resp.readline_stdout(timeout=0)
self.assertIsNone(line)
line = resp.readline_stdout(timeout=-1)
self.assertEqual("test string 4", line)
resp.write_stdin("exit\n")
resp.update(timeout=5)
while True:
Expand Down