Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Extend StreamChangeCache to support multiple entities per stream ID #7303

Merged
merged 6 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7303.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix StreamChangeCache to work with multiple entities changing on the same stream id.
13 changes: 13 additions & 0 deletions stubs/sortedcontainers/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .sorteddict import (
SortedDict,
SortedKeysView,
SortedItemsView,
SortedValuesView,
)

__all__ = [
"SortedDict",
"SortedKeysView",
"SortedItemsView",
"SortedValuesView",
]
124 changes: 124 additions & 0 deletions stubs/sortedcontainers/sorteddict.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# stub for SortedDict. This is a lightly edited copy of
# https://github.com/grantjenks/python-sortedcontainers/blob/eea42df1f7bad2792e8da77335ff888f04b9e5ae/sortedcontainers/sorteddict.pyi
# (from https://github.com/grantjenks/python-sortedcontainers/pull/107)

from typing import (
Any,
Callable,
Dict,
Hashable,
Iterator,
Iterable,
ItemsView,
KeysView,
List,
Mapping,
Optional,
Sequence,
Type,
TypeVar,
Tuple,
Union,
ValuesView,
overload,
)

_T = TypeVar("_T")
_S = TypeVar("_S")
_T_h = TypeVar("_T_h", bound=Hashable)
_KT = TypeVar("_KT", bound=Hashable) # Key type.
_VT = TypeVar("_VT") # Value type.
_KT_co = TypeVar("_KT_co", covariant=True, bound=Hashable)
_VT_co = TypeVar("_VT_co", covariant=True)
_SD = TypeVar("_SD", bound=SortedDict)
_Key = Callable[[_T], Any]

class SortedDict(Dict[_KT, _VT]):
@overload
def __init__(self, **kwargs: _VT) -> None: ...
@overload
def __init__(self, __map: Mapping[_KT, _VT], **kwargs: _VT) -> None: ...
@overload
def __init__(
self, __iterable: Iterable[Tuple[_KT, _VT]], **kwargs: _VT
) -> None: ...
@overload
def __init__(self, __key: _Key[_KT], **kwargs: _VT) -> None: ...
@overload
def __init__(
self, __key: _Key[_KT], __map: Mapping[_KT, _VT], **kwargs: _VT
) -> None: ...
@overload
def __init__(
self, __key: _Key[_KT], __iterable: Iterable[Tuple[_KT, _VT]], **kwargs: _VT
) -> None: ...
@property
def key(self) -> Optional[_Key[_KT]]: ...
@property
def iloc(self) -> SortedKeysView[_KT]: ...
def clear(self) -> None: ...
def __delitem__(self, key: _KT) -> None: ...
def __iter__(self) -> Iterator[_KT]: ...
def __reversed__(self) -> Iterator[_KT]: ...
def __setitem__(self, key: _KT, value: _VT) -> None: ...
def _setitem(self, key: _KT, value: _VT) -> None: ...
def copy(self: _SD) -> _SD: ...
def __copy__(self: _SD) -> _SD: ...
@classmethod
@overload
def fromkeys(cls, seq: Iterable[_T_h]) -> SortedDict[_T_h, None]: ...
@classmethod
@overload
def fromkeys(cls, seq: Iterable[_T_h], value: _S) -> SortedDict[_T_h, _S]: ...
def keys(self) -> SortedKeysView[_KT]: ...
def items(self) -> SortedItemsView[_KT, _VT]: ...
def values(self) -> SortedValuesView[_VT]: ...
@overload
def pop(self, key: _KT) -> _VT: ...
@overload
def pop(self, key: _KT, default: _T = ...) -> Union[_VT, _T]: ...
def popitem(self, index: int = ...) -> Tuple[_KT, _VT]: ...
def peekitem(self, index: int = ...) -> Tuple[_KT, _VT]: ...
def setdefault(self, key: _KT, default: Optional[_VT] = ...) -> _VT: ...
@overload
def update(self, __map: Mapping[_KT, _VT], **kwargs: _VT) -> None: ...
@overload
def update(self, __iterable: Iterable[Tuple[_KT, _VT]], **kwargs: _VT) -> None: ...
@overload
def update(self, **kwargs: _VT) -> None: ...
def __reduce__(
self,
) -> Tuple[
Type[SortedDict[_KT, _VT]], Tuple[Callable[[_KT], Any], List[Tuple[_KT, _VT]]],
]: ...
def __repr__(self) -> str: ...
def _check(self) -> None: ...
def islice(
self, start: Optional[int] = ..., stop: Optional[int] = ..., reverse=bool,
) -> Iterator[_KT]: ...
def bisect_left(self, value: _KT) -> int: ...
def bisect_right(self, value: _KT) -> int: ...

class SortedKeysView(KeysView[_KT_co], Sequence[_KT_co]):
@overload
def __getitem__(self, index: int) -> _KT_co: ...
@overload
def __getitem__(self, index: slice) -> List[_KT_co]: ...
def __delitem__(self, index: Union[int, slice]) -> None: ...

class SortedItemsView( # type: ignore
ItemsView[_KT_co, _VT_co], Sequence[Tuple[_KT_co, _VT_co]]
):
def __iter__(self) -> Iterator[Tuple[_KT_co, _VT_co]]: ...
@overload
def __getitem__(self, index: int) -> Tuple[_KT_co, _VT_co]: ...
@overload
def __getitem__(self, index: slice) -> List[Tuple[_KT_co, _VT_co]]: ...
def __delitem__(self, index: Union[int, slice]) -> None: ...

