Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce duplication in event ordering lookup methods #8453

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8453.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove redundant event ordering lookup methods.
4 changes: 1 addition & 3 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,7 @@ async def get_messages(
# case "JOIN" would have been returned.
assert member_event_id

leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
leave_token = await self.store.get_event_ordering(member_event_id)
assert leave_token.topological is not None

if leave_token.topological < curr_topo:
Expand Down
6 changes: 4 additions & 2 deletions synapse/handlers/read_marker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ async def received_client_read_marker(self, room_id, user_id, event_id):

if existing_read_marker:
# Only update if the new marker is ahead in the stream
should_update = await self.store.is_event_after(
event_id, existing_read_marker["event_id"]
token1 = await self.store.get_event_ordering(event_id)
token2 = await self.store.get_event_ordering(
existing_read_marker["event_id"]
)
should_update = token1 > token2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

attr objects by default allow comparisons I'm assuming? (I don't see anything special in RoomStreamToken to support this.)

Copy link
Member

@erikjohnston erikjohnston Oct 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, #8439 removes this as RoomStreamTokens should not be comparable (as they'll be vector clocks in some cases).


if should_update:
content = {"event_id": event_id}
Expand Down
25 changes: 2 additions & 23 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from twisted.internet import defer

from synapse.api.constants import EventTypes
from synapse.api.errors import NotFoundError, SynapseError
from synapse.api.errors import NotFoundError
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
EventFormatVersions,
Expand All @@ -43,7 +43,7 @@
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.types import Collection, get_domain_from_id
from synapse.util.caches.descriptors import Cache, cached
from synapse.util.caches.descriptors import Cache
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -1260,27 +1260,6 @@ def get_deltas_for_stream_id_txn(txn, stream_id):

return rows, to_token, True

async def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
"""
to_1, so_1 = await self.get_event_ordering(event_id1)
to_2, so_2 = await self.get_event_ordering(event_id2)
return (to_1, so_1) > (to_2, so_2)

@cached(max_entries=5000)
async def get_event_ordering(self, event_id):
res = await self.db_pool.simple_select_one(
table="events",
retcols=["topological_ordering", "stream_ordering"],
keyvalues={"event_id": event_id},
allow_none=True,
)

if not res:
raise SynapseError(404, "Could not find event %s" % (event_id,))

return (int(res["topological_ordering"]), int(res["stream_ordering"]))

async def get_next_event_to_expire(self) -> Optional[Tuple[str, int]]:
"""Retrieve the entry with the lowest expiry timestamp in the event_expiry
table, or None if there's no more event to expire.
Expand Down
11 changes: 9 additions & 2 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

from twisted.internet import defer

from synapse.api.errors import StoreError
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background
Expand All @@ -54,6 +55,7 @@
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.types import Collection, PersistedEventPosition, RoomStreamToken
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache

if TYPE_CHECKING:
Expand Down Expand Up @@ -614,7 +616,8 @@ async def get_position_for_event(self, event_id: str) -> PersistedEventPosition:
row["instance_name"] or "master", row["stream_ordering"]
)

async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToken:
@cached(max_entries=5000)
async def get_event_ordering(self, event_id: str) -> RoomStreamToken:
"""The stream token for an event
Args:
event_id: The id of the event to look up a stream token for.
Expand All @@ -627,8 +630,12 @@ async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToke
table="events",
keyvalues={"event_id": event_id},
retcols=("stream_ordering", "topological_ordering"),
desc="get_topological_token_for_event",
desc="get_event_ordering",
allow_none=True,
)
if not row:
raise StoreError(404, "Could not find event %s" % (event_id,))

return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])

async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
Expand Down
8 changes: 2 additions & 6 deletions tests/rest/client/v1/test_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,17 +902,13 @@ def test_room_messages_purge(self):

# Send a first message in the room, which will be removed by the purge.
first_event_id = self.helper.send(self.room_id, "message 1")["event_id"]
first_token = self.get_success(
store.get_topological_token_for_event(first_event_id)
)
first_token = self.get_success(store.get_event_ordering(first_event_id))
first_token_str = self.get_success(first_token.to_string(store))

# Send a second message in the room, which won't be removed, and which we'll
# use as the marker to purge events before.
second_event_id = self.helper.send(self.room_id, "message 2")["event_id"]
second_token = self.get_success(
store.get_topological_token_for_event(second_event_id)
)
second_token = self.get_success(store.get_event_ordering(second_event_id))
second_token_str = self.get_success(second_token.to_string(store))

# Send a third event in the room to ensure we don't fall under any edge case
Expand Down
8 changes: 2 additions & 6 deletions tests/storage/test_purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ def test_purge(self):
storage = self.hs.get_storage()

# Get the topological token
token = self.get_success(
store.get_topological_token_for_event(last["event_id"])
)
token = self.get_success(store.get_event_ordering(last["event_id"]))
token_str = self.get_success(token.to_string(self.hs.get_datastore()))

# Purge everything before this topological token
Expand Down Expand Up @@ -77,9 +75,7 @@ def test_purge_wont_delete_extrems(self):
storage = self.hs.get_datastore()

# Set the topological token higher than it should be
token = self.get_success(
storage.get_topological_token_for_event(last["event_id"])
)
token = self.get_success(storage.get_event_ordering(last["event_id"]))
event = "t{}-{}".format(token.topological + 1, token.stream + 1)

# Purge everything before this topological token
Expand Down