From 9489f3fab51212d57440c95ec8a7e1bd8e7f2b35 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 5 Nov 2022 19:12:11 +0000 Subject: [PATCH 1/7] Merge `SlavedPushRuleStore` parts into `PushRulesWorkerStore` --- synapse/app/admin_cmd.py | 4 +-- synapse/app/generic_worker.py | 4 +-- .../replication/slave/storage/push_rule.py | 35 ------------------- synapse/storage/databases/main/push_rule.py | 19 ++++++---- 4 files changed, 17 insertions(+), 45 deletions(-) delete mode 100644 synapse/replication/slave/storage/push_rule.py diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 3c8c00ea5bc4..0942311267a6 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -31,7 +31,6 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore -from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.server import HomeServer from synapse.storage.database import DatabasePool, LoggingDatabaseConnection from synapse.storage.databases.main.account_data import AccountDataWorkerStore @@ -40,6 +39,7 @@ ApplicationServiceWorkerStore, ) from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore +from synapse.storage.databases.main.push_rule import PushRulesWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.registration import RegistrationWorkerStore from synapse.storage.databases.main.room import RoomWorkerStore @@ -53,12 +53,12 @@ class AdminCmdSlavedStore( SlavedFilteringStore, - SlavedPushRuleStore, SlavedEventStore, SlavedDeviceStore, TagsWorkerStore, DeviceInboxWorkerStore, AccountDataWorkerStore, + PushRulesWorkerStore, ApplicationServiceTransactionWorkerStore, ApplicationServiceWorkerStore, RegistrationWorkerStore, diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index cb5892f041e9..98e4b8364723 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -52,7 +52,6 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.keys import SlavedKeyStore -from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client import ( @@ -111,6 +110,7 @@ ) from synapse.storage.databases.main.presence import PresenceStore from synapse.storage.databases.main.profile import ProfileWorkerStore +from synapse.storage.databases.main.push_rule import PushRulesWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.registration import RegistrationWorkerStore from synapse.storage.databases.main.room import RoomWorkerStore @@ -233,7 +233,6 @@ class GenericWorkerSlavedStore( PresenceStore, DeviceInboxWorkerStore, SlavedDeviceStore, - SlavedPushRuleStore, TagsWorkerStore, AccountDataWorkerStore, SlavedPusherStore, @@ -244,6 +243,7 @@ class GenericWorkerSlavedStore( RoomWorkerStore, RoomBatchStore, DirectoryWorkerStore, + PushRulesWorkerStore, ApplicationServiceTransactionWorkerStore, ApplicationServiceWorkerStore, ProfileWorkerStore, diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py deleted file mode 100644 index 5e65eaf1e084..000000000000 --- a/synapse/replication/slave/storage/push_rule.py +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright 2015, 2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Any, Iterable - -from synapse.replication.tcp.streams import PushRulesStream -from synapse.storage.databases.main.push_rule import PushRulesWorkerStore - -from .events import SlavedEventStore - - -class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): - def get_max_push_rules_stream_id(self) -> int: - return self._push_rules_stream_id_gen.get_current_token() - - def process_replication_rows( - self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] - ) -> None: - if stream_name == PushRulesStream.NAME: - self._push_rules_stream_id_gen.advance(instance_name, token) - for row in rows: - self.get_push_rules_for_user.invalidate((row.user_id,)) - self.push_rules_stream_cache.entity_has_changed(row.user_id, token) - return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index b6c15f29f8f3..8ae10f61270f 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -12,13 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import abc import logging from typing import ( TYPE_CHECKING, Any, Collection, Dict, + Iterable, List, Mapping, Optional, @@ -31,6 +31,7 @@ from synapse.api.errors import StoreError from synapse.config.homeserver import ExperimentalConfig from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.replication.tcp.streams import PushRulesStream from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -90,8 +91,6 @@ def _load_rules( return filtered_rules -# The ABCMeta metaclass ensures that it cannot be instantiated without -# the abstract methods being implemented. class PushRulesWorkerStore( ApplicationServiceWorkerStore, PusherWorkerStore, @@ -99,7 +98,6 @@ class PushRulesWorkerStore( ReceiptsWorkerStore, EventsWorkerStore, SQLBaseStore, - metaclass=abc.ABCMeta, ): """This is an abstract base class where subclasses must implement `get_max_push_rules_stream_id` which can be called in the initializer. @@ -136,14 +134,23 @@ def __init__( prefilled_cache=push_rules_prefill, ) - @abc.abstractmethod def get_max_push_rules_stream_id(self) -> int: """Get the position of the push rules stream. Returns: int """ - raise NotImplementedError() + return self._push_rules_stream_id_gen.get_current_token() + + def process_replication_rows( + self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] + ) -> None: + if stream_name == PushRulesStream.NAME: + self._push_rules_stream_id_gen.advance(instance_name, token) + for row in rows: + self.get_push_rules_for_user.invalidate((row.user_id,)) + self.push_rules_stream_cache.entity_has_changed(row.user_id, token) + return super().process_replication_rows(stream_name, instance_name, token, rows) @cached(max_entries=5000) async def get_push_rules_for_user(self, user_id: str) -> FilteredPushRules: From a07263e476bdd85c3731487c28dcee8e3d8eb92b Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 5 Nov 2022 19:16:55 +0000 Subject: [PATCH 2/7] Add changelog file --- changelog.d/14375.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/14375.misc diff --git a/changelog.d/14375.misc b/changelog.d/14375.misc new file mode 100644 index 000000000000..d0369b9b8c2d --- /dev/null +++ b/changelog.d/14375.misc @@ -0,0 +1 @@ +Cleanup old worker datastore classes. Contributed by Nick @ Beeper (@fizzadar). From 2994782ae41562aecfcec752d0ab5aaee719b1cf Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 5 Nov 2022 19:58:27 +0000 Subject: [PATCH 3/7] Merge `SlavedPusherStore` parts into `PusherWorkerStore` --- synapse/app/generic_worker.py | 4 +- synapse/replication/slave/storage/pushers.py | 47 -------------------- synapse/storage/databases/main/pusher.py | 41 ++++++++++++++--- 3 files changed, 37 insertions(+), 55 deletions(-) delete mode 100644 synapse/replication/slave/storage/pushers.py diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 98e4b8364723..76dabeffa42b 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -52,7 +52,6 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.keys import SlavedKeyStore -from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client import ( account_data, @@ -111,6 +110,7 @@ from synapse.storage.databases.main.presence import PresenceStore from synapse.storage.databases.main.profile import ProfileWorkerStore from synapse.storage.databases.main.push_rule import PushRulesWorkerStore +from synapse.storage.databases.main.pusher import PusherWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.registration import RegistrationWorkerStore from synapse.storage.databases.main.room import RoomWorkerStore @@ -235,7 +235,6 @@ class GenericWorkerSlavedStore( SlavedDeviceStore, TagsWorkerStore, AccountDataWorkerStore, - SlavedPusherStore, CensorEventsStore, ClientIpWorkerStore, SlavedEventStore, @@ -251,6 +250,7 @@ class GenericWorkerSlavedStore( MonthlyActiveUsersWorkerStore, MediaRepositoryStore, ServerMetricsStore, + PusherWorkerStore, ReceiptsWorkerStore, RegistrationWorkerStore, SearchStore, diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py deleted file mode 100644 index 44ed20e4243e..000000000000 --- a/synapse/replication/slave/storage/pushers.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import TYPE_CHECKING, Any, Iterable - -from synapse.replication.tcp.streams import PushersStream -from synapse.storage.database import DatabasePool, LoggingDatabaseConnection -from synapse.storage.databases.main.pusher import PusherWorkerStore - -from ._slaved_id_tracker import SlavedIdTracker - -if TYPE_CHECKING: - from synapse.server import HomeServer - - -class SlavedPusherStore(PusherWorkerStore): - def __init__( - self, - database: DatabasePool, - db_conn: LoggingDatabaseConnection, - hs: "HomeServer", - ): - super().__init__(database, db_conn, hs) - self._pushers_id_gen = SlavedIdTracker( # type: ignore - db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")] - ) - - def get_pushers_stream_token(self) -> int: - return self._pushers_id_gen.get_current_token() - - def process_replication_rows( - self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] - ) -> None: - if stream_name == PushersStream.NAME: - self._pushers_id_gen.advance(instance_name, token) - return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 01206950a9d4..4a01562d4552 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -27,13 +27,19 @@ ) from synapse.push import PusherConfig, ThrottleParams +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.replication.tcp.streams import PushersStream from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, LoggingTransaction, ) -from synapse.storage.util.id_generators import StreamIdGenerator +from synapse.storage.util.id_generators import ( + AbstractStreamIdGenerator, + AbstractStreamIdTracker, + StreamIdGenerator, +) from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -52,9 +58,21 @@ def __init__( hs: "HomeServer", ): super().__init__(database, db_conn, hs) - self._pushers_id_gen = StreamIdGenerator( - db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")] - ) + + if hs.config.worker.worker_app is None: + self._pushers_id_gen: AbstractStreamIdTracker = StreamIdGenerator( + db_conn, + "pushers", + "id", + extra_tables=[("deleted_pushers", "stream_id")], + ) + else: + self._pushers_id_gen = SlavedIdTracker( + db_conn, + "pushers", + "id", + extra_tables=[("deleted_pushers", "stream_id")], + ) self.db_pool.updates.register_background_update_handler( "remove_deactivated_pushers", @@ -96,6 +114,16 @@ def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]: yield PusherConfig(**r) + def get_pushers_stream_token(self) -> int: + return self._pushers_id_gen.get_current_token() + + def process_replication_rows( + self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] + ) -> None: + if stream_name == PushersStream.NAME: + self._pushers_id_gen.advance(instance_name, token) + return super().process_replication_rows(stream_name, instance_name, token, rows) + async def get_pushers_by_app_id_and_pushkey( self, app_id: str, pushkey: str ) -> Iterator[PusherConfig]: @@ -545,8 +573,9 @@ def set_device_id_for_pushers_txn(txn: LoggingTransaction) -> int: class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore): - def get_pushers_stream_token(self) -> int: - return self._pushers_id_gen.get_current_token() + # Because we have write access, this will be a StreamIdGenerator + # (see PusherWorkerStore.__init__) + _pushers_id_gen: AbstractStreamIdGenerator async def add_pusher( self, From 6cfd419a336d152b85fcc7b22064b4eccf628793 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 5 Nov 2022 20:08:40 +0000 Subject: [PATCH 4/7] Merge `SlavedDeviceStore` parts into `DeviceWorkerStore` --- synapse/app/admin_cmd.py | 4 +- synapse/app/generic_worker.py | 4 +- synapse/replication/slave/storage/devices.py | 79 ------------------- synapse/storage/databases/main/__init__.py | 16 ---- synapse/storage/databases/main/devices.py | 81 +++++++++++++++++--- 5 files changed, 75 insertions(+), 109 deletions(-) delete mode 100644 synapse/replication/slave/storage/devices.py diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 0942311267a6..5f837263b2d1 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -28,7 +28,6 @@ from synapse.config.logger import setup_logging from synapse.events import EventBase from synapse.handlers.admin import ExfiltrationWriter -from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.server import HomeServer @@ -39,6 +38,7 @@ ApplicationServiceWorkerStore, ) from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore +from synapse.storage.databases.main.devices import DeviceWorkerStore from synapse.storage.databases.main.push_rule import PushRulesWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.registration import RegistrationWorkerStore @@ -54,7 +54,7 @@ class AdminCmdSlavedStore( SlavedFilteringStore, SlavedEventStore, - SlavedDeviceStore, + DeviceWorkerStore, TagsWorkerStore, DeviceInboxWorkerStore, AccountDataWorkerStore, diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 76dabeffa42b..2aa2db2b3c8e 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -48,7 +48,6 @@ from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource -from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.keys import SlavedKeyStore @@ -99,6 +98,7 @@ from synapse.storage.databases.main.censor_events import CensorEventsStore from synapse.storage.databases.main.client_ips import ClientIpWorkerStore from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore +from synapse.storage.databases.main.devices import DeviceWorkerStore from synapse.storage.databases.main.directory import DirectoryWorkerStore from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore from synapse.storage.databases.main.lock import LockStore @@ -232,7 +232,7 @@ class GenericWorkerSlavedStore( EndToEndRoomKeyStore, PresenceStore, DeviceInboxWorkerStore, - SlavedDeviceStore, + DeviceWorkerStore, TagsWorkerStore, AccountDataWorkerStore, CensorEventsStore, diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py deleted file mode 100644 index 6fcade510aac..000000000000 --- a/synapse/replication/slave/storage/devices.py +++ /dev/null @@ -1,79 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import TYPE_CHECKING, Any, Iterable - -from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker -from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream -from synapse.storage.database import DatabasePool, LoggingDatabaseConnection -from synapse.storage.databases.main.devices import DeviceWorkerStore - -if TYPE_CHECKING: - from synapse.server import HomeServer - - -class SlavedDeviceStore(DeviceWorkerStore): - def __init__( - self, - database: DatabasePool, - db_conn: LoggingDatabaseConnection, - hs: "HomeServer", - ): - self.hs = hs - - self._device_list_id_gen = SlavedIdTracker( - db_conn, - "device_lists_stream", - "stream_id", - extra_tables=[ - ("user_signature_stream", "stream_id"), - ("device_lists_outbound_pokes", "stream_id"), - ("device_lists_changes_in_room", "stream_id"), - ], - ) - - super().__init__(database, db_conn, hs) - - def get_device_stream_token(self) -> int: - return self._device_list_id_gen.get_current_token() - - def process_replication_rows( - self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] - ) -> None: - if stream_name == DeviceListsStream.NAME: - self._device_list_id_gen.advance(instance_name, token) - self._invalidate_caches_for_devices(token, rows) - elif stream_name == UserSignatureStream.NAME: - self._device_list_id_gen.advance(instance_name, token) - for row in rows: - self._user_signature_stream_cache.entity_has_changed(row.user_id, token) - return super().process_replication_rows(stream_name, instance_name, token, rows) - - def _invalidate_caches_for_devices( - self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow] - ) -> None: - for row in rows: - # The entities are either user IDs (starting with '@') whose devices - # have changed, or remote servers that we need to tell about - # changes. - if row.entity.startswith("@"): - self._device_list_stream_cache.entity_has_changed(row.entity, token) - self.get_cached_devices_for_user.invalidate((row.entity,)) - self._get_cached_user_device.invalidate((row.entity,)) - self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,)) - - else: - self._device_list_federation_stream_cache.entity_has_changed( - row.entity, token - ) diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index cfaedf5e0ca9..003c4eab8113 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -26,7 +26,6 @@ from synapse.storage.databases.main.stats import UserSortOrder from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.types import Cursor -from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import JsonDict, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -138,17 +137,6 @@ def __init__( self._clock = hs.get_clock() self.database_engine = database.engine - self._device_list_id_gen = StreamIdGenerator( - db_conn, - "device_lists_stream", - "stream_id", - extra_tables=[ - ("user_signature_stream", "stream_id"), - ("device_lists_outbound_pokes", "stream_id"), - ("device_lists_changes_in_room", "stream_id"), - ], - ) - super().__init__(database, db_conn, hs) events_max = self._stream_id_gen.get_current_token() @@ -169,10 +157,6 @@ def __init__( self._stream_order_on_start = self.get_room_max_stream_ordering() self._min_stream_order_on_start = self.get_room_min_stream_ordering() - def get_device_stream_token(self) -> int: - # TODO: shouldn't this be moved to `DeviceWorkerStore`? - return self._device_list_id_gen.get_current_token() - async def get_users(self) -> List[JsonDict]: """Function to retrieve a list of users in users table. diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 979dd4e17e3b..aa58c2adc3ad 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import abc import logging from typing import ( TYPE_CHECKING, @@ -39,6 +38,8 @@ whitelisted_homeserver, ) from synapse.metrics.background_process_metrics import wrap_as_background_process +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker +from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, @@ -49,6 +50,11 @@ from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.types import Cursor +from synapse.storage.util.id_generators import ( + AbstractStreamIdGenerator, + AbstractStreamIdTracker, + StreamIdGenerator, +) from synapse.types import JsonDict, get_verify_key_from_cross_signing_key from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList @@ -80,9 +86,32 @@ def __init__( ): super().__init__(database, db_conn, hs) + if hs.config.worker.worker_app is None: + self._device_list_id_gen: AbstractStreamIdTracker = StreamIdGenerator( + db_conn, + "device_lists_stream", + "stream_id", + extra_tables=[ + ("user_signature_stream", "stream_id"), + ("device_lists_outbound_pokes", "stream_id"), + ("device_lists_changes_in_room", "stream_id"), + ], + ) + else: + self._device_list_id_gen = SlavedIdTracker( + db_conn, + "device_lists_stream", + "stream_id", + extra_tables=[ + ("user_signature_stream", "stream_id"), + ("device_lists_outbound_pokes", "stream_id"), + ("device_lists_changes_in_room", "stream_id"), + ], + ) + # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker). - device_list_max = self._device_list_id_gen.get_current_token() # type: ignore[attr-defined] + device_list_max = self._device_list_id_gen.get_current_token() device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( db_conn, "device_lists_stream", @@ -136,6 +165,39 @@ def __init__( self._prune_old_outbound_device_pokes, 60 * 60 * 1000 ) + def process_replication_rows( + self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] + ) -> None: + if stream_name == DeviceListsStream.NAME: + self._device_list_id_gen.advance(instance_name, token) + self._invalidate_caches_for_devices(token, rows) + elif stream_name == UserSignatureStream.NAME: + self._device_list_id_gen.advance(instance_name, token) + for row in rows: + self._user_signature_stream_cache.entity_has_changed(row.user_id, token) + return super().process_replication_rows(stream_name, instance_name, token, rows) + + def _invalidate_caches_for_devices( + self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow] + ) -> None: + for row in rows: + # The entities are either user IDs (starting with '@') whose devices + # have changed, or remote servers that we need to tell about + # changes. + if row.entity.startswith("@"): + self._device_list_stream_cache.entity_has_changed(row.entity, token) + self.get_cached_devices_for_user.invalidate((row.entity,)) + self._get_cached_user_device.invalidate((row.entity,)) + self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,)) + + else: + self._device_list_federation_stream_cache.entity_has_changed( + row.entity, token + ) + + def get_device_stream_token(self) -> int: + return self._device_list_id_gen.get_current_token() + async def count_devices_by_users(self, user_ids: Optional[List[str]] = None) -> int: """Retrieve number of all devices of given users. Only returns number of devices that are not marked as hidden. @@ -677,11 +739,6 @@ def _add_user_signature_change_txn( }, ) - @abc.abstractmethod - def get_device_stream_token(self) -> int: - """Get the current stream id from the _device_list_id_gen""" - ... - @trace @cancellable async def get_user_devices_from_cache( @@ -1481,6 +1538,10 @@ def _txn(txn: LoggingTransaction) -> int: class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): + # Because we have write access, this will be a StreamIdGenerator + # (see DeviceWorkerStore.__init__) + _device_list_id_gen: AbstractStreamIdGenerator + def __init__( self, database: DatabasePool, @@ -1805,7 +1866,7 @@ def add_device_changes_txn( context, ) - async with self._device_list_id_gen.get_next_mult( # type: ignore[attr-defined] + async with self._device_list_id_gen.get_next_mult( len(device_ids) ) as stream_ids: await self.db_pool.runInteraction( @@ -2044,7 +2105,7 @@ def add_device_list_outbound_pokes_txn( [], ) - async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: # type: ignore[attr-defined] + async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: return await self.db_pool.runInteraction( "add_device_list_outbound_pokes", add_device_list_outbound_pokes_txn, @@ -2058,7 +2119,7 @@ async def add_remote_device_list_to_pending( updates during partial joins. """ - async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined] + async with self._device_list_id_gen.get_next() as stream_id: await self.db_pool.simple_upsert( table="device_lists_remote_pending", keyvalues={ From af0750276ac462d7244b9f740173a35545f059cd Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 5 Nov 2022 20:48:30 +0000 Subject: [PATCH 5/7] Merge `SlavedEventStore` parts into `EventsWorkerStore` --- synapse/app/admin_cmd.py | 24 +++++- synapse/app/generic_worker.py | 22 +++++- synapse/replication/slave/storage/events.py | 79 ------------------- synapse/storage/databases/main/__init__.py | 19 ----- .../storage/databases/main/events_worker.py | 16 ++++ synapse/storage/databases/main/stream.py | 1 + .../replication/slave/storage/test_events.py | 6 +- 7 files changed, 61 insertions(+), 106 deletions(-) delete mode 100644 synapse/replication/slave/storage/events.py diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 5f837263b2d1..2dfb23f8a1c1 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -28,7 +28,6 @@ from synapse.config.logger import setup_logging from synapse.events import EventBase from synapse.handlers.admin import ExfiltrationWriter -from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.server import HomeServer from synapse.storage.database import DatabasePool, LoggingDatabaseConnection @@ -39,11 +38,22 @@ ) from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore from synapse.storage.databases.main.devices import DeviceWorkerStore +from synapse.storage.databases.main.event_federation import EventFederationWorkerStore +from synapse.storage.databases.main.event_push_actions import ( + EventPushActionsWorkerStore, +) +from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.push_rule import PushRulesWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.registration import RegistrationWorkerStore +from synapse.storage.databases.main.relations import RelationsWorkerStore from synapse.storage.databases.main.room import RoomWorkerStore +from synapse.storage.databases.main.roommember import RoomMemberWorkerStore +from synapse.storage.databases.main.signatures import SignatureWorkerStore +from synapse.storage.databases.main.state import StateGroupWorkerStore +from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.databases.main.tags import TagsWorkerStore +from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore from synapse.types import StateMap from synapse.util import SYNAPSE_VERSION from synapse.util.logcontext import LoggingContext @@ -53,7 +63,6 @@ class AdminCmdSlavedStore( SlavedFilteringStore, - SlavedEventStore, DeviceWorkerStore, TagsWorkerStore, DeviceInboxWorkerStore, @@ -61,8 +70,17 @@ class AdminCmdSlavedStore( PushRulesWorkerStore, ApplicationServiceTransactionWorkerStore, ApplicationServiceWorkerStore, - RegistrationWorkerStore, + RoomMemberWorkerStore, + RelationsWorkerStore, + EventFederationWorkerStore, + EventPushActionsWorkerStore, + StateGroupWorkerStore, + SignatureWorkerStore, + UserErasureWorkerStore, ReceiptsWorkerStore, + StreamWorkerStore, + EventsWorkerStore, + RegistrationWorkerStore, RoomWorkerStore, ): def __init__( diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 2aa2db2b3c8e..d334da54e91d 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -48,7 +48,6 @@ from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource -from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.rest.admin import register_servlets_for_media_repo @@ -101,6 +100,11 @@ from synapse.storage.databases.main.devices import DeviceWorkerStore from synapse.storage.databases.main.directory import DirectoryWorkerStore from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore +from synapse.storage.databases.main.event_federation import EventFederationWorkerStore +from synapse.storage.databases.main.event_push_actions import ( + EventPushActionsWorkerStore, +) +from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.lock import LockStore from synapse.storage.databases.main.media_repository import MediaRepositoryStore from synapse.storage.databases.main.metrics import ServerMetricsStore @@ -113,15 +117,21 @@ from synapse.storage.databases.main.pusher import PusherWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.registration import RegistrationWorkerStore +from synapse.storage.databases.main.relations import RelationsWorkerStore from synapse.storage.databases.main.room import RoomWorkerStore from synapse.storage.databases.main.room_batch import RoomBatchStore +from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.databases.main.search import SearchStore from synapse.storage.databases.main.session import SessionStore +from synapse.storage.databases.main.signatures import SignatureWorkerStore +from synapse.storage.databases.main.state import StateGroupWorkerStore from synapse.storage.databases.main.stats import StatsStore +from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.databases.main.tags import TagsWorkerStore from synapse.storage.databases.main.transactions import TransactionWorkerStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore from synapse.storage.databases.main.user_directory import UserDirectoryStore +from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore from synapse.types import JsonDict from synapse.util import SYNAPSE_VERSION from synapse.util.httpresourcetree import create_resource_tree @@ -237,7 +247,6 @@ class GenericWorkerSlavedStore( AccountDataWorkerStore, CensorEventsStore, ClientIpWorkerStore, - SlavedEventStore, SlavedKeyStore, RoomWorkerStore, RoomBatchStore, @@ -251,7 +260,16 @@ class GenericWorkerSlavedStore( MediaRepositoryStore, ServerMetricsStore, PusherWorkerStore, + RoomMemberWorkerStore, + RelationsWorkerStore, + EventFederationWorkerStore, + EventPushActionsWorkerStore, + StateGroupWorkerStore, + SignatureWorkerStore, + UserErasureWorkerStore, ReceiptsWorkerStore, + StreamWorkerStore, + EventsWorkerStore, RegistrationWorkerStore, SearchStore, TransactionWorkerStore, diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py deleted file mode 100644 index fe47778cb127..000000000000 --- a/synapse/replication/slave/storage/events.py +++ /dev/null @@ -1,79 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging -from typing import TYPE_CHECKING - -from synapse.storage.database import DatabasePool, LoggingDatabaseConnection -from synapse.storage.databases.main.event_federation import EventFederationWorkerStore -from synapse.storage.databases.main.event_push_actions import ( - EventPushActionsWorkerStore, -) -from synapse.storage.databases.main.events_worker import EventsWorkerStore -from synapse.storage.databases.main.relations import RelationsWorkerStore -from synapse.storage.databases.main.roommember import RoomMemberWorkerStore -from synapse.storage.databases.main.signatures import SignatureWorkerStore -from synapse.storage.databases.main.state import StateGroupWorkerStore -from synapse.storage.databases.main.stream import StreamWorkerStore -from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore -from synapse.util.caches.stream_change_cache import StreamChangeCache - -if TYPE_CHECKING: - from synapse.server import HomeServer - -logger = logging.getLogger(__name__) - - -# So, um, we want to borrow a load of functions intended for reading from -# a DataStore, but we don't want to take functions that either write to the -# DataStore or are cached and don't have cache invalidation logic. -# -# Rather than write duplicate versions of those functions, or lift them to -# a common base class, we going to grab the underlying __func__ object from -# the method descriptor on the DataStore and chuck them into our class. - - -class SlavedEventStore( - EventFederationWorkerStore, - RoomMemberWorkerStore, - EventPushActionsWorkerStore, - StreamWorkerStore, - StateGroupWorkerStore, - SignatureWorkerStore, - EventsWorkerStore, - UserErasureWorkerStore, - RelationsWorkerStore, -): - def __init__( - self, - database: DatabasePool, - db_conn: LoggingDatabaseConnection, - hs: "HomeServer", - ): - super().__init__(database, db_conn, hs) - - events_max = self._stream_id_gen.get_current_token() - curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict( - db_conn, - "current_state_delta_stream", - entity_column="room_id", - stream_column="stream_id", - max_value=events_max, # As we share the stream id with events token - limit=1000, - ) - self._curr_state_delta_stream_cache = StreamChangeCache( - "_curr_state_delta_stream_cache", - min_curr_state_delta_id, - prefilled_cache=curr_state_delta_prefill, - ) diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 003c4eab8113..0e47592be31e 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -27,7 +27,6 @@ from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.types import Cursor from synapse.types import JsonDict, get_domain_from_id -from synapse.util.caches.stream_change_cache import StreamChangeCache from .account_data import AccountDataStore from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore @@ -139,24 +138,6 @@ def __init__( super().__init__(database, db_conn, hs) - events_max = self._stream_id_gen.get_current_token() - curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict( - db_conn, - "current_state_delta_stream", - entity_column="room_id", - stream_column="stream_id", - max_value=events_max, # As we share the stream id with events token - limit=1000, - ) - self._curr_state_delta_stream_cache = StreamChangeCache( - "_curr_state_delta_stream_cache", - min_curr_state_delta_id, - prefilled_cache=curr_state_delta_prefill, - ) - - self._stream_order_on_start = self.get_room_max_stream_ordering() - self._min_stream_order_on_start = self.get_room_min_stream_ordering() - async def get_users(self) -> List[JsonDict]: """Function to retrieve a list of users in users table. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 69fea452ad0b..a79091952a56 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -81,6 +81,7 @@ from synapse.util.async_helpers import ObservableDeferred, delay_cancellation from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import AsyncLruCache +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -233,6 +234,21 @@ def __init__( db_conn, "events", "stream_ordering", step=-1 ) + events_max = self._stream_id_gen.get_current_token() + curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict( + db_conn, + "current_state_delta_stream", + entity_column="room_id", + stream_column="stream_id", + max_value=events_max, # As we share the stream id with events token + limit=1000, + ) + self._curr_state_delta_stream_cache: StreamChangeCache = StreamChangeCache( + "_curr_state_delta_stream_cache", + min_curr_state_delta_id, + prefilled_cache=curr_state_delta_prefill, + ) + if hs.config.worker.run_background_tasks: # We periodically clean out old transaction ID mappings self._clock.looping_call( diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 09ce855aa8a0..cc27ec38042b 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -415,6 +415,7 @@ def __init__( ) self._stream_order_on_start = self.get_room_max_stream_ordering() + self._min_stream_order_on_start = self.get_room_min_stream_ordering() def get_room_max_stream_ordering(self) -> int: """Get the stream_ordering of regular events that we have committed up to diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index d42e36cdf1e0..96f388092310 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -21,11 +21,11 @@ from synapse.api.room_versions import RoomVersions from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict from synapse.handlers.room import RoomEventSource -from synapse.replication.slave.storage.events import SlavedEventStore from synapse.storage.databases.main.event_push_actions import ( NotifCounts, RoomNotifCounts, ) +from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser from synapse.types import PersistedEventPosition @@ -58,9 +58,9 @@ def unpatch(): return unpatch -class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): +class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase): - STORE_TYPE = SlavedEventStore + STORE_TYPE = EventsWorkerStore def setUp(self): # Patch up the equality operator for events so that we can check From e17010c8afb7a325baf87e9bd1ad809a1e2d00e6 Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 5 Nov 2022 21:24:36 +0000 Subject: [PATCH 6/7] Create `FilteringWorkerStore` to replace `SlavedFilteringStore` --- synapse/app/admin_cmd.py | 4 +-- synapse/app/generic_worker.py | 4 +-- .../replication/slave/storage/filtering.py | 35 ------------------- synapse/storage/databases/main/filtering.py | 4 ++- 4 files changed, 7 insertions(+), 40 deletions(-) delete mode 100644 synapse/replication/slave/storage/filtering.py diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index 2dfb23f8a1c1..165d1c5db06b 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -28,7 +28,6 @@ from synapse.config.logger import setup_logging from synapse.events import EventBase from synapse.handlers.admin import ExfiltrationWriter -from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.server import HomeServer from synapse.storage.database import DatabasePool, LoggingDatabaseConnection from synapse.storage.databases.main.account_data import AccountDataWorkerStore @@ -43,6 +42,7 @@ EventPushActionsWorkerStore, ) from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.storage.databases.main.filtering import FilteringWorkerStore from synapse.storage.databases.main.push_rule import PushRulesWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.registration import RegistrationWorkerStore @@ -62,7 +62,7 @@ class AdminCmdSlavedStore( - SlavedFilteringStore, + FilteringWorkerStore, DeviceWorkerStore, TagsWorkerStore, DeviceInboxWorkerStore, diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index d334da54e91d..669bfc2143c8 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -48,7 +48,6 @@ from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource -from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client import ( @@ -105,6 +104,7 @@ EventPushActionsWorkerStore, ) from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.storage.databases.main.filtering import FilteringWorkerStore from synapse.storage.databases.main.lock import LockStore from synapse.storage.databases.main.media_repository import MediaRepositoryStore from synapse.storage.databases.main.metrics import ServerMetricsStore @@ -255,7 +255,7 @@ class GenericWorkerSlavedStore( ApplicationServiceTransactionWorkerStore, ApplicationServiceWorkerStore, ProfileWorkerStore, - SlavedFilteringStore, + FilteringWorkerStore, MonthlyActiveUsersWorkerStore, MediaRepositoryStore, ServerMetricsStore, diff --git a/synapse/replication/slave/storage/filtering.py b/synapse/replication/slave/storage/filtering.py deleted file mode 100644 index c52679cd60f3..000000000000 --- a/synapse/replication/slave/storage/filtering.py +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright 2015, 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import TYPE_CHECKING - -from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool, LoggingDatabaseConnection -from synapse.storage.databases.main.filtering import FilteringStore - -if TYPE_CHECKING: - from synapse.server import HomeServer - - -class SlavedFilteringStore(SQLBaseStore): - def __init__( - self, - database: DatabasePool, - db_conn: LoggingDatabaseConnection, - hs: "HomeServer", - ): - super().__init__(database, db_conn, hs) - - # Filters are immutable so this cache doesn't need to be expired - get_user_filter = FilteringStore.__dict__["get_user_filter"] diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py index cb9ee08fa8e8..12f3b601f11c 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py @@ -24,7 +24,7 @@ from synapse.util.caches.descriptors import cached -class FilteringStore(SQLBaseStore): +class FilteringWorkerStore(SQLBaseStore): @cached(num_args=2) async def get_user_filter( self, user_localpart: str, filter_id: Union[int, str] @@ -46,6 +46,8 @@ async def get_user_filter( return db_to_json(def_json) + +class FilteringStore(FilteringWorkerStore): async def add_user_filter(self, user_localpart: str, user_filter: JsonDict) -> int: def_json = encode_canonical_json(user_filter) From 8982940023bcae49f5732cb51d99346162411a7a Mon Sep 17 00:00:00 2001 From: Nick Barrett Date: Sat, 5 Nov 2022 21:28:57 +0000 Subject: [PATCH 7/7] Remove unnecessary `SlavedKeyStore` --- synapse/app/generic_worker.py | 6 ++++-- synapse/replication/slave/storage/keys.py | 20 -------------------- 2 files changed, 4 insertions(+), 22 deletions(-) delete mode 100644 synapse/replication/slave/storage/keys.py diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 669bfc2143c8..51446b49cd86 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -48,7 +48,6 @@ from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource -from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.client import ( account_data, @@ -105,6 +104,7 @@ ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.filtering import FilteringWorkerStore +from synapse.storage.databases.main.keys import KeyStore from synapse.storage.databases.main.lock import LockStore from synapse.storage.databases.main.media_repository import MediaRepositoryStore from synapse.storage.databases.main.metrics import ServerMetricsStore @@ -247,7 +247,9 @@ class GenericWorkerSlavedStore( AccountDataWorkerStore, CensorEventsStore, ClientIpWorkerStore, - SlavedKeyStore, + # KeyStore isn't really safe to use from a worker, but for now we do so and hope that + # the races it creates aren't too bad. + KeyStore, RoomWorkerStore, RoomBatchStore, DirectoryWorkerStore, diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py deleted file mode 100644 index a00b38c512a0..000000000000 --- a/synapse/replication/slave/storage/keys.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright 2015, 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.storage.databases.main.keys import KeyStore - -# KeyStore isn't really safe to use from a worker, but for now we do so and hope that -# the races it creates aren't too bad. - -SlavedKeyStore = KeyStore