Skip to content

Commit

Permalink
Move catchup of replication streams to worker. (matrix-org#7024)
Browse files Browse the repository at this point in the history
This changes the replication protocol so that the server does not send down `RDATA` for rows that happened before the client connected. Instead, the server will send a `POSITION` and clients then query the database (or master out of band) to get up to date.
  • Loading branch information
erikjohnston authored and phil-flex committed Jun 16, 2020
1 parent 21c674e commit 61d9ffd
Show file tree
Hide file tree
Showing 24 changed files with 635 additions and 487 deletions.
1 change: 1 addition & 0 deletions changelog.d/7024.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move catchup of replication streams logic to worker.
46 changes: 13 additions & 33 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ example flow would be (where '>' indicates master to worker and
'<' worker to master flows):

> SERVER example.com
< REPLICATE events 53
< REPLICATE
> POSITION events 53
> RDATA events 54 ["$foo1:bar.com", ...]
> RDATA events 55 ["$foo4:bar.com", ...]

The example shows the server accepting a new connection and sending its
identity with the `SERVER` command, followed by the client asking to
subscribe to the `events` stream from the token `53`. The server then
periodically sends `RDATA` commands which have the format
`RDATA <stream_name> <token> <row>`, where the format of `<row>` is
defined by the individual streams.
The example shows the server accepting a new connection and sending its identity
with the `SERVER` command, followed by the client server to respond with the
position of all streams. The server then periodically sends `RDATA` commands
which have the format `RDATA <stream_name> <token> <row>`, where the format of
`<row>` is defined by the individual streams.

Error reporting happens by either the client or server sending an ERROR
command, and usually the connection will be closed.
Expand All @@ -32,9 +32,6 @@ Since the protocol is a simple line based, its possible to manually
connect to the server using a tool like netcat. A few things should be
noted when manually using the protocol:

- When subscribing to a stream using `REPLICATE`, the special token
`NOW` can be used to get all future updates. The special stream name
`ALL` can be used with `NOW` to subscribe to all available streams.
- The federation stream is only available if federation sending has
been disabled on the main process.
- The server will only time connections out that have sent a `PING`
Expand Down Expand Up @@ -91,9 +88,7 @@ The client:
- Sends a `NAME` command, allowing the server to associate a human
friendly name with the connection. This is optional.
- Sends a `PING` as above
- For each stream the client wishes to subscribe to it sends a
`REPLICATE` with the `stream_name` and token it wants to subscribe
from.
- Sends a `REPLICATE` to get the current position of all streams.
- On receipt of a `SERVER` command, checks that the server name
matches the expected server name.

Expand Down Expand Up @@ -140,9 +135,7 @@ the wire:
> PING 1490197665618
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE events 1
< REPLICATE backfill 1
< REPLICATE caches 1
< REPLICATE
> POSITION events 1
> POSITION backfill 1
> POSITION caches 1
Expand Down Expand Up @@ -181,9 +174,9 @@ client (C):

#### POSITION (S)

The position of the stream has been updated. Sent to the client
after all missing updates for a stream have been sent to the client
and they're now up to date.
On receipt of a POSITION command clients should check if they have missed any
updates, and if so then fetch them out of band. Sent in response to a
REPLICATE command (but can happen at any time).

#### ERROR (S, C)

Expand All @@ -199,20 +192,7 @@ client (C):

#### REPLICATE (C)

Asks the server to replicate a given stream. The syntax is:

```
REPLICATE <stream_name> <token>
```

Where `<token>` may be either:
* a numeric stream_id to stream updates since (exclusive)
* `NOW` to stream all subsequent updates.

The `<stream_name>` is the name of a replication stream to subscribe
to (see [here](../synapse/replication/tcp/streams/_base.py) for a list
of streams). It can also be `ALL` to subscribe to all known streams,
in which case the `<token>` must be set to `NOW`.
Asks the server for the current position of all streams.

#### USER_SYNC (C)

Expand Down
3 changes: 3 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,9 @@ def process_replication_rows(self, token, rows):
self._room_serials[row.room_id] = token
self._room_typing[row.room_id] = row.user_ids

def get_current_token(self) -> int:
return self._latest_room_serial


class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
Expand Down
9 changes: 9 additions & 0 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,4 +499,13 @@ def wake_destination(self, destination: str):
self._get_per_destination_queue(destination).attempt_new_transaction()

def get_current_token(self) -> int:
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return 0

async def get_replication_rows(
self, from_token, to_token, limit, federation_ack=None
):
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return []
2 changes: 2 additions & 0 deletions synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
membership,
register,
send_event,
streams,
)

REPLICATION_PREFIX = "/_synapse/replication"
Expand All @@ -38,3 +39,4 @@ def register_servlets(self, hs):
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
streams.register_servlets(hs, self)
78 changes: 78 additions & 0 deletions synapse/replication/http/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from synapse.api.errors import SynapseError
from synapse.http.servlet import parse_integer
from synapse.replication.http._base import ReplicationEndpoint

logger = logging.getLogger(__name__)


