Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: websocket issues #426

Merged
merged 2 commits into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ setup_requires =
# Note: eth/web3 dependencies updates are sensitive and can trigger a lot of dependency conflicts.
# Update with care. Dependencies that were added to make it all work are annotated accordingly.
install_requires =
aio_pika==9.0.5
aiocache==0.11.1
aiohttp-cors==0.7.0
aiohttp-jinja2==1.5
Expand Down
87 changes: 67 additions & 20 deletions src/aleph/web/controllers/messages.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import logging
from typing import List, Optional, Any, Dict, Iterable

import aio_pika.abc
from aiohttp import web
import aiohttp.web_ws
from aiohttp import web, WSMsgType
from aleph_message.models import MessageType, ItemHash, Chain
from configmanager import Config
from pydantic import BaseModel, Field, validator, ValidationError, root_validator

import aleph.toolkit.json as aleph_json
Expand Down Expand Up @@ -233,21 +234,23 @@ async def view_messages_list(request: web.Request) -> web.Response:
)


async def messages_ws(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)

mq_channel: aio_pika.abc.AbstractChannel = get_mq_channel_from_request(request)
session_factory: DbSessionFactory = get_session_factory_from_request(request)
config = get_config_from_request(request)
async def _message_ws_read_from_queue(
ws: aiohttp.web_ws.WebSocketResponse,
mq_queue: aio_pika.abc.AbstractQueue,
request: web.Request,
) -> None:
"""
Task receiving new aleph.im messages from the processing pipeline to a websocket.

mq_queue = await mq_make_aleph_message_topic_queue(
channel=mq_channel, config=config, routing_key="processed.*"
)
:param ws: Websocket.
:param mq_queue: Message queue object.
:param request: Websocket HTTP request object.
"""

query_params = WsMessageQueryParams.parse_obj(request.query)
find_filters = query_params.dict(exclude_none=True)
session_factory = get_session_factory_from_request(request)

find_filters = query_params.dict(exclude_none=True)
history = query_params.history

if history:
Expand All @@ -262,14 +265,8 @@ async def messages_ws(request: web.Request) -> web.WebSocketResponse:
await ws.send_str(format_message(message).json())

try:
async with mq_queue.iterator() as queue_iter:
async with mq_queue.iterator(no_ack=True) as queue_iter:
async for mq_message in queue_iter:
# Always acknowledge the message
await mq_message.ack()

if ws.closed:
break

item_hash = aleph_json.loads(mq_message.body)["item_hash"]
# A bastardized way to apply the filters on the message as well.
# TODO: put the filter key/values in the RabbitMQ message?
Expand All @@ -284,8 +281,58 @@ async def messages_ws(request: web.Request) -> web.WebSocketResponse:
await ws.send_str(format_message(message).json())

except ConnectionResetError:
# We can detect the WS closing in this task in addition to the main one.
# warning. The main task will also detect the close event.
# We ignore this exception to avoid the "task exception was never retrieved"
LOGGER.info("Cannot send messages because the websocket is closed")
pass

except asyncio.CancelledError:
LOGGER.info("MQ -> WS task cancelled")
raise


async def messages_ws(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)

config = get_config_from_request(request)
mq_channel = get_mq_channel_from_request(request)

mq_queue = await mq_make_aleph_message_topic_queue(
channel=mq_channel, config=config, routing_key="processed.*"
)

# Start a task to handle outgoing traffic to the websocket.
queue_task = asyncio.create_task(
_message_ws_read_from_queue(
ws=ws,
request=request,
mq_queue=mq_queue,
)
)

# Wait for the websocket to close.
try:
while not ws.closed:
# Users can potentially send anything to the websocket. Ignore these messages
# and only handle "close" messages.
ws_msg = await ws.receive()
LOGGER.info("rx ws msg: %s", str(ws_msg))
if ws_msg.type == WSMsgType.CLOSE:
LOGGER.debug("ws close received")
break

finally:
# Cancel the MQ -> ws task
queue_task.cancel()
await asyncio.wait([queue_task])

# Always delete the queue, auto-delete queues are only deleted once the channel is closed
# and that's not meant to happen for the API.
# await mq_queue.purge(no_wait=True)
await mq_queue.delete(if_unused=False, if_empty=False)

return ws


Expand Down
3 changes: 1 addition & 2 deletions src/aleph/web/controllers/p2p.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import json
import logging
from typing import Dict, cast, Optional, Any, Mapping, List, Collection, Union
from typing import Dict, cast, Optional, Any, Mapping, List, Union

import aio_pika.abc
from aiohttp import web
Expand All @@ -23,7 +23,6 @@
get_config_from_request,
get_ipfs_service_from_request,
get_p2p_client_from_request,
get_mq_conn_from_request,
get_mq_channel_from_request,
)
from aleph.web.controllers.utils import mq_make_aleph_message_topic_queue
Expand Down