diff --git a/changelog.d/8453.misc b/changelog.d/8453.misc new file mode 100644 index 000000000000..5115419ea749 --- /dev/null +++ b/changelog.d/8453.misc @@ -0,0 +1 @@ +Remove redundant event ordering lookup methods. diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 2c2a633938ba..37743e861082 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -373,9 +373,7 @@ async def get_messages( # case "JOIN" would have been returned. assert member_event_id - leave_token = await self.store.get_topological_token_for_event( - member_event_id - ) + leave_token = await self.store.get_event_ordering(member_event_id) assert leave_token.topological is not None if leave_token.topological < curr_topo: diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index c32f314a1c0e..81d599ed256c 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -47,9 +47,11 @@ async def received_client_read_marker(self, room_id, user_id, event_id): if existing_read_marker: # Only update if the new marker is ahead in the stream - should_update = await self.store.is_event_after( - event_id, existing_read_marker["event_id"] + token1 = await self.store.get_event_ordering(event_id) + token2 = await self.store.get_event_ordering( + existing_read_marker["event_id"] ) + should_update = token1 > token2 if should_update: content = {"event_id": event_id} diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index b7ed8ca6ab06..adbf46ac0c51 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -25,7 +25,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes -from synapse.api.errors import NotFoundError, SynapseError +from synapse.api.errors import NotFoundError from synapse.api.room_versions import ( KNOWN_ROOM_VERSIONS, EventFormatVersions, @@ -43,7 +43,7 @@ from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.types import Collection, get_domain_from_id -from synapse.util.caches.descriptors import Cache, cached +from synapse.util.caches.descriptors import Cache from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure @@ -1260,27 +1260,6 @@ def get_deltas_for_stream_id_txn(txn, stream_id): return rows, to_token, True - async def is_event_after(self, event_id1, event_id2): - """Returns True if event_id1 is after event_id2 in the stream - """ - to_1, so_1 = await self.get_event_ordering(event_id1) - to_2, so_2 = await self.get_event_ordering(event_id2) - return (to_1, so_1) > (to_2, so_2) - - @cached(max_entries=5000) - async def get_event_ordering(self, event_id): - res = await self.db_pool.simple_select_one( - table="events", - retcols=["topological_ordering", "stream_ordering"], - keyvalues={"event_id": event_id}, - allow_none=True, - ) - - if not res: - raise SynapseError(404, "Could not find event %s" % (event_id,)) - - return (int(res["topological_ordering"]), int(res["stream_ordering"])) - async def get_next_event_to_expire(self) -> Optional[Tuple[str, int]]: """Retrieve the entry with the lowest expiry timestamp in the event_expiry table, or None if there's no more event to expire. diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index a94bec1ac528..78a6b80ebf9b 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -42,6 +42,7 @@ from twisted.internet import defer +from synapse.api.errors import StoreError from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -54,6 +55,7 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.types import Collection, PersistedEventPosition, RoomStreamToken +from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache if TYPE_CHECKING: @@ -614,7 +616,8 @@ async def get_position_for_event(self, event_id: str) -> PersistedEventPosition: row["instance_name"] or "master", row["stream_ordering"] ) - async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToken: + @cached(max_entries=5000) + async def get_event_ordering(self, event_id: str) -> RoomStreamToken: """The stream token for an event Args: event_id: The id of the event to look up a stream token for. @@ -627,8 +630,12 @@ async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToke table="events", keyvalues={"event_id": event_id}, retcols=("stream_ordering", "topological_ordering"), - desc="get_topological_token_for_event", + desc="get_event_ordering", + allow_none=True, ) + if not row: + raise StoreError(404, "Could not find event %s" % (event_id,)) + return RoomStreamToken(row["topological_ordering"], row["stream_ordering"]) async def get_current_topological_token(self, room_id: str, stream_key: int) -> int: diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 0d809d25d5d4..e8c55d6b3299 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -902,17 +902,13 @@ def test_room_messages_purge(self): # Send a first message in the room, which will be removed by the purge. first_event_id = self.helper.send(self.room_id, "message 1")["event_id"] - first_token = self.get_success( - store.get_topological_token_for_event(first_event_id) - ) + first_token = self.get_success(store.get_event_ordering(first_event_id)) first_token_str = self.get_success(first_token.to_string(store)) # Send a second message in the room, which won't be removed, and which we'll # use as the marker to purge events before. second_event_id = self.helper.send(self.room_id, "message 2")["event_id"] - second_token = self.get_success( - store.get_topological_token_for_event(second_event_id) - ) + second_token = self.get_success(store.get_event_ordering(second_event_id)) second_token_str = self.get_success(second_token.to_string(store)) # Send a third event in the room to ensure we don't fall under any edge case diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index cc1f3c53c51c..7b9971b12f83 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -47,9 +47,7 @@ def test_purge(self): storage = self.hs.get_storage() # Get the topological token - token = self.get_success( - store.get_topological_token_for_event(last["event_id"]) - ) + token = self.get_success(store.get_event_ordering(last["event_id"])) token_str = self.get_success(token.to_string(self.hs.get_datastore())) # Purge everything before this topological token @@ -77,9 +75,7 @@ def test_purge_wont_delete_extrems(self): storage = self.hs.get_datastore() # Set the topological token higher than it should be - token = self.get_success( - storage.get_topological_token_for_event(last["event_id"]) - ) + token = self.get_success(storage.get_event_ordering(last["event_id"])) event = "t{}-{}".format(token.topological + 1, token.stream + 1) # Purge everything before this topological token