Skip to content

Commit

Permalink
Pool connections recycling (#373)
Browse files Browse the repository at this point in the history
* Added optional parameter `recycle` for `create_engine` in aiopg.sa

* Fix code style

* Added test for connection recycling

* Use `loop.time()` instead of `time.time()`

* One more `time.time()` to `loop.time()` change

* Remove unused import

* Rename parameter `recycle` to `pool_recycle` according to SQLAlchemy engine parameter
See: #373
  • Loading branch information
soar authored and jettify committed Sep 10, 2017
1 parent 7db36f4 commit 5699f8d
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 11 deletions.
7 changes: 7 additions & 0 deletions aiopg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs):
assert self._conn.isexecuting(), "Is conn an async at all???"
self._fileno = self._conn.fileno()
self._timeout = timeout
self._last_usage = self._loop.time()
self._waiter = waiter
self._writing = False
self._cancelling = False
Expand Down Expand Up @@ -264,6 +265,7 @@ def cursor(self, name=None, cursor_factory=None,
psycopg in asynchronous mode.
"""
self._last_usage = self._loop.time()
coro = self._cursor(name=name, cursor_factory=cursor_factory,
scrollable=scrollable, withhold=withhold,
timeout=timeout)
Expand Down Expand Up @@ -492,6 +494,11 @@ def timeout(self):
"""Return default timeout for connection operations."""
return self._timeout

@property
def last_usage(self):
"""Return time() when connection was used."""
return self._last_usage

@property
def echo(self):
"""Return echo mode status."""
Expand Down
20 changes: 13 additions & 7 deletions aiopg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@


def create_pool(dsn=None, *, minsize=1, maxsize=10,
loop=None, timeout=TIMEOUT,
loop=None, timeout=TIMEOUT, pool_recycle=-1,
enable_json=True, enable_hstore=True, enable_uuid=True,
echo=False, on_connect=None,
**kwargs):
coro = _create_pool(dsn=dsn, minsize=minsize, maxsize=maxsize, loop=loop,
timeout=timeout, enable_json=enable_json,
enable_hstore=enable_hstore, enable_uuid=enable_uuid,
echo=echo, on_connect=on_connect, **kwargs)
timeout=timeout, pool_recycle=pool_recycle,
enable_json=enable_json, enable_hstore=enable_hstore,
enable_uuid=enable_uuid, echo=echo,
on_connect=on_connect, **kwargs)
return _PoolContextManager(coro)


@asyncio.coroutine
def _create_pool(dsn=None, *, minsize=1, maxsize=10,
loop=None, timeout=TIMEOUT,
loop=None, timeout=TIMEOUT, pool_recycle=-1,
enable_json=True, enable_hstore=True, enable_uuid=True,
echo=False, on_connect=None,
**kwargs):
Expand All @@ -40,7 +41,7 @@ def _create_pool(dsn=None, *, minsize=1, maxsize=10,
pool = Pool(dsn, minsize, maxsize, loop, timeout,
enable_json=enable_json, enable_hstore=enable_hstore,
enable_uuid=enable_uuid, echo=echo, on_connect=on_connect,
**kwargs)
pool_recycle=pool_recycle, **kwargs)
if minsize > 0:
with (yield from pool._cond):
yield from pool._fill_free_pool(False)
Expand All @@ -52,7 +53,7 @@ class Pool(asyncio.AbstractServer):

def __init__(self, dsn, minsize, maxsize, loop, timeout, *,
enable_json, enable_hstore, enable_uuid, echo,
on_connect, **kwargs):
on_connect, pool_recycle, **kwargs):
if minsize < 0:
raise ValueError("minsize should be zero or greater")
if maxsize < minsize and maxsize != 0:
Expand All @@ -61,6 +62,7 @@ def __init__(self, dsn, minsize, maxsize, loop, timeout, *,
self._minsize = minsize
self._loop = loop
self._timeout = timeout
self._recycle = pool_recycle
self._enable_json = enable_json
self._enable_hstore = enable_hstore
self._enable_uuid = enable_uuid
Expand Down Expand Up @@ -187,6 +189,10 @@ def _fill_free_pool(self, override_min):
conn = self._free[-1]
if conn.closed:
self._free.pop()
elif self._recycle > -1 \
and self._loop.time() - conn.last_usage > self._recycle:
conn.close()
self._free.pop()
else:
self._free.rotate()
n += 1
Expand Down
11 changes: 7 additions & 4 deletions aiopg/sa/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def _exec_default(self, default):


def create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None,
dialect=_dialect, timeout=TIMEOUT, **kwargs):
dialect=_dialect, timeout=TIMEOUT, pool_recycle=-1,
**kwargs):
"""A coroutine for Engine creation.
Returns Engine instance with embedded connection pool.
Expand All @@ -54,17 +55,19 @@ def create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None,

coro = _create_engine(dsn=dsn, minsize=minsize, maxsize=maxsize,
loop=loop, dialect=dialect, timeout=timeout,
**kwargs)
pool_recycle=pool_recycle, **kwargs)
return _EngineContextManager(coro)


@asyncio.coroutine
def _create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None,
dialect=_dialect, timeout=TIMEOUT, **kwargs):
dialect=_dialect, timeout=TIMEOUT, pool_recycle=-1,
**kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pool = yield from aiopg.create_pool(dsn, minsize=minsize, maxsize=maxsize,
loop=loop, timeout=timeout, **kwargs)
loop=loop, timeout=timeout,
pool_recycle=pool_recycle, **kwargs)
conn = yield from pool.acquire()
try:
real_dsn = conn.dsn
Expand Down
21 changes: 21 additions & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,27 @@ def sleep(conn):
assert (1,) == val


@asyncio.coroutine
def test_pool_with_connection_recycling(create_pool, loop):
pool = yield from create_pool(minsize=1,
maxsize=1,
pool_recycle=3)
with (yield from pool) as conn:
cur = yield from conn.cursor()
yield from cur.execute('SELECT 1;')
val = yield from cur.fetchone()
assert (1,) == val

yield from asyncio.sleep(5, loop=loop)

assert 1 == pool.freesize
with (yield from pool) as conn:
cur = yield from conn.cursor()
yield from cur.execute('SELECT 1;')
val = yield from cur.fetchone()
assert (1,) == val


@asyncio.coroutine
def test_connection_in_good_state_after_timeout_in_transaction(create_pool):
@asyncio.coroutine
Expand Down

0 comments on commit 5699f8d

Please sign in to comment.