From 5699f8dec1f7d33ef9aec540a92c8b75bb8f9166 Mon Sep 17 00:00:00 2001 From: "Aleksey @soar Smyrnov" Date: Sun, 10 Sep 2017 22:22:20 +0300 Subject: [PATCH] Pool connections recycling (#373) * Added optional parameter `recycle` for `create_engine` in aiopg.sa * Fix code style * Added test for connection recycling * Use `loop.time()` instead of `time.time()` * One more `time.time()` to `loop.time()` change * Remove unused import * Rename parameter `recycle` to `pool_recycle` according to SQLAlchemy engine parameter See: https://github.com/aio-libs/aiopg/pull/373 --- aiopg/connection.py | 7 +++++++ aiopg/pool.py | 20 +++++++++++++------- aiopg/sa/engine.py | 11 +++++++---- tests/test_pool.py | 21 +++++++++++++++++++++ 4 files changed, 48 insertions(+), 11 deletions(-) diff --git a/aiopg/connection.py b/aiopg/connection.py index 8718bec4..5fbbd26d 100755 --- a/aiopg/connection.py +++ b/aiopg/connection.py @@ -111,6 +111,7 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs): assert self._conn.isexecuting(), "Is conn an async at all???" self._fileno = self._conn.fileno() self._timeout = timeout + self._last_usage = self._loop.time() self._waiter = waiter self._writing = False self._cancelling = False @@ -264,6 +265,7 @@ def cursor(self, name=None, cursor_factory=None, psycopg in asynchronous mode. """ + self._last_usage = self._loop.time() coro = self._cursor(name=name, cursor_factory=cursor_factory, scrollable=scrollable, withhold=withhold, timeout=timeout) @@ -492,6 +494,11 @@ def timeout(self): """Return default timeout for connection operations.""" return self._timeout + @property + def last_usage(self): + """Return time() when connection was used.""" + return self._last_usage + @property def echo(self): """Return echo mode status.""" diff --git a/aiopg/pool.py b/aiopg/pool.py index e3912834..f5df9d2d 100644 --- a/aiopg/pool.py +++ b/aiopg/pool.py @@ -17,20 +17,21 @@ def create_pool(dsn=None, *, minsize=1, maxsize=10, - loop=None, timeout=TIMEOUT, + loop=None, timeout=TIMEOUT, pool_recycle=-1, enable_json=True, enable_hstore=True, enable_uuid=True, echo=False, on_connect=None, **kwargs): coro = _create_pool(dsn=dsn, minsize=minsize, maxsize=maxsize, loop=loop, - timeout=timeout, enable_json=enable_json, - enable_hstore=enable_hstore, enable_uuid=enable_uuid, - echo=echo, on_connect=on_connect, **kwargs) + timeout=timeout, pool_recycle=pool_recycle, + enable_json=enable_json, enable_hstore=enable_hstore, + enable_uuid=enable_uuid, echo=echo, + on_connect=on_connect, **kwargs) return _PoolContextManager(coro) @asyncio.coroutine def _create_pool(dsn=None, *, minsize=1, maxsize=10, - loop=None, timeout=TIMEOUT, + loop=None, timeout=TIMEOUT, pool_recycle=-1, enable_json=True, enable_hstore=True, enable_uuid=True, echo=False, on_connect=None, **kwargs): @@ -40,7 +41,7 @@ def _create_pool(dsn=None, *, minsize=1, maxsize=10, pool = Pool(dsn, minsize, maxsize, loop, timeout, enable_json=enable_json, enable_hstore=enable_hstore, enable_uuid=enable_uuid, echo=echo, on_connect=on_connect, - **kwargs) + pool_recycle=pool_recycle, **kwargs) if minsize > 0: with (yield from pool._cond): yield from pool._fill_free_pool(False) @@ -52,7 +53,7 @@ class Pool(asyncio.AbstractServer): def __init__(self, dsn, minsize, maxsize, loop, timeout, *, enable_json, enable_hstore, enable_uuid, echo, - on_connect, **kwargs): + on_connect, pool_recycle, **kwargs): if minsize < 0: raise ValueError("minsize should be zero or greater") if maxsize < minsize and maxsize != 0: @@ -61,6 +62,7 @@ def __init__(self, dsn, minsize, maxsize, loop, timeout, *, self._minsize = minsize self._loop = loop self._timeout = timeout + self._recycle = pool_recycle self._enable_json = enable_json self._enable_hstore = enable_hstore self._enable_uuid = enable_uuid @@ -187,6 +189,10 @@ def _fill_free_pool(self, override_min): conn = self._free[-1] if conn.closed: self._free.pop() + elif self._recycle > -1 \ + and self._loop.time() - conn.last_usage > self._recycle: + conn.close() + self._free.pop() else: self._free.rotate() n += 1 diff --git a/aiopg/sa/engine.py b/aiopg/sa/engine.py index 6b1cbd49..94620fd5 100644 --- a/aiopg/sa/engine.py +++ b/aiopg/sa/engine.py @@ -44,7 +44,8 @@ def _exec_default(self, default): def create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None, - dialect=_dialect, timeout=TIMEOUT, **kwargs): + dialect=_dialect, timeout=TIMEOUT, pool_recycle=-1, + **kwargs): """A coroutine for Engine creation. Returns Engine instance with embedded connection pool. @@ -54,17 +55,19 @@ def create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None, coro = _create_engine(dsn=dsn, minsize=minsize, maxsize=maxsize, loop=loop, dialect=dialect, timeout=timeout, - **kwargs) + pool_recycle=pool_recycle, **kwargs) return _EngineContextManager(coro) @asyncio.coroutine def _create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None, - dialect=_dialect, timeout=TIMEOUT, **kwargs): + dialect=_dialect, timeout=TIMEOUT, pool_recycle=-1, + **kwargs): if loop is None: loop = asyncio.get_event_loop() pool = yield from aiopg.create_pool(dsn, minsize=minsize, maxsize=maxsize, - loop=loop, timeout=timeout, **kwargs) + loop=loop, timeout=timeout, + pool_recycle=pool_recycle, **kwargs) conn = yield from pool.acquire() try: real_dsn = conn.dsn diff --git a/tests/test_pool.py b/tests/test_pool.py index 501e761b..84e36b6f 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -505,6 +505,27 @@ def sleep(conn): assert (1,) == val +@asyncio.coroutine +def test_pool_with_connection_recycling(create_pool, loop): + pool = yield from create_pool(minsize=1, + maxsize=1, + pool_recycle=3) + with (yield from pool) as conn: + cur = yield from conn.cursor() + yield from cur.execute('SELECT 1;') + val = yield from cur.fetchone() + assert (1,) == val + + yield from asyncio.sleep(5, loop=loop) + + assert 1 == pool.freesize + with (yield from pool) as conn: + cur = yield from conn.cursor() + yield from cur.execute('SELECT 1;') + val = yield from cur.fetchone() + assert (1,) == val + + @asyncio.coroutine def test_connection_in_good_state_after_timeout_in_transaction(create_pool): @asyncio.coroutine