Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove concept of a non-limited stream. #7011

Merged
merged 13 commits into from
Mar 20, 2020
18 changes: 12 additions & 6 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import itertools
import logging
from collections import namedtuple
from typing import Any, List, Optional
from typing import Any, List, Optional, Tuple

import attr

from synapse.types import JsonDict

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -166,16 +168,18 @@ async def get_updates(self):

return updates, current_token

async def get_updates_since(self, from_token):
async def get_updates_since(
self, from_token: int
) -> Tuple[List[Tuple[int, JsonDict]], int]:
"""Like get_updates except allows specifying from when we should
stream updates

Returns:
Deferred[Tuple[List[Tuple[int, Any]], int]:
Resolves to a pair ``(updates, current_token)``, where ``updates`` is a
list of ``(token, row)`` entries. ``row`` will be json-serialised and
sent over the replication steam.
Resolves to a pair `(updates, new_last_token)`, where `updates` is
a list of `(token, row)` entries and `new_last_token` is the new
position in stream.
"""

if from_token in ("NOW", "now"):
return [], self.current_token()

Expand All @@ -200,6 +204,8 @@ async def get_updates_since(self, from_token):
if len(updates) >= MAX_EVENTS_BEHIND:
raise Exception("stream %s has fallen behind" % (self.NAME))

# Due to the assertin above we know we're up to date, so we know that
# our new stream position is `current_token`.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
return updates, current_token

def current_token(self):
Expand Down