Skip to content

Commit

Permalink
Merge pull request #1 from soar/feature/connection-recycling
Browse files Browse the repository at this point in the history
connection recycling
  • Loading branch information
soar authored Sep 1, 2017
2 parents 983ecc3 + 8f5deab commit 9bdff25
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 15 deletions.
5 changes: 2 additions & 3 deletions aiopg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import errno
import select
import sys
import time
import traceback
import warnings
import weakref
Expand Down Expand Up @@ -112,7 +111,7 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs):
assert self._conn.isexecuting(), "Is conn an async at all???"
self._fileno = self._conn.fileno()
self._timeout = timeout
self._last_usage = time.time()
self._last_usage = self._loop.time()
self._waiter = waiter
self._writing = False
self._cancelling = False
Expand Down Expand Up @@ -266,7 +265,7 @@ def cursor(self, name=None, cursor_factory=None,
psycopg in asynchronous mode.
"""
self._last_usage = time.time()
self._last_usage = self._loop.time()
coro = self._cursor(name=name, cursor_factory=cursor_factory,
scrollable=scrollable, withhold=withhold,
timeout=timeout)
Expand Down
15 changes: 7 additions & 8 deletions aiopg/pool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import collections
import sys
import time
import warnings


Expand All @@ -18,12 +17,12 @@


def create_pool(dsn=None, *, minsize=1, maxsize=10,
loop=None, timeout=TIMEOUT, recycle=-1,
loop=None, timeout=TIMEOUT, pool_recycle=-1,
enable_json=True, enable_hstore=True, enable_uuid=True,
echo=False, on_connect=None,
**kwargs):
coro = _create_pool(dsn=dsn, minsize=minsize, maxsize=maxsize, loop=loop,
timeout=timeout, recycle=recycle,
timeout=timeout, pool_recycle=pool_recycle,
enable_json=enable_json, enable_hstore=enable_hstore,
enable_uuid=enable_uuid, echo=echo,
on_connect=on_connect, **kwargs)
Expand All @@ -32,7 +31,7 @@ def create_pool(dsn=None, *, minsize=1, maxsize=10,

@asyncio.coroutine
def _create_pool(dsn=None, *, minsize=1, maxsize=10,
loop=None, timeout=TIMEOUT, recycle=-1,
loop=None, timeout=TIMEOUT, pool_recycle=-1,
enable_json=True, enable_hstore=True, enable_uuid=True,
echo=False, on_connect=None,
**kwargs):
Expand All @@ -42,7 +41,7 @@ def _create_pool(dsn=None, *, minsize=1, maxsize=10,
pool = Pool(dsn, minsize, maxsize, loop, timeout,
enable_json=enable_json, enable_hstore=enable_hstore,
enable_uuid=enable_uuid, echo=echo, on_connect=on_connect,
recycle=recycle, **kwargs)
pool_recycle=pool_recycle, **kwargs)
if minsize > 0:
with (yield from pool._cond):
yield from pool._fill_free_pool(False)
Expand All @@ -54,7 +53,7 @@ class Pool(asyncio.AbstractServer):

def __init__(self, dsn, minsize, maxsize, loop, timeout, *,
enable_json, enable_hstore, enable_uuid, echo,
on_connect, recycle, **kwargs):
on_connect, pool_recycle, **kwargs):
if minsize < 0:
raise ValueError("minsize should be zero or greater")
if maxsize < minsize and maxsize != 0:
Expand All @@ -63,7 +62,7 @@ def __init__(self, dsn, minsize, maxsize, loop, timeout, *,
self._minsize = minsize
self._loop = loop
self._timeout = timeout
self._recycle = recycle
self._recycle = pool_recycle
self._enable_json = enable_json
self._enable_hstore = enable_hstore
self._enable_uuid = enable_uuid
Expand Down Expand Up @@ -191,7 +190,7 @@ def _fill_free_pool(self, override_min):
if conn.closed:
self._free.pop()
elif self._recycle > -1 \
and time.time() - conn.last_usage > self._recycle:
and self._loop.time() - conn.last_usage > self._recycle:
conn.close()
self._free.pop()
else:
Expand Down
10 changes: 6 additions & 4 deletions aiopg/sa/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def _exec_default(self, default):


def create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None,
dialect=_dialect, timeout=TIMEOUT, recycle=-1, **kwargs):
dialect=_dialect, timeout=TIMEOUT, pool_recycle=-1,
**kwargs):
"""A coroutine for Engine creation.
Returns Engine instance with embedded connection pool.
Expand All @@ -54,18 +55,19 @@ def create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None,

coro = _create_engine(dsn=dsn, minsize=minsize, maxsize=maxsize,
loop=loop, dialect=dialect, timeout=timeout,
recycle=recycle, **kwargs)
pool_recycle=pool_recycle, **kwargs)
return _EngineContextManager(coro)


@asyncio.coroutine
def _create_engine(dsn=None, *, minsize=1, maxsize=10, loop=None,
dialect=_dialect, timeout=TIMEOUT, recycle=-1, **kwargs):
dialect=_dialect, timeout=TIMEOUT, pool_recycle=-1,
**kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pool = yield from aiopg.create_pool(dsn, minsize=minsize, maxsize=maxsize,
loop=loop, timeout=timeout,
recycle=recycle, **kwargs)
pool_recycle=pool_recycle, **kwargs)
conn = yield from pool.acquire()
try:
real_dsn = conn.dsn
Expand Down
21 changes: 21 additions & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,27 @@ def sleep(conn):
assert (1,) == val


@asyncio.coroutine
def test_pool_with_connection_recycling(create_pool, loop):
pool = yield from create_pool(minsize=1,
maxsize=1,
pool_recycle=3)
with (yield from pool) as conn:
cur = yield from conn.cursor()
yield from cur.execute('SELECT 1;')
val = yield from cur.fetchone()
assert (1,) == val

yield from asyncio.sleep(5, loop=loop)

assert 1 == pool.freesize
with (yield from pool) as conn:
cur = yield from conn.cursor()
yield from cur.execute('SELECT 1;')
val = yield from cur.fetchone()
assert (1,) == val


@asyncio.coroutine
def test_connection_in_good_state_after_timeout_in_transaction(create_pool):
@asyncio.coroutine
Expand Down

0 comments on commit 9bdff25

Please sign in to comment.