Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Propagate cache invalidates from workers to other workers. (#6748)
Browse files Browse the repository at this point in the history
Currently if a worker invalidates a cache it will be streamed to master, which then didn't forward those to other workers.
  • Loading branch information
erikjohnston authored Jan 27, 2020
1 parent f74d178 commit d5275fc
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 5 deletions.
1 change: 1 addition & 0 deletions changelog.d/6748.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Propagate cache invalidates from workers to other workers.
2 changes: 1 addition & 1 deletion synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ async def on_REMOVE_PUSHER(self, cmd):
await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)

async def on_INVALIDATE_CACHE(self, cmd):
self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
await self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)

async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.streamer.on_remote_server_up(cmd.data)
Expand Down
9 changes: 6 additions & 3 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import logging
import random
from typing import List
from typing import Any, List

from six import itervalues

Expand Down Expand Up @@ -271,11 +271,14 @@ async def on_remove_pusher(self, app_id, push_key, user_id):
self.notifier.on_new_replication_data()

@measure_func("repl.on_invalidate_cache")
def on_invalidate_cache(self, cache_func, keys):
async def on_invalidate_cache(self, cache_func: str, keys: List[Any]):
"""The client has asked us to invalidate a cache
"""
invalidate_cache_counter.inc()
getattr(self.store, cache_func).invalidate(tuple(keys))

# We invalidate the cache locally, but then also stream that to other
# workers.
await self.store.invalidate_cache_and_stream(cache_func, tuple(keys))

@measure_func("repl.on_user_ip")
async def on_user_ip(
Expand Down
22 changes: 21 additions & 1 deletion synapse/storage/data_stores/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import itertools
import logging
from typing import Any, Iterable, Optional
from typing import Any, Iterable, Optional, Tuple

from twisted.internet import defer

Expand All @@ -33,6 +33,26 @@


class CacheInvalidationStore(SQLBaseStore):
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
This should only be used to invalidate caches where slaves won't
otherwise know from other replication streams that the cache should
be invalidated.
"""
cache_func = getattr(self, cache_name, None)
if not cache_func:
return

cache_func.invalidate(keys)
await self.runInteraction(
"invalidate_cache_and_stream",
self._send_invalidation_to_replication,
cache_func.__name__,
keys,
)

def _invalidate_cache_and_stream(self, txn, cache_func, keys):
"""Invalidates the cache and adds it to the cache stream so slaves
will know to invalidate their caches.
Expand Down

0 comments on commit d5275fc

Please sign in to comment.