-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathwebsocket_manager.py
139 lines (112 loc) · 4.43 KB
/
websocket_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import asyncio
from typing import Dict
from uuid import uuid4
from fastapi import WebSocket
from fastapi_websocket_pubsub import PubSubClient
from shared import WEBHOOKS_URL
from shared.log_config import get_logger
logger = get_logger(__name__)
class WebsocketManager:
"""
A class for managing websocket connections and establishing PubSub callbacks
"""
_clients: Dict[str, PubSubClient] = {}
@staticmethod
async def subscribe(
websocket: WebSocket,
*,
group_id: str = "",
wallet_id: str = "",
topic: str = "",
) -> str:
"""
Subscribe a websocket connection to a specific topic.
Returns a string representing a unique ID for this client
"""
subscribed_topic = group_id # general prefix
if wallet_id:
subscribed_topic += f":{wallet_id}"
if topic:
subscribed_topic += f":{topic}"
async def callback(data: str, topic: str) -> None:
"""
Callback function for handling received webhook events.
"""
logger.debug("Handling Websocket callback on topic: {}", topic)
await websocket.send_text(data)
client = PubSubClient()
logger.debug("Subscribing PubSubClient to `{}`", subscribed_topic)
client.subscribe(subscribed_topic, callback)
await WebsocketManager.start_pubsub_client(client)
uuid = uuid4().hex
WebsocketManager._clients[uuid] = client
logger.debug(
"Successfully started PubSubClient, listening to `{}`", subscribed_topic
)
return uuid
@staticmethod
async def unsubscribe(uuid: str) -> None:
logger.info("Unsubscribing a client")
client = WebsocketManager._clients[uuid]
await WebsocketManager.disconnect(client)
WebsocketManager._clients.pop(uuid)
logger.info("Successfully unsubscribed client")
@staticmethod
async def start_pubsub_client(client: PubSubClient, timeout: float = 5) -> None:
"""
Start listening for webhook events on the Webhooks pubsub endpoint with a specified timeout.
"""
async def ensure_connection_ready() -> None:
"""
Ensure the connection is established before proceeding
"""
websocket_url = convert_url_to_websocket(WEBHOOKS_URL)
client.start_client(websocket_url + "/pubsub")
await client.wait_until_ready()
try:
logger.debug("Starting PubSubClient for new websocket connection")
await asyncio.wait_for(ensure_connection_ready(), timeout=timeout)
except asyncio.TimeoutError as e:
logger.warning("Starting a PubSubClient has timed out after {}s.", timeout)
await WebsocketManager.disconnect(client)
raise WebsocketTimeout("Starting PubSubClient has timed out.") from e
@staticmethod
async def disconnect(client: PubSubClient, timeout: float = 3) -> None:
"""
Shutdown the Websocket client and clear the connections with a specified timeout.
"""
logger.debug("Disconnecting Websocket client")
async def wait_for_disconnect() -> None:
await client.disconnect()
try:
await asyncio.wait_for(wait_for_disconnect(), timeout=timeout)
except asyncio.TimeoutError as e:
logger.warning(
"Disconnecting a PubSubClient has timed out after {}s.", timeout
)
raise WebsocketTimeout("PubSubClient disconnect has timed out.") from e
@staticmethod
async def disconnect_all() -> None:
"""
Disconnect all Websocket clients and clear the connections.
"""
if WebsocketManager._clients:
logger.debug(
"Disconnecting {} Websocket clients", len(WebsocketManager._clients)
)
for client in WebsocketManager._clients.values():
try:
await WebsocketManager.disconnect(client)
except WebsocketTimeout:
continue
WebsocketManager._clients.clear()
class WebsocketTimeout(Exception):
"""Exception raised when Websocket functions time out."""
def convert_url_to_websocket(url: str) -> str:
"""
Convert an HTTP or HTTPS URL to WebSocket (WS or WSS) URL.
"""
if url.startswith("http"):
return "ws" + url[4:]
else:
return url