diff --git a/aiohttp/connector.py b/aiohttp/connector.py index 98583b52b71..99f319824e6 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -8,6 +8,7 @@ import traceback import warnings +from math import ceil from . import hdrs from .client import ClientRequest @@ -160,31 +161,38 @@ def _cleanup(self): now = self._loop.time() connections = {} + timeout = self._keepalive_timeout + for key, conns in self._conns.items(): alive = [] for transport, proto, t0 in conns: if transport is not None: if proto and not proto.is_connected(): transport = None - elif (now - t0) > self._keepalive_timeout: - transport.close() - transport = None + else: + delta = t0 + self._keepalive_timeout - now + if delta < 0: + transport.close() + transport = None + elif delta < timeout: + timeout = delta - if transport: + if transport is not None: alive.append((transport, proto, t0)) if alive: connections[key] = alive if connections: - self._cleanup_handle = self._loop.call_later( - self._keepalive_timeout, self._cleanup) + self._cleanup_handle = self._loop.call_at( + ceil(now + timeout), self._cleanup) self._conns = connections def _start_cleanup_task(self): if self._cleanup_handle is None: - self._cleanup_handle = self._loop.call_later( - self._keepalive_timeout, self._cleanup) + now = self._loop.time() + self._cleanup_handle = self._loop.call_at( + ceil(now + self._keepalive_timeout), self._cleanup) def close(self): """Close all opened transports.""" diff --git a/tests/test_connector.py b/tests/test_connector.py index 21eb740b21e..061bbeafafa 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -374,13 +374,14 @@ def test_connect_oserr(self): def test_start_cleanup_task(self): loop = unittest.mock.Mock() - conn = aiohttp.BaseConnector(loop=loop) + loop.time.return_value = 1.5 + conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=10) self.assertIsNone(conn._cleanup_handle) conn._start_cleanup_task() self.assertIsNotNone(conn._cleanup_handle) - loop.call_later.assert_called_with( - conn._keepalive_timeout, conn._cleanup) + loop.call_at.assert_called_with( + 12, conn._cleanup) def test_cleanup(self): testset = { @@ -402,15 +403,40 @@ def test_cleanup(self): self.assertEqual(conn._conns, {}) self.assertIsNone(conn._cleanup_handle) + def test_cleanup2(self): testset = {1: [(unittest.mock.Mock(), unittest.mock.Mock(), 300)]} testset[1][0][1].is_connected.return_value = True - conn = aiohttp.BaseConnector(loop=loop) + loop = unittest.mock.Mock() + loop.time.return_value = 300.1 + + conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=10) conn._conns = testset conn._cleanup() self.assertEqual(conn._conns, testset) self.assertIsNotNone(conn._cleanup_handle) + loop.call_at.assert_called_with( + 310, conn._cleanup) + conn.close() + + def test_cleanup3(self): + testset = {1: [(unittest.mock.Mock(), unittest.mock.Mock(), 290.1), + (unittest.mock.Mock(), unittest.mock.Mock(), 305.1)]} + testset[1][0][1].is_connected.return_value = True + + loop = unittest.mock.Mock() + loop.time.return_value = 308.5 + + conn = aiohttp.BaseConnector(loop=loop, keepalive_timeout=10) + conn._conns = testset + + conn._cleanup() + self.assertEqual(conn._conns, {1: [testset[1][1]]}) + + self.assertIsNotNone(conn._cleanup_handle) + loop.call_at.assert_called_with( + 316, conn._cleanup) conn.close() def test_tcp_connector_ctor(self):