Skip to content

Commit

Permalink
Add max_inactive_connection_lifetime parameter to Pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
1st1 committed Mar 31, 2017
1 parent 12cce92 commit a2935ae
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 6 deletions.
2 changes: 2 additions & 0 deletions asyncpg/_testbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def create_pool(dsn=None, *,
min_size=10,
max_size=10,
max_queries=50000,
max_inactive_connection_lifetime=60.0,
setup=None,
init=None,
loop=None,
Expand All @@ -166,6 +167,7 @@ def create_pool(dsn=None, *,
dsn,
min_size=min_size, max_size=max_size,
max_queries=max_queries, loop=loop, setup=setup, init=init,
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
**connect_kwargs)


Expand Down
55 changes: 49 additions & 6 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,24 @@ class PoolConnectionHolder:

__slots__ = ('_con', '_pool', '_loop',
'_connect_args', '_connect_kwargs',
'_max_queries', '_setup', '_init')
'_max_queries', '_setup', '_init',
'_max_inactive_time', '_in_use',
'_inactive_callback')

def __init__(self, pool, *, connect_args, connect_kwargs,
max_queries, setup, init):
max_queries, setup, init, max_inactive_time):

self._pool = pool
self._con = None

self._connect_args = connect_args
self._connect_kwargs = connect_kwargs
self._max_queries = max_queries
self._max_inactive_time = max_inactive_time
self._setup = setup
self._init = init
self._inactive_callback = None
self._in_use = False

async def connect(self):
assert self._con is None
Expand Down Expand Up @@ -134,6 +139,8 @@ async def acquire(self) -> PoolConnectionProxy:
if self._con is None:
await self.connect()

self._maybe_cancel_inactive_callback()

proxy = PoolConnectionProxy(self, self._con)

if self._setup is not None:
Expand All @@ -154,9 +161,13 @@ async def acquire(self) -> PoolConnectionProxy:
self._con = None
raise ex

self._in_use = True
return proxy

async def release(self):
assert self._in_use
self._in_use = False

if self._con.is_closed():
self._con = None

Expand All @@ -181,7 +192,13 @@ async def release(self):
self._con = None
raise ex

assert self._inactive_callback is None
if self._max_inactive_time and self._con is not None:
self._inactive_callback = self._pool._loop.call_later(
self._max_inactive_time, self._deactivate_connection)

async def close(self):
self._maybe_cancel_inactive_callback()
if self._con is None:
return
if self._con.is_closed():
Expand All @@ -194,6 +211,7 @@ async def close(self):
self._con = None

def terminate(self):
self._maybe_cancel_inactive_callback()
if self._con is None:
return
if self._con.is_closed():
Expand All @@ -205,6 +223,18 @@ def terminate(self):
finally:
self._con = None

def _maybe_cancel_inactive_callback(self):
if self._inactive_callback is not None:
self._inactive_callback.cancel()
self._inactive_callback = None

def _deactivate_connection(self):
assert not self._in_use
if self._con is None or self._con.is_closed():
return
self._con.terminate()
self._con = None


class Pool:
"""A connection pool.
Expand All @@ -225,6 +255,7 @@ def __init__(self, *connect_args,
min_size,
max_size,
max_queries,
max_inactive_connection_lifetime,
setup,
init,
loop,
Expand All @@ -247,6 +278,11 @@ def __init__(self, *connect_args,
if max_queries <= 0:
raise ValueError('max_queries is expected to be greater than zero')

if max_inactive_connection_lifetime < 0:
raise ValueError(
'max_inactive_connection_lifetime is expected to be greater '
'or equal to zero')

self._minsize = min_size
self._maxsize = max_size

Expand All @@ -265,6 +301,7 @@ def __init__(self, *connect_args,
connect_args=connect_args,
connect_kwargs=connect_kwargs,
max_queries=max_queries,
max_inactive_time=max_inactive_connection_lifetime,
setup=setup,
init=init)

Expand Down Expand Up @@ -511,6 +548,7 @@ def create_pool(dsn=None, *,
min_size=10,
max_size=10,
max_queries=50000,
max_inactive_connection_lifetime=300.0,
setup=None,
init=None,
loop=None,
Expand Down Expand Up @@ -548,6 +586,9 @@ def create_pool(dsn=None, *,
:param int max_size: Max number of connections in the pool.
:param int max_queries: Number of queries after a connection is closed
and replaced with a new connection.
:param float max_inactive_connection_lifetime:
Number of seconds after which inactive connections in the
pool will be closed. Pass ``0`` to disable this mechanism.
:param coroutine setup: A coroutine to prepare a connection right before
it is returned from :meth:`~pool.Pool.acquire`.
An example use case would be to automatically
Expand All @@ -567,7 +608,9 @@ def create_pool(dsn=None, *,
An :exc:`~asyncpg.exceptions.InterfaceError` will be raised on any
attempted operation on a released connection.
"""
return Pool(dsn,
min_size=min_size, max_size=max_size,
max_queries=max_queries, loop=loop, setup=setup, init=init,
**connect_kwargs)
return Pool(
dsn,
min_size=min_size, max_size=max_size,
max_queries=max_queries, loop=loop, setup=setup, init=init,
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
**connect_kwargs)
94 changes: 94 additions & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os
import platform
import random
import time
import unittest

