Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Commit

Permalink
* #231 Patch for the cpython issue with Lock
Browse files Browse the repository at this point in the history
* Make Lock compatible with 3.3 and 3.4

* Changed test to make it compatible with python 3.3 version

* Flake8 issue

* Increase coverage by reducing scope, test just the bugfix for 3.6

* One unique version of Lock.acquire function

* Use create_future provided by aioredis
  • Loading branch information
pfreixes authored and popravich committed Jun 21, 2017
1 parent 2ca2f08 commit 201451d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 1 deletion.
43 changes: 43 additions & 0 deletions aioredis/locks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from asyncio.locks import Lock as _Lock
from asyncio import coroutine
from asyncio import futures

from .util import create_future

# Fixes an issue with all Python versions that leaves pending waiters
# without being awakened when the first waiter is canceled.
# Code adapted from the PR https://github.com/python/cpython/pull/1031
# Waiting once it is merged to make a proper condition to relay on
# the stdlib implementation or this one patched


class Lock(_Lock):
@coroutine
def acquire(self):
"""Acquire a lock.
This method blocks until the lock is unlocked, then sets it to
locked and returns True.
"""
if not self._locked and all(w.cancelled() for w in self._waiters):
self._locked = True
return True

fut = create_future(self._loop)

self._waiters.append(fut)
try:
yield from fut
self._locked = True
return True
except futures.CancelledError:
if not self._locked: # pragma: no cover
self._wake_up_first()
raise
finally:
self._waiters.remove(fut)

def _wake_up_first(self):
"""Wake up the first waiter who isn't cancelled."""
for fut in self._waiters:
if not fut.done():
fut.set_result(True)
3 changes: 2 additions & 1 deletion aioredis/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .log import logger
from .util import async_task, _NOTSET
from .errors import PoolClosedError
from .locks import Lock


PY_35 = sys.version_info >= (3, 5)
Expand Down Expand Up @@ -79,7 +80,7 @@ def __init__(self, address, db=0, password=None, encoding=None,
self._pool = collections.deque(maxlen=maxsize)
self._used = set()
self._acquiring = 0
self._cond = asyncio.Condition(loop=loop)
self._cond = asyncio.Condition(lock=Lock(loop=loop), loop=loop)
self._close_state = asyncio.Event(loop=loop)
self._close_waiter = None

Expand Down
33 changes: 33 additions & 0 deletions tests/locks_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio
import pytest

from aioredis.util import async_task
from aioredis.locks import Lock


@pytest.mark.run_loop
def test_finished_waiter_cancelled(loop):
lock = Lock(loop=loop)

ta = async_task(lock.acquire(), loop=loop)
yield from asyncio.sleep(0, loop=loop)
assert lock.locked()

tb = async_task(lock.acquire(), loop=loop)
yield from asyncio.sleep(0, loop=loop)
assert len(lock._waiters) == 1

# Create a second waiter, wake up the first, and cancel it.
# Without the fix, the second was not woken up and the lock
# will never be locked
async_task(lock.acquire(), loop=loop)
yield from asyncio.sleep(0, loop=loop)
lock.release()
tb.cancel()

yield from asyncio.sleep(0, loop=loop)
assert ta.done()
assert tb.cancelled()

yield from asyncio.sleep(0, loop=loop)
assert lock.locked()

0 comments on commit 201451d

Please sign in to comment.