From 447ed603832f65060651b0bf4c8d0707a45ca54f Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Mon, 1 May 2023 18:16:41 +0200 Subject: [PATCH 1/2] Fix: avoid message ack exceptions in message WS Problem: when a large number of websockets are open simultaneously, at some point a message acknowledgement is not performed and the channel/connection gets closed by the RabbitMQ broker. Solution: use the WS in no_ack mode, we do not benefit from acknowledgements in this use case anyway. --- src/aleph/web/controllers/messages.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/aleph/web/controllers/messages.py b/src/aleph/web/controllers/messages.py index 3eda18589..426faed3c 100644 --- a/src/aleph/web/controllers/messages.py +++ b/src/aleph/web/controllers/messages.py @@ -262,10 +262,8 @@ async def messages_ws(request: web.Request) -> web.WebSocketResponse: await ws.send_str(format_message(message).json()) try: - async with mq_queue.iterator() as queue_iter: + async with mq_queue.iterator(no_ack=True) as queue_iter: async for mq_message in queue_iter: - # Always acknowledge the message - await mq_message.ack() if ws.closed: break From 885cf4fe113eff8bbc7cddacc27dfe6664a47769 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Fri, 28 Apr 2023 12:00:48 +0200 Subject: [PATCH 2/2] Fix: properly terminate websocket requests Problem: Websocket requests to api/ws0/messages never terminate, resulting in resources leaking (ex: RabbitMQ queues). Solution: split the implementation in two tasks: * the main task reads the websocket and expects a `close` message. * the secondary task publishes messages from the RabbitMQ queue to the websocket. Once a close event is detected, the main task cancels the secondary task and deletes the queue. --- setup.cfg | 1 + src/aleph/web/controllers/messages.py | 83 +++++++++++++++++++++------ src/aleph/web/controllers/p2p.py | 3 +- 3 files changed, 68 insertions(+), 19 deletions(-) diff --git a/setup.cfg b/setup.cfg index 6d8134e91..ac97faaed 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,6 +33,7 @@ setup_requires = # Note: eth/web3 dependencies updates are sensitive and can trigger a lot of dependency conflicts. # Update with care. Dependencies that were added to make it all work are annotated accordingly. install_requires = + aio_pika==9.0.5 aiocache==0.11.1 aiohttp-cors==0.7.0 aiohttp-jinja2==1.5 diff --git a/src/aleph/web/controllers/messages.py b/src/aleph/web/controllers/messages.py index 426faed3c..745530a1b 100644 --- a/src/aleph/web/controllers/messages.py +++ b/src/aleph/web/controllers/messages.py @@ -1,10 +1,11 @@ +import asyncio import logging from typing import List, Optional, Any, Dict, Iterable import aio_pika.abc -from aiohttp import web +import aiohttp.web_ws +from aiohttp import web, WSMsgType from aleph_message.models import MessageType, ItemHash, Chain -from configmanager import Config from pydantic import BaseModel, Field, validator, ValidationError, root_validator import aleph.toolkit.json as aleph_json @@ -233,21 +234,23 @@ async def view_messages_list(request: web.Request) -> web.Response: ) -async def messages_ws(request: web.Request) -> web.WebSocketResponse: - ws = web.WebSocketResponse() - await ws.prepare(request) - - mq_channel: aio_pika.abc.AbstractChannel = get_mq_channel_from_request(request) - session_factory: DbSessionFactory = get_session_factory_from_request(request) - config = get_config_from_request(request) +async def _message_ws_read_from_queue( + ws: aiohttp.web_ws.WebSocketResponse, + mq_queue: aio_pika.abc.AbstractQueue, + request: web.Request, +) -> None: + """ + Task receiving new aleph.im messages from the processing pipeline to a websocket. - mq_queue = await mq_make_aleph_message_topic_queue( - channel=mq_channel, config=config, routing_key="processed.*" - ) + :param ws: Websocket. + :param mq_queue: Message queue object. + :param request: Websocket HTTP request object. + """ query_params = WsMessageQueryParams.parse_obj(request.query) - find_filters = query_params.dict(exclude_none=True) + session_factory = get_session_factory_from_request(request) + find_filters = query_params.dict(exclude_none=True) history = query_params.history if history: @@ -264,10 +267,6 @@ async def messages_ws(request: web.Request) -> web.WebSocketResponse: try: async with mq_queue.iterator(no_ack=True) as queue_iter: async for mq_message in queue_iter: - - if ws.closed: - break - item_hash = aleph_json.loads(mq_message.body)["item_hash"] # A bastardized way to apply the filters on the message as well. # TODO: put the filter key/values in the RabbitMQ message? @@ -282,8 +281,58 @@ async def messages_ws(request: web.Request) -> web.WebSocketResponse: await ws.send_str(format_message(message).json()) except ConnectionResetError: + # We can detect the WS closing in this task in addition to the main one. + # warning. The main task will also detect the close event. + # We ignore this exception to avoid the "task exception was never retrieved" + LOGGER.info("Cannot send messages because the websocket is closed") pass + except asyncio.CancelledError: + LOGGER.info("MQ -> WS task cancelled") + raise + + +async def messages_ws(request: web.Request) -> web.WebSocketResponse: + ws = web.WebSocketResponse() + await ws.prepare(request) + + config = get_config_from_request(request) + mq_channel = get_mq_channel_from_request(request) + + mq_queue = await mq_make_aleph_message_topic_queue( + channel=mq_channel, config=config, routing_key="processed.*" + ) + + # Start a task to handle outgoing traffic to the websocket. + queue_task = asyncio.create_task( + _message_ws_read_from_queue( + ws=ws, + request=request, + mq_queue=mq_queue, + ) + ) + + # Wait for the websocket to close. + try: + while not ws.closed: + # Users can potentially send anything to the websocket. Ignore these messages + # and only handle "close" messages. + ws_msg = await ws.receive() + LOGGER.info("rx ws msg: %s", str(ws_msg)) + if ws_msg.type == WSMsgType.CLOSE: + LOGGER.debug("ws close received") + break + + finally: + # Cancel the MQ -> ws task + queue_task.cancel() + await asyncio.wait([queue_task]) + + # Always delete the queue, auto-delete queues are only deleted once the channel is closed + # and that's not meant to happen for the API. + # await mq_queue.purge(no_wait=True) + await mq_queue.delete(if_unused=False, if_empty=False) + return ws diff --git a/src/aleph/web/controllers/p2p.py b/src/aleph/web/controllers/p2p.py index 6ee0ba88a..f44753eab 100644 --- a/src/aleph/web/controllers/p2p.py +++ b/src/aleph/web/controllers/p2p.py @@ -1,7 +1,7 @@ import asyncio import json import logging -from typing import Dict, cast, Optional, Any, Mapping, List, Collection, Union +from typing import Dict, cast, Optional, Any, Mapping, List, Union import aio_pika.abc from aiohttp import web @@ -23,7 +23,6 @@ get_config_from_request, get_ipfs_service_from_request, get_p2p_client_from_request, - get_mq_conn_from_request, get_mq_channel_from_request, ) from aleph.web.controllers.utils import mq_make_aleph_message_topic_queue