Skip to content

Commit

Permalink
Add a queue to properly serialize events
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovsky committed Apr 14, 2023
1 parent ecfe264 commit 2b7e261
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
18 changes: 13 additions & 5 deletions src/tribler/core/components/restapi/rest/events_endpoint.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import json
import time
from asyncio import CancelledError
from asyncio import CancelledError, Queue
from dataclasses import asdict
from typing import Any, Dict, List, Optional

Expand Down Expand Up @@ -55,6 +55,8 @@ def __init__(self, notifier: Notifier, public_key: str = None):
self.undelivered_error: Optional[MessageDict] = None
self.public_key = public_key
self.notifier = notifier
self.queue = Queue()
self.async_group.add_task(self.process_queue())
notifier.add_observer(notifications.circuit_removed, self.on_circuit_removed)
notifier.add_generic_observer(self.on_notification)

Expand Down Expand Up @@ -117,16 +119,22 @@ def should_skip_message(self, message: MessageDict) -> bool:
return False

def send_event(self, message: MessageDict):
"""
Put event message to a queue to be sent to GUI
"""
if not self.should_skip_message(message):
self.async_group.add_task(self._write_data(message))
self.queue.put_nowait(message)

async def process_queue(self):
while True:
message = await self.queue.get()
if not self.should_skip_message(message):
await self._write_data(message)

async def _write_data(self, message: MessageDict):
"""
Write data over the event socket if it's open.
"""
if self.should_skip_message(message):
return

self._logger.debug(f'Write message: {message}')
try:
message_bytes = self.encode_message(message)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
from asyncio import CancelledError, Event, create_task
from contextlib import suppress
Expand Down Expand Up @@ -120,6 +121,7 @@ async def test_on_tribler_exception_has_connection_to_gui(mocked_write_data, eve
# test that in case of established connection to GUI, `on_tribler_exception` will work
# as a normal events_endpoint function, that is call `_write_data`
events_endpoint.on_tribler_exception(reported_error)
await asyncio.sleep(0.01)

mocked_write_data.assert_called_once()
assert not events_endpoint.undelivered_error
Expand Down

0 comments on commit 2b7e261

Please sign in to comment.