Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to detect disconnect when using NOTIFY/LISTEN #249

Closed
danielnelson opened this issue Jan 4, 2017 · 2 comments
Closed

Unable to detect disconnect when using NOTIFY/LISTEN #249

danielnelson opened this issue Jan 4, 2017 · 2 comments

Comments

@danielnelson
Copy link

I want to ensure that if my database connection goes down that I can recover, but when listening on a notification channel there is no error raised.

I'm using this for testing, it sends a notification every couple seconds. If you start this and then shutdown the database, the NOTIFY commands reconnect but the listen task is unaware that it has lost its connection.

async def notify(pool):
    while True:
        try:
            async with pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute("NOTIFY channel")
                    print('NOTIFY channel')
                    await asyncio.sleep(2)
        except Exception as exc:
            print('NOTIFY error %s' % exc)
            await asyncio.sleep(2)


async def listen(pool):
    try:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("LISTEN channel")
                print('LISTEN channel')
                while True:
                    await conn.notifies.get()
                    print('Notified')
    except Exception as exc:
        print('LISTEN error %s' % exc)


async def main():
    async with aiopg.create_pool(dsn) as pool:
        listener = listen(pool)
        notifier = notify(pool)
        await asyncio.gather(listener, notifier)
    print("ALL DONE")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
@RekGRpth
Copy link

I solve it so:

diff --git a/aiopg/connection.py b/aiopg/connection.py
index 8718bec..8086db9 100755
--- a/aiopg/connection.py
+++ b/aiopg/connection.py
@@ -160,6 +160,7 @@ class Connection:
                     exc = exc2
             if waiter is not None and not waiter.done():
                 waiter.set_exception(exc)
+            self._notifies.put_nowait(None)
         else:
             if self._fileno is None:
                 # connection closed

and code:

<     try:
<         async with pool.acquire() as conn:
<             async with conn.cursor() as cur:
<                 await cur.execute("LISTEN channel")
<                 print('LISTEN channel')
<                 while True:
<                     await conn.notifies.get()
<                     print('Notified')
<     except Exception as exc:
<         print('LISTEN error %s' % exc)
---
>     while True:
>         try:
>             async with pool.acquire() as conn:
>                 async with conn.cursor() as cur:
>                     await cur.execute("LISTEN channel")
>                     print('LISTEN channel')
>                     while True:
>                         message = await conn.notifies.get()
>                         if message is None: break
>                         print('Notified')
>         except Exception as exc:
>             print('LISTEN error %s' % exc)

full my code:

import asyncio
import aiopg
import datetime

def hook(func):
    async def wrapped(*args, **vars):
        while True:
            try: await func(*args, **vars)
            except Exception as exc:
                if exc.__class__ == RuntimeError and exc.args[0] == 'Event loop is closed': return
                print('%s:%s' % (func.__name__, exc))
                await asyncio.sleep(2)
    return wrapped

@hook
async def notify(pool):
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            sql = '''NOTIFY channel, '%s';''' % datetime.datetime.now()
            print(sql)
            await cur.execute(sql)
            await asyncio.sleep(2)

@hook
async def listen(pool):
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            sql = 'LISTEN channel;'
            print(sql)
            await cur.execute(sql)
            while True:
                message = await conn.notifies.get()
                print(message)
                if message is None: break

async def main():
    async with aiopg.create_pool('') as pool: await asyncio.gather(listen(pool), notify(pool))

try: asyncio.get_event_loop().run_until_complete(main())
except KeyboardInterrupt: asyncio.get_event_loop().stop()

@sanderegg
Copy link

Hello, I also happen to use the listen/notify pattern from aiopg in a aiohttp-based application , when the postgres instance is restarted for whatever reason, there is no raised exception and my "listening" code hangs there forever (see example below). Is there a fix deployed to this end? or am I doing something wrong? this is using aiopg 1.0.0

async def listen(app: web.Application):
    listen_query = f"LISTEN {DB_CHANNEL_NAME};"
    db_engine: Engine = app[APP_DB_ENGINE_KEY]
    async with db_engine.acquire() as conn:
        await conn.execute(listen_query)

        while True:
            # NOTE: this waits for a new notification so the engine is locked here
            notification = await conn.connection.notifies.get()
            log.debug(
                "received update from database: %s", pformat(notification.payload)
            )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants