Skip to content

Commit

Permalink
feat: optimize task state checks (#466)
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Dec 12, 2024
1 parent 3a0c2cc commit 0e6e9f0
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 12 deletions.
8 changes: 6 additions & 2 deletions a_sync/asyncio/create_task.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ cdef void __prune_persisted_tasks():
cdef object task
cdef dict context
for task in tuple(__persisted_tasks):
if task.done() and (e := task.exception()):
if _is_done(task) and (e := task.exception()):
# force exceptions related to this lib to bubble up
if not isinstance(e, exceptions.PersistedTaskException):
c_logger.exception(e)
Expand All @@ -152,6 +152,10 @@ cdef void __prune_persisted_tasks():
__persisted_tasks.discard(task)
cdef inline bint _is_done(fut: asyncio.Future):
return <str>fut._state != "PENDING"
async def __persisted_task_exc_wrap(task: "asyncio.Task[T]") -> T:
"""
Wrap a task to handle its exception in a specialized manner.
Expand All @@ -171,4 +175,4 @@ async def __persisted_task_exc_wrap(task: "asyncio.Task[T]") -> T:
raise exceptions.PersistedTaskException(e, task) from e
__all__ = ["create_task"]
__all__ = ["create_task"]
6 changes: 5 additions & 1 deletion a_sync/primitives/locks/event.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ cdef class CythonEvent(_DebugDaemonMixin):
self._value = True

for fut in self._waiters:
if not fut.done():
if _is_not_done(fut):
fut.set_result(True)

cpdef void clear(self):
Expand Down Expand Up @@ -181,3 +181,7 @@ cdef class CythonEvent(_DebugDaemonMixin):
del self # no need to hold a reference here
await asyncio.sleep(interval)
loops += 1


cdef inline bint _is_not_done(fut: asyncio.Future):
return <str>fut._state == "PENDING"
17 changes: 12 additions & 5 deletions a_sync/primitives/locks/prio_semaphore.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
if not cms:
return False
return any(
cm._Semaphore__waiters and any(not w.cancelled() for w in cm._Semaphore__waiters)
cm._Semaphore__waiters and any(_is_not_cancelled(w) for w in cm._Semaphore__waiters)
for cm in cms.values()
)

Expand Down Expand Up @@ -233,7 +233,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
while manager._Semaphore__waiters:
waiter = manager._Semaphore__waiters.popleft()
self._potential_lost_waiters.remove(waiter)
if not waiter.done():
if _is_not_done(waiter):
waiter.set_result(None)
woke_up = True
if debug_logs:
Expand All @@ -260,15 +260,15 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
if not debug_logs:
while self._potential_lost_waiters:
waiter = self._potential_lost_waiters.pop(0)
if not waiter.done():
if _is_not_done(waiter):
waiter.set_result(None)
return
return

while self._potential_lost_waiters:
waiter = self._potential_lost_waiters.pop(0)
c_logger._log(DEBUG, "we found a lost waiter %s", (waiter, ))
if not waiter.done():
if _is_not_done(waiter):
waiter.set_result(None)
c_logger._log(DEBUG, "woke up lost waiter %s", (waiter, ))
return
Expand Down Expand Up @@ -380,7 +380,7 @@ cdef class _AbstractPrioritySemaphoreContextManager(Semaphore):
except:
# See the similar code in Queue.get.
fut.cancel()
if self._parent._Semaphore__value > 0 and not fut.cancelled():
if self._parent._Semaphore__value > 0 and _is_not_cancelled(fut):
self._parent._wake_up_next()
raise
self._parent._Semaphore__value -= 1
Expand Down Expand Up @@ -409,6 +409,13 @@ cdef class _AbstractPrioritySemaphoreContextManager(Semaphore):
self._parent.c_release()


cdef inline bint _is_not_done(fut: asyncio.Future):
return <str>fut._state == "PENDING"

cdef inline bint _is_not_cancelled(fut: asyncio.Future):
return <str>fut._state != "CANCELLED"


cdef class _PrioritySemaphoreContextManager(_AbstractPrioritySemaphoreContextManager):
"""Context manager for numeric priority semaphores."""

Expand Down
15 changes: 11 additions & 4 deletions a_sync/primitives/locks/semaphore.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ cdef class Semaphore(_DebugDaemonMixin):
if self._Semaphore__value == 0:
return True
cdef object waiters = self._Semaphore__waiters
if any(not w.cancelled() for w in (waiters or ())):
if any(_is_not_cancelled(w) for w in (waiters or ())):
return True
def __len__(self) -> int:
Expand Down Expand Up @@ -218,7 +218,7 @@ cdef class Semaphore(_DebugDaemonMixin):
finally:
self._Semaphore__waiters.remove(fut)
except asyncio.exceptions.CancelledError:
if not fut.cancelled():
if _is_not_cancelled(fut):
self._Semaphore__value += 1
self._c_wake_up_next()
raise
Expand Down Expand Up @@ -274,7 +274,7 @@ cdef class Semaphore(_DebugDaemonMixin):
return

for fut in self._Semaphore__waiters:
if not fut.done():
if _is_not_done(fut):
self._Semaphore__value -= 1
fut.set_result(True)
return
Expand All @@ -285,7 +285,7 @@ cdef class Semaphore(_DebugDaemonMixin):
return

for fut in self._Semaphore__waiters:
if not fut.done():
if _is_not_done(fut):
self._Semaphore__value -= 1
fut.set_result(True)
return
Expand Down Expand Up @@ -313,6 +313,13 @@ cdef class Semaphore(_DebugDaemonMixin):
)


cdef inline bint _is_not_done(fut: asyncio.Future):
return <str>fut._state == "PENDING"

cdef inline bint _is_not_cancelled(fut: asyncio.Future):
return <str>fut._state != "CANCELLED"


cdef class DummySemaphore(Semaphore):
"""
A dummy semaphore that implements the standard :class:`asyncio.Semaphore` API but does nothing.
Expand Down

0 comments on commit 0e6e9f0

Please sign in to comment.