From 3a86ea5000c0c0e18b2bd5797cb2f243cc8d4103 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 10:45:14 +0100 Subject: [PATCH 1/7] Fix race between RDATA and POSITION commands. Also fixes an exception caused by incorrectly assuming `_pending_batches` contained `RdataCommand` rather than stream rows. --- synapse/replication/tcp/handler.py | 50 +++++++++++++++++------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 12a1cfd6d1c4..452fe9ef7087 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -92,16 +92,28 @@ async def on_RDATA(self, cmd: RdataCommand): logger.exception("Failed to parse RDATA: %r %r", stream_name, cmd.row) raise - if cmd.token is None or stream_name not in self._streams_connected: - # I.e. either this is part of a batch of updates for this stream (in - # which case batch until we get an update for the stream with a non - # None token) or we're currently connecting so we queue up rows. - self._pending_batches.setdefault(stream_name, []).append(row) - else: - # Check if this is the last of a batch of updates - rows = self._pending_batches.pop(stream_name, []) - rows.append(row) - await self.on_rdata(stream_name, cmd.token, rows) + # We linearize here for two reasons: + # 1. so we don't try and concurrently handle multiple rows for the + # same stream, and + # 2. so we don't race with getting a POSITION command and fetching + # missing RDATA. + with await self._position_linearizer.queue(cmd.stream_name): + if stream_name not in self._streams_connected: + logger.warning( + "Discarding RDATA for unconnected stream %s", stream_name + ) + return + + if cmd.token is None: + # I.e. either this is part of a batch of updates for this stream (in + # which case batch until we get an update for the stream with a non + # None token) or we're currently connecting so we queue up rows. + self._pending_batches.setdefault(stream_name, []).append(row) + else: + # Check if this is the last of a batch of updates + rows = self._pending_batches.pop(stream_name, []) + rows.append(row) + await self.on_rdata(stream_name, cmd.token, rows) async def on_rdata(self, stream_name: str, token: int, rows: list): """Called to handle a batch of replication data with a given stream token. @@ -124,12 +136,13 @@ async def on_POSITION(self, cmd: PositionCommand): # We protect catching up with a linearizer in case the replication # connection reconnects under us. with await self._position_linearizer.queue(cmd.stream_name): - # We're about to go and catch up with the stream, so mark as connecting - # to stop RDATA being handled at the same time by removing stream from - # list of connected streams. We also clear any batched up RDATA from - # before we got the POSITION. + # We're about to go and catch up with the stream, so remove from set + # of connected streams. self._streams_connected.discard(cmd.stream_name) - self._pending_batches.clear() + + # We clear the pending batches for the stream as the fetching + # updates below will fetch all rows in the batch. + self._pending_batches.pop(cmd.stream_name, []) # Find where we previously streamed up to. current_token = self._replication_data_handler.get_streams_to_replicate().get( @@ -158,13 +171,6 @@ async def on_POSITION(self, cmd: PositionCommand): # We've now caught up to position sent to us, notify handler. await self._replication_data_handler.on_position(cmd.stream_name, cmd.token) - # Handle any RDATA that came in while we were catching up. - rows = self._pending_batches.pop(cmd.stream_name, []) - if rows: - await self._replication_data_handler.on_rdata( - cmd.stream_name, rows[-1].token, rows - ) - self._streams_connected.add(cmd.stream_name) async def on_SYNC(self, cmd: SyncCommand): From 7280f95ecafd9cd1e1f5a8a641328a325ecd9617 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 10:47:41 +0100 Subject: [PATCH 2/7] Fix 'GenericWorkerSlavedStore' object has no attribute 'get_all_push_rule_updates' --- synapse/storage/data_stores/main/push_rule.py | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py index 46f9bda773eb..b3faafa0a44d 100644 --- a/synapse/storage/data_stores/main/push_rule.py +++ b/synapse/storage/data_stores/main/push_rule.py @@ -334,6 +334,26 @@ def bulk_get_push_rules_enabled(self, user_ids): results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled return results + def get_all_push_rule_updates(self, last_id, current_id, limit): + """Get all the push rules changes that have happend on the server""" + if last_id == current_id: + return defer.succeed([]) + + def get_all_push_rule_updates_txn(txn): + sql = ( + "SELECT stream_id, event_stream_ordering, user_id, rule_id," + " op, priority_class, priority, conditions, actions" + " FROM push_rules_stream" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + return txn.fetchall() + + return self.db.runInteraction( + "get_all_push_rule_updates", get_all_push_rule_updates_txn + ) + class PushRuleStore(PushRulesWorkerStore): @defer.inlineCallbacks @@ -685,26 +705,6 @@ def _insert_push_rules_update_txn( self.push_rules_stream_cache.entity_has_changed, user_id, stream_id ) - def get_all_push_rule_updates(self, last_id, current_id, limit): - """Get all the push rules changes that have happend on the server""" - if last_id == current_id: - return defer.succeed([]) - - def get_all_push_rule_updates_txn(txn): - sql = ( - "SELECT stream_id, event_stream_ordering, user_id, rule_id," - " op, priority_class, priority, conditions, actions" - " FROM push_rules_stream" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - return txn.fetchall() - - return self.db.runInteraction( - "get_all_push_rule_updates", get_all_push_rule_updates_txn - ) - def get_push_rules_stream_token(self): """Get the position of the push rules stream. Returns a pair of a stream id for the push_rules stream and the From af12f540e4ae508721545c4c6967d0e6172a109d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 10:52:43 +0100 Subject: [PATCH 3/7] Fix fetching missing updates over replication --- synapse/replication/tcp/streams/_base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index c14dff6c6484..f56a0fd4b58e 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -168,12 +168,13 @@ def make_http_update_function( async def update_function( from_token: int, upto_token: int, limit: int ) -> Tuple[List[Tuple[int, tuple]], int, bool]: - return await client( + result = await client( stream_name=stream_name, from_token=from_token, upto_token=upto_token, limit=limit, ) + return result["updates"], result["upto_token"], result["limited"] return update_function From dfb4d01ba661ad18ad6087fad1598ea29d099457 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 11:00:28 +0100 Subject: [PATCH 4/7] Newsfile --- changelog.d/7226.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/7226.misc diff --git a/changelog.d/7226.misc b/changelog.d/7226.misc new file mode 100644 index 000000000000..676f285377f5 --- /dev/null +++ b/changelog.d/7226.misc @@ -0,0 +1 @@ +Move catchup of replication streams logic to worker. From 84ac795e04f6d2205e529f22c86ee06523d92812 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 15:54:56 +0100 Subject: [PATCH 5/7] Add/fixup comments --- synapse/replication/tcp/handler.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 452fe9ef7087..842dd8d720aa 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -99,15 +99,21 @@ async def on_RDATA(self, cmd: RdataCommand): # missing RDATA. with await self._position_linearizer.queue(cmd.stream_name): if stream_name not in self._streams_connected: + # If the stream isn't marked as connected then we haven't seen a + # `POSITION` command yet, and so we may have missed some rows. + # Let's drop the row for now, on the assumption we'll receive a + # `POSITION` soon and we'll catch up correctly then. logger.warning( - "Discarding RDATA for unconnected stream %s", stream_name + "Discarding RDATA for unconnected stream %s -> ", + stream_name, + cmd.token, ) return if cmd.token is None: - # I.e. either this is part of a batch of updates for this stream (in + # I.e. this is part of a batch of updates for this stream (in # which case batch until we get an update for the stream with a non - # None token) or we're currently connecting so we queue up rows. + # None token). self._pending_batches.setdefault(stream_name, []).append(row) else: # Check if this is the last of a batch of updates From a2e0bb9512226d3966f68ff37f8d755e7599c7bf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 18:14:09 +0100 Subject: [PATCH 6/7] Only resync on POSITION if cmd.token != current_token --- synapse/replication/tcp/handler.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 842dd8d720aa..b415beb0fea7 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -161,12 +161,17 @@ async def on_POSITION(self, cmd: PositionCommand): ) return - # Fetch all updates between then and now. - limited = True - while limited: - updates, current_token, limited = await stream.get_updates_since( - current_token, cmd.token - ) + # If the position token matches our current token then we're up to + # date and there's nothing to do. Otherwise, fetch all updates + # between then and now. + missing_updates = cmd.token != current_token + while missing_updates: + ( + updates, + current_token, + missing_updates, + ) = await stream.get_updates_since(current_token, cmd.token) + if updates: await self.on_rdata( cmd.stream_name, From 55eccdc094fe99c1619d046693c401cb5896f92c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 6 Apr 2020 18:20:13 +0100 Subject: [PATCH 7/7] Address review comments --- synapse/replication/tcp/handler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index b415beb0fea7..31584a971cfb 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -104,7 +104,7 @@ async def on_RDATA(self, cmd: RdataCommand): # Let's drop the row for now, on the assumption we'll receive a # `POSITION` soon and we'll catch up correctly then. logger.warning( - "Discarding RDATA for unconnected stream %s -> ", + "Discarding RDATA for unconnected stream %s -> %s", stream_name, cmd.token, ) @@ -146,8 +146,8 @@ async def on_POSITION(self, cmd: PositionCommand): # of connected streams. self._streams_connected.discard(cmd.stream_name) - # We clear the pending batches for the stream as the fetching - # updates below will fetch all rows in the batch. + # We clear the pending batches for the stream as the fetching of the + # missing updates below will fetch all rows in the batch. self._pending_batches.pop(cmd.stream_name, []) # Find where we previously streamed up to.