diff --git a/aiopg/connection.py b/aiopg/connection.py index 8718bec4..bdbb30ed 100755 --- a/aiopg/connection.py +++ b/aiopg/connection.py @@ -115,6 +115,7 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs): self._writing = False self._cancelling = False self._cancellation_waiter = None + self._last_free_time = None self._echo = echo self._notifies = asyncio.Queue(loop=loop) self._weakref = weakref.ref(self) @@ -122,6 +123,8 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs): if loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1)) + self.free() + @staticmethod def _ready(weak_self): self = weak_self() @@ -497,6 +500,15 @@ def echo(self): """Return echo mode status.""" return self._echo + @property + def last_free_time(self): + """Return the last time the `free` method was called.""" + return self._last_free_time + + def free(self): + """Update last free time to the current time.""" + self._last_free_time = self._loop.time() + if PY_341: # pragma: no branch def __del__(self): try: diff --git a/aiopg/pool.py b/aiopg/pool.py index e3912834..966179f0 100644 --- a/aiopg/pool.py +++ b/aiopg/pool.py @@ -19,12 +19,13 @@ def create_pool(dsn=None, *, minsize=1, maxsize=10, loop=None, timeout=TIMEOUT, enable_json=True, enable_hstore=True, enable_uuid=True, - echo=False, on_connect=None, + echo=False, on_connect=None, pool_recycle=-1, **kwargs): coro = _create_pool(dsn=dsn, minsize=minsize, maxsize=maxsize, loop=loop, timeout=timeout, enable_json=enable_json, enable_hstore=enable_hstore, enable_uuid=enable_uuid, - echo=echo, on_connect=on_connect, **kwargs) + echo=echo, on_connect=on_connect, + pool_recycle=pool_recycle, **kwargs) return _PoolContextManager(coro) @@ -32,7 +33,7 @@ def create_pool(dsn=None, *, minsize=1, maxsize=10, def _create_pool(dsn=None, *, minsize=1, maxsize=10, loop=None, timeout=TIMEOUT, enable_json=True, enable_hstore=True, enable_uuid=True, - echo=False, on_connect=None, + echo=False, on_connect=None, pool_recycle=-1, **kwargs): if loop is None: loop = asyncio.get_event_loop() @@ -40,7 +41,7 @@ def _create_pool(dsn=None, *, minsize=1, maxsize=10, pool = Pool(dsn, minsize, maxsize, loop, timeout, enable_json=enable_json, enable_hstore=enable_hstore, enable_uuid=enable_uuid, echo=echo, on_connect=on_connect, - **kwargs) + pool_recycle=pool_recycle, **kwargs) if minsize > 0: with (yield from pool._cond): yield from pool._fill_free_pool(False) @@ -52,7 +53,7 @@ class Pool(asyncio.AbstractServer): def __init__(self, dsn, minsize, maxsize, loop, timeout, *, enable_json, enable_hstore, enable_uuid, echo, - on_connect, **kwargs): + on_connect, pool_recycle, **kwargs): if minsize < 0: raise ValueError("minsize should be zero or greater") if maxsize < minsize and maxsize != 0: @@ -66,6 +67,7 @@ def __init__(self, dsn, minsize, maxsize, loop, timeout, *, self._enable_uuid = enable_uuid self._echo = echo self._on_connect = on_connect + self._pool_recycle = pool_recycle self._conn_kwargs = kwargs self._acquiring = 0 self._free = collections.deque(maxlen=maxsize or None) @@ -183,8 +185,15 @@ def _acquire(self): def _fill_free_pool(self, override_min): # iterate over free connections and remove timeouted ones n, free = 0, len(self._free) + now = self._loop.time() while n < free: conn = self._free[-1] + + # Close connections that have lived longer than the recycle limit. + unused_duration = now - conn.last_free_time + if 0 < self._pool_recycle < unused_duration: + conn.close() + if conn.closed: self._free.pop() else: @@ -203,6 +212,7 @@ def _fill_free_pool(self, override_min): **self._conn_kwargs) # raise exception if pool is closing self._free.append(conn) + conn.free() self._cond.notify() finally: self._acquiring -= 1 @@ -221,6 +231,7 @@ def _fill_free_pool(self, override_min): **self._conn_kwargs) # raise exception if pool is closing self._free.append(conn) + conn.free() self._cond.notify() finally: self._acquiring -= 1 @@ -253,6 +264,7 @@ def release(self, conn): conn.close() else: self._free.append(conn) + conn.free() fut = ensure_future(self._wakeup(), loop=self._loop) return fut diff --git a/tests/test_pool.py b/tests/test_pool.py index 501e761b..036b31a7 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -588,3 +588,25 @@ def cb(connection): yield from cur.execute('SELECT 1') assert called + + +@asyncio.coroutine +def test_pool_recycle_noop_without_pool_recycle(create_pool, loop): + pool = yield from create_pool(minsize=3) + + conn = yield from pool.acquire() + yield from pool.release(conn) + yield from asyncio.sleep(0.02, loop=loop) + yield from pool.acquire() + assert all([not c.closed for c in pool._free if conn is c]) + + +@asyncio.coroutine +def test_pool_recycle_closes_timed_out_engines(create_pool, loop): + pool = yield from create_pool(minsize=3, pool_recycle=0.01) + + conn = yield from pool.acquire() + yield from pool.release(conn) + yield from asyncio.sleep(0.02, loop=loop) + yield from pool.acquire() + assert all([c.closed for c in pool._free if conn is c])