Skip to content

Commit

Permalink
Fix "Unable to detect disconnect when using NOTIFY/LISTEN", Closes ai…
Browse files Browse the repository at this point in the history
  • Loading branch information
Gustavo Carneiro committed Apr 9, 2019
1 parent 9fdf7b9 commit 72e34d5
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 2 deletions.
5 changes: 3 additions & 2 deletions aiopg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)

from .cursor import Cursor
from .utils import _ContextManager, create_future
from .utils import _ContextManager, create_future, ClosableQueue

__all__ = ('connect',)

Expand Down Expand Up @@ -121,7 +121,7 @@ def __init__(self, dsn, loop, timeout, waiter, echo, **kwargs):
self._cancellation_waiter = None
self._echo = echo
self._cursor_instance = None
self._notifies = asyncio.Queue(loop=loop)
self._notifies = ClosableQueue(loop=loop)
self._weakref = weakref.ref(self)
self._loop.add_reader(self._fileno, self._ready, self._weakref)

Expand Down Expand Up @@ -164,6 +164,7 @@ def _ready(weak_self):
# chain exception otherwise
exc2.__cause__ = exc
exc = exc2
self.notifies.close(exc)
if waiter is not None and not waiter.done():
waiter.set_exception(exc)
else:
Expand Down
27 changes: 27 additions & 0 deletions aiopg/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,30 @@ def __exit__(self, *args):
self._pool = None
self._conn = None
self._cur = None


class ClosableQueue(asyncio.Queue):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__close_exception = None
self.__close_event = asyncio.Event()

def close(self, exception):
self.__close_exception = exception
self.__close_event.set()

async def get(self):
get = self._loop.create_task(super().get())
closed = self._loop.create_task(self.__close_event.wait())
_, pending = await asyncio.wait([get, closed],
return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
if get.done():
return get.result()
assert closed.done()
ex = self.__close_exception
self.__close_exception = None
self.__close_event.clear()
raise ex
35 changes: 35 additions & 0 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,3 +643,38 @@ async def test_connection_on_server_restart(connect, pg_server, docker):
delay *= 2
else:
pytest.fail("Cannot connect to the restarted server")


async def test_connection_notify_on_server_restart(connect, pg_server, docker,
loop):
conn = await connect()

async def read_notifies():
while True:
await conn.notifies.get()

reader = loop.create_task(read_notifies())
await asyncio.sleep(0.1)

docker.restart(container=pg_server['Id'])

try:
with pytest.raises(psycopg2.OperationalError):
await asyncio.wait_for(reader, 10)
finally:
conn.close()
reader.cancel()

# Wait for postgres to be up and running again before moving on
# so as the restart won't affect other tests
delay = 0.001
for i in range(100):
try:
conn = await connect()
conn.close()
break
except psycopg2.Error:
time.sleep(delay)
delay *= 2
else:
pytest.fail("Cannot connect to the restarted server")
30 changes: 30 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import asyncio
from aiopg.utils import ClosableQueue


async def test_closable_queue_noclose():
queue = ClosableQueue()
await queue.put(1)
v = await queue.get()
assert v == 1


async def test_closable_queue_close(loop):
queue = ClosableQueue()
v1 = None

async def read():
nonlocal v1
v1 = await queue.get()
await queue.get()

reader = loop.create_task(read())
await queue.put(1)
await asyncio.sleep(0.1)
assert v1 == 1

queue.close(RuntimeError("connection closed"))
try:
await reader
except RuntimeError as ex:
assert ex.args == ("connection closed",)

0 comments on commit 72e34d5

Please sign in to comment.