From bb150be1ad7922b88057b713bdd81e369aa7c6c0 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 15 Nov 2021 12:57:39 +0000 Subject: [PATCH 01/17] Annotate get_all_updates_caches_txn --- synapse/storage/databases/main/cache.py | 30 ++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 36e8422fc63b..e6098152ec6a 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -25,7 +25,11 @@ EventsStreamEventRow, ) from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) from synapse.storage.engines import PostgresEngine from synapse.util.iterutils import batch_iter @@ -39,16 +43,24 @@ # based on the current state when notifying workers over replication. CURRENT_STATE_CACHE_NAME = "cs_cache_fake" +# Corresponds to the (cache_func, keys, invalidation_ts) db columns. +_CacheData = Tuple[str, Optional[List[str]], Optional[int]] + class CacheInvalidationWorkerStore(SQLBaseStore): - def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): super().__init__(database, db_conn, hs) self._instance_name = hs.get_instance_name() async def get_all_updated_caches( self, instance_name: str, last_id: int, current_id: int, limit: int - ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + ) -> Tuple[List[Tuple[int, _CacheData]], int, bool]: """Get updates for caches replication stream. Args: @@ -73,7 +85,9 @@ async def get_all_updated_caches( if last_id == current_id: return [], current_id, False - def get_all_updated_caches_txn(txn): + def get_all_updated_caches_txn( + txn: LoggingTransaction, + ) -> Tuple[List[Tuple[int, _CacheData]], int, bool]: # We purposefully don't bound by the current token, as we want to # send across cache invalidations as quickly as possible. Cache # invalidations are idempotent, so duplicates are fine. @@ -85,7 +99,13 @@ def get_all_updated_caches_txn(txn): LIMIT ? """ txn.execute(sql, (last_id, instance_name, limit)) - updates = [(row[0], row[1:]) for row in txn] + updates: List[Tuple[int, _CacheData]] = [] + row: Tuple[int, str, Optional[List[str]], Optional[int]] + # Type saftey: iterating over `txn` yields `Tuple`, i.e. + # `Tuple[Any, ...]` of arbitrary length. Mypy detects assigning a + # variadic tuple to a fixed length tuple and flags it up as an error. + for row in txn: # type: ignore[assignment] + updates.append((row[0], row[1:])) limited = False upto_token = current_id if len(updates) >= limit: From e140beeb569449ca2b8107572564c9e7092666e3 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 15 Nov 2021 13:15:59 +0000 Subject: [PATCH 02/17] Easier function annotations --- synapse/storage/databases/main/cache.py | 27 ++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index e6098152ec6a..32d2fa1b395d 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -15,7 +15,7 @@ import itertools import logging -from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple from synapse.api.constants import EventTypes from synapse.replication.tcp.streams import BackfillStream, CachesStream @@ -31,6 +31,7 @@ LoggingTransaction, ) from synapse.storage.engines import PostgresEngine +from synapse.util.caches.descriptors import _CachedFunction from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -212,7 +213,9 @@ def _invalidate_caches_for_event( self.get_aggregation_groups_for_event.invalidate((relates_to,)) self.get_applicable_edit.invalidate((relates_to,)) - async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]): + async def invalidate_cache_and_stream( + self, cache_name: str, keys: Tuple[str, ...] + ) -> None: """Invalidates the cache and adds it to the cache stream so slaves will know to invalidate their caches. @@ -232,7 +235,9 @@ async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, .. keys, ) - def _invalidate_cache_and_stream(self, txn, cache_func, keys): + def _invalidate_cache_and_stream( + self, txn: LoggingTransaction, cache_func: _CachedFunction, keys: Iterable[str] + ) -> None: """Invalidates the cache and adds it to the cache stream so slaves will know to invalidate their caches. @@ -243,7 +248,9 @@ def _invalidate_cache_and_stream(self, txn, cache_func, keys): txn.call_after(cache_func.invalidate, keys) self._send_invalidation_to_replication(txn, cache_func.__name__, keys) - def _invalidate_all_cache_and_stream(self, txn, cache_func): + def _invalidate_all_cache_and_stream( + self, txn: LoggingTransaction, cache_func: _CachedFunction + ) -> None: """Invalidates the entire cache and adds it to the cache stream so slaves will know to invalidate their caches. """ @@ -251,7 +258,9 @@ def _invalidate_all_cache_and_stream(self, txn, cache_func): txn.call_after(cache_func.invalidate_all) self._send_invalidation_to_replication(txn, cache_func.__name__, None) - def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed): + def _invalidate_state_caches_and_stream( + self, txn: LoggingTransaction, room_id: str, members_changed: Collection[str] + ) -> None: """Special case invalidation of caches based on current state. We special case this so that we can batch the cache invalidations into a @@ -259,8 +268,8 @@ def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed): Args: txn - room_id (str): Room where state changed - members_changed (iterable[str]): The user_ids of members that have changed + room_id: Room where state changed + members_changed: The user_ids of members that have changed """ txn.call_after(self._invalidate_state_caches, room_id, members_changed) @@ -282,8 +291,8 @@ def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed): ) def _send_invalidation_to_replication( - self, txn, cache_name: str, keys: Optional[Iterable[Any]] - ): + self, txn: LoggingTransaction, cache_name: str, keys: Optional[Iterable[str]] + ) -> None: """Notifies replication that given cache has been invalidated. Note that this does *not* invalidate the cache locally. From 9a316ecd3cee5225bf7b8510a531da1567b23e3f Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 15 Nov 2021 13:58:26 +0000 Subject: [PATCH 03/17] SCARY: Pull in EventFederationWorkerStore for e.g. get_latest_event_ids_in_room Needed on CacheInvalidationWorkerStore for many functions, e.g. `have_seen_event`. Fixup MRO in CensorEventsStore. The BaseSlavedStore is trickier. The removals should be safe because the removed classes are in parents, via SlavedEventStore -> BaseSlavedStore -> CacheInvalidationWorkerStore --- synapse/replication/slave/storage/events.py | 3 --- synapse/storage/databases/main/cache.py | 4 ++-- synapse/storage/databases/main/censor_events.py | 2 +- .../storage/databases/main/events_forward_extremities.py | 6 +----- 4 files changed, 4 insertions(+), 11 deletions(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 63ed50caa5eb..6080d9fb293f 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -47,13 +47,10 @@ class SlavedEventStore( - EventFederationWorkerStore, RoomMemberWorkerStore, EventPushActionsWorkerStore, StreamWorkerStore, StateGroupWorkerStore, - EventsWorkerStore, - SignatureWorkerStore, UserErasureWorkerStore, RelationsWorkerStore, BaseSlavedStore, diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 32d2fa1b395d..fff0824b6a33 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -24,12 +24,12 @@ EventsStreamCurrentStateRow, EventsStreamEventRow, ) -from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.databases.main.event_federation import EventFederationWorkerStore from synapse.storage.engines import PostgresEngine from synapse.util.caches.descriptors import _CachedFunction from synapse.util.iterutils import batch_iter @@ -48,7 +48,7 @@ _CacheData = Tuple[str, Optional[List[str]], Optional[int]] -class CacheInvalidationWorkerStore(SQLBaseStore): +class CacheInvalidationWorkerStore(EventFederationWorkerStore): def __init__( self, database: DatabasePool, diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index 0f56e10220d0..0d600208accd 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) -class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBaseStore): +class CensorEventsStore(CacheInvalidationWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index 68901b43352b..404cd96278be 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -18,15 +18,11 @@ from synapse.api.errors import SynapseError from synapse.storage.database import LoggingTransaction from synapse.storage.databases.main import CacheInvalidationWorkerStore -from synapse.storage.databases.main.event_federation import EventFederationWorkerStore logger = logging.getLogger(__name__) -class EventForwardExtremitiesStore( - EventFederationWorkerStore, - CacheInvalidationWorkerStore, -): +class EventForwardExtremitiesStore(CacheInvalidationWorkerStore): async def delete_forward_extremities_for_room(self, room_id: str) -> int: """Delete any extra forward extremities for a room. From f7111a98057f7b7c85e73ac6f0f9b602d6950655 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 15 Nov 2021 15:08:09 +0000 Subject: [PATCH 04/17] mark _cache_id_gen as a required mixin property --- synapse/replication/slave/storage/_base.py | 4 +--- synapse/storage/databases/main/__init__.py | 1 - synapse/storage/databases/main/cache.py | 6 ++++++ 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 7ecb446e7c78..96fa38305fe2 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -30,9 +30,7 @@ class BaseSlavedStore(CacheInvalidationWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) if isinstance(self.database_engine, PostgresEngine): - self._cache_id_gen: Optional[ - MultiWriterIdGenerator - ] = MultiWriterIdGenerator( + self._cache_id_gen = MultiWriterIdGenerator( db_conn, database, stream_name="caches", diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 9ff2d8d8c35a..e22aa0b9bc6b 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -154,7 +154,6 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): db_conn, "local_group_updates", "stream_id" ) - self._cache_id_gen: Optional[MultiWriterIdGenerator] if isinstance(self.database_engine, PostgresEngine): # We set the `writers` to an empty list here as we don't care about # missing updates over restarts, as we'll not have anything in our diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index fff0824b6a33..41eec2742731 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -31,6 +31,7 @@ ) from synapse.storage.databases.main.event_federation import EventFederationWorkerStore from synapse.storage.engines import PostgresEngine +from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.util.caches.descriptors import _CachedFunction from synapse.util.iterutils import batch_iter @@ -49,6 +50,10 @@ class CacheInvalidationWorkerStore(EventFederationWorkerStore): + # This class must be mixed in with a child class which provides the following + # attribute. TODO: can we get static analysis to enforce this? + _cache_id_gen: Optional[MultiWriterIdGenerator] + def __init__( self, database: DatabasePool, @@ -313,6 +318,7 @@ def _send_invalidation_to_replication( # the transaction. However, we want to only get an ID when we want # to use it, here, so we need to call __enter__ manually, and have # __exit__ called after the transaction finishes. + assert self._cache_id_gen is not None stream_id = self._cache_id_gen.get_next_txn(txn) txn.call_after(self.hs.get_notifier().on_new_replication_data) From 58c20bbcb789be9973b197782d594b8273e82176 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 15 Nov 2021 15:09:57 +0000 Subject: [PATCH 05/17] LoggingDatabaseConnection.__enter__ returns itself Could use a fancy TypeVar here but I restrained myself. --- synapse/storage/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index d4cab69ebfe5..8611fe492c0a 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -175,7 +175,7 @@ def commit(self) -> None: def rollback(self) -> None: self.conn.rollback() - def __enter__(self) -> "Connection": + def __enter__(self) -> "LoggingDatabaseConnection": self.conn.__enter__() return self From 1dea20ca75476cce6a72d790868b127d945bd769 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 15 Nov 2021 15:41:32 +0000 Subject: [PATCH 06/17] CacheInvalidationWorkerStore needs RelationsWorkerStore for e.g. get_relations_for_event. Remove it from SlavedEventsStore. (Still there via SlavedEventsStore -> BaseSlavedStore -> CacheInvalidationWorkerStore -> RelationsWorkerStore. --- synapse/replication/slave/storage/events.py | 1 - synapse/storage/databases/main/cache.py | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 6080d9fb293f..5856425c1776 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -52,7 +52,6 @@ class SlavedEventStore( StreamWorkerStore, StateGroupWorkerStore, UserErasureWorkerStore, - RelationsWorkerStore, BaseSlavedStore, ): def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 41eec2742731..d5fab29905c9 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -30,6 +30,7 @@ LoggingTransaction, ) from synapse.storage.databases.main.event_federation import EventFederationWorkerStore +from synapse.storage.databases.main.relations import RelationsWorkerStore from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.util.caches.descriptors import _CachedFunction @@ -49,7 +50,7 @@ _CacheData = Tuple[str, Optional[List[str]], Optional[int]] -class CacheInvalidationWorkerStore(EventFederationWorkerStore): +class CacheInvalidationWorkerStore(EventFederationWorkerStore, RelationsWorkerStore): # This class must be mixed in with a child class which provides the following # attribute. TODO: can we get static analysis to enforce this? _cache_id_gen: Optional[MultiWriterIdGenerator] From 0e7d79b61d39ec66ebb492b90005af85de9433ae Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 15 Nov 2021 16:10:26 +0000 Subject: [PATCH 07/17] Pull in EventPushActionsWorkerStore --- synapse/replication/slave/storage/events.py | 1 - synapse/storage/databases/main/cache.py | 9 ++++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 5856425c1776..37827f12ad2d 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -48,7 +48,6 @@ class SlavedEventStore( RoomMemberWorkerStore, - EventPushActionsWorkerStore, StreamWorkerStore, StateGroupWorkerStore, UserErasureWorkerStore, diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index d5fab29905c9..b936cdb9d529 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -30,6 +30,9 @@ LoggingTransaction, ) from synapse.storage.databases.main.event_federation import EventFederationWorkerStore +from synapse.storage.databases.main.event_push_actions import ( + EventPushActionsWorkerStore, +) from synapse.storage.databases.main.relations import RelationsWorkerStore from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator @@ -50,7 +53,11 @@ _CacheData = Tuple[str, Optional[List[str]], Optional[int]] -class CacheInvalidationWorkerStore(EventFederationWorkerStore, RelationsWorkerStore): +class CacheInvalidationWorkerStore( + EventFederationWorkerStore, + RelationsWorkerStore, + EventPushActionsWorkerStore, +): # This class must be mixed in with a child class which provides the following # attribute. TODO: can we get static analysis to enforce this? _cache_id_gen: Optional[MultiWriterIdGenerator] From d251ab87859290df36218efe15b72bf2d52a245c Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 15 Nov 2021 16:23:09 +0000 Subject: [PATCH 08/17] Cache needs StreamWorkerStore for _membership_stream_cache --- synapse/replication/slave/storage/events.py | 2 +- synapse/storage/databases/main/cache.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 37827f12ad2d..4fe829aa5b10 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -48,10 +48,10 @@ class SlavedEventStore( RoomMemberWorkerStore, - StreamWorkerStore, StateGroupWorkerStore, UserErasureWorkerStore, BaseSlavedStore, + StreamWorkerStore, ): def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index b936cdb9d529..7c9c93526e85 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -34,6 +34,7 @@ EventPushActionsWorkerStore, ) from synapse.storage.databases.main.relations import RelationsWorkerStore +from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.util.caches.descriptors import _CachedFunction @@ -57,6 +58,7 @@ class CacheInvalidationWorkerStore( EventFederationWorkerStore, RelationsWorkerStore, EventPushActionsWorkerStore, + StreamWorkerStore, ): # This class must be mixed in with a child class which provides the following # attribute. TODO: can we get static analysis to enforce this? From ea34499c9baf8c25d0c28c09c23c9d4044d370d4 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 15 Nov 2021 16:28:44 +0000 Subject: [PATCH 09/17] Pull in StatsStore I need StateDeltasStore for _curr_state_delta_stream_cache. I also need the `clock` attribute, which pathologically only seems to be defined on StatsStore. StatsStore is a subclass of StateDeltasStore, so let's just use that to keep the list of parents minimal. To be explicit: I think it's very very odd that only StatsStore provides a `clock`. I think pulling it in here is silly: why should CacheInvalidationWorkerStore care about stats? Consider this a protest commit to raise attention to the bizarre status quo. --- synapse/app/generic_worker.py | 1 - synapse/storage/databases/main/__init__.py | 1 - synapse/storage/databases/main/cache.py | 2 ++ synapse/storage/databases/main/registration.py | 2 +- 4 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 46f0feff7000..54cd2cbc1361 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -223,7 +223,6 @@ class GenericWorkerSlavedStore( # FIXME(#3714): We need to add UserDirectoryStore as we write directly # rather than going via the correct worker. UserDirectoryStore, - StatsStore, UIAuthWorkerStore, EndToEndRoomKeyStore, PresenceStore, diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index e22aa0b9bc6b..8c3c48a43ead 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -119,7 +119,6 @@ class DataStore( GroupServerStore, UserErasureStore, MonthlyActiveUsersStore, - StatsStore, RelationsStore, CensorEventsStore, UIAuthStore, diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 7c9c93526e85..cbd224d4c2bf 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -34,6 +34,7 @@ EventPushActionsWorkerStore, ) from synapse.storage.databases.main.relations import RelationsWorkerStore +from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator @@ -59,6 +60,7 @@ class CacheInvalidationWorkerStore( RelationsWorkerStore, EventPushActionsWorkerStore, StreamWorkerStore, + StatsStore, ): # This class must be mixed in with a child class which provides the following # attribute. TODO: can we get static analysis to enforce this? diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 6c7d6ba50848..556878dd02ca 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1877,7 +1877,7 @@ async def is_guest(self, user_id: str) -> bool: return res if res else False -class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): +class RegistrationStore(RegistrationBackgroundUpdateStore, StatsStore): def __init__( self, database: DatabasePool, From 13b9509e7648bc259d03cb2cea86cbb28848f926 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Mon, 15 Nov 2021 17:24:39 +0000 Subject: [PATCH 10/17] Pull in RoomMemberWorkerStore for get_rooms_for_user_with_stream_ordering. This was the most painful MRO problem to resolve. Remove BaseSlavedStore and SlavedEventStore from GenericWorkerSlavedStore. These are already pulled in indirectly by inheriting from SlavedPushRuleStore. --- synapse/app/admin_cmd.py | 1 - synapse/app/generic_worker.py | 5 ----- synapse/replication/slave/storage/events.py | 1 - synapse/replication/slave/storage/push_rule.py | 2 +- synapse/storage/databases/main/cache.py | 2 ++ synapse/storage/databases/main/push_rule.py | 1 - 6 files changed, 3 insertions(+), 9 deletions(-) diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 42238f7f280b..8b6e508ca36e 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -61,7 +61,6 @@ class AdminCmdSlavedStore( SlavedPushRuleStore, SlavedEventStore, SlavedClientIpStore, - BaseSlavedStore, RoomWorkerStore, ): pass diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 54cd2cbc1361..2549a145b6ca 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -47,14 +47,12 @@ from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource -from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.directory import DirectoryStore -from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.slave.storage.keys import SlavedKeyStore @@ -114,7 +112,6 @@ from synapse.storage.databases.main.room import RoomWorkerStore from synapse.storage.databases.main.search import SearchStore from synapse.storage.databases.main.session import SessionStore -from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.transactions import TransactionWorkerStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore from synapse.storage.databases.main.user_directory import UserDirectoryStore @@ -235,7 +232,6 @@ class GenericWorkerSlavedStore( SlavedPusherStore, CensorEventsStore, ClientIpWorkerStore, - SlavedEventStore, SlavedKeyStore, RoomWorkerStore, DirectoryStore, @@ -251,7 +247,6 @@ class GenericWorkerSlavedStore( TransactionWorkerStore, LockStore, SessionStore, - BaseSlavedStore, ): # Properties that multiple storage classes define. Tell mypy what the # expected type is. diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 4fe829aa5b10..032e921a1e69 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -47,7 +47,6 @@ class SlavedEventStore( - RoomMemberWorkerStore, StateGroupWorkerStore, UserErasureWorkerStore, BaseSlavedStore, diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index 4d5f86286213..061511d78b1a 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -20,7 +20,7 @@ from .events import SlavedEventStore -class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): +class SlavedPushRuleStore(PushRulesWorkerStore, SlavedEventStore): def get_max_push_rules_stream_id(self): return self._push_rules_stream_id_gen.get_current_token() diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index cbd224d4c2bf..b067050fb89c 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -34,6 +34,7 @@ EventPushActionsWorkerStore, ) from synapse.storage.databases.main.relations import RelationsWorkerStore +from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.engines import PostgresEngine @@ -61,6 +62,7 @@ class CacheInvalidationWorkerStore( EventPushActionsWorkerStore, StreamWorkerStore, StatsStore, + RoomMemberWorkerStore, ): # This class must be mixed in with a child class which provides the following # attribute. TODO: can we get static analysis to enforce this? diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index fa782023d4ee..2e1102cbcac2 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -71,7 +71,6 @@ class PushRulesWorkerStore( PusherWorkerStore, RoomMemberWorkerStore, EventsWorkerStore, - SQLBaseStore, metaclass=abc.ABCMeta, ): """This is an abstract base class where subclasses must implement From ee1803551dbd826dacf61c6f00d7a3a8b3407888 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 16 Nov 2021 11:43:32 +0000 Subject: [PATCH 11/17] Mypy now passes. --- changelog.d/11354.misc | 1 + mypy.ini | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 changelog.d/11354.misc diff --git a/changelog.d/11354.misc b/changelog.d/11354.misc new file mode 100644 index 000000000000..86594a332db9 --- /dev/null +++ b/changelog.d/11354.misc @@ -0,0 +1 @@ +Add type hints to storage classes. diff --git a/mypy.ini b/mypy.ini index b2953974ea10..c133e1715d65 100644 --- a/mypy.ini +++ b/mypy.ini @@ -26,7 +26,6 @@ exclude = (?x) |synapse/storage/databases/__init__.py |synapse/storage/databases/main/__init__.py |synapse/storage/databases/main/account_data.py - |synapse/storage/databases/main/cache.py |synapse/storage/databases/main/devices.py |synapse/storage/databases/main/e2e_room_keys.py |synapse/storage/databases/main/end_to_end_keys.py From 7f6c2aab5749a8d38ead9703c4efdf411e20931b Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 16 Nov 2021 12:11:47 +0000 Subject: [PATCH 12/17] Fixup imports --- synapse/app/admin_cmd.py | 1 - synapse/replication/slave/storage/_base.py | 2 +- synapse/replication/slave/storage/events.py | 8 -------- synapse/storage/databases/main/__init__.py | 1 - synapse/storage/databases/main/cache.py | 2 +- synapse/storage/databases/main/censor_events.py | 2 -- synapse/storage/databases/main/push_rule.py | 2 +- 7 files changed, 3 insertions(+), 15 deletions(-) diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 8b6e508ca36e..138c9dda0c88 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -28,7 +28,6 @@ from synapse.config.logger import setup_logging from synapse.events import EventBase from synapse.handlers.admin import ExfiltrationWriter -from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 96fa38305fe2..7dac1f9ff394 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING from synapse.storage.database import DatabasePool from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 032e921a1e69..d8c831a02eac 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -16,14 +16,6 @@ from typing import TYPE_CHECKING from synapse.storage.database import DatabasePool -from synapse.storage.databases.main.event_federation import EventFederationWorkerStore -from synapse.storage.databases.main.event_push_actions import ( - EventPushActionsWorkerStore, -) -from synapse.storage.databases.main.events_worker import EventsWorkerStore -from synapse.storage.databases.main.relations import RelationsWorkerStore -from synapse.storage.databases.main.roommember import RoomMemberWorkerStore -from synapse.storage.databases.main.signatures import SignatureWorkerStore from synapse.storage.databases.main.state import StateGroupWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 8c3c48a43ead..e5a2369a22cb 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -67,7 +67,6 @@ from .session import SessionStore from .signatures import SignatureStore from .state import StateStore -from .stats import StatsStore from .stream import StreamStore from .tags import TagsStore from .transactions import TransactionWorkerStore diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index b067050fb89c..92630f3eccb1 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -15,7 +15,7 @@ import itertools import logging -from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple +from typing import TYPE_CHECKING, Collection, Iterable, List, Optional, Tuple from synapse.api.constants import EventTypes from synapse.replication.tcp.streams import BackfillStream, CachesStream diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index 0d600208accd..0a087fdb2e92 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -17,10 +17,8 @@ from synapse.events.utils import prune_event_dict from synapse.metrics.background_process_metrics import wrap_as_background_process -from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore -from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.util import json_encoder if TYPE_CHECKING: diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 2e1102cbcac2..695d1304582b 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -19,7 +19,7 @@ from synapse.api.errors import NotFoundError, StoreError from synapse.push.baserules import list_with_base_rules from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker -from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage._base import db_to_json from synapse.storage.database import DatabasePool from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore From dfe4779f43f19410940126ae00416001d73a9232 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 16 Nov 2021 12:22:13 +0000 Subject: [PATCH 13/17] Fixup portdb script Being honest I just tried to do the minimal thing possible to get it to work. --- scripts/synapse_port_db | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 640ff15277db..8ac345c86852 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -35,6 +35,7 @@ from synapse.logging.context import ( make_deferred_yieldable, run_in_background, ) +from synapse.replication.slave.storage.events import SlavedEventStore from synapse.storage.database import DatabasePool, make_conn from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore @@ -57,7 +58,6 @@ from synapse.storage.databases.main.room import RoomBackgroundUpdateStore from synapse.storage.databases.main.roommember import RoomMemberBackgroundUpdateStore from synapse.storage.databases.main.search import SearchBackgroundUpdateStore from synapse.storage.databases.main.state import MainStateBackgroundUpdateStore -from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.user_directory import ( UserDirectoryBackgroundUpdateStore, ) @@ -179,10 +179,10 @@ class Store( MainStateBackgroundUpdateStore, UserDirectoryBackgroundUpdateStore, EndToEndKeyBackgroundStore, - StatsStore, PusherWorkerStore, PresenceBackgroundUpdateStore, GroupServerWorkerStore, + SlavedEventStore, ): def execute(self, f, *args, **kwargs): return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs) @@ -229,6 +229,10 @@ class MockHomeserver: def get_instance_name(self): return "master" + def should_send_federation(self) -> bool: + "Should this server be sending federation traffic directly?" + return False + class Porter(object): def __init__(self, **kwargs): From 1138f18cde1898e4cf129e44a9f05db9e8456b8e Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 18 Nov 2021 18:28:24 +0000 Subject: [PATCH 14/17] Use _clock; weaken StatsStore to StateDeltasStore And while we're at it, StatsStore doesn't needs its own clock attribute. --- synapse/storage/databases/main/cache.py | 6 +++--- synapse/storage/databases/main/stats.py | 5 ++--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 92630f3eccb1..7189d945c355 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -35,7 +35,7 @@ ) from synapse.storage.databases.main.relations import RelationsWorkerStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore -from synapse.storage.databases.main.stats import StatsStore +from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator @@ -61,7 +61,7 @@ class CacheInvalidationWorkerStore( RelationsWorkerStore, EventPushActionsWorkerStore, StreamWorkerStore, - StatsStore, + StateDeltasStore, RoomMemberWorkerStore, ): # This class must be mixed in with a child class which provides the following @@ -347,7 +347,7 @@ def _send_invalidation_to_replication( "instance_name": self._instance_name, "cache_func": cache_name, "keys": keys, - "invalidation_ts": self.clock.time_msec(), + "invalidation_ts": self._clock.time_msec(), }, ) diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index 5d7b59d861c9..f735a4217658 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -100,7 +100,6 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) self.server_name = hs.hostname - self.clock = self.hs.get_clock() self.stats_enabled = hs.config.stats.stats_enabled self.stats_delta_processing_lock = DeferredLock() @@ -601,7 +600,7 @@ def _fetch_current_state_stats(txn): local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)] await self.update_stats_delta( - ts=self.clock.time_msec(), + ts=self._clock.time_msec(), stats_type="room", stats_id=room_id, fields={}, @@ -638,7 +637,7 @@ def _calculate_and_set_initial_state_for_user_txn(txn): ) await self.update_stats_delta( - ts=self.clock.time_msec(), + ts=self._clock.time_msec(), stats_type="user", stats_id=user_id, fields={}, From e5d15cc67776c8dd15cd30302b79297f5582457a Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 18 Nov 2021 18:42:36 +0000 Subject: [PATCH 15/17] Fix typo --- synapse/storage/databases/main/cache.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 7189d945c355..066365a8b5c4 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -121,7 +121,7 @@ def get_all_updated_caches_txn( txn.execute(sql, (last_id, instance_name, limit)) updates: List[Tuple[int, _CacheData]] = [] row: Tuple[int, str, Optional[List[str]], Optional[int]] - # Type saftey: iterating over `txn` yields `Tuple`, i.e. + # Type safety: iterating over `txn` yields `Tuple`, i.e. # `Tuple[Any, ...]` of arbitrary length. Mypy detects assigning a # variadic tuple to a fixed length tuple and flags it up as an error. for row in txn: # type: ignore[assignment] From 46861909452923855a53c43536550bb5e46bb482 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Thu, 18 Nov 2021 18:48:08 +0000 Subject: [PATCH 16/17] SlavedEventStore doesn't need StreamWorkerStore already has it via SlavedEventStore -> BaseEventStore -> CacheInvalidationWorkerStore -> StreamWorkerStore --- synapse/replication/slave/storage/events.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index d8c831a02eac..864b0719767e 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -17,7 +17,6 @@ from synapse.storage.database import DatabasePool from synapse.storage.databases.main.state import StateGroupWorkerStore -from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -42,7 +41,6 @@ class SlavedEventStore( StateGroupWorkerStore, UserErasureWorkerStore, BaseSlavedStore, - StreamWorkerStore, ): def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) From 6c7aecd67fc45dae0688ff12b0b6864b0358f494 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 24 Nov 2021 12:35:19 +0000 Subject: [PATCH 17/17] Use _clock again and note the typing problem --- synapse/visibility.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/visibility.py b/synapse/visibility.py index 17532059e9f8..cca1b275b2de 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -146,7 +146,9 @@ def allowed(event: EventBase) -> Optional[EventBase]: max_lifetime = retention_policy.get("max_lifetime") if max_lifetime is not None: - oldest_allowed_ts = storage.main.clock.time_msec() - max_lifetime + # TODO: reveal_type(storage.main) yields Any. Can we find a way of + # telling mypy that storage.main is a generic `DataStoreT`? + oldest_allowed_ts = storage.main._clock.time_msec() - max_lifetime if event.origin_server_ts < oldest_allowed_ts: return None