From 7aae914ba77a1515aff4f2da09e4237590baec5b Mon Sep 17 00:00:00 2001 From: "Aleksey @soar Smyrnov" Date: Wed, 30 Aug 2017 12:38:28 +0300 Subject: [PATCH 1/7] Added optional parameter `recycle` for `create_engine` in aiopg.sa --- aiopg/connection.py | 8 ++++++++ aiopg/pool.py | 15 ++++++++++----- aiopg/sa/engine.py | 9 +++++---- 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/aiopg/connection.py b/aiopg/connection.py index 8718bec4..06e6d5eb 100755 --- a/aiopg/connection.py +++ b/aiopg/connection.py @@ -3,6 +3,7 @@ import errno import select import sys +import time import traceback import warnings import weakref @@ -111,6 +112,7 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs): assert self._conn.isexecuting(), "Is conn an async at all???" self._fileno = self._conn.fileno() self._timeout = timeout + self._last_usage = time.time() self._waiter = waiter self._writing = False self._cancelling = False @@ -264,6 +266,7 @@ def cursor(self, name=None, cursor_factory=None, psycopg in asynchronous mode. """ + self._last_usage = time.time() coro = self._cursor(name=name, cursor_factory=cursor_factory, scrollable=scrollable, withhold=withhold, timeout=timeout) @@ -492,6 +495,11 @@ def timeout(self): """Return default timeout for connection operations.""" return self._timeout + @property + def last_usage(self): + """Return time() when connection was used.""" + return self._last_usage + @property def echo(self): """Return echo mode status.""" diff --git a/aiopg/pool.py b/aiopg/pool.py index e3912834..d640c6b1 100644 --- a/aiopg/pool.py +++ b/aiopg/pool.py @@ -1,6 +1,7 @@ import asyncio import collections import sys +import time import warnings @@ -17,12 +18,12 @@ def create_pool(dsn=None, *, minsize=1, maxsize=10, - loop=None, timeout=TIMEOUT, + loop=None, timeout=TIMEOUT, recycle=-1, enable_json=True, enable_hstore=True, enable_uuid=True, echo=False, on_connect=None, **kwargs): coro = _create_pool(dsn=dsn, minsize=minsize, maxsize=maxsize, loop=loop, - timeout=timeout, enable_json=enable_json, + timeout=timeout, recycle=recycle, enable_json=enable_json, enable_hstore=enable_hstore, enable_uuid=enable_uuid, echo=echo, on_connect=on_connect, **kwargs) return _PoolContextManager(coro) @@ -30,7 +31,7 @@ def create_pool(dsn=None, *, minsize=1, maxsize=10, @asyncio.coroutine def _create_pool(dsn=None, *, minsize=1, maxsize=10, - loop=None, timeout=TIMEOUT, + loop=None, timeout=TIMEOUT, recycle=-1, enable_json=True, enable_hstore=True, enable_uuid=True, echo=False, on_connect=None, **kwargs): @@ -40,7 +41,7 @@ def _create_pool(dsn=None, *, minsize=1, maxsize=10, pool = Pool(dsn, minsize, maxsize, loop, timeout, enable_json=enable_json, enable_hstore=enable_hstore, enable_uuid=enable_uuid, echo=echo, on_connect=on_connect, - **kwargs) + recycle=recycle, **kwargs) if minsize > 0: with (yield from pool._cond): yield from pool._fill_free_pool(False) @@ -52,7 +53,7 @@ class Pool(asyncio.AbstractServer): def __init__(self, dsn, minsize, maxsize, loop, timeout, *, enable_json, enable_hstore, enable_uuid, echo, - on_connect, **kwargs): + on_connect, recycle, **kwargs): if minsize < 0: raise ValueError("minsize should be zero or greater") if maxsize < minsize and maxsize != 0: @@ -61,6 +62,7 @@ def __init__(self, dsn, minsize, maxsize, loop, timeout, *, self._minsize = minsize self._loop = loop self._timeout = timeout + self._recycle = recycle self._enable_json = enable_json self._enable_hstore = enable_hstore self._enable_uuid = enable_uuid @@ -187,6 +189,9 @@ def _fill_free_pool(self, override_min): conn = self._free[-1] if conn.closed: self._free.pop() + elif self._recycle > -1 and time.time() - conn.last_usage > self._recycle: + conn.close() + self._free.pop() else: self._free.rotate() n += 1 diff --git a/aiopg/sa/engine.py b/aiopg/sa/engine.py index 6b1cbd49..7065bee2 100644 --- a/aiopg/sa/engine.py +++ b/aiopg/sa/engine.py @@ -44,7 +44,7 @@ def _exec_default(self, default): def create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None, - dialect=_dialect, timeout=TIMEOUT, **kwargs): + dialect=_dialect, timeout=TIMEOUT, recycle=-1, **kwargs): """A coroutine for Engine creation. Returns Engine instance with embedded connection pool. @@ -54,17 +54,18 @@ def create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None, coro = _create_engine(dsn=dsn, minsize=minsize, maxsize=maxsize, loop=loop, dialect=dialect, timeout=timeout, - **kwargs) + recycle=recycle, **kwargs) return _EngineContextManager(coro) @asyncio.coroutine def _create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None, - dialect=_dialect, timeout=TIMEOUT, **kwargs): + dialect=_dialect, timeout=TIMEOUT, recycle=-1, **kwargs): if loop is None: loop = asyncio.get_event_loop() pool = yield from aiopg.create_pool(dsn, minsize=minsize, maxsize=maxsize, - loop=loop, timeout=timeout, **kwargs) + loop=loop, timeout=timeout, recycle=recycle, + **kwargs) conn = yield from pool.acquire() try: real_dsn = conn.dsn From c62244362a11eac6b2ebb8d16771047fb651741a Mon Sep 17 00:00:00 2001 From: "Aleksey @soar Smyrnov" Date: Wed, 30 Aug 2017 12:58:41 +0300 Subject: [PATCH 2/7] Fix code style --- aiopg/pool.py | 10 ++++++---- aiopg/sa/engine.py | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/aiopg/pool.py b/aiopg/pool.py index d640c6b1..b5992d63 100644 --- a/aiopg/pool.py +++ b/aiopg/pool.py @@ -23,9 +23,10 @@ def create_pool(dsn=None, *, minsize=1, maxsize=10, echo=False, on_connect=None, **kwargs): coro = _create_pool(dsn=dsn, minsize=minsize, maxsize=maxsize, loop=loop, - timeout=timeout, recycle=recycle, enable_json=enable_json, - enable_hstore=enable_hstore, enable_uuid=enable_uuid, - echo=echo, on_connect=on_connect, **kwargs) + timeout=timeout, recycle=recycle, + enable_json=enable_json, enable_hstore=enable_hstore, + enable_uuid=enable_uuid, echo=echo, + on_connect=on_connect, **kwargs) return _PoolContextManager(coro) @@ -189,7 +190,8 @@ def _fill_free_pool(self, override_min): conn = self._free[-1] if conn.closed: self._free.pop() - elif self._recycle > -1 and time.time() - conn.last_usage > self._recycle: + elif self._recycle > -1 \ + and time.time() - conn.last_usage > self._recycle: conn.close() self._free.pop() else: diff --git a/aiopg/sa/engine.py b/aiopg/sa/engine.py index 7065bee2..53b013f9 100644 --- a/aiopg/sa/engine.py +++ b/aiopg/sa/engine.py @@ -64,8 +64,8 @@ def _create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None, if loop is None: loop = asyncio.get_event_loop() pool = yield from aiopg.create_pool(dsn, minsize=minsize, maxsize=maxsize, - loop=loop, timeout=timeout, recycle=recycle, - **kwargs) + loop=loop, timeout=timeout, + recycle=recycle, **kwargs) conn = yield from pool.acquire() try: real_dsn = conn.dsn From 54eea1f36c5c279ab591eaffc34b4aef4e698a31 Mon Sep 17 00:00:00 2001 From: "Aleksey @soar Smyrnov" Date: Wed, 30 Aug 2017 15:57:25 +0300 Subject: [PATCH 3/7] Added test for connection recycling --- tests/test_pool.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/test_pool.py b/tests/test_pool.py index 501e761b..cb8d0cb3 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -505,6 +505,27 @@ def sleep(conn): assert (1,) == val +@asyncio.coroutine +def test_pool_with_connection_recycling(create_pool, loop): + pool = yield from create_pool(minsize=1, + maxsize=1, + recycle=3) + with (yield from pool) as conn: + cur = yield from conn.cursor() + yield from cur.execute('SELECT 1;') + val = yield from cur.fetchone() + assert (1,) == val + + yield from asyncio.sleep(5, loop=loop) + + assert 1 == pool.freesize + with (yield from pool) as conn: + cur = yield from conn.cursor() + yield from cur.execute('SELECT 1;') + val = yield from cur.fetchone() + assert (1,) == val + + @asyncio.coroutine def test_connection_in_good_state_after_timeout_in_transaction(create_pool): @asyncio.coroutine From 0d006db341603cda9bda447318179b92db77bbfe Mon Sep 17 00:00:00 2001 From: "Aleksey @soar Smyrnov" Date: Wed, 30 Aug 2017 16:16:17 +0300 Subject: [PATCH 4/7] Use `loop.time()` instead of `time.time()` --- aiopg/connection.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/aiopg/connection.py b/aiopg/connection.py index 06e6d5eb..5fbbd26d 100755 --- a/aiopg/connection.py +++ b/aiopg/connection.py @@ -3,7 +3,6 @@ import errno import select import sys -import time import traceback import warnings import weakref @@ -112,7 +111,7 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs): assert self._conn.isexecuting(), "Is conn an async at all???" self._fileno = self._conn.fileno() self._timeout = timeout - self._last_usage = time.time() + self._last_usage = self._loop.time() self._waiter = waiter self._writing = False self._cancelling = False @@ -266,7 +265,7 @@ def cursor(self, name=None, cursor_factory=None, psycopg in asynchronous mode. """ - self._last_usage = time.time() + self._last_usage = self._loop.time() coro = self._cursor(name=name, cursor_factory=cursor_factory, scrollable=scrollable, withhold=withhold, timeout=timeout) From cfc3b7e3446964a6359a26566fcf5a9844487c04 Mon Sep 17 00:00:00 2001 From: "Aleksey @soar Smyrnov" Date: Wed, 30 Aug 2017 21:18:08 +0300 Subject: [PATCH 5/7] One more `time.time()` to `loop.time()` change --- aiopg/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiopg/pool.py b/aiopg/pool.py index b5992d63..7822e35c 100644 --- a/aiopg/pool.py +++ b/aiopg/pool.py @@ -191,7 +191,7 @@ def _fill_free_pool(self, override_min): if conn.closed: self._free.pop() elif self._recycle > -1 \ - and time.time() - conn.last_usage > self._recycle: + and self._loop.time() - conn.last_usage > self._recycle: conn.close() self._free.pop() else: From ca363277985d3bb859f45d517e7e962f7a11a66f Mon Sep 17 00:00:00 2001 From: "Aleksey @soar Smyrnov" Date: Wed, 30 Aug 2017 21:19:24 +0300 Subject: [PATCH 6/7] Remove unused import --- aiopg/pool.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aiopg/pool.py b/aiopg/pool.py index 7822e35c..10341d0b 100644 --- a/aiopg/pool.py +++ b/aiopg/pool.py @@ -1,7 +1,6 @@ import asyncio import collections import sys -import time import warnings From 8f5deab5e79fd1268d49acc2a62e476916d0e80a Mon Sep 17 00:00:00 2001 From: "Aleksey @soar Smyrnov" Date: Fri, 1 Sep 2017 20:35:14 +0300 Subject: [PATCH 7/7] Rename parameter `recycle` to `pool_recycle` according to SQLAlchemy engine parameter See: https://github.com/aio-libs/aiopg/pull/373 --- aiopg/pool.py | 12 ++++++------ aiopg/sa/engine.py | 10 ++++++---- tests/test_pool.py | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/aiopg/pool.py b/aiopg/pool.py index 10341d0b..f5df9d2d 100644 --- a/aiopg/pool.py +++ b/aiopg/pool.py @@ -17,12 +17,12 @@ def create_pool(dsn=None, *, minsize=1, maxsize=10, - loop=None, timeout=TIMEOUT, recycle=-1, + loop=None, timeout=TIMEOUT, pool_recycle=-1, enable_json=True, enable_hstore=True, enable_uuid=True, echo=False, on_connect=None, **kwargs): coro = _create_pool(dsn=dsn, minsize=minsize, maxsize=maxsize, loop=loop, - timeout=timeout, recycle=recycle, + timeout=timeout, pool_recycle=pool_recycle, enable_json=enable_json, enable_hstore=enable_hstore, enable_uuid=enable_uuid, echo=echo, on_connect=on_connect, **kwargs) @@ -31,7 +31,7 @@ def create_pool(dsn=None, *, minsize=1, maxsize=10, @asyncio.coroutine def _create_pool(dsn=None, *, minsize=1, maxsize=10, - loop=None, timeout=TIMEOUT, recycle=-1, + loop=None, timeout=TIMEOUT, pool_recycle=-1, enable_json=True, enable_hstore=True, enable_uuid=True, echo=False, on_connect=None, **kwargs): @@ -41,7 +41,7 @@ def _create_pool(dsn=None, *, minsize=1, maxsize=10, pool = Pool(dsn, minsize, maxsize, loop, timeout, enable_json=enable_json, enable_hstore=enable_hstore, enable_uuid=enable_uuid, echo=echo, on_connect=on_connect, - recycle=recycle, **kwargs) + pool_recycle=pool_recycle, **kwargs) if minsize > 0: with (yield from pool._cond): yield from pool._fill_free_pool(False) @@ -53,7 +53,7 @@ class Pool(asyncio.AbstractServer): def __init__(self, dsn, minsize, maxsize, loop, timeout, *, enable_json, enable_hstore, enable_uuid, echo, - on_connect, recycle, **kwargs): + on_connect, pool_recycle, **kwargs): if minsize < 0: raise ValueError("minsize should be zero or greater") if maxsize < minsize and maxsize != 0: @@ -62,7 +62,7 @@ def __init__(self, dsn, minsize, maxsize, loop, timeout, *, self._minsize = minsize self._loop = loop self._timeout = timeout - self._recycle = recycle + self._recycle = pool_recycle self._enable_json = enable_json self._enable_hstore = enable_hstore self._enable_uuid = enable_uuid diff --git a/aiopg/sa/engine.py b/aiopg/sa/engine.py index 53b013f9..94620fd5 100644 --- a/aiopg/sa/engine.py +++ b/aiopg/sa/engine.py @@ -44,7 +44,8 @@ def _exec_default(self, default): def create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None, - dialect=_dialect, timeout=TIMEOUT, recycle=-1, **kwargs): + dialect=_dialect, timeout=TIMEOUT, pool_recycle=-1, + **kwargs): """A coroutine for Engine creation. Returns Engine instance with embedded connection pool. @@ -54,18 +55,19 @@ def create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None, coro = _create_engine(dsn=dsn, minsize=minsize, maxsize=maxsize, loop=loop, dialect=dialect, timeout=timeout, - recycle=recycle, **kwargs) + pool_recycle=pool_recycle, **kwargs) return _EngineContextManager(coro) @asyncio.coroutine def _create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None, - dialect=_dialect, timeout=TIMEOUT, recycle=-1, **kwargs): + dialect=_dialect, timeout=TIMEOUT, pool_recycle=-1, + **kwargs): if loop is None: loop = asyncio.get_event_loop() pool = yield from aiopg.create_pool(dsn, minsize=minsize, maxsize=maxsize, loop=loop, timeout=timeout, - recycle=recycle, **kwargs) + pool_recycle=pool_recycle, **kwargs) conn = yield from pool.acquire() try: real_dsn = conn.dsn diff --git a/tests/test_pool.py b/tests/test_pool.py index cb8d0cb3..84e36b6f 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -509,7 +509,7 @@ def sleep(conn): def test_pool_with_connection_recycling(create_pool, loop): pool = yield from create_pool(minsize=1, maxsize=1, - recycle=3) + pool_recycle=3) with (yield from pool) as conn: cur = yield from conn.cursor() yield from cur.execute('SELECT 1;')