-
Notifications
You must be signed in to change notification settings - Fork 157
/
lock.py
169 lines (131 loc) · 6.26 KB
/
lock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import asyncio
import uuid
from typing import Any, Dict, Generic, Union
from aiocache.base import BaseCache, CacheKeyType
class RedLock(Generic[CacheKeyType]):
"""
Implementation of `Redlock <https://redis.io/topics/distlock>`_
with a single instance because aiocache is focused on single
instance cache.
This locking has some limitations and shouldn't be used in
situations where consistency is critical. Those locks are aimed for
performance reasons where failing on locking from time to time
is acceptable. TLDR: do NOT use this if you need real resource
exclusion.
Couple of considerations with the implementation:
- If the lease expires and there are calls waiting, all of them
will pass (blocking just happens for the first time).
- When a new call arrives, it will wait always at most lease
time. This means that the call could end up blocked longer
than needed in case the lease from the blocker expires.
Backend specific implementation:
- Redis implements correctly the redlock algorithm. It sets
the key if it doesn't exist. To release, it checks the value
is the same as the instance trying to release and if it is,
it removes the lock. If not it will do nothing
- Memcached follows the same approach with a difference. Due
to memcached lacking a way to execute the operation get and
delete commands atomically, any client is able to release the
lock. This is a limitation that can't be fixed without introducing
race conditions.
- Memory implementation is not distributed, it will only apply
to the process running. Say you have 4 processes running
APIs with aiocache, the locking will apply only per process
(still useful to reduce load per process).
Example usage::
from aiocache import Cache
from aiocache.lock import RedLock
cache = Cache(Cache.REDIS)
async with RedLock(cache, 'key', lease=1): # Calls will wait here
result = await cache.get('key')
if result is not None:
return result
result = await super_expensive_function()
await cache.set('key', result)
In the example, first call will start computing the ``super_expensive_function``
while consecutive calls will block at most 1 second. If the blocking lasts for
more than 1 second, the calls will proceed to also calculate the
result of ``super_expensive_function``.
"""
_EVENTS: Dict[str, asyncio.Event] = {}
def __init__(self, client: BaseCache[CacheKeyType], key: str, lease: Union[int, float]):
self.client = client
self.key = self.client.build_key(key + "-lock")
self.lease = lease
self._value = ""
async def __aenter__(self):
return await self._acquire()
async def _acquire(self):
self._value = str(uuid.uuid4())
try:
await self.client._add(self.key, self._value, ttl=self.lease)
RedLock._EVENTS[self.key] = asyncio.Event()
except ValueError:
await self._wait_for_release()
async def _wait_for_release(self):
try:
await asyncio.wait_for(RedLock._EVENTS[self.key].wait(), self.lease)
except asyncio.TimeoutError:
pass
except KeyError: # lock was released when wait_for was rescheduled
pass
async def __aexit__(self, exc_type, exc_value, traceback):
await self._release()
async def _release(self):
removed = await self.client._redlock_release(self.key, self._value)
if removed:
RedLock._EVENTS.pop(self.key).set()
class OptimisticLock(Generic[CacheKeyType]):
"""
Implementation of
`optimistic lock <https://en.wikipedia.org/wiki/Optimistic_concurrency_control>`_
Optimistic locking assumes multiple transactions can happen at the same time
and they will only fail if before finish, conflicting modifications with other
transactions are found, producing a roll back.
Finding a conflict will end up raising an `aiocache.lock.OptimisticLockError`
exception. A conflict happens when the value at the storage is different from
the one we retrieved when the lock started.
Example usage::
cache = Cache(Cache.REDIS)
# The value stored in 'key' will be checked here
async with OptimisticLock(cache, 'key') as lock:
result = await super_expensive_call()
await lock.cas(result)
If any other call sets the value of ``key`` before the ``lock.cas`` is called,
an :class:`aiocache.lock.OptimisticLockError` will be raised. A way to make
the same call crash would be to change the value inside the lock like::
cache = Cache(Cache.REDIS)
# The value stored in 'key' will be checked here
async with OptimisticLock(cache, 'key') as lock:
result = await super_expensive_call()
await cache.set('random_value') # This will make the `lock.cas` call fail
await lock.cas(result)
If the lock is created with an unexisting key, there will never be conflicts.
"""
def __init__(self, client: BaseCache[CacheKeyType], key: str):
self.client = client
self.key = key
self.ns_key = self.client.build_key(key)
self._token = None
async def __aenter__(self):
return await self._acquire()
async def _acquire(self):
self._token = await self.client._gets(self.ns_key)
return self
async def __aexit__(self, exc_type, exc_value, traceback):
pass
async def cas(self, value: Any, **kwargs: Any) -> bool:
"""
Checks and sets the specified value for the locked key. If the value has changed
since the lock was created, it will raise an :class:`aiocache.lock.OptimisticLockError`
exception.
:raises: :class:`aiocache.lock.OptimisticLockError`
"""
success = await self.client.set(self.key, value, _cas_token=self._token, **kwargs)
if not success:
raise OptimisticLockError("Value has changed since the lock started")
return True
class OptimisticLockError(Exception):
"""
Raised when a conflict is found during an optimistic lock
"""