Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Move catchup of replication streams to worker. #7024

Merged
merged 17 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7024.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move catchup of replication streams logic to worker.
46 changes: 13 additions & 33 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ example flow would be (where '>' indicates master to worker and
'<' worker to master flows):

> SERVER example.com
< REPLICATE events 53
< REPLICATE
> POSITION events 53
> RDATA events 54 ["$foo1:bar.com", ...]
> RDATA events 55 ["$foo4:bar.com", ...]

The example shows the server accepting a new connection and sending its
identity with the `SERVER` command, followed by the client asking to
subscribe to the `events` stream from the token `53`. The server then
periodically sends `RDATA` commands which have the format
`RDATA <stream_name> <token> <row>`, where the format of `<row>` is
defined by the individual streams.
The example shows the server accepting a new connection and sending its identity
with the `SERVER` command, followed by the client server to respond with the
position of all streams. The server then periodically sends `RDATA` commands
which have the format `RDATA <stream_name> <token> <row>`, where the format of
`<row>` is defined by the individual streams.

Error reporting happens by either the client or server sending an ERROR
command, and usually the connection will be closed.
Expand All @@ -32,9 +32,6 @@ Since the protocol is a simple line based, its possible to manually
connect to the server using a tool like netcat. A few things should be
noted when manually using the protocol:

- When subscribing to a stream using `REPLICATE`, the special token
`NOW` can be used to get all future updates. The special stream name
`ALL` can be used with `NOW` to subscribe to all available streams.
- The federation stream is only available if federation sending has
been disabled on the main process.
- The server will only time connections out that have sent a `PING`
Expand Down Expand Up @@ -91,9 +88,7 @@ The client:
- Sends a `NAME` command, allowing the server to associate a human
friendly name with the connection. This is optional.
- Sends a `PING` as above
- For each stream the client wishes to subscribe to it sends a
`REPLICATE` with the `stream_name` and token it wants to subscribe
from.
- Sends a `REPLICATE` to get the current position of all streams.
- On receipt of a `SERVER` command, checks that the server name
matches the expected server name.

Expand Down Expand Up @@ -140,9 +135,7 @@ the wire:
> PING 1490197665618
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE events 1
< REPLICATE backfill 1
< REPLICATE caches 1
< REPLICATE
> POSITION events 1
> POSITION backfill 1
> POSITION caches 1
Expand Down Expand Up @@ -181,9 +174,9 @@ client (C):

#### POSITION (S)

The position of the stream has been updated. Sent to the client
after all missing updates for a stream have been sent to the client
and they're now up to date.
On receipt of a POSITION command clients should check if they have missed any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

am I right in thinking it is only sent on connect? might be worth saying that? or otherwise saying when it is sent?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally happens in response to a REPLICATE command, but in a redis world they'll be flying around whenever anyone connects and sends a REPLICATE. Will update comment.

updates, and if so then fetch them out of band. Sent in response to a
REPLICATE command (but can happen at any time).

#### ERROR (S, C)

Expand All @@ -199,20 +192,7 @@ client (C):

#### REPLICATE (C)

Asks the server to replicate a given stream. The syntax is:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

```
REPLICATE <stream_name> <token>
```

Where `<token>` may be either:
* a numeric stream_id to stream updates since (exclusive)
* `NOW` to stream all subsequent updates.

The `<stream_name>` is the name of a replication stream to subscribe
to (see [here](../synapse/replication/tcp/streams/_base.py) for a list
of streams). It can also be `ALL` to subscribe to all known streams,
in which case the `<token>` must be set to `NOW`.
Asks the server for the current position of all streams.

#### USER_SYNC (C)

Expand Down
3 changes: 3 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ def process_replication_rows(self, token, rows):
self._room_serials[row.room_id] = token
self._room_typing[row.room_id] = row.user_ids

def get_current_token(self) -> int:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets used by the TypingStream.current_token, which now gets called on workers (this is the equivalent of calling a slaved ID gen current_token function).

return self._latest_room_serial


class GenericWorkerSlavedStore(
# FIXME(#3714): We need to add UserDirectoryStore as we write directly
Expand Down
9 changes: 9 additions & 0 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,4 +499,13 @@ def wake_destination(self, destination: str):
self._get_per_destination_queue(destination).attempt_new_transaction()

def get_current_token(self) -> int:
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return 0

async def get_replication_rows(
self, from_token, to_token, limit, federation_ack=None
):
# Dummy implementation for case where federation sender isn't offloaded
# to a worker.
return []
2 changes: 2 additions & 0 deletions synapse/replication/http/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
membership,
register,
send_event,
streams,
)

REPLICATION_PREFIX = "/_synapse/replication"
Expand All @@ -38,3 +39,4 @@ def register_servlets(self, hs):
login.register_servlets(hs, self)
register.register_servlets(hs, self)
devices.register_servlets(hs, self)
streams.register_servlets(hs, self)
78 changes: 78 additions & 0 deletions synapse/replication/http/streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# -*- coding: utf-8 -*-
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from synapse.api.errors import SynapseError
from synapse.http.servlet import parse_integer
from synapse.replication.http._base import ReplicationEndpoint

logger = logging.getLogger(__name__)


