Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge SlavedEventStore parts into EventsWorkerStore
Browse files Browse the repository at this point in the history
  • Loading branch information
Fizzadar committed Nov 5, 2022
1 parent 6cfd419 commit af07502
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 106 deletions.
24 changes: 21 additions & 3 deletions synapse/app/admin_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from synapse.config.logger import setup_logging
from synapse.events import EventBase
from synapse.handlers.admin import ExfiltrationWriter
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.server import HomeServer
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
Expand All @@ -39,11 +38,22 @@
)
from synapse.storage.databases.main.deviceinbox import DeviceInboxWorkerStore
from synapse.storage.databases.main.devices import DeviceWorkerStore
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
from synapse.storage.databases.main.event_push_actions import (
EventPushActionsWorkerStore,
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.registration import RegistrationWorkerStore
from synapse.storage.databases.main.relations import RelationsWorkerStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
from synapse.types import StateMap
from synapse.util import SYNAPSE_VERSION
from synapse.util.logcontext import LoggingContext
Expand All @@ -53,16 +63,24 @@

class AdminCmdSlavedStore(
SlavedFilteringStore,
SlavedEventStore,
DeviceWorkerStore,
TagsWorkerStore,
DeviceInboxWorkerStore,
AccountDataWorkerStore,
PushRulesWorkerStore,
ApplicationServiceTransactionWorkerStore,
ApplicationServiceWorkerStore,
RegistrationWorkerStore,
RoomMemberWorkerStore,
RelationsWorkerStore,
EventFederationWorkerStore,
EventPushActionsWorkerStore,
StateGroupWorkerStore,
SignatureWorkerStore,
UserErasureWorkerStore,
ReceiptsWorkerStore,
StreamWorkerStore,
EventsWorkerStore,
RegistrationWorkerStore,
RoomWorkerStore,
):
def __init__(
Expand Down
22 changes: 20 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.rest.admin import register_servlets_for_media_repo
Expand Down Expand Up @@ -101,6 +100,11 @@
from synapse.storage.databases.main.devices import DeviceWorkerStore
from synapse.storage.databases.main.directory import DirectoryWorkerStore
from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
from synapse.storage.databases.main.event_federation import EventFederationWorkerStore
from synapse.storage.databases.main.event_push_actions import (
EventPushActionsWorkerStore,
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.lock import LockStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
Expand All @@ -113,15 +117,21 @@
from synapse.storage.databases.main.pusher import PusherWorkerStore
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.registration import RegistrationWorkerStore
from synapse.storage.databases.main.relations import RelationsWorkerStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.storage.databases.main.room_batch import RoomBatchStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.databases.main.search import SearchStore
from synapse.storage.databases.main.session import SessionStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
from synapse.storage.databases.main.user_erasure_store import UserErasureWorkerStore
from synapse.types import JsonDict
from synapse.util import SYNAPSE_VERSION
from synapse.util.httpresourcetree import create_resource_tree
Expand Down Expand Up @@ -237,7 +247,6 @@ class GenericWorkerSlavedStore(
AccountDataWorkerStore,
CensorEventsStore,
ClientIpWorkerStore,
SlavedEventStore,
SlavedKeyStore,
RoomWorkerStore,
RoomBatchStore,
Expand All @@ -251,7 +260,16 @@ class GenericWorkerSlavedStore(
MediaRepositoryStore,
ServerMetricsStore,
PusherWorkerStore,
RoomMemberWorkerStore,
RelationsWorkerStore,
EventFederationWorkerStore,
EventPushActionsWorkerStore,
StateGroupWorkerStore,
SignatureWorkerStore,
UserErasureWorkerStore,
ReceiptsWorkerStore,
StreamWorkerStore,
EventsWorkerStore,
RegistrationWorkerStore,
SearchStore,
TransactionWorkerStore,
Expand Down
79 changes: 0 additions & 79 deletions synapse/replication/slave/storage/events.py

This file was deleted.

19 changes: 0 additions & 19 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.types import Cursor
from synapse.types import JsonDict, get_domain_from_id
from synapse.util.caches.stream_change_cache import StreamChangeCache

from .account_data import AccountDataStore
from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
Expand Down Expand Up @@ -139,24 +138,6 @@ def __init__(

super().__init__(database, db_conn, hs)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
db_conn,
"current_state_delta_stream",
entity_column="room_id",
stream_column="stream_id",
max_value=events_max, # As we share the stream id with events token
limit=1000,
)
self._curr_state_delta_stream_cache = StreamChangeCache(
"_curr_state_delta_stream_cache",
min_curr_state_delta_id,
prefilled_cache=curr_state_delta_prefill,
)

self._stream_order_on_start = self.get_room_max_stream_ordering()
self._min_stream_order_on_start = self.get_room_min_stream_ordering()

async def get_users(self) -> List[JsonDict]:
"""Function to retrieve a list of users in users table.
Expand Down
16 changes: 16 additions & 0 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import AsyncLruCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -233,6 +234,21 @@ def __init__(
db_conn, "events", "stream_ordering", step=-1
)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db_pool.get_cache_dict(
db_conn,
"current_state_delta_stream",
entity_column="room_id",
stream_column="stream_id",
max_value=events_max, # As we share the stream id with events token
limit=1000,
)
self._curr_state_delta_stream_cache: StreamChangeCache = StreamChangeCache(
"_curr_state_delta_stream_cache",
min_curr_state_delta_id,
prefilled_cache=curr_state_delta_prefill,
)

if hs.config.worker.run_background_tasks:
# We periodically clean out old transaction ID mappings
self._clock.looping_call(
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ def __init__(
)

self._stream_order_on_start = self.get_room_max_stream_ordering()
self._min_stream_order_on_start = self.get_room_min_stream_ordering()

def get_room_max_stream_ordering(self) -> int:
"""Get the stream_ordering of regular events that we have committed up to
Expand Down
6 changes: 3 additions & 3 deletions tests/replication/slave/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
from synapse.api.room_versions import RoomVersions
from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict
from synapse.handlers.room import RoomEventSource
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.storage.databases.main.event_push_actions import (
NotifCounts,
RoomNotifCounts,
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
from synapse.types import PersistedEventPosition

Expand Down Expand Up @@ -58,9 +58,9 @@ def unpatch():
return unpatch


class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
class EventsWorkerStoreTestCase(BaseSlavedStoreTestCase):

STORE_TYPE = SlavedEventStore
STORE_TYPE = EventsWorkerStore

def setUp(self):
# Patch up the equality operator for events so that we can check
Expand Down

0 comments on commit af07502

Please sign in to comment.