Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Rename Cache to DeferredCache, and related changes #8548

Merged
merged 5 commits into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self, database: DatabasePool, db_conn, hs):

self.client_ip_last_seen = Cache(
name="client_ip_last_seen", keylen=4, max_entries=50000
)
) # type: Cache[tuple, int]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mypy grumbled about this once I added the Generic params to Cache.


async def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
now = int(self._clock.time_msec())
Expand Down
81 changes: 59 additions & 22 deletions synapse/util/caches/descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import enum
import functools
import inspect
import logging
import threading
from typing import Any, Callable, Generic, Optional, Tuple, TypeVar, Union, cast
from typing import (
Any,
Callable,
Generic,
Iterable,
MutableMapping,
Optional,
Tuple,
TypeVar,
Union,
cast,
)
from weakref import WeakValueDictionary

from prometheus_client import Gauge
Expand All @@ -38,6 +49,8 @@
CacheKey = Union[Tuple, Any]

F = TypeVar("F", bound=Callable[..., Any])
KT = TypeVar("KT")
VT = TypeVar("VT")


class _CachedFunction(Generic[F]):
Expand All @@ -61,13 +74,19 @@ class _CachedFunction(Generic[F]):
["name"],
)

_CacheSentinel = object()

class _Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup.
sentinel = object()


class CacheEntry:
__slots__ = ["deferred", "callbacks", "invalidated"]

def __init__(self, deferred, callbacks):
def __init__(
self, deferred: ObservableDeferred, callbacks: Iterable[Callable[[], None]]
):
self.deferred = deferred
self.callbacks = set(callbacks)
self.invalidated = False
Expand All @@ -80,7 +99,13 @@ def invalidate(self):
self.callbacks.clear()


class Cache:
class Cache(Generic[KT, VT]):
"""Wraps an LruCache, adding support for Deferred results.

It expects that each entry added with set() will be a Deferred; likewise get()
may return an ObservableDeferred.
"""

__slots__ = (
"cache",
"name",
Expand All @@ -103,19 +128,23 @@ def __init__(
Args:
name: The name of the cache
max_entries: Maximum amount of entries that the cache will hold
keylen: The length of the tuple used as the cache key
keylen: The length of the tuple used as the cache key. Ignored unless
`tree` is True.
tree: Use a TreeCache instead of a dict as the underlying cache type
iterable: If True, count each item in the cached object as an entry,
rather than each cached object
apply_cache_factor_from_config: Whether cache factors specified in the
config file affect `max_entries`

Returns:
Cache
"""
cache_type = TreeCache if tree else dict
self._pending_deferred_cache = cache_type()

# _pending_deferred_cache maps from the key value to a `CacheEntry` object.
self._pending_deferred_cache = (
cache_type()
) # type: MutableMapping[KT, CacheEntry]

# cache is used for completed results and maps to the result itself, rather than
# a Deferred.
self.cache = LruCache(
max_size=max_entries,
keylen=keylen,
Expand Down Expand Up @@ -155,7 +184,13 @@ def check_thread(self):
"Cache objects can only be accessed from the main thread"
)

def get(self, key, default=_CacheSentinel, callback=None, update_metrics=True):
def get(
self,
key: KT,
default=_Sentinel.sentinel,
callback: Optional[Callable[[], None]] = None,
update_metrics: bool = True,
):
"""Looks the key up in the caches.

Args:
Expand All @@ -166,30 +201,32 @@ def get(self, key, default=_CacheSentinel, callback=None, update_metrics=True):
update_metrics (bool): whether to update the cache hit rate metrics

Returns:
Either an ObservableDeferred or the raw result
Either an ObservableDeferred or the result itself
"""
callbacks = [callback] if callback else []
val = self._pending_deferred_cache.get(key, _CacheSentinel)
if val is not _CacheSentinel:
val = self._pending_deferred_cache.get(key, _Sentinel.sentinel)
if val is not _Sentinel.sentinel:
val.callbacks.update(callbacks)
if update_metrics:
self.metrics.inc_hits()
return val.deferred

val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
if val is not _CacheSentinel:
val = self.cache.get(key, _Sentinel.sentinel, callbacks=callbacks)
if val is not _Sentinel.sentinel:
self.metrics.inc_hits()
return val

if update_metrics:
self.metrics.inc_misses()

if default is _CacheSentinel:
if default is _Sentinel.sentinel:
raise KeyError()
else:
return default

def set(self, key, value, callback=None):
def set(
self, key: KT, value: defer.Deferred, callback: Optional[Callable[[], None]] = None
) -> ObservableDeferred:
if not isinstance(value, defer.Deferred):
raise TypeError("not a Deferred")

Expand Down Expand Up @@ -248,7 +285,7 @@ def eb(_fail):
observer.addCallbacks(cb, eb)
return observable

def prefill(self, key, value, callback=None):
def prefill(self, key: KT, value: VT, callback: Callable[[], None] = None):
callbacks = [callback] if callback else []
self.cache.set(key, value, callbacks=callbacks)

Expand All @@ -267,15 +304,15 @@ def invalidate(self, key):
if entry:
entry.invalidate()

def invalidate_many(self, key):
def invalidate_many(self, key: KT):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError("The cache key must be a tuple not %r" % (type(key),))
self.cache.del_multi(key)

# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, as above
entry_dict = self._pending_deferred_cache.pop(key, None)
entry_dict = self._pending_deferred_cache.pop(cast(KT, key), None)
if entry_dict is not None:
for entry in iterate_tree_cache_entry(entry_dict):
entry.invalidate()
Expand Down Expand Up @@ -396,7 +433,7 @@ def __get__(self, obj, owner):
keylen=self.num_args,
tree=self.tree,
iterable=self.iterable,
)
) # type: Cache[Tuple, Any]

def get_cache_key_gen(args, kwargs):
"""Given some args/kwargs return a generator that resolves into
Expand Down
3 changes: 2 additions & 1 deletion synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ def __init__(
Args:
max_size: The maximum amount of entries the cache can hold

keylen: The length of the tuple used as the cache key
keylen: The length of the tuple used as the cache key. Ignored unless
cache_type is `TreeCache`.

cache_type (type):
type of underlying cache to be used. Typically one of dict
Expand Down