diff --git a/kubernetes/base/stream/ws_client.py b/kubernetes/base/stream/ws_client.py index 3c854ea74..75e1be6fb 100644 --- a/kubernetes/base/stream/ws_client.py +++ b/kubernetes/base/stream/ws_client.py @@ -89,10 +89,10 @@ def read_channel(self, channel, timeout=0): def readline_channel(self, channel, timeout=None): """Read a line from a channel.""" - if timeout is None: - timeout = float("inf") + if timeout is not None and timeout < 0: + timeout = None start = time.time() - while self.is_open() and time.time() - start < timeout: + while timeout is None or time.time() - start < timeout: if channel in self._channels: data = self._channels[channel] if self.newline in data: @@ -104,7 +104,15 @@ def readline_channel(self, channel, timeout=None): else: del self._channels[channel] return ret - self.update(timeout=(timeout - time.time() + start)) + + if not self.is_open(): + return + + if timeout is not None: + # the timeout here should never be negative, because otherwise this method could block indefinitly + self.update(timeout=max(timeout - time.time() + start, 0)) + else: + self.update(timeout=None) def write_channel(self, channel, data): """Write data to a channel.""" @@ -190,6 +198,13 @@ def update(self, timeout=0): r = poll.poll(timeout) poll.unregister(self.sock.sock) else: + # select.select() does not work with negative timeouts, when a negative value is + # given select.epoll() and select.poll() are blocking until there is an event for + # the poll object, therefore set the timeout to None in order to have the same + # behaviour when select.select() is used + if timeout is not None and timeout < 0: + timeout = None + r, _, _ = select.select( (self.sock.sock, ), (), (), timeout) @@ -220,10 +235,11 @@ def update(self, timeout=0): def run_forever(self, timeout=None): """Wait till connection is closed or timeout reached. Buffer any input received during this time.""" - if timeout: + if timeout is not None: start = time.time() while self.is_open() and time.time() - start < timeout: - self.update(timeout=(timeout - time.time() + start)) + # the timeout here should never be negative, because otherwise this method could block indefinitly + self.update(timeout=max(timeout - time.time() + start, 0)) else: while self.is_open(): self.update(timeout=None) diff --git a/kubernetes/e2e_test/test_client.py b/kubernetes/e2e_test/test_client.py index 15689291e..aa9808624 100644 --- a/kubernetes/e2e_test/test_client.py +++ b/kubernetes/e2e_test/test_client.py @@ -157,6 +157,14 @@ def test_pod_apis(self): line = resp.readline_stderr(timeout=5) self.assertFalse(resp.peek_stdout()) self.assertEqual("test string 2", line) + resp.write_stdin("sleep 2 && echo test string 3\n") + line = resp.readline_stdout(timeout=5) + self.assertEqual("test string 3", line) + resp.write_stdin("sleep 5 && echo test string 4\n") + line = resp.readline_stdout(timeout=0) + self.assertIsNone(line) + line = resp.readline_stdout(timeout=-1) + self.assertEqual("test string 4", line) resp.write_stdin("exit\n") resp.update(timeout=5) while True: