Skip to content

Commit

Permalink
* Reworked ability to configure UpdateStatuses (for instance to upd…
Browse files Browse the repository at this point in the history
…ate lock duration)

  * Split default implementation into an interface and a default implementation
  * Added docs & updated version
  • Loading branch information
zmumi committed Sep 30, 2024
1 parent cbbe415 commit 8aa8573
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 49 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
3.1.0
-----

* Added ability to configure `UpdateStatuses` (for instance to update lock duration)
* Split default implementation into an interface and a default implementation

3.0.0
-----

Expand Down
17 changes: 9 additions & 8 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,15 @@ Example how to customize default config (everything gets overridden):
@memoize(
configuration=MutableCacheConfiguration
.initialized_with(DefaultInMemoryCacheConfiguration())
.set_method_timeout(value=timedelta(minutes=2))
.set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2),
expire_after=timedelta(minutes=5)))
.set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048))
.set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False))
.set_storage(LocalInMemoryCacheStorage())
.set_postprocessing(DeepcopyPostprocessing())
.initialized_with(DefaultInMemoryCacheConfiguration())
.set_method_timeout(value=timedelta(minutes=2))
.set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2),
expire_after=timedelta(minutes=5)))
.set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048))
.set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False))
.set_storage(LocalInMemoryCacheStorage())
.set_postprocessing(DeepcopyPostprocessing()),
update_statuses=InMemoryLocks(update_lock_timeout=timedelta(minutes=5))
)
async def cached():
return 'dummy'
Expand Down
18 changes: 10 additions & 8 deletions examples/configuration/custom_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
from memoize.eviction import LeastRecentlyUpdatedEvictionStrategy
from memoize.key import EncodedMethodNameAndArgsKeyExtractor
from memoize.postprocessing import DeepcopyPostprocessing
from memoize.statuses import InMemoryLocks
from memoize.storage import LocalInMemoryCacheStorage
from memoize.wrapper import memoize


@memoize(
configuration=MutableCacheConfiguration
.initialized_with(DefaultInMemoryCacheConfiguration())
.set_method_timeout(value=timedelta(minutes=2))
.set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2),
expire_after=timedelta(minutes=5)))
.set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048))
.set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False))
.set_storage(LocalInMemoryCacheStorage())
.set_postprocessing(DeepcopyPostprocessing())
.initialized_with(DefaultInMemoryCacheConfiguration())
.set_method_timeout(value=timedelta(minutes=2))
.set_entry_builder(ProvidedLifeSpanCacheEntryBuilder(update_after=timedelta(minutes=2),
expire_after=timedelta(minutes=5)))
.set_eviction_strategy(LeastRecentlyUpdatedEvictionStrategy(capacity=2048))
.set_key_extractor(EncodedMethodNameAndArgsKeyExtractor(skip_first_arg_as_self=False))
.set_storage(LocalInMemoryCacheStorage())
.set_postprocessing(DeepcopyPostprocessing()),
update_statuses=InMemoryLocks(update_lock_timeout=timedelta(minutes=5))
)
async def cached():
return 'dummy'
53 changes: 38 additions & 15 deletions memoize/statuses.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,61 @@
"""
[Internal use only] Encapsulates update state management.
[API] Encapsulates update state management.
"""
import asyncio
import datetime
import logging
from abc import ABCMeta, abstractmethod
from asyncio import Future
from typing import Dict, Awaitable, Union

from memoize.entry import CacheKey, CacheEntry


class UpdateStatuses:
class UpdateStatuses(metaclass=ABCMeta):
@abstractmethod
def is_being_updated(self, key: CacheKey) -> bool:
"""Check if update for given key is in progress. Obtained info is valid until control gets back to IO-loop."""
raise NotImplementedError()

@abstractmethod
def mark_being_updated(self, key: CacheKey) -> None:
"""Inform that update has been started.
Should be called only if 'is_being_updated' returned False (and since then IO-loop has not been lost)..
Calls to 'is_being_updated' will return True until 'mark_updated' will be called."""
raise NotImplementedError()

def mark_updated(self, key: CacheKey, entry: CacheEntry) -> None:
"""Inform that update has been finished.
Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called."""
raise NotImplementedError()

