From 201451d6210f35e6521e7df7c52051dbacfa94f1 Mon Sep 17 00:00:00 2001 From: Pau Freixes Date: Wed, 21 Jun 2017 08:46:50 +0200 Subject: [PATCH] * #231 Patch for the cpython issue with Lock * Make Lock compatible with 3.3 and 3.4 * Changed test to make it compatible with python 3.3 version * Flake8 issue * Increase coverage by reducing scope, test just the bugfix for 3.6 * One unique version of Lock.acquire function * Use create_future provided by aioredis --- aioredis/locks.py | 43 +++++++++++++++++++++++++++++++++++++++++++ aioredis/pool.py | 3 ++- tests/locks_test.py | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 aioredis/locks.py create mode 100644 tests/locks_test.py diff --git a/aioredis/locks.py b/aioredis/locks.py new file mode 100644 index 000000000..c49e9b2a4 --- /dev/null +++ b/aioredis/locks.py @@ -0,0 +1,43 @@ +from asyncio.locks import Lock as _Lock +from asyncio import coroutine +from asyncio import futures + +from .util import create_future + +# Fixes an issue with all Python versions that leaves pending waiters +# without being awakened when the first waiter is canceled. +# Code adapted from the PR https://github.com/python/cpython/pull/1031 +# Waiting once it is merged to make a proper condition to relay on +# the stdlib implementation or this one patched + + +class Lock(_Lock): + @coroutine + def acquire(self): + """Acquire a lock. + This method blocks until the lock is unlocked, then sets it to + locked and returns True. + """ + if not self._locked and all(w.cancelled() for w in self._waiters): + self._locked = True + return True + + fut = create_future(self._loop) + + self._waiters.append(fut) + try: + yield from fut + self._locked = True + return True + except futures.CancelledError: + if not self._locked: # pragma: no cover + self._wake_up_first() + raise + finally: + self._waiters.remove(fut) + + def _wake_up_first(self): + """Wake up the first waiter who isn't cancelled.""" + for fut in self._waiters: + if not fut.done(): + fut.set_result(True) diff --git a/aioredis/pool.py b/aioredis/pool.py index f98f08ff8..a477182e6 100644 --- a/aioredis/pool.py +++ b/aioredis/pool.py @@ -7,6 +7,7 @@ from .log import logger from .util import async_task, _NOTSET from .errors import PoolClosedError +from .locks import Lock PY_35 = sys.version_info >= (3, 5) @@ -79,7 +80,7 @@ def __init__(self, address, db=0, password=None, encoding=None, self._pool = collections.deque(maxlen=maxsize) self._used = set() self._acquiring = 0 - self._cond = asyncio.Condition(loop=loop) + self._cond = asyncio.Condition(lock=Lock(loop=loop), loop=loop) self._close_state = asyncio.Event(loop=loop) self._close_waiter = None diff --git a/tests/locks_test.py b/tests/locks_test.py new file mode 100644 index 000000000..385d11684 --- /dev/null +++ b/tests/locks_test.py @@ -0,0 +1,33 @@ +import asyncio +import pytest + +from aioredis.util import async_task +from aioredis.locks import Lock + + +@pytest.mark.run_loop +def test_finished_waiter_cancelled(loop): + lock = Lock(loop=loop) + + ta = async_task(lock.acquire(), loop=loop) + yield from asyncio.sleep(0, loop=loop) + assert lock.locked() + + tb = async_task(lock.acquire(), loop=loop) + yield from asyncio.sleep(0, loop=loop) + assert len(lock._waiters) == 1 + + # Create a second waiter, wake up the first, and cancel it. + # Without the fix, the second was not woken up and the lock + # will never be locked + async_task(lock.acquire(), loop=loop) + yield from asyncio.sleep(0, loop=loop) + lock.release() + tb.cancel() + + yield from asyncio.sleep(0, loop=loop) + assert ta.done() + assert tb.cancelled() + + yield from asyncio.sleep(0, loop=loop) + assert lock.locked()