Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Propagate cache invalidates from workers to other workers. #6748

Merged
merged 3 commits into from
Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6748.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Propagate cache invalidates from workers to other workers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this fix an actual bug we can cite here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, it was all best effort anyway. Annoyingly this doesn't automatically fix the destination retry cache stuff as we don't stream invalidations for that currently

2 changes: 1 addition & 1 deletion synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ async def on_REMOVE_PUSHER(self, cmd):
await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)

async def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
await self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)

async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.streamer.on_remote_server_up(cmd.data)
Expand Down
9 changes: 6 additions & 3 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import logging
import random
from typing import List
from typing import Any, List

from six import itervalues

Expand Down Expand Up @@ -271,11 +271,14 @@ async def on_remove_pusher(self, app_id, push_key, user_id):
self.notifier.on_new_replication_data()

@measure_func("repl.on_invalidate_cache")
def on_invalidate_cache(self, cache_func, keys):
async def on_invalidate_cache(self, cache_func: str, keys: List[Any]):
"""The client has asked us to invalidate a cache
"""
invalidate_cache_counter.inc()
getattr(self.store, cache_func).invalidate(tuple(keys))

# We invalidate the cache locally, but then also stream that to other
# workers.
await self.store.invalidate_cache_and_stream(cache_func, tuple(keys))

@measure_func("repl.on_user_ip")
async def on_user_ip(
Expand Down
22 changes: 21 additions & 1 deletion synapse/storage/data_stores/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import itertools
import logging
from typing import Any, Iterable, Optional
from typing import Any, Iterable, Optional, Tuple

from twisted.internet import defer

Expand All @@ -33,6 +33,26 @@


class CacheInvalidationStore(SQLBaseStore):
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
This should only be used to invalidate caches where slaves won't
otherwise know from other replication streams that the cache should
be invalidated.
"""
cache_func = getattr(self, cache_name, None)
if not cache_func:
return

cache_func.invalidate(keys)
await self.runInteraction(
"invalidate_cache_and_stream",
self._send_invalidation_to_replication,
cache_func.__name__,
keys,
)

def _invalidate_cache_and_stream(self, txn, cache_func, keys):
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
Expand Down