Skip to content

Commit

Permalink
fix: websocket provider receives nothing forever when the peer is clo…
Browse files Browse the repository at this point in the history
…sed or gone
  • Loading branch information
kaiix authored and fselmo committed Jul 12, 2024
1 parent 616212e commit 9a70e5b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
21 changes: 12 additions & 9 deletions web3/_utils/module_testing/module_testing_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from collections import (
deque,
)
import pytest
import time
from typing import (
Expand All @@ -9,6 +6,7 @@
Collection,
Dict,
Generator,
Optional,
Sequence,
Union,
)
Expand All @@ -35,6 +33,7 @@
)
from web3._utils.request import (
async_cache_and_return_session,
asyncio,
cache_and_return_session,
)
from web3.types import (
Expand Down Expand Up @@ -188,9 +187,13 @@ class WebsocketMessageStreamMock:
closed: bool = False

def __init__(
self, messages: Collection[bytes] = None, raise_exception: Exception = None
self,
messages: Optional[Collection[bytes]] = None,
raise_exception: Optional[Exception] = None,
) -> None:
self.messages = deque(messages) if messages else deque()
self.queue = asyncio.Queue[bytes]()
for msg in messages or []:
self.queue.put_nowait(msg)
self.raise_exception = raise_exception

def __await__(self) -> Generator[Any, Any, "Self"]:
Expand All @@ -203,13 +206,13 @@ def __aiter__(self) -> "Self":
return self

async def __anext__(self) -> bytes:
return await self.recv()

async def recv(self) -> bytes:
if self.raise_exception:
raise self.raise_exception

elif len(self.messages) == 0:
raise StopAsyncIteration

return self.messages.popleft()
return await self.queue.get()

@staticmethod
async def pong() -> Literal[False]:
Expand Down
5 changes: 2 additions & 3 deletions web3/providers/websocket/websocket_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,8 @@ async def make_request(self, method: RPCEndpoint, params: Any) -> RPCResponse:
return response

async def _provider_specific_message_listener(self) -> None:
async for raw_message in self._ws:
await asyncio.sleep(0)

while True:
raw_message = await self._ws.recv()
response = json.loads(raw_message)
subscription = response.get("method") == "eth_subscription"
await self._request_processor.cache_raw_response(
Expand Down

0 comments on commit 9a70e5b

Please sign in to comment.