class ReplicationGetStreamUpdates(ReplicationEndpoint):
"""Fetches stream updates from a server. Used for streams not persisted to
the database, e.g. typing notifications.

The API looks like:

GET /_synapse/replication/get_repl_stream_updates/events?from_token=0&to_token=10&limit=100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_token is inclusive, to_token is exclusive?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we generally avoided saying inclusive vs exclusive, and rather thought about tokens as being "between" items? I don't know if that is helpful, can add the comment in


200 OK

{
updates: [ ... ],
upto_token: 10,
limited: False,
}

"""
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

NAME = "get_repl_stream_updates"
PATH_ARGS = ("stream_name",)
METHOD = "GET"

def __init__(self, hs):
super().__init__(hs)

# We pull the streams from the replication steamer (if we try and make
# them ourselves we end up in an import loop).
self.streams = hs.get_replication_streamer().get_streams()

@staticmethod
def _serialize_payload(stream_name, from_token, upto_token, limit):
return {"from_token": from_token, "upto_token": upto_token, "limit": limit}

async def _handle_request(self, request, stream_name):
stream = self.streams.get(stream_name)
if stream is None:
raise SynapseError(400, "Unknown stream")

from_token = parse_integer(request, "from_token", required=True)
upto_token = parse_integer(request, "upto_token", required=True)
limit = parse_integer(request, "limit", required=True)

updates, upto_token, limited = await stream.get_updates_since(
from_token, upto_token, limit
)

return (
200,
{"updates": updates, "upto_token": upto_token, "limited": limited},
)


def register_servlets(hs, http_server):
ReplicationGetStreamUpdates(hs).register(http_server)
14 changes: 11 additions & 3 deletions synapse/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

import six

from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.cache import CURRENT_STATE_CACHE_NAME
from synapse.storage.data_stores.main.cache import (
CURRENT_STATE_CACHE_NAME,
CacheInvalidationWorkerStore,
)
from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine

Expand All @@ -35,7 +37,7 @@ def __func__(inp):
return inp.__func__


class BaseSlavedStore(SQLBaseStore):
class BaseSlavedStore(CacheInvalidationWorkerStore):
def __init__(self, database: Database, db_conn, hs):
super(BaseSlavedStore, self).__init__(database, db_conn, hs)
if isinstance(self.database_engine, PostgresEngine):
Expand All @@ -60,6 +62,12 @@ def stream_positions(self) -> Dict[str, int]:
pos["caches"] = self._cache_id_gen.get_current_token()
return pos

def get_cache_stream_token(self):
if self._cache_id_gen:
return self._cache_id_gen.get_current_token()
else:
return 0

def process_replication_rows(self, stream_name, token, rows):
if stream_name == "caches":
if self._cache_id_gen:
Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def stream_positions(self):
result["pushers"] = self._pushers_id_gen.get_current_token()
return result

def get_pushers_stream_token(self):
return self._pushers_id_gen.get_current_token()

def process_replication_rows(self, stream_name, token, rows):
if stream_name == "pushers":
self._pushers_id_gen.advance(token)
Expand Down
3 changes: 2 additions & 1 deletion synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, hs, client_name, handler: AbstractReplicationClientHandler):
self.client_name = client_name
self.handler = handler
self.server_name = hs.config.server_name
self.hs = hs
self._clock = hs.get_clock() # As self.clock is defined in super class

hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
Expand All @@ -65,7 +66,7 @@ def startedConnecting(self, connector):
def buildProtocol(self, addr):
logger.info("Connected to replication: %r", addr)
return ClientReplicationStreamProtocol(
self.client_name, self.server_name, self._clock, self.handler
self.hs, self.client_name, self.server_name, self._clock, self.handler,
)

def clientConnectionLost(self, connector, reason):
Expand Down
34 changes: 8 additions & 26 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ class PositionCommand(Command):
"""Sent by the server to tell the client the stream postition without
needing to send an RDATA.

Sent to the client after all missing updates for a stream have been sent
to the client and they're now up to date.
On receipt of a POSITION command clients should check if they have missed
any updates, and if so then fetch them out of band.
"""

NAME = "POSITION"
Expand Down Expand Up @@ -179,42 +179,24 @@ class NameCommand(Command):


class ReplicateCommand(Command):
"""Sent by the client to subscribe to the stream.
"""Sent by the client to subscribe to streams.

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
Format::

REPLICATE <stream_name> <token>

Where <token> may be either:
* a numeric stream_id to stream updates from
* "NOW" to stream all subsequent updates.

The <stream_name> can be "ALL" to subscribe to all known streams, in which
case the <token> must be set to "NOW", i.e.::

REPLICATE ALL NOW
REPLICATE
"""

NAME = "REPLICATE"

def __init__(self, stream_name, token):
self.stream_name = stream_name
self.token = token
def __init__(self):
pass

@classmethod
def from_line(cls, line):
stream_name, token = line.split(" ", 1)
if token in ("NOW", "now"):
token = "NOW"
else:
token = int(token)
return cls(stream_name, token)
return cls()

def to_line(self):
return " ".join((self.stream_name, str(self.token)))

def get_logcontext_id(self):
return "REPLICATE-" + self.stream_name
return ""


class UserSyncCommand(Command):
Expand Down
Loading