@abstractmethod
def mark_update_aborted(self, key: CacheKey, exception: Exception) -> None:
"""Inform that update failed to complete.
Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called.
Accepts exception to propagate it across all clients awaiting an update."""
raise NotImplementedError()

@abstractmethod
def await_updated(self, key: CacheKey) -> Awaitable[Union[CacheEntry, Exception]]:
"""Wait (asynchronously) until update in progress has benn finished.
Returns awaitable with the updated entry
(or awaitable with an exception if update failed/timed-out).
Should be called only if 'is_being_updated' returned True (and since then IO-loop has not been lost)."""
raise NotImplementedError()


class InMemoryLocks(UpdateStatuses):
"""Manages in-memory locks to prevent dog-piling. """
def __init__(self, update_lock_timeout: datetime.timedelta = datetime.timedelta(minutes=5)) -> None:
self.logger = logging.getLogger(__name__)
self._update_lock_timeout = update_lock_timeout
self._updates_in_progress: Dict[CacheKey, Future] = {}

def is_being_updated(self, key: CacheKey) -> bool:
"""Checks if update for given key is in progress. Obtained info is valid until control gets back to IO-loop."""
return key in self._updates_in_progress

def mark_being_updated(self, key: CacheKey) -> None:
"""Informs that update has been started.
Should be called only if 'is_being_updated' returned False (and since then IO-loop has not been lost)..
Calls to 'is_being_updated' will return True until 'mark_updated' will be called."""
if key in self._updates_in_progress:
raise ValueError('Key {} is already being updated'.format(key))

Expand All @@ -42,27 +74,18 @@ def complete_on_timeout_passed():
callback=complete_on_timeout_passed)

def mark_updated(self, key: CacheKey, entry: CacheEntry) -> None:
"""Informs that update has been finished.
Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called."""
if key not in self._updates_in_progress:
raise ValueError('Key {} is not being updated'.format(key))
update = self._updates_in_progress.pop(key)
update.set_result(entry)

def mark_update_aborted(self, key: CacheKey, exception: Exception) -> None:
"""Informs that update failed to complete.
Calls to 'is_being_updated' will return False until 'mark_being_updated' will be called.
Accepts exception to propagate it across all clients awaiting an update."""
if key not in self._updates_in_progress:
raise ValueError('Key {} is not being updated'.format(key))
update = self._updates_in_progress.pop(key)
update.set_result(exception)

def await_updated(self, key: CacheKey) -> Awaitable[Union[CacheEntry, Exception]]:
"""Waits (asynchronously) until update in progress has benn finished.
Returns awaitable with the updated entry
(or awaitable with an exception if update failed/timed-out).
Should be called only if 'is_being_updated' returned True (and since then IO-loop has not been lost)."""
if not self.is_being_updated(key):
raise ValueError('Key {} is not being updated'.format(key))
return self._updates_in_progress[key]
36 changes: 21 additions & 15 deletions memoize/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
from memoize.entry import CacheKey, CacheEntry
from memoize.exceptions import CachedMethodFailedException
from memoize.invalidation import InvalidationSupport
from memoize.statuses import UpdateStatuses
from memoize.statuses import UpdateStatuses, InMemoryLocks


