diff --git a/src/tribler/core/components/restapi/rest/events_endpoint.py b/src/tribler/core/components/restapi/rest/events_endpoint.py index 32ee51460be..1307b927724 100644 --- a/src/tribler/core/components/restapi/rest/events_endpoint.py +++ b/src/tribler/core/components/restapi/rest/events_endpoint.py @@ -1,7 +1,7 @@ import asyncio import json import time -from asyncio import CancelledError +from asyncio import CancelledError, Queue from dataclasses import asdict from typing import Any, Dict, List, Optional @@ -55,6 +55,8 @@ def __init__(self, notifier: Notifier, public_key: str = None): self.undelivered_error: Optional[MessageDict] = None self.public_key = public_key self.notifier = notifier + self.queue = Queue() + self.async_group.add_task(self.process_queue()) notifier.add_observer(notifications.circuit_removed, self.on_circuit_removed) notifier.add_generic_observer(self.on_notification) @@ -117,16 +119,22 @@ def should_skip_message(self, message: MessageDict) -> bool: return False def send_event(self, message: MessageDict): + """ + Put event message to a queue to be sent to GUI + """ if not self.should_skip_message(message): - self.async_group.add_task(self._write_data(message)) + self.queue.put_nowait(message) + + async def process_queue(self): + while True: + message = await self.queue.get() + if not self.should_skip_message(message): + await self._write_data(message) async def _write_data(self, message: MessageDict): """ Write data over the event socket if it's open. """ - if self.should_skip_message(message): - return - self._logger.debug(f'Write message: {message}') try: message_bytes = self.encode_message(message) diff --git a/src/tribler/core/components/restapi/rest/tests/test_events_endpoint.py b/src/tribler/core/components/restapi/rest/tests/test_events_endpoint.py index 26159ac08a2..384ca0a2ab7 100644 --- a/src/tribler/core/components/restapi/rest/tests/test_events_endpoint.py +++ b/src/tribler/core/components/restapi/rest/tests/test_events_endpoint.py @@ -1,3 +1,4 @@ +import asyncio import json from asyncio import CancelledError, Event, create_task from contextlib import suppress @@ -120,6 +121,7 @@ async def test_on_tribler_exception_has_connection_to_gui(mocked_write_data, eve # test that in case of established connection to GUI, `on_tribler_exception` will work # as a normal events_endpoint function, that is call `_write_data` events_endpoint.on_tribler_exception(reported_error) + await asyncio.sleep(0.01) mocked_write_data.assert_called_once() assert not events_endpoint.undelivered_error