diff --git a/a_sync/asyncio/create_task.pyx b/a_sync/asyncio/create_task.pyx index 129c0a5c..f7af0a31 100644 --- a/a_sync/asyncio/create_task.pyx +++ b/a_sync/asyncio/create_task.pyx @@ -134,7 +134,7 @@ cdef void __prune_persisted_tasks(): cdef object task cdef dict context for task in tuple(__persisted_tasks): - if task.done() and (e := task.exception()): + if _is_done(task) and (e := task.exception()): # force exceptions related to this lib to bubble up if not isinstance(e, exceptions.PersistedTaskException): c_logger.exception(e) @@ -152,6 +152,10 @@ cdef void __prune_persisted_tasks(): __persisted_tasks.discard(task) +cdef inline bint _is_done(fut: asyncio.Future): + return fut._state != "PENDING" + + async def __persisted_task_exc_wrap(task: "asyncio.Task[T]") -> T: """ Wrap a task to handle its exception in a specialized manner. @@ -171,4 +175,4 @@ async def __persisted_task_exc_wrap(task: "asyncio.Task[T]") -> T: raise exceptions.PersistedTaskException(e, task) from e -__all__ = ["create_task"] +__all__ = ["create_task"] \ No newline at end of file diff --git a/a_sync/primitives/locks/event.pyx b/a_sync/primitives/locks/event.pyx index 4179b699..430dacc8 100644 --- a/a_sync/primitives/locks/event.pyx +++ b/a_sync/primitives/locks/event.pyx @@ -114,7 +114,7 @@ cdef class CythonEvent(_DebugDaemonMixin): self._value = True for fut in self._waiters: - if not fut.done(): + if _is_not_done(fut): fut.set_result(True) cpdef void clear(self): @@ -181,3 +181,7 @@ cdef class CythonEvent(_DebugDaemonMixin): del self # no need to hold a reference here await asyncio.sleep(interval) loops += 1 + + +cdef inline bint _is_not_done(fut: asyncio.Future): + return fut._state == "PENDING" \ No newline at end of file diff --git a/a_sync/primitives/locks/prio_semaphore.pyx b/a_sync/primitives/locks/prio_semaphore.pyx index 6a706812..a03356d5 100644 --- a/a_sync/primitives/locks/prio_semaphore.pyx +++ b/a_sync/primitives/locks/prio_semaphore.pyx @@ -169,7 +169,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore): if not cms: return False return any( - cm._Semaphore__waiters and any(not w.cancelled() for w in cm._Semaphore__waiters) + cm._Semaphore__waiters and any(_is_not_cancelled(w) for w in cm._Semaphore__waiters) for cm in cms.values() ) @@ -233,7 +233,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore): while manager._Semaphore__waiters: waiter = manager._Semaphore__waiters.popleft() self._potential_lost_waiters.remove(waiter) - if not waiter.done(): + if _is_not_done(waiter): waiter.set_result(None) woke_up = True if debug_logs: @@ -260,7 +260,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore): if not debug_logs: while self._potential_lost_waiters: waiter = self._potential_lost_waiters.pop(0) - if not waiter.done(): + if _is_not_done(waiter): waiter.set_result(None) return return @@ -268,7 +268,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore): while self._potential_lost_waiters: waiter = self._potential_lost_waiters.pop(0) c_logger._log(DEBUG, "we found a lost waiter %s", (waiter, )) - if not waiter.done(): + if _is_not_done(waiter): waiter.set_result(None) c_logger._log(DEBUG, "woke up lost waiter %s", (waiter, )) return @@ -380,7 +380,7 @@ cdef class _AbstractPrioritySemaphoreContextManager(Semaphore): except: # See the similar code in Queue.get. fut.cancel() - if self._parent._Semaphore__value > 0 and not fut.cancelled(): + if self._parent._Semaphore__value > 0 and _is_not_cancelled(fut): self._parent._wake_up_next() raise self._parent._Semaphore__value -= 1 @@ -409,6 +409,13 @@ cdef class _AbstractPrioritySemaphoreContextManager(Semaphore): self._parent.c_release() +cdef inline bint _is_not_done(fut: asyncio.Future): + return fut._state == "PENDING" + +cdef inline bint _is_not_cancelled(fut: asyncio.Future): + return fut._state != "CANCELLED" + + cdef class _PrioritySemaphoreContextManager(_AbstractPrioritySemaphoreContextManager): """Context manager for numeric priority semaphores.""" diff --git a/a_sync/primitives/locks/semaphore.pyx b/a_sync/primitives/locks/semaphore.pyx index ea6f43da..5c02ab12 100644 --- a/a_sync/primitives/locks/semaphore.pyx +++ b/a_sync/primitives/locks/semaphore.pyx @@ -140,7 +140,7 @@ cdef class Semaphore(_DebugDaemonMixin): if self._Semaphore__value == 0: return True cdef object waiters = self._Semaphore__waiters - if any(not w.cancelled() for w in (waiters or ())): + if any(_is_not_cancelled(w) for w in (waiters or ())): return True def __len__(self) -> int: @@ -218,7 +218,7 @@ cdef class Semaphore(_DebugDaemonMixin): finally: self._Semaphore__waiters.remove(fut) except asyncio.exceptions.CancelledError: - if not fut.cancelled(): + if _is_not_cancelled(fut): self._Semaphore__value += 1 self._c_wake_up_next() raise @@ -274,7 +274,7 @@ cdef class Semaphore(_DebugDaemonMixin): return for fut in self._Semaphore__waiters: - if not fut.done(): + if _is_not_done(fut): self._Semaphore__value -= 1 fut.set_result(True) return @@ -285,7 +285,7 @@ cdef class Semaphore(_DebugDaemonMixin): return for fut in self._Semaphore__waiters: - if not fut.done(): + if _is_not_done(fut): self._Semaphore__value -= 1 fut.set_result(True) return @@ -313,6 +313,13 @@ cdef class Semaphore(_DebugDaemonMixin): ) +cdef inline bint _is_not_done(fut: asyncio.Future): + return fut._state == "PENDING" + +cdef inline bint _is_not_cancelled(fut: asyncio.Future): + return fut._state != "CANCELLED" + + cdef class DummySemaphore(Semaphore): """ A dummy semaphore that implements the standard :class:`asyncio.Semaphore` API but does nothing.