From 8f0870c5e314730a74cca0ca74461c51b744efd9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Apr 2020 11:14:46 +0100 Subject: [PATCH 1/5] On catchup, process each row with its own stream id Other parts of the code (such as the StreamChangeCache) assume that there will not be multiple changes with the same stream id. --- changelog.d/7286.misc | 1 + synapse/replication/tcp/handler.py | 6 ++---- synapse/util/caches/stream_change_cache.py | 5 +++++ 3 files changed, 8 insertions(+), 4 deletions(-) create mode 100644 changelog.d/7286.misc diff --git a/changelog.d/7286.misc b/changelog.d/7286.misc new file mode 100644 index 000000000000..676f285377f5 --- /dev/null +++ b/changelog.d/7286.misc @@ -0,0 +1 @@ +Move catchup of replication streams logic to worker. diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 2f5a29914103..676f4a2a9302 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -268,11 +268,9 @@ async def on_POSITION(self, cmd: PositionCommand): missing_updates, ) = await stream.get_updates_since(current_token, cmd.token) - if updates: + for token, row in updates: await self.on_rdata( - cmd.stream_name, - current_token, - [stream.parse_row(update[1]) for update in updates], + cmd.stream_name, token, [stream.parse_row(row)], ) # We've now caught up to position sent to us, notify handler. diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 235f64049c95..6afda0144043 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -126,6 +126,11 @@ def entity_has_changed(self, entity, stream_pos): """ assert type(stream_pos) is int + if stream_pos in self._cache: + raise NotImplementedError( + "more than one entity changing at a stream position" + ) + if stream_pos > self._earliest_known_stream_pos: old_pos = self._entity_to_key.get(entity, None) if old_pos is not None: From c7c0fe37e5c91b20a302c1a9b210dfd22a4a91ac Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 16 Apr 2020 23:25:11 +0100 Subject: [PATCH 2/5] Batch up the updates returned by get_updates_since --- synapse/replication/tcp/handler.py | 68 ++++++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 676f4a2a9302..a2ce7d2a8a82 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -15,7 +15,18 @@ # limitations under the License. import logging -from typing import Any, Callable, Dict, List, Optional, Set +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Optional, + Set, + Tuple, + TypeVar, +) from prometheus_client import Counter @@ -268,9 +279,14 @@ async def on_POSITION(self, cmd: PositionCommand): missing_updates, ) = await stream.get_updates_since(current_token, cmd.token) - for token, row in updates: + # TODO: add some tests for this + + # Some streams return multiple rows with the same stream IDs, + # which need to be processed in batches. + + for token, rows in _batch_updates(updates): await self.on_rdata( - cmd.stream_name, token, [stream.parse_row(row)], + cmd.stream_name, token, [stream.parse_row(row) for row in rows], ) # We've now caught up to position sent to us, notify handler. @@ -402,3 +418,49 @@ def stream_update(self, stream_name: str, token: str, data: Any): We need to check if the client is interested in the stream or not """ self.send_command(RdataCommand(stream_name, token, data)) + + +UpdateToken = TypeVar("UpdateToken") +UpdateRow = TypeVar("UpdateRow") + + +def _batch_updates( + updates: Iterable[Tuple[UpdateToken, UpdateRow]] +) -> Iterator[Tuple[UpdateToken, List[UpdateRow]]]: + """Collect stream updates with the same token together + + Given a series of updates returned by Stream.get_updates_since(), collects + the updates which share the same stream_id together. + + For example: + + [(1, a), (1, b), (2, c), (3, d), (3, e)] + + becomes: + + [ + (1, [a, b]), + (2, [c]), + (3, [d, e]), + ] + """ + + update_iter = iter(updates) + token, row = next(update_iter) + + current_batch_token = token + current_batch = [row] + + for token, row in update_iter: + if token != current_batch_token: + # different token to the previous row: flush the previous + # batch and start anew + yield current_batch_token, current_batch + current_batch_token = token + current_batch = [] + + current_batch.append(row) + + # flush the final batch + if current_batch_token is not None: + yield current_batch_token, current_batch From f1709e2d3edce4741e5f57a973745a85ebf128f1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Apr 2020 10:39:30 +0100 Subject: [PATCH 3/5] Fix _batch_updates for py37 turns out that as of py37/PEP424, if you let a StopIteration bubble out of a generator (as opposed to implicitly raising one by returning from said generator), it gets turned into a RuntimeError. Fix it by having `next` return None rather than raising StopIteration, and handling that. --- synapse/replication/tcp/handler.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index a2ce7d2a8a82..e32e68e8c4bb 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -446,10 +446,14 @@ def _batch_updates( """ update_iter = iter(updates) - token, row = next(update_iter) - current_batch_token = token - current_batch = [row] + first_update = next(update_iter, None) + if first_update is None: + # empty input + return + + current_batch_token = first_update[0] + current_batch = [first_update[1]] for token, row in update_iter: if token != current_batch_token: @@ -462,5 +466,4 @@ def _batch_updates( current_batch.append(row) # flush the final batch - if current_batch_token is not None: - yield current_batch_token, current_batch + yield current_batch_token, current_batch From dc35b6840a55198b1dae7f7c1c4aa61d3e43f8cd Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Apr 2020 11:40:00 +0100 Subject: [PATCH 4/5] Relax StreamChangeCache sanity-check It's pointless but valid to invalidate the same entity twice at the same stream id. --- synapse/util/caches/stream_change_cache.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 6afda0144043..f41ea5bd3c72 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -126,10 +126,14 @@ def entity_has_changed(self, entity, stream_pos): """ assert type(stream_pos) is int - if stream_pos in self._cache: - raise NotImplementedError( - "more than one entity changing at a stream position" - ) + # sanity-check that we are not going to overwrite existing data. + current = self._cache.get(stream_pos) + if current is not None: + if current != entity: + raise NotImplementedError( + "more than one entity changing at a stream position" + ) + return if stream_pos > self._earliest_known_stream_pos: old_pos = self._entity_to_key.get(entity, None) From 378d34e970fbd5e8ce5251ce1abdbf480be147ee Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 17 Apr 2020 14:23:27 +0100 Subject: [PATCH 5/5] Remove the sanity check Argh. It turns out that if you introduce sanity-checks, you discover the places in which we are not adhering to them. --- synapse/util/caches/stream_change_cache.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index f41ea5bd3c72..c61d36a82e3e 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -126,14 +126,8 @@ def entity_has_changed(self, entity, stream_pos): """ assert type(stream_pos) is int - # sanity-check that we are not going to overwrite existing data. - current = self._cache.get(stream_pos) - if current is not None: - if current != entity: - raise NotImplementedError( - "more than one entity changing at a stream position" - ) - return + # FIXME: add a sanity check here that we are not overwriting existing + # data in self._cache if stream_pos > self._earliest_known_stream_pos: old_pos = self._entity_to_key.get(entity, None)