from asyncpg import _testbase as tb
Expand Down Expand Up @@ -457,6 +458,99 @@ async def worker(pool):
finally:
await pool.execute('DROP TABLE exmany')

async def test_pool_max_inactive_time_01(self):
async with self.create_pool(
database='postgres', min_size=1, max_size=1,
max_inactive_connection_lifetime=0.1) as pool:

# Test that it's OK if a query takes longer time to execute
# than `max_inactive_connection_lifetime`.

con = pool._holders[0]._con

for _ in range(3):
await pool.execute('SELECT pg_sleep(0.5)')
self.assertIs(pool._holders[0]._con, con)

self.assertEqual(
await pool.execute('SELECT 1::int'),
'SELECT 1')
self.assertIs(pool._holders[0]._con, con)

async def test_pool_max_inactive_time_02(self):
async with self.create_pool(
database='postgres', min_size=1, max_size=1,
max_inactive_connection_lifetime=0.5) as pool:

# Test that we have a new connection after pool not
# being used longer than `max_inactive_connection_lifetime`.

con = pool._holders[0]._con

self.assertEqual(
await pool.execute('SELECT 1::int'),
'SELECT 1')
self.assertIs(pool._holders[0]._con, con)

await asyncio.sleep(1, loop=self.loop)
self.assertIs(pool._holders[0]._con, None)

self.assertEqual(
await pool.execute('SELECT 1::int'),
'SELECT 1')
self.assertIsNot(pool._holders[0]._con, con)

async def test_pool_max_inactive_time_03(self):
async with self.create_pool(
database='postgres', min_size=1, max_size=1,
max_inactive_connection_lifetime=1) as pool:

# Test that we start counting inactive time *after*
# the connection is being released back to the pool.

con = pool._holders[0]._con

await pool.execute('SELECT pg_sleep(0.5)')
await asyncio.sleep(0.6, loop=self.loop)

self.assertIs(pool._holders[0]._con, con)

self.assertEqual(
await pool.execute('SELECT 1::int'),
'SELECT 1')
self.assertIs(pool._holders[0]._con, con)

async def test_pool_max_inactive_time_04(self):
# Chaos test for max_inactive_connection_lifetime.
DURATION = 2.0
START = time.monotonic()
N = 0

async def worker(pool):
nonlocal N
await asyncio.sleep(random.random() / 10 + 0.1, loop=self.loop)
async with pool.acquire() as con:
if random.random() > 0.5:
await con.execute('SELECT pg_sleep({:.2f})'.format(
random.random() / 10))
self.assertEqual(
await con.fetchval('SELECT 42::int'),
42)

if time.monotonic() - START < DURATION:
await worker(pool)

N += 1

async with self.create_pool(
database='postgres', min_size=10, max_size=30,
max_inactive_connection_lifetime=0.1) as pool:

workers = [worker(pool) for _ in range(50)]
await asyncio.gather(*workers, loop=self.loop)

self.assertGreater(N, 50)


@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
class TestHostStandby(tb.ConnectedTestCase):
Expand Down

0 comments on commit a2935ae

Please sign in to comment.