Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pool_recycle keyword argument to Pool. #366

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions aiopg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,16 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs):
self._writing = False
self._cancelling = False
self._cancellation_waiter = None
self._last_free_time = None
self._echo = echo
self._notifies = asyncio.Queue(loop=loop)
self._weakref = weakref.ref(self)
self._loop.add_reader(self._fileno, self._ready, self._weakref)
if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))

self.free()

@staticmethod
def _ready(weak_self):
self = weak_self()
Expand Down Expand Up @@ -497,6 +500,15 @@ def echo(self):
"""Return echo mode status."""
return self._echo

@property
def last_free_time(self):
"""Return the last time the `free` method was called."""
return self._last_free_time

def free(self):
"""Update last free time to the current time."""
self._last_free_time = self._loop.time()

if PY_341: # pragma: no branch
def __del__(self):
try:
Expand Down
22 changes: 17 additions & 5 deletions aiopg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,29 @@
def create_pool(dsn=None, *, minsize=1, maxsize=10,
loop=None, timeout=TIMEOUT,
enable_json=True, enable_hstore=True, enable_uuid=True,
echo=False, on_connect=None,
echo=False, on_connect=None, pool_recycle=-1,
**kwargs):
coro = _create_pool(dsn=dsn, minsize=minsize, maxsize=maxsize, loop=loop,
timeout=timeout, enable_json=enable_json,
enable_hstore=enable_hstore, enable_uuid=enable_uuid,
echo=echo, on_connect=on_connect, **kwargs)
echo=echo, on_connect=on_connect,
pool_recycle=pool_recycle, **kwargs)
return _PoolContextManager(coro)


@asyncio.coroutine
def _create_pool(dsn=None, *, minsize=1, maxsize=10,
loop=None, timeout=TIMEOUT,
enable_json=True, enable_hstore=True, enable_uuid=True,
echo=False, on_connect=None,
echo=False, on_connect=None, pool_recycle=-1,
**kwargs):
if loop is None:
loop = asyncio.get_event_loop()

pool = Pool(dsn, minsize, maxsize, loop, timeout,
enable_json=enable_json, enable_hstore=enable_hstore,
enable_uuid=enable_uuid, echo=echo, on_connect=on_connect,
**kwargs)
pool_recycle=pool_recycle, **kwargs)
if minsize > 0:
with (yield from pool._cond):
yield from pool._fill_free_pool(False)
Expand All @@ -52,7 +53,7 @@ class Pool(asyncio.AbstractServer):

def __init__(self, dsn, minsize, maxsize, loop, timeout, *,
enable_json, enable_hstore, enable_uuid, echo,
on_connect, **kwargs):
on_connect, pool_recycle, **kwargs):
if minsize < 0:
raise ValueError("minsize should be zero or greater")
if maxsize < minsize and maxsize != 0:
Expand All @@ -66,6 +67,7 @@ def __init__(self, dsn, minsize, maxsize, loop, timeout, *,
self._enable_uuid = enable_uuid
self._echo = echo
self._on_connect = on_connect
self._pool_recycle = pool_recycle
self._conn_kwargs = kwargs
self._acquiring = 0
self._free = collections.deque(maxlen=maxsize or None)
Expand Down Expand Up @@ -183,8 +185,15 @@ def _acquire(self):
def _fill_free_pool(self, override_min):
# iterate over free connections and remove timeouted ones
n, free = 0, len(self._free)
now = self._loop.time()
while n < free:
conn = self._free[-1]

# Close connections that have lived longer than the recycle limit.
unused_duration = now - conn.last_free_time
if 0 < self._pool_recycle < unused_duration:
conn.close()

if conn.closed:
self._free.pop()
else:
Expand All @@ -203,6 +212,7 @@ def _fill_free_pool(self, override_min):
**self._conn_kwargs)
# raise exception if pool is closing
self._free.append(conn)
conn.free()
self._cond.notify()
finally:
self._acquiring -= 1
Expand All @@ -221,6 +231,7 @@ def _fill_free_pool(self, override_min):
**self._conn_kwargs)
# raise exception if pool is closing
self._free.append(conn)
conn.free()
self._cond.notify()
finally:
self._acquiring -= 1
Expand Down Expand Up @@ -253,6 +264,7 @@ def release(self, conn):
conn.close()
else:
self._free.append(conn)
conn.free()
fut = ensure_future(self._wakeup(), loop=self._loop)
return fut

Expand Down
22 changes: 22 additions & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,3 +588,25 @@ def cb(connection):
yield from cur.execute('SELECT 1')

assert called


@asyncio.coroutine
def test_pool_recycle_noop_without_pool_recycle(create_pool, loop):
pool = yield from create_pool(minsize=3)

conn = yield from pool.acquire()
yield from pool.release(conn)
yield from asyncio.sleep(0.02, loop=loop)
yield from pool.acquire()
assert all([not c.closed for c in pool._free if conn is c])


@asyncio.coroutine
def test_pool_recycle_closes_timed_out_engines(create_pool, loop):
pool = yield from create_pool(minsize=3, pool_recycle=0.01)

conn = yield from pool.acquire()
yield from pool.release(conn)
yield from asyncio.sleep(0.02, loop=loop)
yield from pool.acquire()
assert all([c.closed for c in pool._free if conn is c])