Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move catchup of replication streams to worker. #7024

Merged
merged 17 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7024.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move catchup of replication streams logic to worker.
3 changes: 3 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ def process_replication_rows(self, token, rows):
self._room_serials[row.room_id] = token
self._room_typing[row.room_id] = row.user_ids

def get_current_token(self) -> int:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets used by the TypingStream.current_token, which now gets called on workers (this is the equivalent of calling a slaved ID gen current_token function).

return self._latest_room_serial


class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
Expand Down
9 changes: 9 additions & 0 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,4 +499,13 @@ def wake_destination(self, destination: str):
self._get_per_destination_queue(destination).attempt_new_transaction()

def get_current_token(self) -> int:
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return 0

async def get_replication_rows(
self, from_token, to_token, limit, federation_ack=None
):
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return []
2 changes: 2 additions & 0 deletions synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
membership,
register,
send_event,
streams,
)

REPLICATION_PREFIX = "/_synapse/replication"
Expand All @@ -38,3 +39,4 @@ def register_servlets(self, hs):
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
streams.register_servlets(hs, self)
65 changes: 65 additions & 0 deletions synapse/replication/http/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from synapse.api.errors import SynapseError
from synapse.http.servlet import parse_integer
from synapse.replication.http._base import ReplicationEndpoint

logger = logging.getLogger(__name__)


class ReplicationGetStreamUpdates(ReplicationEndpoint):
"""Fetches stream updates from a server. Used for streams not persisted to
the database, e.g. typing notifications.
"""
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

NAME = "get_repl_stream_updates"
PATH_ARGS = ("stream_name",)
METHOD = "GET"

def __init__(self, hs):
super(ReplicationGetStreamUpdates, self).__init__(hs)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

from synapse.replication.tcp.streams import STREAMS_MAP
richvdh marked this conversation as resolved.
Show resolved Hide resolved

self.streams = {stream.NAME: stream(hs) for stream in STREAMS_MAP.values()}

@staticmethod
def _serialize_payload(stream_name, from_token, upto_token, limit):
return {"from_token": from_token, "upto_token": upto_token, "limit": limit}

async def _handle_request(self, request, stream_name):
stream = self.streams.get(stream_name)
if stream is None:
raise SynapseError(400, "Unknown stream")

from_token = parse_integer(request, "from_token", required=True)
upto_token = parse_integer(request, "upto_token", required=True)
limit = parse_integer(request, "limit", required=True)

updates, upto_token, limited = await stream.get_updates_since(
from_token, upto_token, limit
)

return (
200,
{"updates": updates, "upto_token": upto_token, "limited": limited},
)


def register_servlets(hs, http_server):
ReplicationGetStreamUpdates(hs).register(http_server)
14 changes: 11 additions & 3 deletions synapse/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import six

from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.cache import CURRENT_STATE_CACHE_NAME
from synapse.storage.data_stores.main.cache import (
CURRENT_STATE_CACHE_NAME,
CacheInvalidationWorkerStore,
)
from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine

Expand All @@ -35,7 +37,7 @@ def __func__(inp):
return inp.__func__


class BaseSlavedStore(SQLBaseStore):
class BaseSlavedStore(CacheInvalidationWorkerStore):
def __init__(self, database: Database, db_conn, hs):
super(BaseSlavedStore, self).__init__(database, db_conn, hs)
if isinstance(self.database_engine, PostgresEngine):
Expand All @@ -60,6 +62,12 @@ def stream_positions(self) -> Dict[str, int]:
pos["caches"] = self._cache_id_gen.get_current_token()
return pos

def get_cache_stream_token(self):
if self._cache_id_gen:
return self._cache_id_gen.get_current_token()
else:
return 0

def process_replication_rows(self, stream_name, token, rows):
if stream_name == "caches":
if self._cache_id_gen:
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def stream_positions(self):
result["pushers"] = self._pushers_id_gen.get_current_token()
return result

def get_pushers_stream_token(self):
return self._pushers_id_gen.get_current_token()

def process_replication_rows(self, stream_name, token, rows):
if stream_name == "pushers":
self._pushers_id_gen.advance(token)
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, hs, client_name, handler: AbstractReplicationClientHandler):
self.client_name = client_name
self.handler = handler
self.server_name = hs.config.server_name
self.hs = hs
self._clock = hs.get_clock() # As self.clock is defined in super class

hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
Expand All @@ -65,7 +66,7 @@ def startedConnecting(self, connector):
def buildProtocol(self, addr):
logger.info("Connected to replication: %r", addr)
return ClientReplicationStreamProtocol(
self.client_name, self.server_name, self._clock, self.handler
self.hs, self.client_name, self.server_name, self._clock, self.handler,
)

def clientConnectionLost(self, connector, reason):
Expand Down
23 changes: 5 additions & 18 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,35 +183,22 @@ class ReplicateCommand(Command):

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
Format::

REPLICATE <stream_name> <token>
REPLICATE <stream_name>

Where <token> may be either:
* a numeric stream_id to stream updates from
* "NOW" to stream all subsequent updates.

The <stream_name> can be "ALL" to subscribe to all known streams, in which
case the <token> must be set to "NOW", i.e.::

REPLICATE ALL NOW
The <stream_name> can be "ALL" to subscribe to all known streams
"""

NAME = "REPLICATE"

def __init__(self, stream_name, token):
def __init__(self, stream_name):
self.stream_name = stream_name
self.token = token

@classmethod
def from_line(cls, line):
stream_name, token = line.split(" ", 1)
if token in ("NOW", "now"):
token = "NOW"
else:
token = int(token)
return cls(stream_name, token)
return cls(line)

def to_line(self):
return " ".join((self.stream_name, str(self.token)))
return self.stream_name

def get_logcontext_id(self):
return "REPLICATE-" + self.stream_name
Expand Down
Loading