class ReplicationGetStreamUpdates(ReplicationEndpoint):
"""Fetches stream updates from a server. Used for streams not persisted to
the database, e.g. typing notifications.
The API looks like:
GET /_synapse/replication/get_repl_stream_updates/events?from_token=0&to_token=10&limit=100
200 OK
{
updates: [ ... ],
upto_token: 10,
limited: False,
}
"""

NAME = "get_repl_stream_updates"
PATH_ARGS = ("stream_name",)
METHOD = "GET"

def __init__(self, hs):
super().__init__(hs)

# We pull the streams from the replication steamer (if we try and make
# them ourselves we end up in an import loop).
self.streams = hs.get_replication_streamer().get_streams()

@staticmethod
def _serialize_payload(stream_name, from_token, upto_token, limit):
return {"from_token": from_token, "upto_token": upto_token, "limit": limit}

async def _handle_request(self, request, stream_name):
stream = self.streams.get(stream_name)
if stream is None:
raise SynapseError(400, "Unknown stream")

from_token = parse_integer(request, "from_token", required=True)
upto_token = parse_integer(request, "upto_token", required=True)
limit = parse_integer(request, "limit", required=True)

updates, upto_token, limited = await stream.get_updates_since(
from_token, upto_token, limit
)

return (
200,
{"updates": updates, "upto_token": upto_token, "limited": limited},
)


def register_servlets(hs, http_server):
ReplicationGetStreamUpdates(hs).register(http_server)
14 changes: 11 additions & 3 deletions synapse/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import six

from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.cache import CURRENT_STATE_CACHE_NAME
from synapse.storage.data_stores.main.cache import (
CURRENT_STATE_CACHE_NAME,
CacheInvalidationWorkerStore,
)
from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine

Expand All @@ -35,7 +37,7 @@ def __func__(inp):
return inp.__func__


class BaseSlavedStore(SQLBaseStore):
class BaseSlavedStore(CacheInvalidationWorkerStore):
def __init__(self, database: Database, db_conn, hs):
super(BaseSlavedStore, self).__init__(database, db_conn, hs)
if isinstance(self.database_engine, PostgresEngine):
Expand All @@ -60,6 +62,12 @@ def stream_positions(self) -> Dict[str, int]:
pos["caches"] = self._cache_id_gen.get_current_token()
return pos

def get_cache_stream_token(self):
if self._cache_id_gen:
return self._cache_id_gen.get_current_token()
else:
return 0

def process_replication_rows(self, stream_name, token, rows):
if stream_name == "caches":
if self._cache_id_gen:
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def stream_positions(self):
result["pushers"] = self._pushers_id_gen.get_current_token()
return result

def get_pushers_stream_token(self):
return self._pushers_id_gen.get_current_token()

def process_replication_rows(self, stream_name, token, rows):
if stream_name == "pushers":
self._pushers_id_gen.advance(token)
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, hs, client_name, handler: AbstractReplicationClientHandler):
self.client_name = client_name
self.handler = handler
self.server_name = hs.config.server_name
self.hs = hs
self._clock = hs.get_clock() # As self.clock is defined in super class

hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
Expand All @@ -65,7 +66,7 @@ def startedConnecting(self, connector):
def buildProtocol(self, addr):
logger.info("Connected to replication: %r", addr)
return ClientReplicationStreamProtocol(
self.client_name, self.server_name, self._clock, self.handler
self.hs, self.client_name, self.server_name, self._clock, self.handler,
)

def clientConnectionLost(self, connector, reason):
Expand Down
34 changes: 8 additions & 26 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ class PositionCommand(Command):
"""Sent by the server to tell the client the stream postition without
needing to send an RDATA.
Sent to the client after all missing updates for a stream have been sent
to the client and they're now up to date.
On receipt of a POSITION command clients should check if they have missed
any updates, and if so then fetch them out of band.
"""

NAME = "POSITION"
Expand Down Expand Up @@ -179,42 +179,24 @@ class NameCommand(Command):


class ReplicateCommand(Command):
"""Sent by the client to subscribe to the stream.
"""Sent by the client to subscribe to streams.
Format::
REPLICATE <stream_name> <token>
Where <token> may be either:
* a numeric stream_id to stream updates from
* "NOW" to stream all subsequent updates.
The <stream_name> can be "ALL" to subscribe to all known streams, in which
case the <token> must be set to "NOW", i.e.::
REPLICATE ALL NOW
REPLICATE
"""

NAME = "REPLICATE"

def __init__(self, stream_name, token):
self.stream_name = stream_name
self.token = token
def __init__(self):
pass

@classmethod
def from_line(cls, line):
stream_name, token = line.split(" ", 1)
if token in ("NOW", "now"):
token = "NOW"
else:
token = int(token)
return cls(stream_name, token)
return cls()

def to_line(self):
return " ".join((self.stream_name, str(self.token)))

def get_logcontext_id(self):
return "REPLICATE-" + self.stream_name
return ""


class UserSyncCommand(Command):
Expand Down
Loading

0 comments on commit 61d9ffd

Please sign in to comment.