class SortedValuesView(ValuesView[_VT_co], Sequence[_VT_co]):
@overload
def __getitem__(self, index: int) -> _VT_co: ...
@overload
def __getitem__(self, index: slice) -> List[_VT_co]: ...
def __delitem__(self, index: Union[int, slice]) -> None: ...
117 changes: 71 additions & 46 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import logging
from typing import Dict, Iterable, List, Mapping, Optional, Set

from six import integer_types

Expand All @@ -23,8 +24,11 @@

logger = logging.getLogger(__name__)

# for now, assume all entities in the cache are strings
EntityType = str

class StreamChangeCache(object):

class StreamChangeCache:
"""Keeps track of the stream positions of the latest change in a set of entities.

Typically the entity will be a room or user id.
Expand All @@ -34,10 +38,23 @@ class StreamChangeCache(object):
old then the cache will simply return all given entities.
"""

def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
def __init__(
self,
name: str,
current_stream_pos: int,
max_size=10000,
prefilled_cache: Optional[Mapping[EntityType, int]] = None,
):
self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
self._entity_to_key = {}
self._cache = SortedDict()
self._entity_to_key = {} # type: Dict[EntityType, int]

# map from stream id to the a set of entities which changed at that stream id.
self._cache = SortedDict() # type: SortedDict[int, Set[EntityType]]

# the earliest stream_pos for which we can reliably answer
# get_all_entities_changed. In other words, one less than the earliest
# stream_pos for which we know _cache is valid.
#
self._earliest_known_stream_pos = current_stream_pos
self.name = name
self.metrics = caches.register_cache("cache", self.name, self._cache)
Expand All @@ -46,7 +63,7 @@ def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=Non
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)

def has_entity_changed(self, entity, stream_pos):
def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
"""Returns True if the entity may have been updated since stream_pos
"""
assert type(stream_pos) in integer_types
Expand All @@ -67,36 +84,31 @@ def has_entity_changed(self, entity, stream_pos):
self.metrics.inc_hits()
return False

def get_entities_changed(self, entities, stream_pos):
def get_entities_changed(
self, entities: Iterable[EntityType], stream_pos: int
) -> Set[EntityType]:
"""
Returns subset of entities that have had new things since the given
position. Entities unknown to the cache will be returned. If the
position is too old it will just return the given list.
"""
assert type(stream_pos) is int

if stream_pos >= self._earliest_known_stream_pos:
changed_entities = {
self._cache[k]
for k in self._cache.islice(start=self._cache.bisect_right(stream_pos))
}

result = changed_entities.intersection(entities)

changed_entities = self.get_all_entities_changed(stream_pos)
if changed_entities is not None:
result = set(changed_entities).intersection(entities)
self.metrics.inc_hits()
else:
result = set(entities)
self.metrics.inc_misses()

return result

def has_any_entity_changed(self, stream_pos):
def has_any_entity_changed(self, stream_pos: int) -> bool:
"""Returns if any entity has changed
"""
assert type(stream_pos) is int

if not self._cache:
# If we have no cache, nothing can have changed.
# If the cache is empty, nothing can have changed.
return False

if stream_pos >= self._earliest_known_stream_pos:
Expand All @@ -106,45 +118,58 @@ def has_any_entity_changed(self, stream_pos):
self.metrics.inc_misses()
return True

def get_all_entities_changed(self, stream_pos):
"""Returns all entites that have had new things since the given
def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]:
"""Returns all entities that have had new things since the given
position. If the position is too old it will return None.

Returns the entities in the order that they were changed.
"""
assert type(stream_pos) is int

if stream_pos >= self._earliest_known_stream_pos:
return [
self._cache[k]
for k in self._cache.islice(start=self._cache.bisect_right(stream_pos))
]
else:
if stream_pos < self._earliest_known_stream_pos:
return None

def entity_has_changed(self, entity, stream_pos):
changed_entities = [] # type: List[EntityType]

for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)):
changed_entities.extend(self._cache[k])
return changed_entities

def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
"""Informs the cache that the entity has been changed at the given
position.
"""
assert type(stream_pos) is int

# FIXME: add a sanity check here that we are not overwriting existing
# data in self._cache

if stream_pos > self._earliest_known_stream_pos:
old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
stream_pos = max(stream_pos, old_pos)
self._cache.pop(old_pos, None)
self._cache[stream_pos] = entity
self._entity_to_key[entity] = stream_pos

while len(self._cache) > self._max_size:
k, r = self._cache.popitem(0)
self._earliest_known_stream_pos = max(
k, self._earliest_known_stream_pos
)
self._entity_to_key.pop(r, None)

def get_max_pos_of_last_change(self, entity):
if stream_pos <= self._earliest_known_stream_pos:
return

old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
if old_pos >= stream_pos:
# nothing to do
return
e = self._cache[old_pos]
e.remove(entity)
if not e:
# cache at this point is now empty
del self._cache[old_pos]

e1 = self._cache.get(stream_pos)
if e1 is None:
e1 = self._cache[stream_pos] = set()
e1.add(entity)
self._entity_to_key[entity] = stream_pos

# if the cache is too big, remove entries
while len(self._cache) > self._max_size:
k, r = self._cache.popitem(0)
self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
for entity in r:
del self._entity_to_key[entity]

def get_max_pos_of_last_change(self, entity: EntityType) -> int:

"""Returns an upper bound of the stream id of the last change to an
entity.
"""
Expand Down
Loading