Skip to content

Commit

Permalink
feat: don't flush entire global cache on transient errors (#654)
Browse files Browse the repository at this point in the history
It turns out this was unnecessary and can cause problems of its own.

Fixes #649 #634
  • Loading branch information
Chris Rossi authored May 19, 2021
1 parent dc082fd commit cbf2d7d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 122 deletions.
15 changes: 4 additions & 11 deletions google/cloud/ndb/_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,15 @@ def wrap(wrapped):
def retry(wrapped, transient_errors):
@functools.wraps(wrapped)
@tasklets.tasklet
def retry_wrapper(*args, **kwargs):
def retry_wrapper(key, *args, **kwargs):
sleep_generator = core_retry.exponential_sleep_generator(0.1, 1)
attempts = 5
for sleep_time in sleep_generator: # pragma: NO BRANCH
# pragma is required because loop never exits normally, it only gets
# raised out of.
attempts -= 1
try:
result = yield wrapped(*args, **kwargs)
result = yield wrapped(key, *args, **kwargs)
raise tasklets.Return(result)
except transient_errors:
if not attempts:
Expand All @@ -163,7 +163,7 @@ def retry_wrapper(*args, **kwargs):

@functools.wraps(wrapped)
@tasklets.tasklet
def wrapper(*args, **kwargs):
def wrapper(key, *args, **kwargs):
cache = _global_cache()

is_read = read
Expand All @@ -177,17 +177,10 @@ def wrapper(*args, **kwargs):
function = wrapped

try:
if cache.clear_cache_soon:
warnings.warn("Clearing global cache...", RuntimeWarning)
cache.clear()
cache.clear_cache_soon = False

result = yield function(*args, **kwargs)
result = yield function(key, *args, **kwargs)
raise tasklets.Return(result)

except cache.transient_errors as error:
cache.clear_cache_soon = True

if strict:
raise

