diff --git a/distributed/cli/dask_worker.py b/distributed/cli/dask_worker.py index 870564dc7f3..93c64dbb584 100755 --- a/distributed/cli/dask_worker.py +++ b/distributed/cli/dask_worker.py @@ -14,7 +14,7 @@ import click from tlz import valmap -from tornado.ioloop import IOLoop, TimeoutError +from tornado.ioloop import TimeoutError import dask from dask.system import CPU_COUNT @@ -448,14 +448,11 @@ def del_pid_file(): signal_fired = False async def run(): - loop = IOLoop.current() - nannies = [ t( scheduler, scheduler_file=scheduler_file, nthreads=nthreads, - loop=loop, resources=resources, security=sec, contact_address=contact_address, diff --git a/distributed/comm/inproc.py b/distributed/comm/inproc.py index 4a23a1c461a..9bd3b22bb8e 100644 --- a/distributed/comm/inproc.py +++ b/distributed/comm/inproc.py @@ -88,6 +88,13 @@ class QueueEmpty(Exception): pass +def _set_result_unless_cancelled(fut, result): + """Helper setting the result only if the future was not cancelled.""" + if fut.cancelled(): + return + fut.set_result(result) + + class Queue: """ A single-reader, single-writer, non-threadsafe, peekable queue. @@ -119,7 +126,7 @@ def put_nowait(self, value): if fut is not None: assert len(q) == 0 self._read_future = None - fut.set_result(value) + _set_result_unless_cancelled(fut, value) else: q.append(value) diff --git a/distributed/core.py b/distributed/core.py index 85f72f49fb5..14b37f4281c 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -11,15 +11,14 @@ import warnings import weakref from collections import defaultdict, deque -from collections.abc import Container +from collections.abc import Container, Coroutine from contextlib import suppress from enum import Enum from functools import partial -from typing import Any, Callable, ClassVar, TypedDict +from typing import TYPE_CHECKING, Any, Callable, ClassVar, TypedDict, TypeVar import tblib from tlz import merge -from tornado import gen from tornado.ioloop import IOLoop, PeriodicCallback import dask @@ -38,13 +37,22 @@ from distributed.metrics import time from distributed.system_monitor import SystemMonitor from distributed.utils import ( + NoOpAwaitable, get_traceback, has_keyword, - is_coroutine_function, + iscoroutinefunction, recursive_to_dict, truncate_exception, ) +if TYPE_CHECKING: + from typing_extensions import ParamSpec + + P = ParamSpec("P") + R = TypeVar("R") + T = TypeVar("T") + Coro = Coroutine[Any, Any, T] + class Status(Enum): """ @@ -109,6 +117,161 @@ def _expects_comm(func: Callable) -> bool: return False +class _LoopBoundMixin: + """Backport of the private asyncio.mixins._LoopBoundMixin from 3.11""" + + _global_lock = threading.Lock() + + _loop = None + + def _get_loop(self): + loop = asyncio.get_running_loop() + + if self._loop is None: + with self._global_lock: + if self._loop is None: + self._loop = loop + if loop is not self._loop: + raise RuntimeError(f"{self!r} is bound to a different event loop") + return loop + + +class AsyncTaskGroupClosedError(RuntimeError): + pass + + +def _delayed(corofunc: Callable[P, Coro[T]], delay: float) -> Callable[P, Coro[T]]: + """Decorator to delay the evaluation of a coroutine function by the given delay in seconds.""" + + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + await asyncio.sleep(delay) + return await corofunc(*args, **kwargs) + + return wrapper + + +class AsyncTaskGroup(_LoopBoundMixin): + """Collection tracking all currently running asynchronous tasks within a group""" + + #: If True, the group is closed and does not allow adding new tasks. + closed: bool + + def __init__(self) -> None: + self.closed = False + self._ongoing_tasks: set[asyncio.Task[None]] = set() + + def call_soon( + self, afunc: Callable[P, Coro[None]], /, *args: P.args, **kwargs: P.kwargs + ) -> None: + """Schedule a coroutine function to be executed as an `asyncio.Task`. + + The coroutine function `afunc` is scheduled with `args` arguments and `kwargs` keyword arguments + as an `asyncio.Task`. + + Parameters + ---------- + afunc + Coroutine function to schedule. + *args + Arguments to be passed to `afunc`. + **kwargs + Keyword arguments to be passed to `afunc` + + Returns + ------- + None + + Raises + ------ + AsyncTaskGroupClosedError + If the task group is closed. + """ + if self.closed: # Avoid creating a coroutine + raise AsyncTaskGroupClosedError( + "Cannot schedule a new coroutine function as the group is already closed." + ) + task = self._get_loop().create_task(afunc(*args, **kwargs)) + task.add_done_callback(self._ongoing_tasks.remove) + self._ongoing_tasks.add(task) + return None + + def call_later( + self, + delay: float, + afunc: Callable[P, Coro[None]], + /, + *args: P.args, + **kwargs: P.kwargs, + ) -> None: + """Schedule a coroutine function to be executed after `delay` seconds as an `asyncio.Task`. + + The coroutine function `afunc` is scheduled with `args` arguments and `kwargs` keyword arguments + as an `asyncio.Task` that is executed after `delay` seconds. + + Parameters + ---------- + delay + Delay in seconds. + afunc + Coroutine function to schedule. + *args + Arguments to be passed to `afunc`. + **kwargs + Keyword arguments to be passed to `afunc` + + Returns + ------- + The None + + Raises + ------ + AsyncTaskGroupClosedError + If the task group is closed. + """ + self.call_soon(_delayed(afunc, delay), *args, **kwargs) + + def close(self) -> None: + """Closes the task group so that no new tasks can be scheduled. + + Existing tasks continue to run. + """ + self.closed = True + + async def stop(self, timeout: float = 1) -> None: + """Close the group and stop all currently running tasks. + + Closes the task group and waits `timeout` seconds for all tasks to gracefully finish. + After the timeout, all remaining tasks are cancelled. + """ + self.close() + + current_task = asyncio.current_task(self._get_loop()) + tasks_to_stop = [t for t in self._ongoing_tasks if t is not current_task] + + if tasks_to_stop: + # Wrap gather in task to avoid Python3.8 issue, + # see https://github.com/dask/distributed/pull/6478#discussion_r885696827 + async def gather(): + return await asyncio.gather(*tasks_to_stop, return_exceptions=True) + + try: + await asyncio.wait_for( + gather(), + timeout, + ) + except asyncio.TimeoutError: + # The timeout on gather has cancelled the tasks, so this will not hang indefinitely + await asyncio.gather(*tasks_to_stop, return_exceptions=True) + + if [t for t in self._ongoing_tasks if t is not current_task]: + raise RuntimeError( + f"Expected all ongoing tasks to be cancelled and removed, found {self._ongoing_tasks}." + ) + + def __len__(self): + return len(self._ongoing_tasks) + + class Server: """Dask Distributed Server @@ -195,7 +358,8 @@ def __init__( self.monitor = SystemMonitor() self.counters = None self.digests = None - self._ongoing_coroutines = set() + self._ongoing_background_tasks = AsyncTaskGroup() + self._ongoing_comm_handlers = AsyncTaskGroup() self._event_finished = asyncio.Event() self.listeners = [] @@ -228,7 +392,7 @@ def stop() -> bool: self.counters = defaultdict(partial(Counter, loop=self.io_loop)) - self.periodic_callbacks = dict() + self.periodic_callbacks = {} pc = PeriodicCallback( self.monitor.update, @@ -349,29 +513,29 @@ def start_periodic_callbacks(self): """Start Periodic Callbacks consistently This starts all PeriodicCallbacks stored in self.periodic_callbacks if - they are not yet running. It does this safely on the IOLoop. + they are not yet running. It does this safely by checking that it is using the + correct event loop. """ - self._last_tick = time() - - def start_pcs(): - for pc in self.periodic_callbacks.values(): - if not pc.is_running(): - pc.start() + if self.io_loop.asyncio_loop is not asyncio.get_running_loop(): + raise RuntimeError(f"{self!r} is bound to a different event loop") - self.io_loop.add_callback(start_pcs) + self._last_tick = time() + for pc in self.periodic_callbacks.values(): + if not pc.is_running(): + pc.start() def stop(self): if not self.__stopped: self.__stopped = True + for listener in self.listeners: - # Delay closing the server socket until the next IO loop tick. - # Otherwise race conditions can appear if an event handler - # for an accept() call is already scheduled by the IO loop, - # raising EBADF. - # The demonstrator for this is Worker.terminate(), which - # closes the server socket in response to an incoming message. - # See https://github.com/tornadoweb/tornado/issues/2069 - self.io_loop.add_callback(listener.stop) + + async def stop_listener(listener): + v = listener.stop() + if inspect.isawaitable(v): + await v + + self._ongoing_background_tasks.call_soon(stop_listener, listener) @property def listener(self): @@ -508,7 +672,15 @@ async def listen(self, port_or_addr=None, allow_offload=True, **kwargs): ) self.listeners.append(listener) - async def handle_comm(self, comm): + def handle_comm(self, comm): + """Start a background task that dispatches new communications to coroutine-handlers""" + try: + self._ongoing_background_tasks.call_soon(self._handle_comm, comm) + except AsyncTaskGroupClosedError: + comm.abort() + return NoOpAwaitable() + + async def _handle_comm(self, comm): """Dispatch new communications to coroutine-handlers Handlers is a dictionary mapping operation names to functions or @@ -602,11 +774,6 @@ async def handle_comm(self, comm): else: result = handler(**msg) if inspect.iscoroutine(result): - result = asyncio.create_task( - result, name=f"handle-comm-{address}-{op}" - ) - self._ongoing_coroutines.add(result) - result.add_done_callback(self._ongoing_coroutines.remove) result = await result elif inspect.isawaitable(result): raise RuntimeError( @@ -673,9 +840,11 @@ async def handle_stream(self, comm, extra=None): closed = True break handler = self.stream_handlers[op] - if is_coroutine_function(handler): - self.loop.add_callback(handler, **merge(extra, msg)) - await gen.sleep(0) + if iscoroutinefunction(handler): + self._ongoing_background_tasks.call_soon( + handler, **merge(extra, msg) + ) + await asyncio.sleep(0) else: handler(**merge(extra, msg)) else: @@ -696,36 +865,29 @@ async def handle_stream(self, comm, extra=None): assert comm.closed() async def close(self, timeout=None): - for pc in self.periodic_callbacks.values(): - pc.stop() + try: + for pc in self.periodic_callbacks.values(): + pc.stop() - if not self.__stopped: - self.__stopped = True - _stops = set() - for listener in self.listeners: - future = listener.stop() - if inspect.isawaitable(future): - _stops.add(future) - await asyncio.gather(*_stops) - - def _ongoing_tasks(): - return ( - t for t in self._ongoing_coroutines if t is not asyncio.current_task() - ) + if not self.__stopped: + self.__stopped = True + _stops = set() + for listener in self.listeners: + future = listener.stop() + if inspect.isawaitable(future): + _stops.add(future) + await asyncio.gather(*_stops) - # TODO: Deal with exceptions - try: - # Give the handlers a bit of time to finish gracefully - await asyncio.wait_for( - asyncio.gather(*_ongoing_tasks(), return_exceptions=True), 1 - ) - except asyncio.TimeoutError: - # the timeout on gather should've cancelled all the tasks - await asyncio.gather(*_ongoing_tasks(), return_exceptions=True) + # TODO: Deal with exceptions + await self._ongoing_background_tasks.stop(timeout=1) - await self.rpc.close() - await asyncio.gather(*[comm.close() for comm in list(self._comms)]) - self._event_finished.set() + # TODO: Deal with exceptions + await self._ongoing_comm_handlers.stop(timeout=1) + + await self.rpc.close() + await asyncio.gather(*[comm.close() for comm in list(self._comms)]) + finally: + self._event_finished.set() def pingpong(comm): @@ -892,12 +1054,10 @@ async def _close_comm(comm): tasks = [] for comm in list(self.comms): if comm and not comm.closed(): - # IOLoop.current().add_callback(_close_comm, comm) task = asyncio.ensure_future(_close_comm(comm)) tasks.append(task) for comm in list(self._created): if comm and not comm.closed(): - # IOLoop.current().add_callback(_close_comm, comm) task = asyncio.ensure_future(_close_comm(comm)) tasks.append(task) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 27b2bd3f6f4..893c5a9a14c 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -225,7 +225,11 @@ async def test_adapt_quickly(): await cluster - while len(cluster.scheduler.workers) > 1 or len(cluster.worker_spec) > 1: + while ( + len(cluster.scheduler.workers) > 1 + or len(cluster.worker_spec) > 1 + or len(cluster.workers) > 1 + ): await asyncio.sleep(0.01) # Don't scale up for large sequential computations diff --git a/distributed/nanny.py b/distributed/nanny.py index de871254ceb..b2579b28f10 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -498,7 +498,7 @@ def run(self, comm, *args, **kwargs): return run(self, comm, *args, **kwargs) def _on_exit_sync(self, exitcode): - self.loop.add_callback(self._on_exit, exitcode) + self._ongoing_background_tasks.call_soon(self._on_exit, exitcode) @log_errors async def _on_exit(self, exitcode): @@ -595,7 +595,7 @@ async def _log_event(self, topic, msg): ) def log_event(self, topic, msg): - self.loop.add_callback(self._log_event, topic, msg) + self._ongoing_background_tasks.call_soon(self._log_event, topic, msg) class WorkerProcess: diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3af733923d5..e508791a740 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -28,7 +28,6 @@ Set, ) from contextlib import suppress -from datetime import timedelta from functools import partial from numbers import Number from typing import Any, ClassVar, Literal, cast @@ -3340,7 +3339,7 @@ async def start_unsafe(self): for k, v in self.services.items(): logger.info("%11s at: %25s", k, "%s:%d" % (listen_ip, v.port)) - self.loop.add_callback(self.reevaluate_occupancy) + self._ongoing_background_tasks.call_soon(self.reevaluate_occupancy) if self.scheduler_file: with open(self.scheduler_file, "w") as f: @@ -4284,7 +4283,7 @@ async def remove_worker( self.bandwidth_workers.pop((address, w), None) self.bandwidth_workers.pop((w, address), None) - def remove_worker_from_events(): + async def remove_worker_from_events(): # If the worker isn't registered anymore after the delay, remove from events if address not in self.workers and address in self.events: del self.events[address] @@ -4292,7 +4291,10 @@ def remove_worker_from_events(): cleanup_delay = parse_timedelta( dask.config.get("distributed.scheduler.events-cleanup-delay") ) - self.loop.call_later(cleanup_delay, remove_worker_from_events) + + self._ongoing_background_tasks.call_later( + cleanup_delay, remove_worker_from_events + ) logger.debug("Removed worker %s", ws) return "OK" @@ -4644,7 +4646,7 @@ def remove_client(self, client: str, stimulus_id: str = None) -> None: except Exception as e: logger.exception(e) - def remove_client_from_events(): + async def remove_client_from_events(): # If the client isn't registered anymore after the delay, remove from events if client not in self.clients and client in self.events: del self.events[client] @@ -4652,7 +4654,10 @@ def remove_client_from_events(): cleanup_delay = parse_timedelta( dask.config.get("distributed.scheduler.events-cleanup-delay") ) - self.loop.call_later(cleanup_delay, remove_client_from_events) + if not self._ongoing_background_tasks.closed: + self._ongoing_background_tasks.call_later( + cleanup_delay, remove_client_from_events + ) def send_task_to_worker(self, worker, ts: TaskState, duration: float = -1): """Send a single computational task to a worker""" @@ -4900,7 +4905,7 @@ def worker_send(self, worker: str, msg: dict[str, Any]) -> None: try: stream_comms[worker].send(msg) except (CommClosedError, AttributeError): - self.loop.add_callback( + self._ongoing_background_tasks.call_soon( self.remove_worker, address=worker, stimulus_id=f"worker-send-comm-fail-{time()}", @@ -4949,7 +4954,7 @@ def send_all(self, client_msgs: dict, worker_msgs: dict): # worker already gone pass except (CommClosedError, AttributeError): - self.loop.add_callback( + self._ongoing_background_tasks.call_soon( self.remove_worker, address=worker, stimulus_id=f"send-all-comm-fail-{time()}", @@ -6964,7 +6969,7 @@ async def get_worker_monitor_info(self, recent=False, starts=None): # Cleanup # ########### - def reevaluate_occupancy(self, worker_index: int = 0): + async def reevaluate_occupancy(self, worker_index: int = 0): """Periodically reassess task duration time The expected duration of a task can change over time. Unfortunately we @@ -6980,33 +6985,29 @@ def reevaluate_occupancy(self, worker_index: int = 0): think about. """ try: - if self.status == Status.closed: - return - last = time() - next_time = timedelta(seconds=0.1) - - if self.proc.cpu_percent() < 50: - workers: list = list(self.workers.values()) - nworkers: int = len(workers) - i: int - for i in range(nworkers): - ws: WorkerState = workers[worker_index % nworkers] - worker_index += 1 - try: - if ws is None or not ws.processing: - continue - self._reevaluate_occupancy_worker(ws) - finally: - del ws # lose ref - - duration = time() - last - if duration > 0.005: # 5ms since last release - next_time = timedelta(seconds=duration * 5) # 25ms gap - break + while self.status != Status.closed: + last = time() + delay = 0.1 - self.loop.add_timeout( - next_time, self.reevaluate_occupancy, worker_index=worker_index - ) + if self.proc.cpu_percent() < 50: + workers: list = list(self.workers.values()) + nworkers: int = len(workers) + i: int + for i in range(nworkers): + ws: WorkerState = workers[worker_index % nworkers] + worker_index += 1 + try: + if ws is None or not ws.processing: + continue + self._reevaluate_occupancy_worker(ws) + finally: + del ws # lose ref + + duration = time() - last + if duration > 0.005: # 5ms since last release + delay = duration * 5 # 25ms gap + break + await asyncio.sleep(delay) except Exception: logger.error("Error in reevaluate occupancy", exc_info=True) @@ -7049,7 +7050,7 @@ def check_idle(self): "Scheduler closing after being idle for %s", format_time(self.idle_timeout), ) - self.loop.add_callback(self.close) + self._ongoing_background_tasks.call_soon(self.close) def adaptive_target(self, target_duration=None): """Desired number of workers based on the current workload diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 347c54cfcc0..68cce197212 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -5135,7 +5135,7 @@ def long_running(lock, entered): assert s.total_occupancy == 0 assert ws.occupancy == 0 - s.reevaluate_occupancy(0) + s._ongoing_background_tasks.call_soon(s.reevaluate_occupancy, 0) assert s.workers[a.address].occupancy == 0 await l.release() diff --git a/distributed/tests/test_core.py b/distributed/tests/test_core.py index 7422181d024..754729810b9 100644 --- a/distributed/tests/test_core.py +++ b/distributed/tests/test_core.py @@ -3,6 +3,7 @@ import os import socket import threading +import time as timemod import weakref import pytest @@ -12,6 +13,8 @@ from distributed.comm.core import CommClosedError from distributed.core import ( + AsyncTaskGroup, + AsyncTaskGroupClosedError, ConnectionPool, Server, Status, @@ -73,6 +76,161 @@ def echo_no_serialize(comm, x): return {"result": x} +def test_async_task_group_initialization(): + group = AsyncTaskGroup() + assert not group.closed + assert len(group) == 0 + + +async def _wait_for_n_loop_cycles(n): + for i in range(n): + await asyncio.sleep(0) + + +@gen_test() +async def test_async_task_group_call_soon_executes_task_in_background(): + group = AsyncTaskGroup() + ev = asyncio.Event() + flag = False + + async def set_flag(): + nonlocal flag + await ev.wait() + flag = True + + assert group.call_soon(set_flag) is None + assert len(group) == 1 + ev.set() + await _wait_for_n_loop_cycles(2) + assert len(group) == 0 + assert flag + + +@gen_test() +async def test_async_task_group_call_later_executes_delayed_task_in_background(): + group = AsyncTaskGroup() + flag = False + + async def set_flag(): + nonlocal flag + flag = True + + start = timemod.monotonic() + assert group.call_later(1, set_flag) is None + assert len(group) == 1 + # the task must complete in exactly 1 event loop cycle + await asyncio.sleep(1) + await _wait_for_n_loop_cycles(2) + end = timemod.monotonic() + assert len(group) == 0 + assert flag + assert end - start > 1 - timemod.get_clock_info("monotonic").resolution + + +def test_async_task_group_close_closes(): + group = AsyncTaskGroup() + group.close() + assert group.closed + + # Test idempotency + group.close() + assert group.closed + + +@gen_test() +async def test_async_task_group_close_does_not_cancel_existing_tasks(): + group = AsyncTaskGroup() + + ev = asyncio.Event() + flag = False + + async def set_flag(): + nonlocal flag + await ev.wait() + flag = True + return None + + assert group.call_soon(set_flag) is None + + group.close() + + assert len(group) == 1 + + ev.set() + await _wait_for_n_loop_cycles(2) + assert len(group) == 0 + + +@gen_test() +async def test_async_task_group_close_prohibits_new_tasks(): + group = AsyncTaskGroup() + group.close() + + ev = asyncio.Event() + flag = False + + async def set_flag(): + nonlocal flag + await ev.wait() + flag = True + return True + + with pytest.raises(AsyncTaskGroupClosedError): + group.call_soon(set_flag) + assert len(group) == 0 + + with pytest.raises(AsyncTaskGroupClosedError): + group.call_later(1, set_flag) + assert len(group) == 0 + + await asyncio.sleep(0.01) + assert not flag + + +@gen_test() +async def test_async_task_group_stop_allows_shutdown(): + group = AsyncTaskGroup() + + task = None + + async def set_flag(): + nonlocal task + while not group.closed: + await asyncio.sleep(0.01) + task = asyncio.current_task() + return None + + assert group.call_soon(set_flag) is None + assert len(group) == 1 + # when given a grace period of 1 second tasks are allowed to poll group.stop + # before awaiting other async functions + await group.stop(timeout=1) + assert task.done() + assert not task.cancelled() + + +@gen_test() +async def test_async_task_group_stop_cancels_long_running(): + group = AsyncTaskGroup() + + task = None + flag = False + + async def set_flag(): + nonlocal task + task = asyncio.current_task() + await asyncio.sleep(10) + nonlocal flag + flag = True + return True + + assert group.call_soon(set_flag) is None + assert len(group) == 1 + await group.stop(timeout=1) + assert not flag + assert task.cancelled() + + @gen_test() async def test_server_status_is_always_enum(): """Assignments with strings is forbidden""" @@ -928,7 +1086,7 @@ async def sleep(comm=None): comm = await remote.live_comm() await comm.write({"op": "sleep"}) - await async_wait_for(lambda: not server._ongoing_coroutines, 10) + await async_wait_for(lambda: not server._ongoing_comm_handlers, 10) listeners = server.listeners assert len(listeners) == len(ports) @@ -942,7 +1100,7 @@ async def sleep(comm=None): await assert_cannot_connect(f"tcp://{ip}:{port}") # weakref set/dict should be cleaned up - assert not len(server._ongoing_coroutines) + assert not len(server._ongoing_comm_handlers) @gen_test() diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index dfa58a83579..d3712c30d5d 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -577,7 +577,7 @@ async def test_scheduler_crash_doesnt_restart(s, a): bcomm.abort() await s.close() - while a.status != Status.closing_gracefully: + while a.status not in {Status.closing_gracefully, Status.closed}: await asyncio.sleep(0.01) await a.finished() diff --git a/distributed/worker.py b/distributed/worker.py index 817e541886c..eb6c3c39bfd 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1609,7 +1609,7 @@ async def batched_send_connect(): bcomm.start(comm) - self.loop.add_callback(batched_send_connect) + self._ongoing_background_tasks.call_soon(batched_send_connect) self.stream_comms[address].send(msg) @@ -3044,7 +3044,7 @@ async def run(server, comm, function, args=(), kwargs=None, wait=True): if wait: result = await function(*args, **kwargs) else: - server.loop.add_callback(function, *args, **kwargs) + server._ongoing_background_tasks.call_soon(function, *args, **kwargs) result = None except Exception as e: