Skip to content

Commit

Permalink
Implement suggestions from the review
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Mar 1, 2023
1 parent 30df5bb commit 9344426
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
26 changes: 16 additions & 10 deletions src/tribler/core/utilities/async_group/async_group.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
import logging
from asyncio import CancelledError, Future, Task
from contextlib import suppress
from typing import Iterable, List, Set
from typing import Coroutine, Iterable, List, Set

from tribler.core.utilities.async_group.exceptions import CancelledException
from tribler.core.utilities.async_group.exceptions import CanceledException


class AsyncGroup:
Expand All @@ -22,14 +23,15 @@ class AsyncGroup:
"""

def __init__(self):
self._logger = logging.getLogger(self.__class__.__name__)
self._futures: Set[Future] = set()
self._cancelled = False
self._canceled = False

def add_task(self, coroutine) -> Task:
def add_task(self, coroutine: Coroutine) -> Task:
"""Add a coroutine to the group.
"""
if self._cancelled:
raise CancelledException()
if self._canceled:
raise CanceledException()

task = asyncio.create_task(coroutine)
self._futures.add(task)
Expand All @@ -47,10 +49,10 @@ async def cancel(self) -> List[Future]:
Only active futures will be cancelled.
"""
if self._cancelled:
if self._canceled:
return []

self._cancelled = True
self._canceled = True

active = list(self._active(self._futures))
for future in active:
Expand All @@ -63,11 +65,15 @@ async def cancel(self) -> List[Future]:

@property
def cancelled(self):
return self._cancelled
return self._canceled

def _done_callback(self, future: Future):
self._futures.remove(future)
self._futures.discard(future)

@staticmethod
def _active(futures: Iterable[Future]) -> Iterable[Future]:
return (future for future in futures if not future.done())

def __del__(self):
if active := list(self._active(self._futures)):
self._logger.error(f'AsyncGroup is destroying but {len(active)} futures are active')
2 changes: 1 addition & 1 deletion src/tribler/core/utilities/async_group/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
class CancelledException(Exception):
class CanceledException(Exception):
"""A coroutine can not be added to a cancelled AsyncGroup"""
26 changes: 24 additions & 2 deletions src/tribler/core/utilities/async_group/tests/test_async_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from contextlib import suppress

import pytest
from _pytest.logging import LogCaptureFixture

from tribler.core.utilities.async_group.async_group import AsyncGroup
from tribler.core.utilities.async_group.exceptions import CancelledException
from tribler.core.utilities.async_group.exceptions import CanceledException


# pylint: disable=redefined-outer-name, protected-access
Expand Down Expand Up @@ -40,7 +41,7 @@ async def test_add_task(group: AsyncGroup):
async def test_add_task_when_cancelled(group: AsyncGroup):
await group.cancel()

with pytest.raises(CancelledException):
with pytest.raises(CanceledException):
group.add_task(void())


Expand Down Expand Up @@ -114,3 +115,24 @@ async def test_auto_cleanup(group: AsyncGroup):
await asyncio.gather(*group._futures, return_exceptions=True)

assert not group._futures


async def test_del_error(group: AsyncGroup, caplog: LogCaptureFixture):
""" In this test we add a single coroutine to the group and call __del__ before the coroutine is completed.
The group should add an error message to a log.
"""
group.add_task(void())
group.__del__()
assert f'AsyncGroup is destroying but 1 futures are active' in caplog.text


async def test_del_no_error(group: AsyncGroup, caplog: LogCaptureFixture):
""" In this test we add a single coroutine to the group and call __del__ after the coroutine is completed.
The group should not add an error message to a log.
"""
group.add_task(void())
await group.wait()
group.__del__()
assert f'AsyncGroup is destroying but 1 futures are active' not in caplog.text

0 comments on commit 9344426

Please sign in to comment.