Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pool recylce #216

Merged
merged 3 commits into from
Oct 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions aiomysql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ def __init__(self, host="localhost", user=None, password="",
self._db = db
self._no_delay = no_delay
self._echo = echo
self._last_usage = self._loop.time()

self._unix_socket = unix_socket
if charset:
Expand Down Expand Up @@ -240,6 +241,11 @@ def echo(self):
"""Return echo mode status."""
return self._echo

@property
def last_usage(self):
"""Return time() when connection was used."""
return self._last_usage

@property
def loop(self):
return self._loop
Expand Down Expand Up @@ -375,6 +381,7 @@ def cursor(self, cursor=None):
:raises TypeError: cursor_class is not a subclass of Cursor.
"""
self._ensure_alive()
self._last_usage = self._loop.time()
if cursor is not None and not issubclass(cursor, Cursor):
raise TypeError('Custom cursor must be subclass of Cursor')

Expand Down
21 changes: 15 additions & 6 deletions aiomysql/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,21 @@
_PoolAcquireContextManager, create_future, create_task)


def create_pool(minsize=1, maxsize=10, echo=False, loop=None, **kwargs):
def create_pool(minsize=1, maxsize=10, echo=False, pool_recycle=-1,
loop=None, **kwargs):
coro = _create_pool(minsize=minsize, maxsize=maxsize, echo=echo,
loop=loop, **kwargs)
pool_recycle=pool_recycle, loop=loop, **kwargs)
return _PoolContextManager(coro)


@asyncio.coroutine
def _create_pool(minsize=1, maxsize=10, echo=False, loop=None, **kwargs):
def _create_pool(minsize=1, maxsize=10, echo=False, pool_recycle=-1,
loop=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()

pool = Pool(minsize=minsize, maxsize=maxsize, echo=echo, loop=loop,
**kwargs)
pool = Pool(minsize=minsize, maxsize=maxsize, echo=echo,
pool_recycle=pool_recycle, loop=loop, **kwargs)
if minsize > 0:
with (yield from pool._cond):
yield from pool._fill_free_pool(False)
Expand All @@ -32,7 +34,7 @@ def _create_pool(minsize=1, maxsize=10, echo=False, loop=None, **kwargs):
class Pool(asyncio.AbstractServer):
"""Connection pool"""

def __init__(self, minsize, maxsize, echo, loop, **kwargs):
def __init__(self, minsize, maxsize, echo, pool_recycle, loop, **kwargs):
if minsize < 0:
raise ValueError("minsize should be zero or greater")
if maxsize < minsize:
Expand All @@ -48,6 +50,7 @@ def __init__(self, minsize, maxsize, echo, loop, **kwargs):
self._closing = False
self._closed = False
self._echo = echo
self._recycle = pool_recycle

@property
def echo(self):
Expand Down Expand Up @@ -153,6 +156,12 @@ def _fill_free_pool(self, override_min):
if conn._reader.at_eof():
self._free.pop()
conn.close()

elif (self._recycle > -1 and
self._loop.time() - conn.last_usage > self._recycle):
self._free.pop()
conn.close()

else:
self._free.rotate()
n += 1
Expand Down
9 changes: 5 additions & 4 deletions aiomysql/sa/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,27 @@


def create_engine(minsize=1, maxsize=10, loop=None,
dialect=_dialect, **kwargs):
dialect=_dialect, pool_recycle=-1, **kwargs):
"""A coroutine for Engine creation.

Returns Engine instance with embedded connection pool.

The pool has *minsize* opened connections to PostgreSQL server.
"""
coro = _create_engine(minsize=minsize, maxsize=maxsize, loop=loop,
dialect=dialect, **kwargs)
dialect=dialect, pool_recycle=pool_recycle, **kwargs)
return _EngineContextManager(coro)


@asyncio.coroutine
def _create_engine(minsize=1, maxsize=10, loop=None,
dialect=_dialect, **kwargs):
dialect=_dialect, pool_recycle=-1, **kwargs):

if loop is None:
loop = asyncio.get_event_loop()
pool = yield from aiomysql.create_pool(minsize=minsize, maxsize=maxsize,
loop=loop, **kwargs)
loop=loop,
pool_recycle=pool_recycle, **kwargs)
conn = yield from pool.acquire()
try:
return Engine(dialect, pool, **kwargs)
Expand Down
21 changes: 21 additions & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,3 +497,24 @@ def test_cancelled_connection(pool_creator, loop):
res = yield from cur2.fetchall()
# If we receive [(1, 0)] - we retrieved old cursor's values
assert list(res) == [(2, 0)]


@asyncio.coroutine
def test_pool_with_connection_recycling(pool_creator, loop):
pool = yield from pool_creator(minsize=1,
maxsize=1,
pool_recycle=3)
with (yield from pool) as conn:
cur = yield from conn.cursor()
yield from cur.execute('SELECT 1;')
val = yield from cur.fetchone()
assert (1,) == val

yield from asyncio.sleep(5, loop=loop)

assert 1 == pool.freesize
with (yield from pool) as conn:
cur = yield from conn.cursor()
yield from cur.execute('SELECT 1;')
val = yield from cur.fetchone()
assert (1,) == val