From 24d35ab47bdec651706a221974424409d9ab036b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 14:03:38 +0100 Subject: [PATCH 01/16] Add new storage functions for new replication The new replication protocol will keep all the streams separate, rather than muxing multiple streams into one. --- synapse/handlers/typing.py | 3 ++ synapse/replication/resource.py | 2 +- synapse/storage/devices.py | 6 +-- synapse/storage/events.py | 88 +++++++++++++++++++++++++++++++++ synapse/storage/pusher.py | 42 ++++++++++++++++ 5 files changed, 137 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 0eea7f8f9c29..d6809862e0de 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -293,6 +293,9 @@ def get_all_typing_updates(self, last_id, current_id): rows.sort() return rows + def get_current_token(self): + return self._latest_room_serial + class TypingNotificationEventSource(object): def __init__(self, hs): diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 03930fe958d5..2d3ec2eca21b 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -504,7 +504,7 @@ def device_lists(self, writer, current_token, limit, request_streams): if device_lists is not None and device_lists != current_position: changes = yield self.store.get_all_device_list_changes_for_remotes( - device_lists, + device_lists, current_position, ) writer.write_header_and_rows("device_lists", changes, ( "position", "user_id", "destination", diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 53e36791d584..c8d5f5ba8b8c 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -533,7 +533,7 @@ def get_user_whose_devices_changed(self, from_key): rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) defer.returnValue(set(row[0] for row in rows)) - def get_all_device_list_changes_for_remotes(self, from_key): + def get_all_device_list_changes_for_remotes(self, from_key, to_key): """Return a list of `(stream_id, user_id, destination)` which is the combined list of changes to devices, and which destinations need to be poked. `destination` may be None if no destinations need to be poked. @@ -541,11 +541,11 @@ def get_all_device_list_changes_for_remotes(self, from_key): sql = """ SELECT stream_id, user_id, destination FROM device_lists_stream LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) - WHERE stream_id > ? + WHERE ? < stream_id AND stream_id <= ? """ return self._execute( "get_all_device_list_changes_for_remotes", None, - sql, from_key, + sql, from_key, to_key ) @defer.inlineCallbacks diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 3f6833fad263..64fe937bdc1d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1771,6 +1771,94 @@ def get_current_backfill_token(self): """The current minimum token that backfilled events have reached""" return -self._backfill_id_gen.get_current_token() + def get_current_events_token(self): + """The current maximum token that events have reached""" + return self._stream_id_gen.get_current_token() + + def get_all_new_forward_event_rows(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + + def get_all_new_forward_event_rows(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? < stream_ordering AND stream_ordering <= ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + new_event_updates = txn.fetchall() + + if len(new_event_updates) == limit: + upper_bound = new_event_updates[-1][0] + else: + upper_bound = current_id + + sql = ( + "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " INNER JOIN ex_outlier_stream USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? < event_stream_ordering" + " AND event_stream_ordering <= ?" + " ORDER BY event_stream_ordering DESC" + ) + txn.execute(sql, (last_id, upper_bound)) + new_event_updates.extend(txn) + + return new_event_updates + return self.runInteraction( + "get_all_new_forward_event_rows", get_all_new_forward_event_rows + ) + + def get_all_new_backfill_event_rows(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + + def get_all_new_backfill_event_rows(txn): + sql = ( + "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? > stream_ordering AND stream_ordering >= ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + txn.execute(sql, (-last_id, -current_id, limit)) + new_event_updates = txn.fetchall() + + if len(new_event_updates) == limit: + upper_bound = new_event_updates[-1][0] + else: + upper_bound = current_id + + sql = ( + "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts" + " FROM events AS e" + " INNER JOIN ex_outlier_stream USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " WHERE ? > event_stream_ordering" + " AND event_stream_ordering >= ?" + " ORDER BY event_stream_ordering DESC" + ) + txn.execute(sql, (-last_id, -upper_bound)) + new_event_updates.extend(txn.fetchall()) + + return new_event_updates + return self.runInteraction( + "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows + ) + @cached(num_args=5, max_entries=10) def get_all_new_events(self, last_backfill_id, last_forward_id, current_backfill_id, current_forward_id, limit): diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 8cc9f0353bf6..715c8bef2417 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -135,6 +135,48 @@ def get_all_updated_pushers_txn(txn): "get_all_updated_pushers", get_all_updated_pushers_txn ) + def get_all_updated_pushers_rows(self, last_id, current_id, limit): + """Get all the pushers that have changed between the given tokens. + + Returns: + list(tuple): each tuple consists of: + stream_id (str) + user_id (str) + app_id (str) + pushkey (str) + was_deleted (bool): whether the pusher was added/updated (False) + or deleted (True) + """ + + if last_id == current_id: + return defer.succeed([]) + + def get_all_updated_pushers_rows_txn(txn): + sql = ( + "SELECT id, user_name, app_id, pushkey" + " FROM pushers" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + results = [list(row) + [False] for row in txn] + + sql = ( + "SELECT stream_id, user_id, app_id, pushkey" + " FROM deleted_pushers" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + + results.extend(list(row) + [True] for row in txn) + results.sort() # Sort so that they're ordered by stream id + + return results + return self.runInteraction( + "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn + ) + @cachedInlineCallbacks(num_args=1, max_entries=15000) def get_if_user_has_pusher(self, user_id): # This only exists for the cachedList decorator From 7984708a555bd5ccdef713710325bf7bec27d970 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 14:05:07 +0100 Subject: [PATCH 02/16] Add a simple hook to wait for replication traffic --- synapse/notifier.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/notifier.py b/synapse/notifier.py index 7eeba6d28e57..f9fcc0ca2580 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -550,3 +550,9 @@ def wait_for_replication(self, callback, timeout): break defer.returnValue(result) + + def wait_once_for_replication(self): + """Returns a deferred which resolves when there is new data for + replication to handle. + """ + return self.replication_deferred.observe() From 11880103b13146aa8e700827f36c81a26fb8e09e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 14:11:17 +0100 Subject: [PATCH 03/16] Make federation send queue take the current position --- synapse/federation/send_queue.py | 40 ++++++++++++++++++++------------ synapse/replication/resource.py | 2 +- 2 files changed, 26 insertions(+), 16 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index bbb019522803..4bde66fbf8c7 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -220,10 +220,15 @@ def send_device_messages(self, destination): def get_current_token(self): return self.pos - 1 - def get_replication_rows(self, token, limit, federation_ack=None): - """ + def federation_ack(self, token): + self._clear_queue_before_pos(token) + + def get_replication_rows(self, from_token, to_token, limit, federation_ack=None): + """Get rows to be sent over federation between the two tokens + Args: - token (int) + from_token (int) + to_token(int) limit (int) federation_ack (int): Optional. The position where the worker is explicitly acknowledged it has handled. Allows us to drop @@ -232,8 +237,8 @@ def get_replication_rows(self, token, limit, federation_ack=None): # TODO: Handle limit. # To handle restarts where we wrap around - if token > self.pos: - token = -1 + if from_token > self.pos: + from_token = -1 rows = [] @@ -244,10 +249,11 @@ def get_replication_rows(self, token, limit, federation_ack=None): # Fetch changed presence keys = self.presence_changed.keys() - i = keys.bisect_right(token) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 dest_user_ids = set( (pos, dest_user_id) - for pos in keys[i:] + for pos in keys[i:j] for dest_user_id in self.presence_changed[pos] ) @@ -259,8 +265,9 @@ def get_replication_rows(self, token, limit, federation_ack=None): # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() - i = keys.bisect_right(token) - keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) for (pos, (destination, edu_key)) in keyed_edus: rows.append( @@ -272,16 +279,18 @@ def get_replication_rows(self, token, limit, federation_ack=None): # Fetch changed edus keys = self.edus.keys() - i = keys.bisect_right(token) - edus = set((k, self.edus[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) # Fetch changed failures keys = self.failures.keys() - i = keys.bisect_right(token) - failures = set((k, self.failures[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: rows.append((pos, FAILURE_TYPE, ujson.dumps({ @@ -291,8 +300,9 @@ def get_replication_rows(self, token, limit, federation_ack=None): # Fetch changed device messages keys = self.device_messages.keys() - i = keys.bisect_right(token) - device_messages = set((k, self.device_messages[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 2d3ec2eca21b..abd3fe766530 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -489,7 +489,7 @@ def federation(self, writer, current_token, limit, request_streams, federation_a if federation is not None and federation != current_position: federation_rows = self.federation_sender.get_replication_rows( - federation, limit, federation_ack=federation_ack, + federation, current_position, limit, federation_ack=federation_ack, ) upto_token = _position_from_rows(federation_rows, current_position) writer.write_header_and_rows("federation", federation_rows, ( From 8da6f0be480b67e6b917dc0e90a6f95d2dbd2311 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 14:58:26 +0100 Subject: [PATCH 04/16] Define the various streams we will replicate --- synapse/replication/tcp/__init__.py | 14 + synapse/replication/tcp/streams.py | 409 ++++++++++++++++++++++++++++ 2 files changed, 423 insertions(+) create mode 100644 synapse/replication/tcp/__init__.py create mode 100644 synapse/replication/tcp/streams.py diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py new file mode 100644 index 000000000000..451dae3b6c8e --- /dev/null +++ b/synapse/replication/tcp/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py new file mode 100644 index 000000000000..07adf9412e2f --- /dev/null +++ b/synapse/replication/tcp/streams.py @@ -0,0 +1,409 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Defines all the valid streams that clients can subscribe to, and the format +of the rows returned by each stream. + +Each stream is defined by the following information: + + stream name: The name of the stream + row type: The type that is used to serialise/deserialse the row + current_token: The function that returns the current token for the stream + update_function: The function that returns a list of updates between two tokens +""" + +from twisted.internet import defer +from collections import namedtuple + +import logging + + +logger = logging.getLogger(__name__) + + +MAX_EVENTS_BEHIND = 10000 + + +EventStreamRow = namedtuple("EventStreamRow", + ("event_id", "room_id", "type", "state_key", "redacts")) +BackfillStreamRow = namedtuple("BackfillStreamRow", + ("event_id", "room_id", "type", "state_key", "redacts")) +PresenceStreamRow = namedtuple("PresenceStreamRow", + ("user_id", "state", "last_active_ts", + "last_federation_update_ts", "last_user_sync_ts", + "status_msg", "currently_active")) +TypingStreamRow = namedtuple("TypingStreamRow", + ("room_id", "user_ids")) +ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", + ("room_id", "receipt_type", "user_id", "event_id", + "data")) +PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) +PushersStreamRow = namedtuple("PushersStreamRow", + ("user_id", "app_id", "pushkey", "deleted",)) +CachesStreamRow = namedtuple("CachesStreamRow", + ("cache_func", "keys", "invalidation_ts",)) +PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", + ("room_id", "visibility", "appservice_id", + "network_id",)) +DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",)) +ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) +FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",)) +TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", + ("user_id", "room_id", "data")) +AccountDataStreamRow = namedtuple("AccountDataStream", + ("user_id", "room_id", "data_type", "data")) + + +class Stream(object): + """Base class for the streams. + + Provides a `get_updates()` function that returns new updates since the last + time it was called up until the point `advance_current_token` was called. + """ + NAME = None # The name of the stream + ROW_TYPE = None # The type of the row + _LIMITED = True # Whether the update function takes a limit + + def __init__(self, hs): + # The token from which we last asked for updates + self.last_token = self.current_token() + + # The token that we will get updates up to + self.upto_token = self.current_token() + + def advance_current_token(self): + """Updates `upto_token` to "now", which updates up until which point + get_updates[_since] will fetch rows till. + """ + self.upto_token = self.current_token() + + @defer.inlineCallbacks + def get_updates(self): + """Gets all updates since the last time this function was called (or + since the stream was constructed if it hadn't been called before), + until the `upto_token` + + Returns: + (list(ROW_TYPE), int): list of updates plus the token used as an + upper bound of the updates (i.e. the "current token") + """ + updates, current_token = yield self.get_updates_since(self.last_token) + self.last_token = current_token + + defer.returnValue((updates, current_token)) + + @defer.inlineCallbacks + def get_updates_since(self, from_token): + """Like get_updates except allows specifying from when we should + stream updates + + Returns: + (list(ROW_TYPE), int): list of updates plus the token used as an + upper bound of the updates (i.e. the "current token") + """ + if from_token in ("NOW", "now"): + defer.returnValue(([], self.upto_token)) + + current_token = self.upto_token + + from_token = int(from_token) + + if from_token == current_token: + defer.returnValue(([], current_token)) + + if self._LIMITED: + rows = yield self.update_function( + from_token, current_token, + limit=MAX_EVENTS_BEHIND + 1, + ) + + if len(rows) >= MAX_EVENTS_BEHIND: + raise Exception("stream %s has fallen behined" % (self.NAME)) + else: + rows = yield self.update_function( + from_token, current_token, + ) + + updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows] + + defer.returnValue((updates, current_token)) + + def current_token(self): + """Gets the current token of the underlying streams. Should be provided + by the sub classes + + Returns: + int + """ + raise NotImplementedError() + + def update_function(self, from_token, current_token, limit=None): + """Get updates between from_token and to_token. If Stream._LIMITED is + True then limit is provided, otherwise it's not. + + Returns: + list(tuple): the first entry in the tuple is the token for that + update, and the rest of the tuple gets used to construct + a ``ROW_TYPE`` instance + """ + raise NotImplementedError() + + +class EventsStream(Stream): + """We received a new event, or an event went from being an outlier to not + """ + NAME = "events" + ROW_TYPE = EventStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + self.current_token = store.get_current_events_token + self.update_function = store.get_all_new_forward_event_rows + + super(EventsStream, self).__init__(hs) + + +class BackfillStream(Stream): + """We fetched some old events and either we had never seen that event before + or it went from being an outlier to not. + """ + NAME = "backfill" + ROW_TYPE = BackfillStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + self.current_token = store.get_current_backfill_token + self.update_function = store.get_all_new_backfill_event_rows + + super(BackfillStream, self).__init__(hs) + + +class PresenceStream(Stream): + NAME = "presence" + _LIMITED = False + ROW_TYPE = PresenceStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + presence_handler = hs.get_presence_handler() + + self.current_token = store.get_current_presence_token + self.update_function = presence_handler.get_all_presence_updates + + super(PresenceStream, self).__init__(hs) + + +class TypingStream(Stream): + NAME = "typing" + _LIMITED = False + ROW_TYPE = TypingStreamRow + + def __init__(self, hs): + typing_handler = hs.get_typing_handler() + + self.current_token = typing_handler.get_current_token + self.update_function = typing_handler.get_all_typing_updates + + super(TypingStream, self).__init__(hs) + + +class ReceiptsStream(Stream): + NAME = "receipts" + ROW_TYPE = ReceiptsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_max_receipt_stream_id + self.update_function = store.get_all_updated_receipts + + super(ReceiptsStream, self).__init__(hs) + + +class PushRulesStream(Stream): + """A user has changed their push rules + """ + NAME = "push_rules" + ROW_TYPE = PushRulesStreamRow + + def __init__(self, hs): + self.store = hs.get_datastore() + super(PushRulesStream, self).__init__(hs) + + def current_token(self): + push_rules_token, _ = self.store.get_push_rules_stream_token() + return push_rules_token + + @defer.inlineCallbacks + def update_function(self, from_token, to_token, limit): + rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit) + defer.returnValue([(row[0], row[2]) for row in rows]) + + +class PushersStream(Stream): + """A user has added/changed/removed a pusher + """ + NAME = "pushers" + ROW_TYPE = PushersStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_pushers_stream_token + self.update_function = store.get_all_updated_pushers_rows + + super(PushersStream, self).__init__(hs) + + +class CachesStream(Stream): + """A cache was invalidated on the master and no other stream would invalidate + the cache on the workers + """ + NAME = "caches" + ROW_TYPE = CachesStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_cache_stream_token + self.update_function = store.get_all_updated_caches + + super(CachesStream, self).__init__(hs) + + +class PublicRoomsStream(Stream): + """The public rooms list changed + """ + NAME = "public_rooms" + ROW_TYPE = PublicRoomsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_current_public_room_stream_id + self.update_function = store.get_all_new_public_rooms + + super(PublicRoomsStream, self).__init__(hs) + + +class DeviceListsStream(Stream): + """Someone added/changed/removed a device + """ + NAME = "device_lists" + _LIMITED = False + ROW_TYPE = DeviceListsStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_device_stream_token + self.update_function = store.get_all_device_list_changes_for_remotes + + super(DeviceListsStream, self).__init__(hs) + + +class ToDeviceStream(Stream): + """New to_device messages for a client + """ + NAME = "to_device" + ROW_TYPE = ToDeviceStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_to_device_stream_token + self.update_function = store.get_all_new_device_messages + + super(ToDeviceStream, self).__init__(hs) + + +class FederationStream(Stream): + """Data to be sent over federation. Only available when master has federation + sending disabled. + """ + NAME = "federation" + ROW_TYPE = FederationStreamRow + + def __init__(self, hs): + federation_sender = hs.get_federation_sender() + + self.current_token = federation_sender.get_current_token + self.update_function = federation_sender.get_replication_rows + + super(FederationStream, self).__init__(hs) + + +class TagAccountDataStream(Stream): + """Someone added/removed a tag for a room + """ + NAME = "tag_account_data" + ROW_TYPE = TagAccountDataStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_max_account_data_stream_id + self.update_function = store.get_all_updated_tags + + super(TagAccountDataStream, self).__init__(hs) + + +class AccountDataStream(Stream): + """Global or per room account data was changed + """ + NAME = "account_data" + ROW_TYPE = AccountDataStreamRow + + def __init__(self, hs): + self.store = hs.get_datastore() + + self.current_token = self.store.get_max_account_data_stream_id + + super(AccountDataStream, self).__init__(hs) + + @defer.inlineCallbacks + def update_function(self, from_token, to_token, limit): + global_results, room_results = yield self.store.get_all_updated_account_data( + from_token, from_token, to_token, limit + ) + + results = list(room_results) + results.extend( + (stream_id, user_id, None, account_data_type, content,) + for stream_id, user_id, account_data_type, content in global_results + ) + + defer.returnValue(results) + + +STREAMS_MAP = { + stream.NAME: stream + for stream in ( + EventsStream, + BackfillStream, + PresenceStream, + TypingStream, + ReceiptsStream, + PushRulesStream, + PushersStream, + CachesStream, + PublicRoomsStream, + DeviceListsStream, + ToDeviceStream, + FederationStream, + TagAccountDataStream, + AccountDataStream, + ) +} From 74506934356dcb10b1704e3e66d4648e99ba6308 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 15:40:37 +0100 Subject: [PATCH 05/16] Initial TCP protocol implementation This defines the low level TCP replication protocol --- docs/tcp_replication.rst | 174 ++++++++ synapse/replication/tcp/__init__.py | 32 ++ synapse/replication/tcp/commands.py | 341 ++++++++++++++++ synapse/replication/tcp/protocol.py | 601 ++++++++++++++++++++++++++++ 4 files changed, 1148 insertions(+) create mode 100644 docs/tcp_replication.rst create mode 100644 synapse/replication/tcp/commands.py create mode 100644 synapse/replication/tcp/protocol.py diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst new file mode 100644 index 000000000000..946add284915 --- /dev/null +++ b/docs/tcp_replication.rst @@ -0,0 +1,174 @@ +TCP Replication +=============== + +This describes the TCP replication protocol that replaces the HTTP protocol. + +Motivation +---------- + +The HTTP API used long poll from the workers to the master, this has the problem +of causing a lot of duplicate work on the server. This TCP protocol aims to +solve. + +Overview +-------- + +The protocol is based on fire and forget, line based commands. An example flow +would be (where '>' indicates master->worker and '<' worker->master flows):: + + > SERVER example.com + < REPLICATE events 53 + > RDATA events 54 ["$foo1:bar.com", ...] + > RDATA events 55 ["$foo4:bar.com", ...] + +The example shows the server accepting a new connection and sending its identity +with the ``SERVER`` command, followed by the client asking to subscribe to the +``events`` stream from the token ``53``. The server then periodically sends ``RDATA`` +commands which have the format ``RDATA ```, where the +format of ```` is defined by the individual streams. + +Error reporting happens by either the client or server sending an `ERROR` +command, and usually the connection will be closed. + + +Since the protocol is a simple line based, its possible to manually connect to +the server using a tool like netcat. A few things should be noted when manually +using the protocol: + * When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can + be used to get all future updates. The special stream name ``ALL`` can be used + with ``NOW`` to subscribe to all available streams. + * The federation stream is only available if federation sending has been + disabled on the main process. + * The server will only time connections out that have sent a ``PING`` command. + If a ping is sent then the connection will be closed if no further commands + are receieved within 15s. Both the client and server protocol implementations + will send an initial PING on connection and ensure at least one command every + 5s is sent (not necessarily ``PING``). + * ``RDATA`` commands *usually* include a numeric token, however if the stream + has multiple rows to replicate per token the server will send multiple + ``RDATA`` commands, with all but the last having a token of ``batch``. See + the documentation on ``commands.RdataCommand`` for further details. + + +Architecture +------------ + +The basic structure of the protocol is line based, where the initial word of +each line specifies the command. The rest of the line is parsed based on the +command. For example, the `RDATA` command is defined as:: + + RDATA + +(Note that `` may contains spaces, but cannot contain newlines.) + +Blank lines are ignored. + + +Keep alives +~~~~~~~~~~~ + +Both sides are expected to send at least one command every 5s or so, and +should send a ``PING`` command if necessary. If either side do not receive a +command within e.g. 15s then the connection should be closed. + +Because the server may be connected to manually using e.g. netcat, the timeouts +aren't enabled until an initial ``PING`` command is seen. Both the client and +server implementations below send a ``PING`` command immediately on connection to +ensure the timeouts are enabled. + +This ensures that both sides can quickly realize if the tcp connection has gone +and handle the situation appropriately. + + +Start up +~~~~~~~~ + +When a new connection is made, the server: + * Sends a ``SERVER`` command, which includes the identity of the server, allowing + the client to detect if its connected to the expected server + * Sends a ``PING`` command as above, to enable the client to time out connections + promptly. + +The client: + * Sends a ``NAME`` command, allowing the server to associate a human friendly + name with the connection. This is optional. + * Sends a ``PING`` as above + * For each stream the client wishes to subscribe to it sends a ``REPLICATE`` + with the stream_name and token it wants to subscribe from. + * On receipt of a ``SERVER`` command, checks that the server name matches the + expected server name. + + +Error handling +~~~~~~~~~~~~~~ + +If either side detects an error it can send an ``ERROR`` command and close the +connection. + +If the client side loses the connection to the server it should reconnect, +following the steps above. + + +Congestion +~~~~~~~~~~ + +If the server sends messages faster than the client can consume them the server +will first buffer a (fairly large) number of commands and then disconnect the +client. This ensures that we don't queue up an unbounded number of commands in +memory and gives us a potential oppurtunity to squawk loudly. When/if the client +recovers it can reconnect to the server and ask for missed messages. + + +Reliability +~~~~~~~~~~~ + +In general the replication stream should be consisdered an unreliable transport +since e.g. commands are not resent if the connection disappears. + +The exception to that are the replication streams, i.e. RDATA commands, since +these include tokens which can be used to restart the stream on connection +errors. + +The client should keep track of the token in the last RDATA command received +for each stream so that on reconneciton it can start streaming from the correct +place. Note: not all RDATA have valid tokens due to batching. See +``RdataCommand`` for more details. + + +Example +~~~~~~~ + +An example iteraction is shown below. Each line is prefixed with '>' or '<' to +indicate which side is sending, these are *not* included on the wire:: + + * connection established * + > SERVER localhost:8823 + > PING 1490197665618 + < NAME synapse.app.appservice + < PING 1490197665618 + < REPLICATE events 1 + < REPLICATE backfill 1 + < REPLICATE caches 1 + > POSITION events 1 + > POSITION backfill 1 + > POSITION caches 1 + > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513] + > RDATA events 14 ["$149019767112vOHxz:localhost:8823", + "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null] + < PING 1490197675618 + > ERROR server stopping + * connection closed by server * + +The ``POSITION`` command sent by the server is used to set the clients position +without needing to send data with the ``RDATA`` command. + + +An example of a batched set of ``RDATA`` is:: + + > RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513] + > RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513] + > RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513] + > RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513] + +In this case the client shouldn't advance their caches token until it sees the +the last ``RDATA``. diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py index 451dae3b6c8e..8b4e886d4ef8 100644 --- a/synapse/replication/tcp/__init__.py +++ b/synapse/replication/tcp/__init__.py @@ -12,3 +12,35 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +"""This module implements the TCP replication protocol used by synapse to +communicate between the master process and its workers (when they're enabled). + +The protocol is based on fire and forget, line based commands. An example flow +would be (where '>' indicates master->worker and '<' worker->master flows):: + + > SERVER example.com + < REPLICATE events 53 + > RDATA events 54 ["$foo1:bar.com", ...] + > RDATA events 55 ["$foo4:bar.com", ...] + +The example shows the server accepting a new connection and sending its identity +with the `SERVER` command, followed by the client asking to subscribe to the +`events` stream from the token `53`. The server then periodically sends `RDATA` +commands which have the format `RDATA `, where the +format of `` is defined by the individual streams. + +Error reporting happens by either the client or server sending an `ERROR` +command, and usually the connection will be closed. + + +Structure of the module: + * client.py - the client classes used for workers to connect to master + * command.py - the definitions of all the valid commands + * protocol.py - contains bot the client and server protocol implementations, + these should not be used directly + * resource.py - the server classes that accepts and handle client connections + * streams.py - the definitons of all the valid streams + +Further details can be found in docs/tcp_replication.rst +""" diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py new file mode 100644 index 000000000000..68165cf2dc4c --- /dev/null +++ b/synapse/replication/tcp/commands.py @@ -0,0 +1,341 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Defines the various valid commands + +The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are +allowed to be sent by which side. +""" + +import logging +import ujson as json + + +logger = logging.getLogger(__name__) + + +class Command(object): + """The base command class. + + All subclasses must set the NAME variable which equates to the name of the + command on the wire. + + A full command line on the wire is constructed from `NAME + " " + to_line()` + + The default implementation creates a command of form ` ` + """ + NAME = None + + def __init__(self, data): + self.data = data + + @classmethod + def from_line(cls, line): + """Deserialises a line from the wire into this command. `line` does not + include the command. + """ + return cls(line) + + def to_line(self): + """Serialises the comamnd for the wire. Does not include the command + prefix. + """ + return self.data + + +class ServerCommand(Command): + """Sent by the server on new connection and includes the server_name. + + Format:: + + SERVER + """ + NAME = "SERVER" + + +class RdataCommand(Command): + """Sent by server when a subscribed stream has an update. + + Format:: + + RDATA + + The `` may either be a numeric stream id OR "batch". The latter case + is used to support sending multiple updates with the same stream ID. This + is done by sending an RDATA for each row, with all but the last RDATA having + a token of "batch" and the last having the final stream ID. + + The client should batch all incoming RDATA with a token of "batch" (per + stream_name) until it sees an RDATA with a numeric stream ID. + + `` of "batch" maps to the instance variable `token` being None. + + An example of a batched series of RDATA:: + + RDATA presence batch ["@foo:example.com", "online", ...] + RDATA presence batch ["@bar:example.com", "online", ...] + RDATA presence 59 ["@baz:example.com", "online", ...] + """ + NAME = "RDATA" + + def __init__(self, stream_name, token, row): + self.stream_name = stream_name + self.token = token + self.row = row + + @classmethod + def from_line(cls, line): + stream_name, token, row_json = line.split(" ", 2) + return cls( + stream_name, + None if token == "batch" else int(token), + json.loads(row_json) + ) + + def to_line(self): + return " ".join(( + self.stream_name, + str(self.token) if self.token is not None else "batch", + json.dumps(self.row), + )) + + +class PositionCommand(Command): + """Sent by the client to tell the client the stream postition without + needing to send an RDATA. + """ + NAME = "POSITION" + + def __init__(self, stream_name, token): + self.stream_name = stream_name + self.token = token + + @classmethod + def from_line(cls, line): + stream_name, token = line.split(" ", 1) + return cls(stream_name, int(token)) + + def to_line(self): + return " ".join((self.stream_name, str(self.token),)) + + +class ErrorCommand(Command): + """Sent by either side if there was an ERROR. The data is a string describing + the error. + """ + NAME = "ERROR" + + +class PingCommand(Command): + """Sent by either side as a keep alive. The data is arbitary (often timestamp) + """ + NAME = "PING" + + +class NameCommand(Command): + """Sent by client to inform the server of the client's identity. The data + is the name + """ + NAME = "NAME" + + +class ReplicateCommand(Command): + """Sent by the client to subscribe to the stream. + + Format:: + + REPLICATE + + Where may be either: + * a numeric stream_id to stream updates from + * "NOW" to stream all subsequent updates. + + The can be "ALL" to subscribe to all known streams, in which + case the must be set to "NOW", i.e.:: + + REPLICATE ALL NOW + """ + NAME = "REPLICATE" + + def __init__(self, stream_name, token): + self.stream_name = stream_name + self.token = token + + @classmethod + def from_line(cls, line): + stream_name, token = line.split(" ", 1) + if token in ("NOW", "now"): + token = "NOW" + else: + token = int(token) + return cls(stream_name, token) + + def to_line(self): + return " ".join((self.stream_name, str(self.token),)) + + +class UserSyncCommand(Command): + """Sent by the client to inform the server that a user has started or + stopped syncing. Used to calculate presence on the master. + + Format:: + + USER_SYNC + + Where is either "start" or "stop" + """ + NAME = "USER_SYNC" + + def __init__(self, user_id, is_syncing): + self.user_id = user_id + self.is_syncing = is_syncing + + @classmethod + def from_line(cls, line): + user_id, state = line.split(" ", 1) + + if state not in ("start", "end"): + raise Exception("Invalid USER_SYNC state %r" % (state,)) + + return cls(user_id, state == "start") + + def to_line(self): + return " ".join((self.user_id, "start" if self.is_syncing else "end")) + + +class FederationAckCommand(Command): + """Sent by the client when it has processed up to a given point in the + federation stream. This allows the master to drop in-memory caches of the + federation stream. + + This must only be sent from one worker (i.e. the one sending federation) + + Format:: + + FEDERATION_ACK + """ + NAME = "FEDERATION_ACK" + + def __init__(self, token): + self.token = token + + @classmethod + def from_line(cls, line): + return cls(int(line)) + + def to_line(self): + return str(self.token) + + +class SyncCommand(Command): + """Used for testing. The client protocol implementation allows waiting + on a SYNC command with a specified data. + """ + NAME = "SYNC" + + +class RemovePusherCommand(Command): + """Sent by the client to request the master remove the given pusher. + + Format:: + + REMOVE_PUSHER + """ + NAME = "REMOVE_PUSHER" + + def __init__(self, app_id, push_key, user_id): + self.user_id = user_id + self.app_id = app_id + self.push_key = push_key + + @classmethod + def from_line(cls, line): + app_id, push_key, user_id = line.split(" ", 2) + + return cls(app_id, push_key, user_id) + + def to_line(self): + return " ".join((self.app_id, self.push_key, self.user_id)) + + +class InvalidateCacheCommand(Command): + """Sent by the client to invalidate an upstream cache. + + THIS IS NOT RELIABLE, AND SHOULD *NOT* BE USED ACCEPT FOR THINGS THAT ARE + NOT DISASTROUS IF WE DROP ON THE FLOOR. + + Mainly used to invalidate destination retry timing caches. + + Format:: + + INVALIDATE_CACHE + + Where is a json list. + """ + NAME = "INVALIDATE_CACHE" + + def __init__(self, cache_func, keys): + self.cache_func = cache_func + self.keys = keys + + @classmethod + def from_line(cls, line): + cache_func, keys_json = line.split(" ", 1) + + return cls(cache_func, json.loads(keys_json)) + + def to_line(self): + return " ".join((self.cache_func, json.dumps(self.keys))) + + +# Map of command name to command type. +COMMAND_MAP = { + cmd.NAME: cmd + for cmd in ( + ServerCommand, + RdataCommand, + PositionCommand, + ErrorCommand, + PingCommand, + NameCommand, + ReplicateCommand, + UserSyncCommand, + FederationAckCommand, + SyncCommand, + RemovePusherCommand, + InvalidateCacheCommand, + ) +} + +# The commands the server is allowed to send +VALID_SERVER_COMMANDS = ( + ServerCommand.NAME, + RdataCommand.NAME, + PositionCommand.NAME, + ErrorCommand.NAME, + PingCommand.NAME, + SyncCommand.NAME, +) + +# The commands the client is allowed to send +VALID_CLIENT_COMMANDS = ( + NameCommand.NAME, + ReplicateCommand.NAME, + PingCommand.NAME, + UserSyncCommand.NAME, + FederationAckCommand.NAME, + RemovePusherCommand.NAME, + InvalidateCacheCommand.NAME, + ErrorCommand.NAME, +) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py new file mode 100644 index 000000000000..c1dc91bdb7eb --- /dev/null +++ b/synapse/replication/tcp/protocol.py @@ -0,0 +1,601 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This module contains the implementation of both the client and server +protocols. + +The basic structure of the protocol is line based, where the initial word of +each line specifies the command. The rest of the line is parsed based on the +command. For example, the `RDATA` command is defined as:: + + RDATA + +(Note that `` may contains spaces, but cannot contain newlines.) + +Blank lines are ignored. + +# Example + +An example iteraction is shown below. Each line is prefixed with '>' or '<' to +indicate which side is sending, these are *not* included on the wire:: + + * connection established * + > SERVER localhost:8823 + > PING 1490197665618 + < NAME synapse.app.appservice + < PING 1490197665618 + < REPLICATE events 1 + < REPLICATE backfill 1 + < REPLICATE caches 1 + > POSITION events 1 + > POSITION backfill 1 + > POSITION caches 1 + > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513] + > RDATA events 14 ["$149019767112vOHxz:localhost:8823", + "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null] + < PING 1490197675618 + > ERROR server stopping + * connection closed by server * +""" + +from twisted.internet import defer +from twisted.protocols.basic import LineOnlyReceiver + +from commands import ( + COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS, + ErrorCommand, ServerCommand, RdataCommand, PositionCommand, PingCommand, + NameCommand, ReplicateCommand, UserSyncCommand, SyncCommand, +) +from streams import STREAMS_MAP + +from synapse.util.stringutils import random_string + +import logging +import synapse.metrics +import struct +import fcntl + + +metrics = synapse.metrics.get_metrics_for(__name__) + +inbound_commands_counter = metrics.register_counter( + "inbound_commands", labels=["command", "name", "conn_id"], +) +outbound_commands_counter = metrics.register_counter( + "outbound_commands", labels=["command", "name", "conn_id"], +) + + +# A list of all connected protocols. This allows us to send metrics about the +# connections. +connected_connections = [] + + +logger = logging.getLogger(__name__) + + +PING_TIME = 5000 + + +class ConnectionStates(object): + CONNECTING = "connecting" + ESTABLISHED = "established" + PAUSED = "paused" + CLOSED = "closed" + + +class BaseReplicationStreamProtocol(LineOnlyReceiver): + """Base replication protocol shared between client and server. + + Reads lines (ignoring blank ones) and parses them into command classes, + asserting that they are valid for the given direction, i.e. server commands + are only sent by the server. + + On receiving a new command it calls `on_` with the parsed + command. + + It also sends `PING` periodically, and correctly times out remote connections + (if they send a `PING` command) + """ + delimiter = b'\n' + + VALID_INBOUND_COMMANDS = [] # Valid commands we expect to receive + VALID_OUTBOUND_COMMANDS = [] # Valid commans we can send + + max_line_buffer = 10000 + + def __init__(self, clock): + self.clock = clock + + self.last_received_command = self.clock.time_msec() + self.last_sent_command = 0 + self.time_we_closed = None # When we requested the connection be closed + + self.received_ping = False # Have we reecived a ping from the other side + + self.state = ConnectionStates.CONNECTING + + self.name = "anon" # The name sent by a client. + self.conn_id = random_string(5) # To dedupe in case of name clashes. + + # List of pending commands to send once we've established the connection + self.pending_commands = [] + + # The LoopingCall for sending pings. + self._send_ping_loop = None + + def connectionMade(self): + logger.info("[%s] Connection established", self.id()) + + self.state = ConnectionStates.ESTABLISHED + + connected_connections.append(self) # Register connection for metrics + + self.transport.registerProducer(self, True) # For the *Producing callbacks + + self._send_pending_commands() + + # Starts sending pings + self._send_ping_loop = self.clock.looping_call(self.send_ping, 5000) + + # Always send the initial PING so that the other side knows that they + # can time us out. + self.send_command(PingCommand(self.clock.time_msec())) + + def send_ping(self): + """Periodically sends a ping and checks if we should close the connection + due to the other side timing out. + """ + now = self.clock.time_msec() + + if self.time_we_closed: + if now - self.time_we_closed > PING_TIME * 3: + logger.info( + "[%s] Failed to close connection gracefully, aborting", self.id() + ) + self.transport.abortConnection() + else: + if now - self.last_sent_command >= PING_TIME: + self.send_command(PingCommand(now)) + + if self.received_ping and now - self.last_received_command > PING_TIME * 3: + logger.info( + "[%s] Connection hasn't received command in %r ms. Closing.", + self.id(), now - self.last_received_command + ) + self.send_error("ping timeout") + + def lineReceived(self, line): + """Called when we've received a line + """ + if line.strip() == "": + # Ignore blank lines + return + + line = line.decode("utf-8") + cmd_name, rest_of_line = line.split(" ", 1) + + if cmd_name not in self.VALID_INBOUND_COMMANDS: + logger.error("[%s] invalid command %s", self.id(), cmd_name) + self.send_error("invalid command: %s", cmd_name) + return + + self.last_received_command = self.clock.time_msec() + + inbound_commands_counter.inc(cmd_name, self.name, self.conn_id) + + cmd_cls = COMMAND_MAP[cmd_name] + try: + cmd = cmd_cls.from_line(rest_of_line) + except Exception as e: + logger.exception( + "[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line + ) + self.send_error( + "failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line) + ) + return + + # Now lets try and call on_ function + try: + getattr(self, "on_%s" % (cmd_name,))(cmd) + except Exception: + logger.exception("[%s] Failed to handle line: %r", self.id(), line) + + def close(self): + self.time_we_closed = self.clock.time_msec() + self.transport.loseConnection() + self.on_connection_closed() + + def send_error(self, error_string, *args): + """Send an error to remote and close the connection. + """ + self.send_command(ErrorCommand(error_string % args)) + self.close() + + def send_command(self, cmd, do_buffer=True): + """Send a command if connection has been established. + + Args: + cmd (Command) + do_buffer (bool): Whether to buffer the message or always attempt + to send the command. This is mostly used to send an error + message if we're about to close the connection due our buffers + becoming full. + """ + if self.state == ConnectionStates.CLOSED: + logger.info("[%s] Not sending, connection closed", self.id()) + return + + if do_buffer and self.state != ConnectionStates.ESTABLISHED: + self._queue_command(cmd) + return + + outbound_commands_counter.inc(cmd.NAME, self.name, self.conn_id) + + string = "%s %s" % (cmd.NAME, cmd.to_line(),) + if "\n" in string: + raise Exception("Unexpected newline in command: %r", string) + + self.sendLine(string.encode("utf-8")) + + self.last_sent_command = self.clock.time_msec() + + def _queue_command(self, cmd): + """Queue the command until the connection is ready to write to again. + """ + logger.info("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd) + self.pending_commands.append(cmd) + + if len(self.pending_commands) > self.max_line_buffer: + # The other side is failing to keep up and out buffers are becoming + # full, so lets close the connection. + # XXX: should we squawk more loudly? + logger.error("[%s] Remote failed to keep up", self.id()) + self.send_command(ErrorCommand("Failed to keep up"), do_buffer=False) + self.close() + + def _send_pending_commands(self): + """Send any queued commandes + """ + pending = self.pending_commands + self.pending_commands = [] + for cmd in pending: + self.send_command(cmd) + + def on_PING(self, line): + self.received_ping = True + + def on_ERROR(self, cmd): + logger.error("[%s] Remote reported error: %r", self.id(), cmd.data) + + def pauseProducing(self): + """This is called when both the kernel send buffer and the twisted + tcp connection send buffers have become full. + + We don't actually have any control over those sizes, so we buffer some + commands ourselves before knifing the connection due to the remote + failing to keep up. + """ + logger.info("[%s] Pause producing", self.id()) + self.state = ConnectionStates.PAUSED + + def resumeProducing(self): + """The remote has caught up after we started buffering! + """ + logger.info("[%s] Resume producing", self.id()) + self.state = ConnectionStates.ESTABLISHED + self._send_pending_commands() + + def stopProducing(self): + """We're never going to send any more data (normally because either + we or the remote has closed the connection) + """ + logger.info("[%s] Stop producing", self.id()) + self.on_connection_closed() + + def connectionLost(self, reason): + logger.info("[%s] Replication connection closed: %r", self.id(), reason) + + try: + # Remove us from list of connections to be monitored + connected_connections.remove(self) + except ValueError: + pass + + # Stop the looping call sending pings. + if self._send_ping_loop and self._send_ping_loop.running: + self._send_ping_loop.stop() + + self.on_connection_closed() + + def on_connection_closed(self): + logger.info("[%s] Connection was closed", self.id()) + + self.state = ConnectionStates.CLOSED + self.pending_commands = [] + + if self.transport: + self.transport.unregisterProducer() + + def __str__(self): + return "ReplicationConnection" % ( + self.name, self.conn_id, self.addr, + ) + + def id(self): + return "%s-%s" % (self.name, self.conn_id) + + +class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): + VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS + VALID_OUTBOUND_COMMANDS = VALID_SERVER_COMMANDS + + def __init__(self, server_name, clock, streamer, addr): + BaseReplicationStreamProtocol.__init__(self, clock) # Old style class + + self.server_name = server_name + self.streamer = streamer + self.addr = addr + + # The streams the client has subscribed to and is up to date with + self.replication_streams = set() + + # The streams the client is currently subscribing to. + self.connecting_streams = set() + + # Map from stream name to list of updates to send once we've finished + # subscribing the client to the stream. + self.pending_rdata = {} + + def connectionMade(self): + self.send_command(ServerCommand(self.server_name)) + BaseReplicationStreamProtocol.connectionMade(self) + self.streamer.new_connection(self) + + def on_NAME(self, cmd): + self.name = cmd.data + + def on_USER_SYNC(self, cmd): + self.streamer.on_user_sync(self.conn_id, cmd.user_id, cmd.is_syncing) + + def on_REPLICATE(self, cmd): + stream_name = cmd.stream_name + token = cmd.token + + if stream_name == "ALL": + # Subscribe to all streams we're publishing to. + for stream in self.streamer.streams_by_name.iterkeys(): + self.subscribe_to_stream(stream, token) + else: + self.subscribe_to_stream(stream_name, token) + + def on_FEDERATION_ACK(self, cmd): + self.streamer.federation_ack(cmd.token) + + def on_REMOVE_PUSHER(self, cmd): + self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id) + + def onINVALIDATE_CACHE(self, cmd): + self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) + + @defer.inlineCallbacks + def subscribe_to_stream(self, stream_name, token): + """Subscribe the remote to a streams. + + This invloves checking if they've missed anything and sending those + updates down if they have. During that time new updates for the stream + are queued and sent once we've sent down any missed updates. + """ + self.replication_streams.discard(stream_name) + self.connecting_streams.add(stream_name) + + try: + # Get missing updates + updates, current_token = yield self.streamer.get_stream_updates( + stream_name, token, + ) + + # Send all the missing updates + for update in updates: + token, row = update[0], update[1] + self.send_command(RdataCommand(stream_name, token, row)) + + # Now we can send any updates that came in while we were subscribing + pending_rdata = self.pending_rdata.pop(stream_name, []) + for token, update in pending_rdata: + self.send_command(RdataCommand(stream_name, token, update)) + + # We send a POSITION command to ensure that they have an up to + # date token (especially useful if we didn't send any updates + # above) + self.send_command(PositionCommand(stream_name, current_token)) + + # They're now fully subscribed + self.replication_streams.add(stream_name) + except Exception as e: + logger.exception("[%s] Failed to handle REPLICATE command", self.id()) + self.send_error("failed to handle replicate: %r", e) + finally: + self.connecting_streams.discard(stream_name) + + def stream_update(self, stream_name, token, data): + """Called when a new update is available to stream to clients. + + We need to check if the client is interested in the stream or not + """ + if stream_name in self.replication_streams: + # The client is subscribed to the stream + self.send_command(RdataCommand(stream_name, token, data)) + elif stream_name in self.connecting_streams: + # The client is being subscribed to the stream + logger.info("[%s] Queuing RDATA %r %r", self.id(), stream_name, token) + self.pending_rdata.setdefault(stream_name, []).append((token, data)) + else: + # The client isn't subscribed + logger.debug("[%s] Dropping RDATA %r %r", self.id(), stream_name, token) + + def send_sync(self, data): + self.send_command(SyncCommand(data)) + + def on_connection_closed(self): + BaseReplicationStreamProtocol.on_connection_closed(self) + logger.info("[%s] Replication connection closed", self.id()) + self.streamer.lost_connection(self) + + +class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): + VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS + VALID_OUTBOUND_COMMANDS = VALID_CLIENT_COMMANDS + + def __init__(self, client_name, server_name, clock, handler): + BaseReplicationStreamProtocol.__init__(self, clock) + + self.client_name = client_name + self.server_name = server_name + self.handler = handler + + # Map of stream to batched updates. See RdataCommand for info on how + # batching works. + self.pending_batches = {} + + def connectionMade(self): + self.send_command(NameCommand(self.client_name)) + BaseReplicationStreamProtocol.connectionMade(self) + + # Once we've connected subscribe to the necessary streams + for stream_name, token in self.handler.get_streams_to_replicate().iteritems(): + self.replicate(stream_name, token) + + # Tell the server if we have any users currently syncing (should only + # happen on synchrotrons) + currently_syncing = self.handler.get_currently_syncing_users() + for user_id in currently_syncing: + self.send_command(UserSyncCommand(user_id, True)) + + # We've now finished connecting to so inform the client handler + self.handler.update_connection(self) + + def on_SERVER(self, cmd): + if cmd.data != self.server_name: + logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) + self.transport.abortConnection() + + def on_RDATA(self, cmd): + try: + row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row) + except Exception: + logger.exception( + "[%s] Failed to parse RDATA: %r %r", + self.id(), cmd.stream_name, cmd.row + ) + raise + + if cmd.token is None: + # I.e. this is part of a batch of updates for this stream. Batch + # until we get an update for the stream with a non None token + self.pending_batches.setdefault(cmd.stream_name, []).append(row) + else: + # Check if this is the last of a batch of updates + rows = self.pending_batches.pop(cmd.stream_name, []) + rows.append(row) + + self.handler.on_rdata(cmd.stream_name, cmd.token, rows) + + def on_POSITION(self, cmd): + self.handler.on_position(cmd.stream_name, cmd.token) + + def on_SYNC(self, cmd): + self.handler.on_sync(cmd.data) + + def replicate(self, stream_name, token): + """Send the subscription request to the server + """ + if stream_name not in STREAMS_MAP: + raise Exception("Invalid stream name %r" % (stream_name,)) + + logger.info( + "[%s] Subscribing to replication stream: %r from %r", + self.id(), stream_name, token + ) + + self.send_command(ReplicateCommand(stream_name, token)) + + def on_connection_closed(self): + BaseReplicationStreamProtocol.on_connection_closed(self) + self.handler.update_connection(None) + + +# The following simply registers metrics for the replication connections + +metrics.register_callback( + "pending_commands", + lambda: { + (p.name, p.conn_id): len(p.pending_commands) + for p in connected_connections + }, + labels=["name", "conn_id"], +) + + +def transport_buffer_size(protocol): + if protocol.transport: + size = len(protocol.transport.dataBuffer) + protocol.transport._tempDataLen + return size + return 0 + + +metrics.register_callback( + "transport_send_buffer", + lambda: { + (p.name, p.conn_id): transport_buffer_size(p) + for p in connected_connections + }, + labels=["name", "conn_id"], +) + + +def transport_kernel_read_buffer_size(protocol, read=True): + SIOCINQ = 0x541B + SIOCOUTQ = 0x5411 + + if protocol.transport: + fileno = protocol.transport.getHandle().fileno() + if read: + op = SIOCINQ + else: + op = SIOCOUTQ + size = struct.unpack("I", fcntl.ioctl(fileno, op, '\0\0\0\0'))[0] + return size + return 0 + + +metrics.register_callback( + "transport_kernel_send_buffer", + lambda: { + (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False) + for p in connected_connections + }, + labels=["name", "conn_id"], +) + + +metrics.register_callback( + "transport_kernel_read_buffer", + lambda: { + (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True) + for p in connected_connections + }, + labels=["name", "conn_id"], +) From 4d7fc7f9771d60745a41770cc645fcdb482a65f2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 15:42:07 +0100 Subject: [PATCH 06/16] Add server side resource for tcp replication --- synapse/replication/tcp/resource.py | 300 ++++++++++++++++++++++++++++ 1 file changed, 300 insertions(+) create mode 100644 synapse/replication/tcp/resource.py diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py new file mode 100644 index 000000000000..d5da0496a813 --- /dev/null +++ b/synapse/replication/tcp/resource.py @@ -0,0 +1,300 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The server side of the replication stream. +""" + +from twisted.internet import defer, reactor +from twisted.internet.protocol import Factory + +from streams import STREAMS_MAP, FederationStream +from protocol import ServerReplicationStreamProtocol + +from synapse.util.logcontext import preserve_fn +from synapse.util.metrics import Measure, measure_func + +import logging +import synapse.metrics + + +metrics = synapse.metrics.get_metrics_for(__name__) +stream_updates_counter = metrics.register_counter( + "stream_updates", labels=["stream_name"] +) +user_sync_counter = metrics.register_counter("user_sync") +federation_ack_counter = metrics.register_counter("federation_ack") +remove_pusher_counter = metrics.register_counter("remove_pusher") +invalidate_cache_counter = metrics.register_counter("invalidate_cache") + +logger = logging.getLogger(__name__) + + +class ReplicationStreamProtocolFactory(Factory): + """Factory for new replication connections. + """ + def __init__(self, hs): + self.streamer = ReplicationStreamer(hs) + self.clock = hs.get_clock() + self.server_name = hs.config.server_name + + def buildProtocol(self, addr): + return ServerReplicationStreamProtocol( + self.server_name, + self.clock, + self.streamer, + addr + ) + + +class ReplicationStreamer(object): + """Handles replication connections. + + This needs to be poked when new replication data may be available. When new + data is available it will propagate to all connected clients. + """ + + def __init__(self, hs): + self.store = hs.get_datastore() + self.notifier = hs.get_notifier() + self.presence_handler = hs.get_presence_handler() + self.clock = hs.get_clock() + + # Current connections. + self.connections = [] + + metrics.register_callback("total_connections", lambda: len(self.connections)) + + # List of streams that clients can subscribe to. + # We only support federation stream if federation sending hase been + # disabled on the master. + self.streams = [ + stream(hs) for stream in STREAMS_MAP.itervalues() + if stream != FederationStream or not hs.config.send_federation + ] + + self.streams_by_name = {stream.NAME: stream for stream in self.streams} + + metrics.register_callback( + "connections_per_stream", + lambda: { + (stream_name,): len([ + conn for conn in self.connections + if stream_name in conn.replication_streams + ]) + for stream_name in self.streams_by_name + }, + labels=["stream_name"], + ) + + self.federation_sender = None + if not hs.config.send_federation: + self.federation_sender = hs.get_federation_sender() + + # Start listening for updates from the notifier + preserve_fn(self.notifier_listener)() + + # Keeps track of whether we are currently checking for updates + self.is_looping = False + self.pending_updates = False + + reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown) + + def on_shutdown(self): + # close all connections on shutdown + for conn in self.connections: + conn.send_error("server shutting down") + + @defer.inlineCallbacks + def notifier_listener(self): + """Sits forever looping on the notifier waiting for new data. + """ + while True: + yield self.notifier.wait_once_for_replication() + logger.debug("Woken up by notifier") + # We need to call this each time we get woken up, as per docstring + preserve_fn(self.on_notifier_poke)() + + @defer.inlineCallbacks + def on_notifier_poke(self): + """Checks if there is actually any new data and sends it to the + connections if there are. + + This should get called each time new data is available, even if it + is currently being executed, so that nothing gets missed + """ + if not self.connections: + # Don't bother if nothing is listening + return + + # If we're in the process of checking for new updates, mark that fact + # and return + if self.is_looping: + logger.debug("Noitifier poke loop already running") + self.pending_updates = True + return + + self.pending_updates = True + self.is_looping = True + + try: + # Keep looping while there have been pokes about potential updates. + # This protects against the race where a stream we already checked + # gets an update while we're handling other streams. + while self.pending_updates: + self.pending_updates = False + + with Measure(self.clock, "repl.stream.get_updates"): + # First we tell the streams that they should update their + # current tokens. + for stream in self.streams: + stream.advance_current_token() + + for stream in self.streams: + if stream.last_token == stream.upto_token: + continue + + logger.debug( + "Getting stream: %s: %s -> %s", + stream.NAME, stream.last_token, stream.upto_token + ) + updates, current_token = yield stream.get_updates() + + logger.debug( + "Sending %d updates to %d connections", + len(updates), len(self.connections), + ) + + if updates: + logger.info( + "Streaming: %s -> %s", stream.NAME, updates[-1][0] + ) + stream_updates_counter.inc_by(len(updates), stream.NAME) + + # Some streams return multiple rows with the same stream IDs, + # we need to make sure they get sent out in batches. We do + # this by setting the current token to all but the last of + # a series of updates with the same token to have a None + # token. See RdataCommand for more details. + batched_updates = _batch_updates(updates) + + for conn in self.connections: + for token, row in batched_updates: + try: + conn.stream_update(stream.NAME, token, row) + except Exception: + logger.exception("Failed to replicate") + + logger.debug("No more pending updates, breaking poke loop") + finally: + self.pending_updates = False + self.is_looping = False + + @measure_func("repl.get_stream_updates") + def get_stream_updates(self, stream_name, token): + """For a given stream get all updates since token. This is called when + a client first subscribes to a stream. + """ + stream = self.streams_by_name.get(stream_name, None) + if not stream: + raise Exception("unknown stream %s", stream_name) + + return stream.get_updates_since(token) + + @measure_func("repl.federation_ack") + def federation_ack(self, token): + """We've received an ack for federation stream from a client. + """ + federation_ack_counter.inc() + if self.federation_sender: + self.federation_sender.federation_ack(token) + + @measure_func("repl.on_user_sync") + def on_user_sync(self, conn_id, user_id, is_syncing): + """A client has started/stopped syncing on a worker. + """ + user_sync_counter.inc() + self.presence_handler.update_external_syncs_row( + conn_id, user_id, is_syncing + ) + + @measure_func("repl.on_remove_pusher") + @defer.inlineCallbacks + def on_remove_pusher(self, app_id, push_key, user_id): + """A client has asked us to remove a pusher + """ + remove_pusher_counter.inc() + yield self.store.delete_pusher_by_app_id_pushkey_user_id( + app_id=app_id, pushkey=push_key, user_id=user_id + ) + + self.notifier.on_new_replication_data() + + @measure_func("repl.on_invalidate_cache") + def on_invalidate_cache(self, cache_func, keys): + """The client has asked us to invalidate a cache + """ + invalidate_cache_counter.inc() + getattr(self.store, cache_func).invalidate(tuple(keys)) + + def send_sync_to_all_connections(self, data): + """Sends a SYNC command to all clients. + + Used in tests. + """ + for conn in self.connections: + conn.send_sync(data) + + def new_connection(self, connection): + """A new client connection has been established + """ + self.connections.append(connection) + + def lost_connection(self, connection): + """A client connection has been lost + """ + try: + self.connections.remove(connection) + except ValueError: + pass + + # We need to tell the presence handler that the connection has been + # lost so that it can handle any ongoing syncs on that connection. + self.presence_handler.update_external_syncs_clear(connection.conn_id) + + +def _batch_updates(updates): + """Takes a list of updates of form [(token, row)] and sets the token to + None for all rows where the next row has the same token. This is used to + implement batching. + + For example: + + [(1, _), (1, _), (2, _), (3, _), (3, _)] + + becomes: + + [(None, _), (1, _), (2, _), (None, _), (3, _)] + """ + if not updates: + return [] + + new_updates = [] + for i, update in enumerate(updates[:-1]): + if update[0] == updates[i + 1][0]: + new_updates.append((None, update[1])) + else: + new_updates.append(update) + + new_updates.append(updates[-1]) + return new_updates From e9dd8370b0ae8ecc1ac3bf13857a230ae9f72199 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 15:56:25 +0100 Subject: [PATCH 07/16] Add functions to presence to support remote syncs The TCP replication protocol streams deltas of who has started or stopped syncing. This is different from the HTTP API which periodically sends the full list of users who are syncing. This commit adds support for the new TCP style of sending deltas. --- synapse/handlers/presence.py | 66 ++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1ede117c7959..9f9257029e45 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -30,6 +30,7 @@ from synapse.storage.presence import UserPresenceState from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.async import Linearizer from synapse.util.logcontext import preserve_fn from synapse.util.logutils import log_function from synapse.util.metrics import Measure @@ -187,6 +188,7 @@ def __init__(self, hs): # process_id to millisecond timestamp last updated. self.external_process_to_current_syncs = {} self.external_process_last_updated_ms = {} + self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") # Start a LoopingCall in 30s that fires every 5s. # The initial delay is to allow disconnected clients a chance to @@ -508,6 +510,70 @@ def update_external_syncs(self, process_id, syncing_user_ids): self.external_process_last_updated_ms[process_id] = self.clock.time_msec() self.external_process_to_current_syncs[process_id] = syncing_user_ids + @defer.inlineCallbacks + def update_external_syncs_row(self, process_id, user_id, is_syncing): + """Update the syncing users for an external process as a delta. + + Args: + process_id (str): An identifier for the process the users are + syncing against. This allows synapse to process updates + as user start and stop syncing against a given process. + user_id (str): The user who has started or stopped syncing + is_syncing (bool): Whether or not the user is now syncing + """ + with (yield self.external_sync_linearizer.queue(process_id)): + prev_state = yield self.current_state_for_user(user_id) + + process_presence = self.external_process_to_current_syncs.setdefault( + process_id, set() + ) + time_now_ms = self.clock.time_msec() + + updates = [] + if is_syncing and user_id not in process_presence: + if prev_state.state == PresenceState.OFFLINE: + updates.append(prev_state.copy_and_replace( + state=PresenceState.ONLINE, + last_active_ts=time_now_ms, + last_user_sync_ts=time_now_ms, + )) + else: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + process_presence.add(user_id) + elif user_id in process_presence: + updates.append(prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + )) + + if updates: + yield self._update_states(updates) + + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() + + @defer.inlineCallbacks + def update_external_syncs_clear(self, process_id): + """Marks all users that had been marked as syncing by a given process + as offline. + + Used when the process has stopped/disappeared. + """ + with (yield self.external_sync_linearizer.queue(process_id)): + process_presence = self.external_process_to_current_syncs.pop( + process_id, set() + ) + prev_states = yield self.current_state_for_users(process_presence) + time_now_ms = self.clock.time_msec() + + yield self._update_states([ + prev_state.copy_and_replace( + last_user_sync_ts=time_now_ms, + ) + for prev_state in prev_states.itervalues() + ]) + self.external_process_last_updated_ms.pop(process_id, None) + @defer.inlineCallbacks def current_state_for_user(self, user_id): """Get the current presence state for a user. From 3ba2859e0c1fb30a22b70c25de89eb4242d14d6e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Mar 2017 13:31:10 +0100 Subject: [PATCH 08/16] Add tcp replication listener type and hook it up --- synapse/app/homeserver.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 2cdd2d39ffa9..990eb477e5cb 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -56,6 +56,7 @@ from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX +from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.federation.transport.server import TransportLayerServer from synapse.util.rlimit import change_resource_limit @@ -222,6 +223,16 @@ def start_listening(self): ), interface=address ) + elif listener["type"] == "replication": + bind_addresses = listener["bind_addresses"] + for address in bind_addresses: + factory = ReplicationStreamProtocolFactory(self) + server_listener = reactor.listenTCP( + listener["port"], factory, interface=address + ) + reactor.addSystemEventTrigger( + "before", "shutdown", server_listener.stopListening, + ) else: logger.warn("Unrecognized listener type: %s", listener["type"]) From 31e0fe90315c46d62163cd5cb78cf9302e9dd0c9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Mar 2017 13:54:15 +0100 Subject: [PATCH 09/16] Fix indentation in docs/ --- docs/tcp_replication.rst | 53 +++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst index 946add284915..be0aa6b28c0b 100644 --- a/docs/tcp_replication.rst +++ b/docs/tcp_replication.rst @@ -34,20 +34,21 @@ command, and usually the connection will be closed. Since the protocol is a simple line based, its possible to manually connect to the server using a tool like netcat. A few things should be noted when manually using the protocol: - * When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can - be used to get all future updates. The special stream name ``ALL`` can be used - with ``NOW`` to subscribe to all available streams. - * The federation stream is only available if federation sending has been - disabled on the main process. - * The server will only time connections out that have sent a ``PING`` command. - If a ping is sent then the connection will be closed if no further commands - are receieved within 15s. Both the client and server protocol implementations - will send an initial PING on connection and ensure at least one command every - 5s is sent (not necessarily ``PING``). - * ``RDATA`` commands *usually* include a numeric token, however if the stream - has multiple rows to replicate per token the server will send multiple - ``RDATA`` commands, with all but the last having a token of ``batch``. See - the documentation on ``commands.RdataCommand`` for further details. + +* When subscribing to a stream using ``REPLICATE``, the special token ``NOW`` can + be used to get all future updates. The special stream name ``ALL`` can be used + with ``NOW`` to subscribe to all available streams. +* The federation stream is only available if federation sending has been + disabled on the main process. +* The server will only time connections out that have sent a ``PING`` command. + If a ping is sent then the connection will be closed if no further commands + are receieved within 15s. Both the client and server protocol implementations + will send an initial PING on connection and ensure at least one command every + 5s is sent (not necessarily ``PING``). +* ``RDATA`` commands *usually* include a numeric token, however if the stream + has multiple rows to replicate per token the server will send multiple + ``RDATA`` commands, with all but the last having a token of ``batch``. See + the documentation on ``commands.RdataCommand`` for further details. Architecture @@ -84,19 +85,21 @@ Start up ~~~~~~~~ When a new connection is made, the server: - * Sends a ``SERVER`` command, which includes the identity of the server, allowing - the client to detect if its connected to the expected server - * Sends a ``PING`` command as above, to enable the client to time out connections - promptly. + +* Sends a ``SERVER`` command, which includes the identity of the server, allowing + the client to detect if its connected to the expected server +* Sends a ``PING`` command as above, to enable the client to time out connections + promptly. The client: - * Sends a ``NAME`` command, allowing the server to associate a human friendly - name with the connection. This is optional. - * Sends a ``PING`` as above - * For each stream the client wishes to subscribe to it sends a ``REPLICATE`` - with the stream_name and token it wants to subscribe from. - * On receipt of a ``SERVER`` command, checks that the server name matches the - expected server name. + +* Sends a ``NAME`` command, allowing the server to associate a human friendly + name with the connection. This is optional. +* Sends a ``PING`` as above +* For each stream the client wishes to subscribe to it sends a ``REPLICATE`` + with the stream_name and token it wants to subscribe from. +* On receipt of a ``SERVER`` command, checks that the server name matches the + expected server name. Error handling From 63fcc42990765563867536de550d18412f172626 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 Mar 2017 14:26:08 +0100 Subject: [PATCH 10/16] Remove user from process_presence when stops syncing --- synapse/handlers/presence.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9f9257029e45..ac6d1107bf97 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -546,6 +546,7 @@ def update_external_syncs_row(self, process_id, user_id, is_syncing): updates.append(prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, )) + process_presence.discard(user_id) if updates: yield self._update_states(updates) From bfcf016714575edb0ad2c19b2f00694d62ca08ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 11:19:24 +0100 Subject: [PATCH 11/16] Fix up docs --- docs/tcp_replication.rst | 16 ++++++++-------- synapse/replication/tcp/__init__.py | 18 +----------------- synapse/replication/tcp/streams.py | 4 ++-- synapse/storage/pusher.py | 2 +- 4 files changed, 12 insertions(+), 28 deletions(-) diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst index be0aa6b28c0b..7393527f6fd7 100644 --- a/docs/tcp_replication.rst +++ b/docs/tcp_replication.rst @@ -1,20 +1,20 @@ TCP Replication =============== -This describes the TCP replication protocol that replaces the HTTP protocol. - Motivation ---------- -The HTTP API used long poll from the workers to the master, this has the problem -of causing a lot of duplicate work on the server. This TCP protocol aims to -solve. +Previously the workers used an HTTP long poll mechanism to get updates from the +master, which had the problem of causing a lot of duplicate work on the server. +This TCP protocol replaces those APIs with the aim of increased efficiency. + + Overview -------- The protocol is based on fire and forget, line based commands. An example flow -would be (where '>' indicates master->worker and '<' worker->master flows):: +would be (where '>' indicates master to worker and '<' worker to master flows):: > SERVER example.com < REPLICATE events 53 @@ -24,7 +24,7 @@ would be (where '>' indicates master->worker and '<' worker->master flows):: The example shows the server accepting a new connection and sending its identity with the ``SERVER`` command, followed by the client asking to subscribe to the ``events`` stream from the token ``53``. The server then periodically sends ``RDATA`` -commands which have the format ``RDATA ```, where the +commands which have the format ``RDATA ``, where the format of ```` is defined by the individual streams. Error reporting happens by either the client or server sending an `ERROR` @@ -125,7 +125,7 @@ recovers it can reconnect to the server and ask for missed messages. Reliability ~~~~~~~~~~~ -In general the replication stream should be consisdered an unreliable transport +In general the replication stream should be considered an unreliable transport since e.g. commands are not resent if the connection disappears. The exception to that are the replication streams, i.e. RDATA commands, since diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py index 8b4e886d4ef8..81c2ea7ee9f6 100644 --- a/synapse/replication/tcp/__init__.py +++ b/synapse/replication/tcp/__init__.py @@ -16,22 +16,7 @@ """This module implements the TCP replication protocol used by synapse to communicate between the master process and its workers (when they're enabled). -The protocol is based on fire and forget, line based commands. An example flow -would be (where '>' indicates master->worker and '<' worker->master flows):: - - > SERVER example.com - < REPLICATE events 53 - > RDATA events 54 ["$foo1:bar.com", ...] - > RDATA events 55 ["$foo4:bar.com", ...] - -The example shows the server accepting a new connection and sending its identity -with the `SERVER` command, followed by the client asking to subscribe to the -`events` stream from the token `53`. The server then periodically sends `RDATA` -commands which have the format `RDATA `, where the -format of `` is defined by the individual streams. - -Error reporting happens by either the client or server sending an `ERROR` -command, and usually the connection will be closed. +Further details can be found in docs/tcp_replication.rst Structure of the module: @@ -42,5 +27,4 @@ * resource.py - the server classes that accepts and handle client connections * streams.py - the definitons of all the valid streams -Further details can be found in docs/tcp_replication.rst """ diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 07adf9412e2f..fada40c6efd3 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -154,8 +154,8 @@ def update_function(self, from_token, current_token, limit=None): True then limit is provided, otherwise it's not. Returns: - list(tuple): the first entry in the tuple is the token for that - update, and the rest of the tuple gets used to construct + Deferred(list(tuple)): the first entry in the tuple is the token for + that update, and the rest of the tuple gets used to construct a ``ROW_TYPE`` instance """ raise NotImplementedError() diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 715c8bef2417..34d2f82b7f93 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -139,7 +139,7 @@ def get_all_updated_pushers_rows(self, last_id, current_id, limit): """Get all the pushers that have changed between the given tokens. Returns: - list(tuple): each tuple consists of: + Deferred(list(tuple)): each tuple consists of: stream_id (str) user_id (str) app_id (str) From b4276a3896b7fec702feac4a60391ec81e595322 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 11:34:40 +0100 Subject: [PATCH 12/16] Add a brief list of commands to docs --- docs/tcp_replication.rst | 46 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/docs/tcp_replication.rst b/docs/tcp_replication.rst index 7393527f6fd7..62225ba6f4ca 100644 --- a/docs/tcp_replication.rst +++ b/docs/tcp_replication.rst @@ -175,3 +175,49 @@ An example of a batched set of ``RDATA`` is:: In this case the client shouldn't advance their caches token until it sees the the last ``RDATA``. + + +List of commands +~~~~~~~~~~~~~~~~ + +The list of valid commands, with which side can send it: server (S) or client (C): + +SERVER (S) + Sent at the start to identify which server the client is talking to + +RDATA (S) + A single update in a stream + +POSITION (S) + The position of the stream has been updated + +ERROR (S, C) + There was an error + +PING (S, C) + Sent periodically to ensure the connection is still alive + +NAME (C) + Sent at the start by client to inform the server who they are + +REPLICATE (C) + Asks the server to replicate a given stream + +USER_SYNC (C) + A user has started or stopped syncing + +FEDERATION_ACK (C) + Acknowledge receipt of some federation data + +REMOVE_PUSHER (C) + Inform the server a pusher should be removed + +INVALIDATE_CACHE (C) + Inform the server a cache should be invalidated + +SYNC (S, C) + Used exclusively in tests + + +See ``synapse/replication/tcp/commands.py`` for a detailed description and the +format of each command. From 9d0170ac6cb54f5fe86cf1a5121d688772a544f0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 11:36:32 +0100 Subject: [PATCH 13/16] Fix up presence --- synapse/handlers/presence.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ac6d1107bf97..9e147606596e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -546,12 +546,14 @@ def update_external_syncs_row(self, process_id, user_id, is_syncing): updates.append(prev_state.copy_and_replace( last_user_sync_ts=time_now_ms, )) + + if not is_syncing: process_presence.discard(user_id) if updates: yield self._update_states(updates) - self.external_process_last_updated_ms[process_id] = self.clock.time_msec() + self.external_process_last_updated_ms[process_id] = time_now_ms @defer.inlineCallbacks def update_external_syncs_clear(self, process_id): From 36d2b66f90dcdfae843c4b0ee7cba1dea23486f4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 11:46:20 +0100 Subject: [PATCH 14/16] Add a timestamp to USER_SYNC command This timestamp is used to indicate when the user last sync'd --- synapse/handlers/presence.py | 14 +++++++------- synapse/replication/tcp/commands.py | 15 ++++++++++----- synapse/replication/tcp/protocol.py | 7 +++++-- synapse/replication/tcp/resource.py | 4 ++-- 4 files changed, 24 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9e147606596e..53baf3e79ad2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -511,7 +511,7 @@ def update_external_syncs(self, process_id, syncing_user_ids): self.external_process_to_current_syncs[process_id] = syncing_user_ids @defer.inlineCallbacks - def update_external_syncs_row(self, process_id, user_id, is_syncing): + def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec): """Update the syncing users for an external process as a delta. Args: @@ -520,6 +520,7 @@ def update_external_syncs_row(self, process_id, user_id, is_syncing): as user start and stop syncing against a given process. user_id (str): The user who has started or stopped syncing is_syncing (bool): Whether or not the user is now syncing + sync_time_msec(int): Time in ms when the user was last syncing """ with (yield self.external_sync_linearizer.queue(process_id)): prev_state = yield self.current_state_for_user(user_id) @@ -527,24 +528,23 @@ def update_external_syncs_row(self, process_id, user_id, is_syncing): process_presence = self.external_process_to_current_syncs.setdefault( process_id, set() ) - time_now_ms = self.clock.time_msec() updates = [] if is_syncing and user_id not in process_presence: if prev_state.state == PresenceState.OFFLINE: updates.append(prev_state.copy_and_replace( state=PresenceState.ONLINE, - last_active_ts=time_now_ms, - last_user_sync_ts=time_now_ms, + last_active_ts=sync_time_msec, + last_user_sync_ts=sync_time_msec, )) else: updates.append(prev_state.copy_and_replace( - last_user_sync_ts=time_now_ms, + last_user_sync_ts=sync_time_msec, )) process_presence.add(user_id) elif user_id in process_presence: updates.append(prev_state.copy_and_replace( - last_user_sync_ts=time_now_ms, + last_user_sync_ts=sync_time_msec, )) if not is_syncing: @@ -553,7 +553,7 @@ def update_external_syncs_row(self, process_id, user_id, is_syncing): if updates: yield self._update_states(updates) - self.external_process_last_updated_ms[process_id] = time_now_ms + self.external_process_last_updated_ms[process_id] = self.clock.time_msec() @defer.inlineCallbacks def update_external_syncs_clear(self, process_id): diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 68165cf2dc4c..84d2a2272ad7 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -189,29 +189,34 @@ class UserSyncCommand(Command): """Sent by the client to inform the server that a user has started or stopped syncing. Used to calculate presence on the master. + Includes a timestamp of when the last user sync was. + Format:: - USER_SYNC + USER_SYNC Where is either "start" or "stop" """ NAME = "USER_SYNC" - def __init__(self, user_id, is_syncing): + def __init__(self, user_id, is_syncing, last_sync_ms): self.user_id = user_id self.is_syncing = is_syncing + self.last_sync_ms = last_sync_ms @classmethod def from_line(cls, line): - user_id, state = line.split(" ", 1) + user_id, state, last_sync_ms = line.split(" ", 2) if state not in ("start", "end"): raise Exception("Invalid USER_SYNC state %r" % (state,)) - return cls(user_id, state == "start") + return cls(user_id, state == "start", int(last_sync_ms)) def to_line(self): - return " ".join((self.user_id, "start" if self.is_syncing else "end")) + return " ".join(( + self.user_id, "start" if self.is_syncing else "end", str(self.last_sync_ms), + )) class FederationAckCommand(Command): diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index c1dc91bdb7eb..80f732b4551d 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -368,7 +368,9 @@ def on_NAME(self, cmd): self.name = cmd.data def on_USER_SYNC(self, cmd): - self.streamer.on_user_sync(self.conn_id, cmd.user_id, cmd.is_syncing) + self.streamer.on_user_sync( + self.conn_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms, + ) def on_REPLICATE(self, cmd): stream_name = cmd.stream_name @@ -481,8 +483,9 @@ def connectionMade(self): # Tell the server if we have any users currently syncing (should only # happen on synchrotrons) currently_syncing = self.handler.get_currently_syncing_users() + now = self.clock.time_msec() for user_id in currently_syncing: - self.send_command(UserSyncCommand(user_id, True)) + self.send_command(UserSyncCommand(user_id, True, now)) # We've now finished connecting to so inform the client handler self.handler.update_connection(self) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index d5da0496a813..243a81d488f1 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -220,12 +220,12 @@ def federation_ack(self, token): self.federation_sender.federation_ack(token) @measure_func("repl.on_user_sync") - def on_user_sync(self, conn_id, user_id, is_syncing): + def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms): """A client has started/stopped syncing on a worker. """ user_sync_counter.inc() self.presence_handler.update_external_syncs_row( - conn_id, user_id, is_syncing + conn_id, user_id, is_syncing, last_sync_ms, ) @measure_func("repl.on_remove_pusher") From 1df7c28661207df8575fd519ce9c23690b9156ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2017 13:36:38 +0100 Subject: [PATCH 15/16] Use callbacks to notify tcp replication rather than deferreds --- synapse/notifier.py | 17 +++++++++++------ synapse/replication/tcp/resource.py | 15 +-------------- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index f9fcc0ca2580..4fda184b7a3c 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -163,6 +163,8 @@ def __init__(self, hs): self.store = hs.get_datastore() self.pending_new_room_events = [] + self.replication_callbacks = [] + self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() @@ -202,6 +204,12 @@ def count_listeners(): lambda: len(self.user_to_user_stream), ) + def add_replication_callback(self, cb): + """Add a callback that will be called when some new data is available. + Callback is not given any arguments. + """ + self.replication_callbacks.append(cb) + @preserve_fn def on_new_room_event(self, event, room_stream_id, max_room_stream_id, extra_users=[]): @@ -510,6 +518,9 @@ def notify_replication(self): self.replication_deferred = ObservableDeferred(defer.Deferred()) deferred.callback(None) + for cb in self.replication_callbacks: + preserve_fn(cb)() + @defer.inlineCallbacks def wait_for_replication(self, callback, timeout): """Wait for an event to happen. @@ -550,9 +561,3 @@ def wait_for_replication(self, callback, timeout): break defer.returnValue(result) - - def wait_once_for_replication(self): - """Returns a deferred which resolves when there is new data for - replication to handle. - """ - return self.replication_deferred.observe() diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 243a81d488f1..b70fa7334fc7 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -21,7 +21,6 @@ from streams import STREAMS_MAP, FederationStream from protocol import ServerReplicationStreamProtocol -from synapse.util.logcontext import preserve_fn from synapse.util.metrics import Measure, measure_func import logging @@ -66,7 +65,6 @@ class ReplicationStreamer(object): def __init__(self, hs): self.store = hs.get_datastore() - self.notifier = hs.get_notifier() self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() @@ -101,8 +99,7 @@ def __init__(self, hs): if not hs.config.send_federation: self.federation_sender = hs.get_federation_sender() - # Start listening for updates from the notifier - preserve_fn(self.notifier_listener)() + hs.get_notifier().add_replication_callback(self.on_notifier_poke) # Keeps track of whether we are currently checking for updates self.is_looping = False @@ -115,16 +112,6 @@ def on_shutdown(self): for conn in self.connections: conn.send_error("server shutting down") - @defer.inlineCallbacks - def notifier_listener(self): - """Sits forever looping on the notifier waiting for new data. - """ - while True: - yield self.notifier.wait_once_for_replication() - logger.debug("Woken up by notifier") - # We need to call this each time we get woken up, as per docstring - preserve_fn(self.on_notifier_poke)() - @defer.inlineCallbacks def on_notifier_poke(self): """Checks if there is actually any new data and sends it to the From 0a6a966e2b86ebe423709c030634c04c1093ab0b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 3 Apr 2017 15:22:56 +0100 Subject: [PATCH 16/16] Always advance stream tokens --- synapse/replication/tcp/resource.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index b70fa7334fc7..0d7ea573189e 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -121,7 +121,10 @@ def on_notifier_poke(self): is currently being executed, so that nothing gets missed """ if not self.connections: - # Don't bother if nothing is listening + # Don't bother if nothing is listening. We still need to advance + # the stream tokens otherwise they'll fall beihind forever + for stream in self.streams: + stream.advance_current_token() return # If we're in the process of checking for new updates, mark that fact