Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge user_dir worker
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Feb 21, 2020
1 parent 27260e1 commit 8b53661
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 211 deletions.
18 changes: 17 additions & 1 deletion synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
RoomStateRestServlet,
)
from synapse.rest.client.v1.voip import VoipRestServlet
from synapse.rest.client.v2_alpha import groups, sync
from synapse.rest.client.v2_alpha import groups, sync, user_directory
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
Expand Down Expand Up @@ -453,6 +453,8 @@ def _listen_http(self, listener_config):
InitialSyncRestServlet(self).register(resource)
RoomInitialSyncRestServlet(self).register(resource)

user_directory.register_servlets(self, resource)

# If presence is disabled, use the stub servlet that does
# not allow sending presence
if not self.config.use_presence:
Expand Down Expand Up @@ -687,6 +689,7 @@ def start(config_options):
"synapse.app.media_repository",
"synapse.app.pusher",
"synapse.app.synchrotron",
"synapse.app.user_dir",
)

if config.worker_app == "synapse.app.appservice":
Expand Down Expand Up @@ -715,6 +718,19 @@ def start(config_options):
# Force the pushers to start since they will be disabled in the main config
config.start_pushers = True

if config.worker_app == "synapse.app.user_dir":
if config.update_user_directory:
sys.stderr.write(
"\nThe update_user_directory must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``update_user_directory: false`` to the main config"
"\n"
)
sys.exit(1)

# Force the pushers to start since they will be disabled in the main config
config.update_user_directory = True

synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts

ss = GenericWorkerServer(
Expand Down
213 changes: 3 additions & 210 deletions synapse/app/user_dir.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,217 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import sys

from twisted.internet import defer, reactor
from twisted.web.resource import NoResource

import synapse
from synapse import events
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext, run_in_background
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamCurrentStateRow,
)
from synapse.rest.client.v2_alpha import user_directory
from synapse.server import HomeServer
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.storage.database import Database
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string

logger = logging.getLogger("synapse.app.user_dir")


class UserDirectorySlaveStore(
SlavedEventStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedClientIpStore,
UserDirectoryStore,
BaseSlavedStore,
):
def __init__(self, database: Database, db_conn, hs):
super(UserDirectorySlaveStore, self).__init__(database, db_conn, hs)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict(
db_conn,
"current_state_delta_stream",
entity_column="room_id",
stream_column="stream_id",
max_value=events_max, # As we share the stream id with events token
limit=1000,
)
self._curr_state_delta_stream_cache = StreamChangeCache(
"_curr_state_delta_stream_cache",
min_curr_state_delta_id,
prefilled_cache=curr_state_delta_prefill,
)

def stream_positions(self):
result = super(UserDirectorySlaveStore, self).stream_positions()
return result

def process_replication_rows(self, stream_name, token, rows):
if stream_name == EventsStream.NAME:
self._stream_id_gen.advance(token)
for row in rows:
if row.type != EventsStreamCurrentStateRow.TypeId:
continue
self._curr_state_delta_stream_cache.entity_has_changed(
row.data.room_id, token
)
return super(UserDirectorySlaveStore, self).process_replication_rows(
stream_name, token, rows
)


class UserDirectoryServer(HomeServer):
DATASTORE_CLASS = UserDirectorySlaveStore

def _listen_http(self, listener_config):
port = listener_config["port"]
bind_addresses = listener_config["bind_addresses"]
site_tag = listener_config.get("tag", port)
resources = {}
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
user_directory.register_servlets(self, resource)
resources.update(
{
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
"/_matrix/client/v2_alpha": resource,
"/_matrix/client/api/v1": resource,
}
)

root_resource = create_resource_tree(resources, NoResource())

_base.listen_tcp(
bind_addresses,
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
)

logger.info("Synapse user_dir now listening on port %d", port)

def start_listening(self, listeners):
for listener in listeners:
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
_base.listen_tcp(
listener["bind_addresses"],
listener["port"],
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warning(
(
"Metrics listener configured, but "
"enable_metrics is not True!"
)
)
else:
_base.listen_metrics(listener["bind_addresses"], listener["port"])
else:
logger.warning("Unrecognized listener type: %s", listener["type"])

self.get_tcp_replication().start_replication(self)

def build_tcp_replication(self):
return UserDirectoryReplicationHandler(self)


class UserDirectoryReplicationHandler(ReplicationClientHandler):
def __init__(self, hs):
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
self.user_directory = hs.get_user_directory_handler()

async def on_rdata(self, stream_name, token, rows):
await super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == EventsStream.NAME:
run_in_background(self._notify_directory)

@defer.inlineCallbacks
def _notify_directory(self):
try:
yield self.user_directory.notify_new_event()
except Exception:
logger.exception("Error notifiying user directory of state update")


def start(config_options):
try:
config = HomeServerConfig.load_config("Synapse user directory", config_options)
except ConfigError as e:
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)

assert config.worker_app == "synapse.app.user_dir"

events.USE_FROZEN_DICTS = config.use_frozen_dicts

if config.update_user_directory:
sys.stderr.write(
"\nThe update_user_directory must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker."
"\nPlease add ``update_user_directory: false`` to the main config"
"\n"
)
sys.exit(1)

# Force the pushers to start since they will be disabled in the main config
config.update_user_directory = True

ss = UserDirectoryServer(
config.server_name,
config=config,
version_string="Synapse/" + get_version_string(synapse),
)

setup_logging(ss, config, use_worker_options=True)

ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
)

_base.start_worker_reactor("synapse-user-dir", config)

from synapse.app.generic_worker import start
from synapse.util.logcontext import LoggingContext

if __name__ == "__main__":
with LoggingContext("main"):
Expand Down
20 changes: 20 additions & 0 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from synapse.storage.data_stores.main.stream import StreamWorkerStore
from synapse.storage.data_stores.main.user_erasure_store import UserErasureWorkerStore
from synapse.storage.database import Database
from synapse.util.caches.stream_change_cache import StreamChangeCache

from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
Expand Down Expand Up @@ -68,6 +69,21 @@ def __init__(self, database: Database, db_conn, hs):

super(SlavedEventStore, self).__init__(database, db_conn, hs)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict(
db_conn,
"current_state_delta_stream",
entity_column="room_id",
stream_column="stream_id",
max_value=events_max, # As we share the stream id with events token
limit=1000,
)
self._curr_state_delta_stream_cache = StreamChangeCache(
"_curr_state_delta_stream_cache",
min_curr_state_delta_id,
prefilled_cache=curr_state_delta_prefill,
)

# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.

Expand Down Expand Up @@ -120,6 +136,10 @@ def _process_event_stream_row(self, token, row):
backfilled=False,
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
self._curr_state_delta_stream_cache.entity_has_changed(
row.data.room_id, token
)

if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
Expand Down

0 comments on commit 8b53661

Please sign in to comment.