Expand Down
38 changes: 12 additions & 26 deletions google/cloud/ndb/global_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ class GlobalCache(object):
This should be overridden by subclasses.
"""

clear_cache_soon = False
strict_read = True
strict_write = True

Expand Down Expand Up @@ -267,13 +266,10 @@ class RedisCache(GlobalCache):
the application. If :data:`True`, connection errors during write will be
raised as exceptions in the application. Because write operations involve
cache invalidation, setting this to :data:`False` may allow other clients to
retrieve stale data from the cache. If there is a connection error, an
internal flag will be set to clear the cache the next time any method is
called on this object, to try and minimize the opportunity for clients to
read stale data from the cache. If :data:`True`, in the event of connection
errors, cache operations will be retried a number of times before eventually
raising the connection error to the application layer, if it does not
resolve after retrying. Setting this to :data:`True` will cause NDB
retrieve stale data from the cache. If :data:`True`, in the event of
connection errors, cache operations will be retried a number of times before
eventually raising the connection error to the application layer, if it does
not resolve after retrying. Setting this to :data:`True` will cause NDB
operations to take longer to complete if there are transient errors in the
cache layer. Default: :data:`True`.
"""
Expand Down Expand Up @@ -309,10 +305,7 @@ def from_environment(cls, strict_read=False, strict_write=True):
exception in the application. If :data:`True`, connection errors during
write will be raised as exceptions in the application. Because write
operations involve cache invalidation, setting this to :data:`False` may
allow other clients to retrieve stale data from the cache. If there is
a connection error, an internal flag will be set to clear the cache the
next time any method is called on this object, to try and minimize the
opportunity for clients to read stale data from the cache. If
allow other clients to retrieve stale data from the cache. If
:data:`True`, in the event of connection errors, cache operations will
be retried a number of times before eventually raising the connection
error to the application layer, if it does not resolve after retrying.
Expand Down Expand Up @@ -444,16 +437,12 @@ class MemcacheCache(GlobalCache):
exception in the application. If :data:`True`, connection errors during
write will be raised as exceptions in the application. Because write
operations involve cache invalidation, setting this to :data:`False` may
allow other clients to retrieve stale data from the cache. If there is
a connection error, an internal flag will be set to clear the cache the
next time any method is called on this object, to try and minimize the
opportunity for clients to read stale data from the cache. If
:data:`True`, in the event of connection errors, cache operations will
be retried a number of times before eventually raising the connection
error to the application layer, if it does not resolve after retrying.
Setting this to :data:`True` will cause NDB operations to take longer to
complete if there are transient errors in the cache layer. Default:
:data:`True`.
allow other clients to retrieve stale data from the cache. If :data:`True`,
in the event of connection errors, cache operations will be retried a number
of times before eventually raising the connection error to the application
layer, if it does not resolve after retrying. Setting this to :data:`True`
will cause NDB operations to take longer to complete if there are transient
errors in the cache layer. Default: :data:`True`.
"""

transient_errors = (
Expand Down Expand Up @@ -512,10 +501,7 @@ def from_environment(cls, max_pool_size=4, strict_read=False, strict_write=True)
exception in the application. If :data:`True`, connection errors during
write will be raised as exceptions in the application. Because write
operations involve cache invalidation, setting this to :data:`False` may
allow other clients to retrieve stale data from the cache. If there is
a connection error, an internal flag will be set to clear the cache the
next time any method is called on this object, to try and minimize the
opportunity for clients to read stale data from the cache. If
allow other clients to retrieve stale data from the cache. If
:data:`True`, in the event of connection errors, cache operations will
be retried a number of times before eventually raising the connection
error to the application layer, if it does not resolve after retrying.
Expand Down
99 changes: 14 additions & 85 deletions tests/unit/test__cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,69 +104,15 @@ def test_global_get(_batch, _global_cache):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(),
clear_cache_soon=False,
strict_read=False,
spec=("transient_errors", "clear_cache_soon", "strict_read"),
spec=("transient_errors", "strict_read"),
)

assert _cache.global_get(b"foo").result() == "hi mom!"
_batch.get_batch.assert_called_once_with(_cache._GlobalCacheGetBatch)
batch.add.assert_called_once_with(b"foo")


@pytest.mark.usefixtures("in_context")
@mock.patch("google.cloud.ndb._cache._global_cache")
@mock.patch("google.cloud.ndb._cache._batch")
def test_global_get_clear_cache_soon(_batch, _global_cache):
batch = _batch.get_batch.return_value
future = _future_result("hi mom!")
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(),
clear_cache_soon=True,
strict_read=False,
spec=("transient_errors", "clear_cache_soon", "clear", "strict_read"),
)

with warnings.catch_warnings(record=True) as logged:
assert _cache.global_get(b"foo").result() == "hi mom!"
assert len(logged) == 1

_batch.get_batch.assert_called_once_with(_cache._GlobalCacheGetBatch)
batch.add.assert_called_once_with(b"foo")
_global_cache.return_value.clear.assert_called_once_with()


@pytest.mark.usefixtures("in_context")
@mock.patch("google.cloud.ndb._cache._global_cache")
@mock.patch("google.cloud.ndb._cache._batch")
def test_global_get_clear_cache_soon_with_error(_batch, _global_cache):
"""Regression test for #633
https://github.com/googleapis/python-ndb/issues/633
"""

class TransientError(Exception):
pass

batch = _batch.get_batch.return_value
future = _future_result("hi mom!")
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(TransientError),
clear_cache_soon=True,
strict_read=False,
clear=mock.Mock(side_effect=TransientError("oops!"), spec=()),
spec=("transient_errors", "clear_cache_soon", "clear", "strict_read"),
)

with warnings.catch_warnings(record=True) as logged:
assert _cache.global_get(b"foo").result() is None
assert len(logged) == 2

_global_cache.return_value.clear.assert_called_once_with()


@pytest.mark.usefixtures("in_context")
@mock.patch("google.cloud.ndb.tasklets.sleep")
@mock.patch("google.cloud.ndb._cache._global_cache")
Expand All @@ -181,17 +127,15 @@ class TransientError(Exception):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(TransientError,),
clear_cache_soon=False,
strict_read=True,
spec=("transient_errors", "clear_cache_soon", "strict_read"),
spec=("transient_errors", "strict_read"),
)

with pytest.raises(TransientError):
_cache.global_get(b"foo").result()

_batch.get_batch.assert_called_with(_cache._GlobalCacheGetBatch)
batch.add.assert_called_with(b"foo")
assert _global_cache.return_value.clear_cache_soon is True


@pytest.mark.usefixtures("in_context")
Expand All @@ -210,15 +154,13 @@ class TransientError(Exception):
]
_global_cache.return_value = mock.Mock(
transient_errors=(TransientError,),
clear_cache_soon=False,
strict_read=True,
spec=("transient_errors", "clear_cache_soon", "strict_read"),
spec=("transient_errors", "strict_read"),
)

assert _cache.global_get(b"foo").result() == "hi mom!"
_batch.get_batch.assert_called_with(_cache._GlobalCacheGetBatch)
batch.add.assert_called_with(b"foo")
assert _global_cache.return_value.clear_cache_soon is False


@pytest.mark.usefixtures("in_context")
Expand All @@ -233,9 +175,8 @@ class TransientError(Exception):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(TransientError,),
clear_cache_soon=False,
strict_read=False,
spec=("transient_errors", "clear_cache_soon", "strict_read"),
spec=("transient_errors", "strict_read"),
)

with warnings.catch_warnings(record=True) as logged:
Expand Down Expand Up @@ -325,9 +266,8 @@ def test_without_expires(_batch, _global_cache):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(),
clear_cache_soon=False,
strict_write=False,
spec=("transient_errors", "clear_cache_soon", "strict_write"),
spec=("transient_errors", "strict_write"),
)

assert _cache.global_set(b"key", b"value").result() == "hi mom!"
Expand All @@ -348,16 +288,14 @@ class TransientError(Exception):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(TransientError,),
clear_cache_soon=False,
spec=("transient_errors", "clear_cache_soon", "strict_write"),
spec=("transient_errors", "strict_write"),
)

with pytest.raises(TransientError):
_cache.global_set(b"key", b"value").result()

_batch.get_batch.assert_called_with(_cache._GlobalCacheSetBatch, {})
batch.add.assert_called_with(b"key", b"value")
assert _global_cache.return_value.clear_cache_soon is True

@staticmethod
@mock.patch("google.cloud.ndb._cache._global_cache")
Expand All @@ -373,9 +311,8 @@ class TransientError(Exception):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(TransientError,),
clear_cache_soon=False,
strict_write=False,
spec=("transient_errors", "clear_cache_soon", "strict_write"),
spec=("transient_errors", "strict_write"),
)

with warnings.catch_warnings(record=True) as logged:
Expand All @@ -384,7 +321,6 @@ class TransientError(Exception):

_batch.get_batch.assert_called_once_with(_cache._GlobalCacheSetBatch, {})
batch.add.assert_called_once_with(b"key", b"value")
assert _global_cache.return_value.clear_cache_soon is True

@staticmethod
@mock.patch("google.cloud.ndb._cache._global_cache")
Expand All @@ -395,9 +331,8 @@ def test_with_expires(_batch, _global_cache):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(),
clear_cache_soon=False,
strict_write=False,
spec=("transient_errors", "clear_cache_soon", "strict_write"),
spec=("transient_errors", "strict_write"),
)

future = _cache.global_set(b"key", b"value", expires=5)
Expand Down Expand Up @@ -475,9 +410,8 @@ def test_global_delete(_batch, _global_cache):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(),
clear_cache_soon=False,
strict_write=False,
spec=("transient_errors", "clear_cache_soon", "strict_write"),
spec=("transient_errors", "strict_write"),
)

assert _cache.global_delete(b"key").result() == "hi mom!"
Expand Down Expand Up @@ -511,9 +445,8 @@ def test_global_watch(_batch, _global_cache):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(),
clear_cache_soon=False,
strict_read=False,
spec=("transient_errors", "clear_cache_soon", "strict_read"),
spec=("transient_errors", "strict_read"),
)

assert _cache.global_watch(b"key").result() == "hi mom!"
Expand Down Expand Up @@ -547,9 +480,8 @@ def test_global_unwatch(_batch, _global_cache):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(),
clear_cache_soon=False,
strict_write=False,
spec=("transient_errors", "clear_cache_soon", "strict_write"),
spec=("transient_errors", "strict_write"),
)

assert _cache.global_unwatch(b"key").result() == "hi mom!"
Expand Down Expand Up @@ -585,9 +517,8 @@ def test_without_expires(_batch, _global_cache):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(),
clear_cache_soon=False,
strict_read=False,
spec=("transient_errors", "clear_cache_soon", "strict_read"),
spec=("transient_errors", "strict_read"),
)

future = _cache.global_compare_and_swap(b"key", b"value")
Expand All @@ -606,9 +537,8 @@ def test_with_expires(_batch, _global_cache):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(),
clear_cache_soon=False,
strict_read=False,
spec=("transient_errors", "clear_cache_soon", "strict_read"),
spec=("transient_errors", "strict_read"),
)

future = _cache.global_compare_and_swap(b"key", b"value", expires=5)
Expand Down Expand Up @@ -668,9 +598,8 @@ def test_global_lock(_batch, _global_cache):
batch.add.return_value = future
_global_cache.return_value = mock.Mock(
transient_errors=(),
clear_cache_soon=False,
strict_write=False,
spec=("transient_errors", "clear_cache_soon", "strict_write"),
spec=("transient_errors", "strict_write"),
)

assert _cache.global_lock(b"key").result() == "hi mom!"
Expand Down

0 comments on commit cbf2d7d

Please sign in to comment.