diff --git a/aiomysql/connection.py b/aiomysql/connection.py index cfea0718..2771e532 100644 --- a/aiomysql/connection.py +++ b/aiomysql/connection.py @@ -163,6 +163,7 @@ def __init__(self, host="localhost", user=None, password="", self._db = db self._no_delay = no_delay self._echo = echo + self._last_usage = self._loop.time() self._unix_socket = unix_socket if charset: @@ -240,6 +241,11 @@ def echo(self): """Return echo mode status.""" return self._echo + @property + def last_usage(self): + """Return time() when connection was used.""" + return self._last_usage + @property def loop(self): return self._loop @@ -375,6 +381,7 @@ def cursor(self, cursor=None): :raises TypeError: cursor_class is not a subclass of Cursor. """ self._ensure_alive() + self._last_usage = self._loop.time() if cursor is not None and not issubclass(cursor, Cursor): raise TypeError('Custom cursor must be subclass of Cursor') diff --git a/aiomysql/pool.py b/aiomysql/pool.py index 3dc74768..45f896ee 100644 --- a/aiomysql/pool.py +++ b/aiomysql/pool.py @@ -10,19 +10,21 @@ _PoolAcquireContextManager, create_future, create_task) -def create_pool(minsize=1, maxsize=10, echo=False, loop=None, **kwargs): +def create_pool(minsize=1, maxsize=10, echo=False, pool_recycle=-1, + loop=None, **kwargs): coro = _create_pool(minsize=minsize, maxsize=maxsize, echo=echo, - loop=loop, **kwargs) + pool_recycle=pool_recycle, loop=loop, **kwargs) return _PoolContextManager(coro) @asyncio.coroutine -def _create_pool(minsize=1, maxsize=10, echo=False, loop=None, **kwargs): +def _create_pool(minsize=1, maxsize=10, echo=False, pool_recycle=-1, + loop=None, **kwargs): if loop is None: loop = asyncio.get_event_loop() - pool = Pool(minsize=minsize, maxsize=maxsize, echo=echo, loop=loop, - **kwargs) + pool = Pool(minsize=minsize, maxsize=maxsize, echo=echo, + pool_recycle=pool_recycle, loop=loop, **kwargs) if minsize > 0: with (yield from pool._cond): yield from pool._fill_free_pool(False) @@ -32,7 +34,7 @@ def _create_pool(minsize=1, maxsize=10, echo=False, loop=None, **kwargs): class Pool(asyncio.AbstractServer): """Connection pool""" - def __init__(self, minsize, maxsize, echo, loop, **kwargs): + def __init__(self, minsize, maxsize, echo, pool_recycle, loop, **kwargs): if minsize < 0: raise ValueError("minsize should be zero or greater") if maxsize < minsize: @@ -48,6 +50,7 @@ def __init__(self, minsize, maxsize, echo, loop, **kwargs): self._closing = False self._closed = False self._echo = echo + self._recycle = pool_recycle @property def echo(self): @@ -153,6 +156,12 @@ def _fill_free_pool(self, override_min): if conn._reader.at_eof(): self._free.pop() conn.close() + + elif (self._recycle > -1 and + self._loop.time() - conn.last_usage > self._recycle): + self._free.pop() + conn.close() + else: self._free.rotate() n += 1 diff --git a/aiomysql/sa/engine.py b/aiomysql/sa/engine.py index a5dc1ff3..416e8960 100644 --- a/aiomysql/sa/engine.py +++ b/aiomysql/sa/engine.py @@ -19,7 +19,7 @@ def create_engine(minsize=1, maxsize=10, loop=None, - dialect=_dialect, **kwargs): + dialect=_dialect, pool_recycle=-1, **kwargs): """A coroutine for Engine creation. Returns Engine instance with embedded connection pool. @@ -27,18 +27,19 @@ def create_engine(minsize=1, maxsize=10, loop=None, The pool has *minsize* opened connections to PostgreSQL server. """ coro = _create_engine(minsize=minsize, maxsize=maxsize, loop=loop, - dialect=dialect, **kwargs) + dialect=dialect, pool_recycle=pool_recycle, **kwargs) return _EngineContextManager(coro) @asyncio.coroutine def _create_engine(minsize=1, maxsize=10, loop=None, - dialect=_dialect, **kwargs): + dialect=_dialect, pool_recycle=-1, **kwargs): if loop is None: loop = asyncio.get_event_loop() pool = yield from aiomysql.create_pool(minsize=minsize, maxsize=maxsize, - loop=loop, **kwargs) + loop=loop, + pool_recycle=pool_recycle, **kwargs) conn = yield from pool.acquire() try: return Engine(dialect, pool, **kwargs) diff --git a/tests/test_pool.py b/tests/test_pool.py index 5fb204b5..712e7789 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -497,3 +497,24 @@ def test_cancelled_connection(pool_creator, loop): res = yield from cur2.fetchall() # If we receive [(1, 0)] - we retrieved old cursor's values assert list(res) == [(2, 0)] + + +@asyncio.coroutine +def test_pool_with_connection_recycling(pool_creator, loop): + pool = yield from pool_creator(minsize=1, + maxsize=1, + pool_recycle=3) + with (yield from pool) as conn: + cur = yield from conn.cursor() + yield from cur.execute('SELECT 1;') + val = yield from cur.fetchone() + assert (1,) == val + + yield from asyncio.sleep(5, loop=loop) + + assert 1 == pool.freesize + with (yield from pool) as conn: + cur = yield from conn.cursor() + yield from cur.execute('SELECT 1;') + val = yield from cur.fetchone() + assert (1,) == val