def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration = None,
invalidation: InvalidationSupport = None, update_status_tracker: Optional[UpdateStatuses] = None):
invalidation: InvalidationSupport = None, update_statuses: UpdateStatuses = None):
"""Wraps function with memoization.
If entry reaches time it should be updated, refresh is performed in background,
Expand All @@ -41,7 +41,8 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration
:param function method: function to be decorated
:param CacheConfiguration configuration: cache configuration; default: DefaultInMemoryCacheConfiguration
:param InvalidationSupport invalidation: pass created instance of InvalidationSupport to have it configured
:param UpdateStatuses update_status_tracker: optional precreated state tracker to allow observability of this state or non-default update lock timeout
:param UpdateStatuses update_statuses: allows to override how cache updates are tracked (e.g. lock config);
default: InMemoryStatuses
:raises: CachedMethodFailedException upon call: if cached method timed-out or thrown an exception
:raises: NotConfiguredCacheCalledException upon call: if provided configuration is not ready
Expand All @@ -50,19 +51,24 @@ def memoize(method: Optional[Callable] = None, configuration: CacheConfiguration
if method is None:
if configuration is None:
configuration = DefaultInMemoryCacheConfiguration()
return functools.partial(memoize, configuration=configuration, invalidation=invalidation, update_status_tracker=update_status_tracker)
return functools.partial(
memoize,
configuration=configuration,
invalidation=invalidation,
update_statuses=update_statuses
)

if invalidation is not None and not invalidation._initialized() and configuration is not None:
invalidation._initialize(configuration.storage(), configuration.key_extractor(), method)

logger = logging.getLogger('{}@{}'.format(memoize.__name__, method.__name__))
logger.debug('wrapping %s with memoization - configuration: %s', method.__name__, configuration)

if update_status_tracker is None:
update_status_tracker = UpdateStatuses()
if update_statuses is None:
update_statuses = InMemoryLocks()

async def try_release(key: CacheKey, configuration_snapshot: CacheConfiguration) -> bool:
if update_status_tracker.is_being_updated(key):
if update_statuses.is_being_updated(key):
return False
try:
await configuration_snapshot.storage().release(key)
Expand All @@ -76,24 +82,24 @@ async def try_release(key: CacheKey, configuration_snapshot: CacheConfiguration)
async def refresh(actual_entry: Optional[CacheEntry], key: CacheKey,
value_future_provider: Callable[[], asyncio.Future],
configuration_snapshot: CacheConfiguration):
if actual_entry is None and update_status_tracker.is_being_updated(key):
if actual_entry is None and update_statuses.is_being_updated(key):
logger.debug('As entry expired, waiting for results of concurrent refresh %s', key)
entry = await update_status_tracker.await_updated(key)
entry = await update_statuses.await_updated(key)
if isinstance(entry, Exception):
raise CachedMethodFailedException('Concurrent refresh failed to complete') from entry
return entry
elif actual_entry is not None and update_status_tracker.is_being_updated(key):
elif actual_entry is not None and update_statuses.is_being_updated(key):
logger.debug('As update point reached but concurrent update already in progress, '
'relying on concurrent refresh to finish %s', key)
return actual_entry
elif not update_status_tracker.is_being_updated(key):
update_status_tracker.mark_being_updated(key)
elif not update_statuses.is_being_updated(key):
update_statuses.mark_being_updated(key)
try:
value_future = value_future_provider()
value = await value_future
offered_entry = configuration_snapshot.entry_builder().build(key, value)
await configuration_snapshot.storage().offer(key, offered_entry)
update_status_tracker.mark_updated(key, offered_entry)
update_statuses.mark_updated(key, offered_entry)
logger.debug('Successfully refreshed cache for key %s', key)

eviction_strategy = configuration_snapshot.eviction_strategy()
Expand All @@ -108,11 +114,11 @@ async def refresh(actual_entry: Optional[CacheEntry], key: CacheKey,
return offered_entry
except asyncio.TimeoutError as e:
logger.debug('Timeout for %s: %s', key, e)
update_status_tracker.mark_update_aborted(key, e)
update_statuses.mark_update_aborted(key, e)
raise CachedMethodFailedException('Refresh timed out') from e
except Exception as e:
logger.debug('Error while refreshing cache for %s: %s', key, e)
update_status_tracker.mark_update_aborted(key, e)
update_statuses.mark_update_aborted(key, e)
raise CachedMethodFailedException('Refresh failed to complete') from e

@functools.wraps(method)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def prepare_description():

setup(
name='py-memoize',
version='3.0.0',
version='3.1.0',
author='Michal Zmuda',
author_email='zmu.michal@gmail.com',
url='https://github.com/DreamLab/memoize',
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_statuses.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

from datetime import timedelta

from memoize.statuses import UpdateStatuses
from memoize.statuses import InMemoryLocks, UpdateStatuses


@pytest.mark.asyncio(scope="class")
class TestStatuses:

def setup_method(self):
self.update_statuses = UpdateStatuses()
self.update_statuses: UpdateStatuses = InMemoryLocks()

async def test_should_not_be_updating(self):
# given/when/then
Expand Down

0 comments on commit 8aa8573

Please sign in to comment.