Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Propagate cache invalidates from workers to other workers. (#6748)
Browse files Browse the repository at this point in the history
* commit 'd5275fc55':
  Propagate cache invalidates from workers to other workers. (#6748)
  • Loading branch information
anoadragon453 committed Mar 23, 2020
2 parents 645a30b + d5275fc commit c83cd65
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 5 deletions.
1 change: 1 addition & 0 deletions changelog.d/6748.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Propagate cache invalidates from workers to other workers.
2 changes: 1 addition & 1 deletion synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ async def on_REMOVE_PUSHER(self, cmd):
await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)

async def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
await self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)

async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.streamer.on_remote_server_up(cmd.data)
Expand Down
9 changes: 6 additions & 3 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import logging
import random
from typing import List
from typing import Any, List

from six import itervalues

Expand Down Expand Up @@ -271,11 +271,14 @@ async def on_remove_pusher(self, app_id, push_key, user_id):
self.notifier.on_new_replication_data()

@measure_func("repl.on_invalidate_cache")
def on_invalidate_cache(self, cache_func, keys):
async def on_invalidate_cache(self, cache_func: str, keys: List[Any]):
"""The client has asked us to invalidate a cache
"""
invalidate_cache_counter.inc()
getattr(self.store, cache_func).invalidate(tuple(keys))

# We invalidate the cache locally, but then also stream that to other
# workers.
await self.store.invalidate_cache_and_stream(cache_func, tuple(keys))

@measure_func("repl.on_user_ip")
async def on_user_ip(
Expand Down
22 changes: 21 additions & 1 deletion synapse/storage/data_stores/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import itertools
import logging
from typing import Any, Iterable, Optional
from typing import Any, Iterable, Optional, Tuple

from twisted.internet import defer

Expand All @@ -33,6 +33,26 @@


class CacheInvalidationStore(SQLBaseStore):
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
This should only be used to invalidate caches where slaves won't
otherwise know from other replication streams that the cache should
be invalidated.
"""
cache_func = getattr(self, cache_name, None)
if not cache_func:
return

cache_func.invalidate(keys)
await self.runInteraction(
"invalidate_cache_and_stream",
self._send_invalidation_to_replication,
cache_func.__name__,
keys,
)

def _invalidate_cache_and_stream(self, txn, cache_func, keys):
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
Expand Down

0 comments on commit c83cd65

Please sign in to comment.