Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Various clean ups to room stream tokens. #8423

Merged
merged 6 commits into from
Sep 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8423.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Various refactors to simplify stream token handling.
6 changes: 3 additions & 3 deletions synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from unpaddedbase64 import encode_base64

from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
from synapse.types import JsonDict
from synapse.types import JsonDict, RoomStreamToken
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze

Expand Down Expand Up @@ -118,8 +118,8 @@ def __init__(self, internal_metadata_dict: JsonDict):
# XXX: These are set by StreamWorkerStore._set_before_and_after.
# I'm pretty sure that these are never persisted to the database, so shouldn't
# be here
before = DictProperty("before") # type: str
after = DictProperty("after") # type: str
before = DictProperty("before") # type: RoomStreamToken
after = DictProperty("after") # type: RoomStreamToken
order = DictProperty("order") # type: Tuple[int, int]

def get_dict(self) -> JsonDict:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async def export_user_data(self, user_id, writer):
if not events:
break

from_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
from_key = events[-1].internal_metadata.after

events = await filter_events_for_client(self.storage, user_id, events)

Expand Down
4 changes: 1 addition & 3 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import (
RoomStreamToken,
StreamToken,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
Expand Down Expand Up @@ -113,8 +112,7 @@ async def get_user_ids_changed(self, user_id: str, from_token: StreamToken):

set_tag("user_id", user_id)
set_tag("from_token", from_token)
now_room_id = self.store.get_room_max_stream_ordering()
now_room_key = RoomStreamToken(None, now_room_id)
now_room_key = self.store.get_room_max_token()

room_ids = await self.store.get_rooms_for_user(user_id)

Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ async def _room_initial_sync_parted(
if limit is None:
limit = 10

stream_token = await self.store.get_stream_token_for_event(member_event_id)
leave_position = await self.store.get_position_for_event(member_event_id)
stream_token = leave_position.to_room_stream_token()

messages, token = await self.store.get_recent_events_for_room(
room_id, limit=limit, end_token=stream_token
Expand Down
5 changes: 2 additions & 3 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import Requester, RoomStreamToken
from synapse.types import Requester
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -373,10 +373,9 @@ async def get_messages(
# case "JOIN" would have been returned.
assert member_event_id

leave_token_str = await self.store.get_topological_token_for_event(
leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token_str)
assert leave_token.topological is not None

if leave_token.topological < curr_topo:
Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,14 +1134,14 @@ async def get_new_events(
events[:] = events[:limit]

if events:
end_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
end_key = events[-1].internal_metadata.after
else:
end_key = to_key

return (events, end_key)

def get_current_key(self) -> RoomStreamToken:
return RoomStreamToken(None, self.store.get_room_max_stream_ordering())
return self.store.get_room_max_token()

def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)
Expand Down
20 changes: 14 additions & 6 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ async def _load_filtered_recents(
if len(recents) > timeline_limit:
limited = True
recents = recents[-timeline_limit:]
room_key = RoomStreamToken.parse(recents[0].internal_metadata.before)
room_key = recents[0].internal_metadata.before

prev_batch_token = now_token.copy_and_replace("room_key", room_key)

Expand Down Expand Up @@ -1595,16 +1595,24 @@ async def _get_rooms_changed(

if leave_events:
leave_event = leave_events[-1]
leave_stream_token = await self.store.get_stream_token_for_event(
leave_position = await self.store.get_position_for_event(
leave_event.event_id
)
leave_token = since_token.copy_and_replace(
"room_key", leave_stream_token
)

if since_token and since_token.is_after(leave_token):
# If the leave event happened before the since token then we
# bail.
if since_token and not leave_position.persisted_after(
since_token.room_key
):
continue

# We can safely convert the position of the leave event into a
# stream token as it'll only be used in the context of this
# room. (c.f. the docstring of `to_room_stream_token`).
leave_token = since_token.copy_and_replace(
"room_key", leave_position.to_room_stream_token()
)

# If this is an out of band message, like a remote invite
# rejection, we include it in the recents batch. Otherwise, we
# let _load_filtered_recents handle fetching the correct
Expand Down
4 changes: 2 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def new_listener(self, token: StreamToken) -> _NotificationListener:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
if self.last_notified_token.is_after(token):
if self.last_notified_token != token:
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
Expand Down Expand Up @@ -470,7 +470,7 @@ async def get_events_for(
async def check_for_updates(
before_token: StreamToken, after_token: StreamToken
) -> EventStreamResult:
if not after_token.is_after(before_token):
if after_token == before_token:
return EventStreamResult([], (from_token, from_token))

events = [] # type: List[EventBase]
Expand Down
6 changes: 2 additions & 4 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
EventsStreamEventRow,
EventsStreamRow,
)
from synapse.types import PersistedEventPosition, RoomStreamToken, UserID
from synapse.types import PersistedEventPosition, UserID
from synapse.util.async_helpers import timeout_deferred
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -152,9 +152,7 @@ async def on_rdata(
if event.type == EventTypes.Member:
extra_users = (UserID.from_string(event.state_key),)

max_token = RoomStreamToken(
None, self.store.get_room_max_stream_ordering()
)
max_token = self.store.get_room_max_token()
event_pos = PersistedEventPosition(instance_name, token)
self.notifier.on_new_room_event(
event, event_pos, max_token, extra_users
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ async def on_POST(self, request, room_id, event_id):
if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.")

token = await self.store.get_topological_token_for_event(event_id)
room_token = await self.store.get_topological_token_for_event(event_id)
token = str(room_token)

logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
elif "purge_up_to_ts" in body:
Expand Down
38 changes: 21 additions & 17 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
- topological tokems: "t%d-%d", where the integers map to the topological
and stream ordering columns respectively.
"""

import abc
import logging
from collections import namedtuple
Expand All @@ -54,7 +53,7 @@
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.types import Collection, RoomStreamToken
from synapse.types import Collection, PersistedEventPosition, RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache

if TYPE_CHECKING:
Expand Down Expand Up @@ -305,6 +304,9 @@ def get_room_max_stream_ordering(self) -> int:
def get_room_min_stream_ordering(self) -> int:
raise NotImplementedError()

def get_room_max_token(self) -> RoomStreamToken:
return RoomStreamToken(None, self.get_room_max_stream_ordering())

async def get_room_events_stream_for_rooms(
self,
room_ids: Collection[str],
Expand Down Expand Up @@ -611,34 +613,36 @@ def get_stream_id_for_event_txn(
allow_none=allow_none,
)

async def get_stream_token_for_event(self, event_id: str) -> RoomStreamToken:
"""The stream token for an event
Args:
event_id: The id of the event to look up a stream token for.
Raises:
StoreError if the event wasn't in the database.
Returns:
A stream token.
async def get_position_for_event(self, event_id: str) -> PersistedEventPosition:
"""Get the persisted position for an event
"""
stream_id = await self.get_stream_id_for_event(event_id)
return RoomStreamToken(None, stream_id)
row = await self.db_pool.simple_select_one(
table="events",
keyvalues={"event_id": event_id},
retcols=("stream_ordering", "instance_name"),
desc="get_position_for_event",
)

return PersistedEventPosition(
row["instance_name"] or "master", row["stream_ordering"]
)

async def get_topological_token_for_event(self, event_id: str) -> str:
async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToken:
"""The stream token for an event
Args:
event_id: The id of the event to look up a stream token for.
Raises:
StoreError if the event wasn't in the database.
Returns:
A "t%d-%d" topological token.
A `RoomStreamToken` topological token.
"""
row = await self.db_pool.simple_select_one(
table="events",
keyvalues={"event_id": event_id},
retcols=("stream_ordering", "topological_ordering"),
desc="get_topological_token_for_event",
)
return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"])
return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])

async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
"""Gets the topological token in a room after or at the given stream
Expand Down Expand Up @@ -687,8 +691,8 @@ def _set_before_and_after(
else:
topo = None
internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1))
internal.after = str(RoomStreamToken(topo, stream))
internal.before = RoomStreamToken(topo, stream - 1)
internal.after = RoomStreamToken(topo, stream)
internal.order = (int(topo) if topo else 0, int(stream))

async def get_events_around(
Expand Down
5 changes: 2 additions & 3 deletions synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ async def persist_events(
defer.gatherResults(deferreds, consumeErrors=True)
)

return RoomStreamToken(None, self.main_store.get_current_events_token())
return self.main_store.get_room_max_token()

async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
Expand All @@ -247,11 +247,10 @@ async def persist_event(

await make_deferred_yieldable(deferred)

max_persisted_id = self.main_store.get_current_events_token()
event_stream_id = event.internal_metadata.stream_ordering

pos = PersistedEventPosition(self._instance_name, event_stream_id)
return pos, RoomStreamToken(None, max_persisted_id)
return pos, self.main_store.get_room_max_token()

def _maybe_start_persisting(self, room_id: str):
async def persisting_queue(item):
Expand Down
53 changes: 33 additions & 20 deletions synapse/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,18 @@ def parse_stream_token(cls, string: str) -> "RoomStreamToken":
pass
raise SynapseError(400, "Invalid token %r" % (string,))

def copy_and_advance(self, other: "RoomStreamToken") -> "RoomStreamToken":
"""Return a new token such that if an event is after both this token and
the other token, then its after the returned token too.
"""

if self.topological or other.topological:
raise Exception("Can't advance topological tokens")

max_stream = max(self.stream, other.stream)

return RoomStreamToken(None, max_stream)

def as_tuple(self) -> Tuple[Optional[int], int]:
return (self.topological, self.stream)

Expand Down Expand Up @@ -458,31 +470,20 @@ def to_string(self):
def room_stream_id(self):
return self.room_key.stream

def is_after(self, other):
"""Does this token contain events that the other doesn't?"""
return (
(other.room_stream_id < self.room_stream_id)
or (int(other.presence_key) < int(self.presence_key))
or (int(other.typing_key) < int(self.typing_key))
or (int(other.receipt_key) < int(self.receipt_key))
or (int(other.account_data_key) < int(self.account_data_key))
or (int(other.push_rules_key) < int(self.push_rules_key))
or (int(other.to_device_key) < int(self.to_device_key))
or (int(other.device_list_key) < int(self.device_list_key))
or (int(other.groups_key) < int(self.groups_key))
)

def copy_and_advance(self, key, new_value) -> "StreamToken":
"""Advance the given key in the token to a new value if and only if the
new value is after the old value.
"""
new_token = self.copy_and_replace(key, new_value)
if key == "room_key":
new_id = new_token.room_stream_id
old_id = self.room_stream_id
else:
new_id = int(getattr(new_token, key))
old_id = int(getattr(self, key))
new_token = self.copy_and_replace(
"room_key", self.room_key.copy_and_advance(new_value)
)
return new_token

new_token = self.copy_and_replace(key, new_value)
new_id = int(getattr(new_token, key))
old_id = int(getattr(self, key))

if old_id < new_id:
return new_token
else:
Expand All @@ -509,6 +510,18 @@ class PersistedEventPosition:
def persisted_after(self, token: RoomStreamToken) -> bool:
return token.stream < self.stream

def to_room_stream_token(self) -> RoomStreamToken:
"""Converts the position to a room stream token such that events
persisted in the same room after this position will be after the
returned `RoomStreamToken`.

Note: no guarentees are made about ordering w.r.t. events in other
rooms.
"""
# Doing the naive thing satisfies the desired properties described in
# the docstring.
return RoomStreamToken(None, self.stream)


class ThirdPartyInstanceID(
namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))
Expand Down
8 changes: 4 additions & 4 deletions tests/rest/client/v1/test_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,15 +902,15 @@ def test_room_messages_purge(self):

# Send a first message in the room, which will be removed by the purge.
first_event_id = self.helper.send(self.room_id, "message 1")["event_id"]
first_token = self.get_success(
store.get_topological_token_for_event(first_event_id)
first_token = str(
self.get_success(store.get_topological_token_for_event(first_event_id))
)

# Send a second message in the room, which won't be removed, and which we'll
# use as the marker to purge events before.
second_event_id = self.helper.send(self.room_id, "message 2")["event_id"]
second_token = self.get_success(
store.get_topological_token_for_event(second_event_id)
second_token = str(
self.get_success(store.get_topological_token_for_event(second_event_id))
)

# Send a third event in the room to ensure we don't fall under any edge case
Expand Down
Loading