From 797dd9e7d89debcf850dac1145e27962e02b8a7a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 23 Mar 2022 08:05:04 -0400 Subject: [PATCH 01/26] The latest event in a thread should include bundled aggregations. --- synapse/events/utils.py | 10 +++++++--- tests/rest/client/test_relations.py | 8 +++++++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 71200621277e..87f5b1f09293 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -32,13 +32,13 @@ from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersion +from synapse.handlers.relations import BundledAggregations from synapse.types import JsonDict from synapse.util.frozenutils import unfreeze from . import EventBase if TYPE_CHECKING: - from synapse.handlers.relations import BundledAggregations from synapse.server import HomeServer @@ -515,8 +515,12 @@ def _inject_bundled_aggregations( ) # Manually apply an edit, if one exists. if thread.latest_edit: - self._apply_edit( - thread.latest_event, serialized_latest_event, thread.latest_edit + self._inject_bundled_aggregations( + thread.latest_event, + time_now, + config, + BundledAggregations(replace=thread.latest_edit), + serialized_latest_event, ) thread_summary = { diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 329690f8f7c8..231717afc2cc 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -610,12 +610,13 @@ def test_edit_thread(self) -> None: threaded_event_id = channel.json_body["event_id"] new_body = {"msgtype": "m.text", "body": "I've been edited!"} - self._send_relation( + channel = self._send_relation( RelationTypes.REPLACE, "m.room.message", content={"msgtype": "m.text", "body": "foo", "m.new_content": new_body}, parent_id=threaded_event_id, ) + edit_event_id = channel.json_body["event_id"] # Fetch the thread root, to get the bundled aggregation for the thread. channel = self.make_request( @@ -634,6 +635,11 @@ def test_edit_thread(self) -> None: self.assertIn("latest_event", thread_summary) latest_event_in_thread = thread_summary["latest_event"] self.assertEqual(latest_event_in_thread["content"]["body"], "I've been edited!") + # The event should also have edit appear under the bundled aggregations. + self.assertDictContainsSubset( + {"event_id": edit_event_id, "sender": "@alice:test"}, + latest_event_in_thread["unsigned"]["m.relations"][RelationTypes.REPLACE], + ) def test_edit_edit(self) -> None: """Test that an edit cannot be edited.""" From 5065c1040b285f902fca01aa4991b63cce42acda Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 23 Mar 2022 08:22:45 -0400 Subject: [PATCH 02/26] Move the thread edit test into BundledAggregationstTestCase. --- tests/rest/client/test_relations.py | 86 ++++++++++++++--------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 231717afc2cc..475cdab21b93 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -598,49 +598,6 @@ def test_edit_reply(self) -> None: {"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict ) - def test_edit_thread(self) -> None: - """Test that editing a thread works.""" - - # Create a thread and edit the last event. - channel = self._send_relation( - RelationTypes.THREAD, - "m.room.message", - content={"msgtype": "m.text", "body": "A threaded reply!"}, - ) - threaded_event_id = channel.json_body["event_id"] - - new_body = {"msgtype": "m.text", "body": "I've been edited!"} - channel = self._send_relation( - RelationTypes.REPLACE, - "m.room.message", - content={"msgtype": "m.text", "body": "foo", "m.new_content": new_body}, - parent_id=threaded_event_id, - ) - edit_event_id = channel.json_body["event_id"] - - # Fetch the thread root, to get the bundled aggregation for the thread. - channel = self.make_request( - "GET", - f"/rooms/{self.room}/event/{self.parent_id}", - access_token=self.user_token, - ) - self.assertEqual(200, channel.code, channel.json_body) - - # We expect that the edit message appears in the thread summary in the - # unsigned relations section. - relations_dict = channel.json_body["unsigned"].get("m.relations") - self.assertIn(RelationTypes.THREAD, relations_dict) - - thread_summary = relations_dict[RelationTypes.THREAD] - self.assertIn("latest_event", thread_summary) - latest_event_in_thread = thread_summary["latest_event"] - self.assertEqual(latest_event_in_thread["content"]["body"], "I've been edited!") - # The event should also have edit appear under the bundled aggregations. - self.assertDictContainsSubset( - {"event_id": edit_event_id, "sender": "@alice:test"}, - latest_event_in_thread["unsigned"]["m.relations"][RelationTypes.REPLACE], - ) - def test_edit_edit(self) -> None: """Test that an edit cannot be edited.""" new_body = {"msgtype": "m.text", "body": "Initial edit"} @@ -1237,6 +1194,49 @@ def assert_annotations(bundled_aggregations: JsonDict) -> None: self._test_bundled_aggregations(RelationTypes.THREAD, assert_annotations, 9) + def test_edit_thread(self) -> None: + """Test that editing the latest event in a thread works.""" + + # Create a thread and edit the last event. + channel = self._send_relation( + RelationTypes.THREAD, + "m.room.message", + content={"msgtype": "m.text", "body": "A threaded reply!"}, + ) + threaded_event_id = channel.json_body["event_id"] + + new_body = {"msgtype": "m.text", "body": "I've been edited!"} + channel = self._send_relation( + RelationTypes.REPLACE, + "m.room.message", + content={"msgtype": "m.text", "body": "foo", "m.new_content": new_body}, + parent_id=threaded_event_id, + ) + edit_event_id = channel.json_body["event_id"] + + # Fetch the thread root, to get the bundled aggregation for the thread. + channel = self.make_request( + "GET", + f"/rooms/{self.room}/event/{self.parent_id}", + access_token=self.user_token, + ) + self.assertEqual(200, channel.code, channel.json_body) + + # We expect that the edit message appears in the thread summary in the + # unsigned relations section. + relations_dict = channel.json_body["unsigned"].get("m.relations") + self.assertIn(RelationTypes.THREAD, relations_dict) + + thread_summary = relations_dict[RelationTypes.THREAD] + self.assertIn("latest_event", thread_summary) + latest_event_in_thread = thread_summary["latest_event"] + self.assertEqual(latest_event_in_thread["content"]["body"], "I've been edited!") + # The event should also have edit appear under the bundled aggregations. + self.assertDictContainsSubset( + {"event_id": edit_event_id, "sender": "@alice:test"}, + latest_event_in_thread["unsigned"]["m.relations"][RelationTypes.REPLACE], + ) + def test_aggregation_get_event_for_annotation(self) -> None: """Test that annotations do not get bundled aggregations included when directly requested. From f0ab01f62d44623cf5b7e1a32fb5fcfa31518004 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 23 Mar 2022 08:47:52 -0400 Subject: [PATCH 03/26] Support including other bundled aggregations for the latest event. --- synapse/events/utils.py | 46 ++++++++--------- synapse/handlers/relations.py | 29 ++++++++--- synapse/storage/databases/main/relations.py | 9 +--- tests/rest/client/test_relations.py | 55 +++++++++++++++++++-- 4 files changed, 99 insertions(+), 40 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 87f5b1f09293..637e04aa0313 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -428,13 +428,12 @@ def serialize_event( # Check if there are any bundled aggregations to include with the event. if bundle_aggregations: - event_aggregations = bundle_aggregations.get(event.event_id) - if event_aggregations: + if event.event_id in bundle_aggregations: self._inject_bundled_aggregations( event, time_now, config, - bundle_aggregations[event.event_id], + bundle_aggregations, serialized_event, ) @@ -472,7 +471,7 @@ def _inject_bundled_aggregations( event: EventBase, time_now: int, config: SerializeEventConfig, - aggregations: "BundledAggregations", + bundled_aggregations: Dict[str, "BundledAggregations"], serialized_event: JsonDict, ) -> None: """Potentially injects bundled aggregations into the unsigned portion of the serialized event. @@ -487,15 +486,22 @@ def _inject_bundled_aggregations( """ serialized_aggregations = {} - if aggregations.annotations: - serialized_aggregations[RelationTypes.ANNOTATION] = aggregations.annotations + # The aggregations must exist for this event. + event_aggregations = bundled_aggregations[event.event_id] - if aggregations.references: - serialized_aggregations[RelationTypes.REFERENCE] = aggregations.references + if event_aggregations.annotations: + serialized_aggregations[ + RelationTypes.ANNOTATION + ] = event_aggregations.annotations - if aggregations.replace: + if event_aggregations.references: + serialized_aggregations[ + RelationTypes.REFERENCE + ] = event_aggregations.references + + if event_aggregations.replace: # If there is an edit, apply it to the event. - edit = aggregations.replace + edit = event_aggregations.replace self._apply_edit(event, serialized_event, edit) # Include information about it in the relations dict. @@ -506,22 +512,16 @@ def _inject_bundled_aggregations( } # If this event is the start of a thread, include a summary of the replies. - if aggregations.thread: - thread = aggregations.thread + if event_aggregations.thread: + thread = event_aggregations.thread # Don't bundle aggregations as this could recurse forever. - serialized_latest_event = serialize_event( - thread.latest_event, time_now, config=config + serialized_latest_event = self.serialize_event( + thread.latest_event, + time_now, + config=config, + bundle_aggregations=bundled_aggregations, ) - # Manually apply an edit, if one exists. - if thread.latest_edit: - self._inject_bundled_aggregations( - thread.latest_event, - time_now, - config, - BundledAggregations(replace=thread.latest_edit), - serialized_latest_event, - ) thread_summary = { "latest_event": serialized_latest_event, diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 57135d45197b..67d7e83ea34f 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -34,8 +34,6 @@ class _ThreadAggregation: # The latest event in the thread. latest_event: EventBase - # The latest edit to the latest event in the thread. - latest_edit: Optional[EventBase] # The total number of events in the thread. count: int # True if the current user has sent an event to the thread. @@ -210,8 +208,13 @@ async def get_bundled_aggregations( user_id: The user requesting the bundled aggregations. Returns: - A map of event ID to the bundled aggregation for the event. Not all - events may have bundled aggregations in the results. + A map of event ID to the bundled aggregation for the event. + + Not all requested events may exist in the results (if they don't have + bundled aggregations). + + The results may include additional events which are related to the + requested events. """ # De-duplicate events by ID to handle the same event requested multiple times. # @@ -247,18 +250,32 @@ async def get_bundled_aggregations( participated = await self._main_store.get_threads_participated( [event_id for event_id, summary in summaries.items() if summary], user_id ) + additional_events = set() for event_id, summary in summaries.items(): if summary: - thread_count, latest_thread_event, edit = summary + thread_count, latest_thread_event = summary + + # If the latest event in a thread is not already being fetched, + # add it to the events. + if ( + latest_thread_event + and latest_thread_event.event_id not in events_by_id + ): + additional_events.add(latest_thread_event) + results.setdefault( event_id, BundledAggregations() ).thread = _ThreadAggregation( latest_event=latest_thread_event, - latest_edit=edit, count=thread_count, # If there's a thread summary it must also exist in the # participated dictionary. current_user_participated=participated[event_id], ) + if additional_events: + results.update( + await self.get_bundled_aggregations(additional_events, user_id) + ) + return results diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index b2295fd51f60..5054f238811b 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -444,7 +444,7 @@ def get_thread_summary(self, event_id: str) -> Optional[Tuple[int, EventBase]]: @cachedList(cached_method_name="get_thread_summary", list_name="event_ids") async def get_thread_summaries( self, event_ids: Collection[str] - ) -> Dict[str, Optional[Tuple[int, EventBase, Optional[EventBase]]]]: + ) -> Dict[str, Optional[Tuple[int, EventBase]]]: """Get the number of threaded replies, the latest reply (if any), and the latest edit for that reply for the given event. Args: @@ -457,7 +457,6 @@ async def get_thread_summaries( Each summary is a tuple of: The number of events in the thread. The most recent event in the thread. - The most recent edit to the most recent event in the thread, if applicable. """ def _get_thread_summaries_txn( @@ -555,9 +554,6 @@ def _get_thread_summaries_txn( latest_events = await self.get_events(latest_event_ids.values()) # type: ignore[attr-defined] - # Check to see if any of those events are edited. - latest_edits = await self.get_applicable_edits(latest_event_ids.values()) - # Map to the event IDs to the thread summary. # # There might not be a summary due to there not being a thread or @@ -568,8 +564,7 @@ def _get_thread_summaries_txn( summary = None if latest_event: - latest_edit = latest_edits.get(latest_event_id) - summary = (counts[parent_event_id], latest_event, latest_edit) + summary = (counts[parent_event_id], latest_event) summaries[parent_event_id] = summary return summaries diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 475cdab21b93..7f37431f6ecd 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -1173,7 +1173,7 @@ def test_thread(self) -> None: channel = self._send_relation(RelationTypes.THREAD, "m.room.test") thread_2 = channel.json_body["event_id"] - def assert_annotations(bundled_aggregations: JsonDict) -> None: + def assert_thread(bundled_aggregations: JsonDict) -> None: self.assertEqual(2, bundled_aggregations.get("count")) self.assertTrue(bundled_aggregations.get("current_user_participated")) # The latest thread event has some fields that don't matter. @@ -1192,9 +1192,56 @@ def assert_annotations(bundled_aggregations: JsonDict) -> None: bundled_aggregations.get("latest_event"), ) - self._test_bundled_aggregations(RelationTypes.THREAD, assert_annotations, 9) + self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 12) + + @unittest.override_config({"experimental_features": {"msc3666_enabled": True}}) + def test_thread_with_bundled_aggregations_for_latest(self) -> None: + """ + Bundled aggregations should get applied to the latest thread event. + """ + self._send_relation(RelationTypes.THREAD, "m.room.test") + channel = self._send_relation(RelationTypes.THREAD, "m.room.test") + thread_2 = channel.json_body["event_id"] - def test_edit_thread(self) -> None: + self._send_relation( + RelationTypes.ANNOTATION, "m.reaction", "a", parent_id=thread_2 + ) + + def assert_thread(bundled_aggregations: JsonDict) -> None: + self.assertEqual(2, bundled_aggregations.get("count")) + self.assertTrue(bundled_aggregations.get("current_user_participated")) + # The latest thread event has some fields that don't matter. + self.assert_dict( + { + "content": { + "m.relates_to": { + "event_id": self.parent_id, + "rel_type": RelationTypes.THREAD, + } + }, + "event_id": thread_2, + "sender": self.user_id, + "type": "m.room.test", + }, + bundled_aggregations.get("latest_event"), + ) + # Check the unsigned field on the latest event. + self.assert_dict( + { + "m.relations": { + RelationTypes.ANNOTATION: { + "chunk": [ + {"type": "m.reaction", "key": "a", "count": 1}, + ] + }, + } + }, + bundled_aggregations["latest_event"].get("unsigned"), + ) + + self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 12) + + def test_thread_edit_latest_event(self) -> None: """Test that editing the latest event in a thread works.""" # Create a thread and edit the last event. @@ -1262,7 +1309,7 @@ def test_aggregation_get_event_for_thread(self) -> None: channel = self._send_relation(RelationTypes.THREAD, "m.room.test") thread_id = channel.json_body["event_id"] - # Annotate the annotation. + # Annotate the thread. self._send_relation( RelationTypes.ANNOTATION, "m.reaction", "a", parent_id=thread_id ) From d78ca4c333f9555b87104fef8a66460e1811b32e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 23 Mar 2022 09:20:44 -0400 Subject: [PATCH 04/26] Do not recurse into threads indefinitely. --- synapse/handlers/relations.py | 84 +++++++++++++++++------------ tests/rest/client/test_relations.py | 31 ++++++++++- 2 files changed, 79 insertions(+), 36 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 67d7e83ea34f..2a310b4c6e1b 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -199,13 +199,18 @@ async def _get_bundled_aggregation_for_event( return aggregations async def get_bundled_aggregations( - self, events: Iterable[EventBase], user_id: str + self, + events: Iterable[EventBase], + user_id: str, + fetch_bundled_aggregations_for_threads: bool = True, ) -> Dict[str, BundledAggregations]: """Generate bundled aggregations for events. Args: events: The iterable of events to calculate bundled aggregations for. user_id: The user requesting the bundled aggregations. + fetch_bundled_aggregations_for_threads: True to recurse to fetch the + bundled aggregations for the latest event in threads. Returns: A map of event ID to the bundled aggregation for the event. @@ -243,39 +248,50 @@ async def get_bundled_aggregations( for event_id, edit in edits.items(): results.setdefault(event_id, BundledAggregations()).replace = edit - # Fetch thread summaries. - summaries = await self._main_store.get_thread_summaries(events_by_id.keys()) - # Only fetch participated for a limited selection based on what had - # summaries. - participated = await self._main_store.get_threads_participated( - [event_id for event_id, summary in summaries.items() if summary], user_id - ) - additional_events = set() - for event_id, summary in summaries.items(): - if summary: - thread_count, latest_thread_event = summary - - # If the latest event in a thread is not already being fetched, - # add it to the events. - if ( - latest_thread_event - and latest_thread_event.event_id not in events_by_id - ): - additional_events.add(latest_thread_event) - - results.setdefault( - event_id, BundledAggregations() - ).thread = _ThreadAggregation( - latest_event=latest_thread_event, - count=thread_count, - # If there's a thread summary it must also exist in the - # participated dictionary. - current_user_participated=participated[event_id], - ) - - if additional_events: - results.update( - await self.get_bundled_aggregations(additional_events, user_id) + # Fetch thread summaries (but only for the directly requested events). + # + # Note that you can't have threads off of other related events, but it is + # possible for a malicious homeserver to inject them anyway. + if fetch_bundled_aggregations_for_threads: + summaries = await self._main_store.get_thread_summaries(events_by_id.keys()) + # Only fetch participated for a limited selection based on what had + # summaries. + participated = await self._main_store.get_threads_participated( + [event_id for event_id, summary in summaries.items() if summary], + user_id, ) + # Additional events to check for bundled aggregations (i.e. the + # latest events in the threads). + additional_events = set() + for event_id, summary in summaries.items(): + if summary: + thread_count, latest_thread_event = summary + + # If the latest event in a thread is not already being fetched, + # add it to the events. + if ( + latest_thread_event + and latest_thread_event.event_id not in events_by_id + ): + additional_events.add(latest_thread_event) + + results.setdefault( + event_id, BundledAggregations() + ).thread = _ThreadAggregation( + latest_event=latest_thread_event, + count=thread_count, + # If there's a thread summary it must also exist in the + # participated dictionary. + current_user_participated=participated[event_id], + ) + + if additional_events: + results.update( + await self.get_bundled_aggregations( + additional_events, + user_id, + fetch_bundled_aggregations_for_threads=False, + ) + ) return results diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 7f37431f6ecd..da9b223affa4 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -1192,7 +1192,7 @@ def assert_thread(bundled_aggregations: JsonDict) -> None: bundled_aggregations.get("latest_event"), ) - self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 12) + self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 11) @unittest.override_config({"experimental_features": {"msc3666_enabled": True}}) def test_thread_with_bundled_aggregations_for_latest(self) -> None: @@ -1239,7 +1239,34 @@ def assert_thread(bundled_aggregations: JsonDict) -> None: bundled_aggregations["latest_event"].get("unsigned"), ) - self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 12) + self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 11) + + def test_thread_loop(self) -> None: + """Ensure that bogus events do not cause the bundled aggregations code to iterate forever.""" + last_event = self.parent_id + + # Disable the validation to pretend this came over federation. + with patch( + "synapse.handlers.message.EventCreationHandler._validate_event_relation", + new=lambda self, event: make_awaitable(None), + ): + for _ in range(2): + channel = self._send_relation( + RelationTypes.THREAD, "m.room.test", parent_id=last_event + ) + last_event = channel.json_body["event_id"] + + # Fetch the thread root, to get the bundled aggregation for the thread. + relations = self._get_bundled_aggregations() + + # The latest event should have bundled aggregations. + self.assertIn(RelationTypes.THREAD, relations) + thread_summary = relations[RelationTypes.THREAD] + self.assertIn("latest_event", thread_summary) + + # The latest event should not have any bundled aggregations (since it + # only has a thread attached. + self.assertNotIn("m.relations", thread_summary["latest_event"]["unsigned"]) def test_thread_edit_latest_event(self) -> None: """Test that editing the latest event in a thread works.""" From 5a1c218fc3c5a765b5c4c6082a474ea242d892f4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 23 Mar 2022 09:50:36 -0400 Subject: [PATCH 05/26] Rejigger to avoid passing state around. --- synapse/handlers/relations.py | 120 ++++++++++++---------------- tests/rest/client/test_relations.py | 4 +- 2 files changed, 54 insertions(+), 70 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 2a310b4c6e1b..f8dcf40dd6b9 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, Iterable, Optional, cast +from typing import TYPE_CHECKING, Dict, Iterable, Optional, Tuple, cast import attr from frozendict import frozendict @@ -151,7 +151,7 @@ async def get_relations( async def _get_bundled_aggregation_for_event( self, event: EventBase, user_id: str - ) -> Optional[BundledAggregations]: + ) -> Tuple[Optional[JsonDict], Optional[JsonDict]]: """Generate bundled aggregations for an event. Note that this does not use a cache, but depends on cached methods. @@ -171,46 +171,36 @@ async def _get_bundled_aggregation_for_event( if isinstance(relates_to, (dict, frozendict)): relation_type = relates_to.get("rel_type") if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): - return None + return None, None event_id = event.event_id room_id = event.room_id - # The bundled aggregations to include, a mapping of relation type to a - # type-specific value. Some types include the direct return type here - # while others need more processing during serialization. - aggregations = BundledAggregations() - annotations = await self._main_store.get_aggregation_groups_for_event( event_id, room_id ) + serialized_annotations = None if annotations.chunk: - aggregations.annotations = await annotations.to_dict( - cast("DataStore", self) - ) + serialized_annotations = await annotations.to_dict(cast("DataStore", self)) references = await self._main_store.get_relations_for_event( event_id, event, room_id, RelationTypes.REFERENCE, direction="f" ) + serialized_references = None if references.chunk: - aggregations.references = await references.to_dict(cast("DataStore", self)) + serialized_references = await references.to_dict(cast("DataStore", self)) # Store the bundled aggregations in the event metadata for later use. - return aggregations + return serialized_annotations, serialized_references async def get_bundled_aggregations( - self, - events: Iterable[EventBase], - user_id: str, - fetch_bundled_aggregations_for_threads: bool = True, + self, events: Iterable[EventBase], user_id: str ) -> Dict[str, BundledAggregations]: """Generate bundled aggregations for events. Args: events: The iterable of events to calculate bundled aggregations for. user_id: The user requesting the bundled aggregations. - fetch_bundled_aggregations_for_threads: True to recurse to fetch the - bundled aggregations for the latest event in threads. Returns: A map of event ID to the bundled aggregation for the event. @@ -231,11 +221,51 @@ async def get_bundled_aggregations( # event ID -> bundled aggregation in non-serialized form. results: Dict[str, BundledAggregations] = {} + # Threads are special as the latest event of a thread might cause additional + # events to be fetched. Thus, we check those first! + + # Fetch thread summaries (but only for the directly requested events). + # + # Note that you can't have threads off of other related events, but it is + # possible for a malicious homeserver to inject them anyway. + summaries = await self._main_store.get_thread_summaries(events_by_id.keys()) + # Only fetch participated for a limited selection based on what had + # summaries. + participated = await self._main_store.get_threads_participated( + [event_id for event_id, summary in summaries.items() if summary], + user_id, + ) + for event_id, summary in summaries.items(): + if summary: + thread_count, latest_thread_event = summary + + # If the latest event in a thread is not already being fetched, + # add it to the events. (Note that we don't mo + if ( + latest_thread_event + and latest_thread_event.event_id not in events_by_id + ): + events_by_id[latest_thread_event.event_id] = latest_thread_event + + results.setdefault( + event_id, BundledAggregations() + ).thread = _ThreadAggregation( + latest_event=latest_thread_event, + count=thread_count, + # If there's a thread summary it must also exist in the + # participated dictionary. + current_user_participated=participated[event_id], + ) + # Fetch other relations per event. for event in events_by_id.values(): - event_result = await self._get_bundled_aggregation_for_event(event, user_id) - if event_result: - results[event.event_id] = event_result + annotations, references = await self._get_bundled_aggregation_for_event( + event, user_id + ) + if annotations or references: + aggregations = results.setdefault(event.event_id, BundledAggregations()) + aggregations.annotations = annotations + aggregations.references = references # Fetch any edits (but not for redacted events). edits = await self._main_store.get_applicable_edits( @@ -248,50 +278,4 @@ async def get_bundled_aggregations( for event_id, edit in edits.items(): results.setdefault(event_id, BundledAggregations()).replace = edit - # Fetch thread summaries (but only for the directly requested events). - # - # Note that you can't have threads off of other related events, but it is - # possible for a malicious homeserver to inject them anyway. - if fetch_bundled_aggregations_for_threads: - summaries = await self._main_store.get_thread_summaries(events_by_id.keys()) - # Only fetch participated for a limited selection based on what had - # summaries. - participated = await self._main_store.get_threads_participated( - [event_id for event_id, summary in summaries.items() if summary], - user_id, - ) - # Additional events to check for bundled aggregations (i.e. the - # latest events in the threads). - additional_events = set() - for event_id, summary in summaries.items(): - if summary: - thread_count, latest_thread_event = summary - - # If the latest event in a thread is not already being fetched, - # add it to the events. - if ( - latest_thread_event - and latest_thread_event.event_id not in events_by_id - ): - additional_events.add(latest_thread_event) - - results.setdefault( - event_id, BundledAggregations() - ).thread = _ThreadAggregation( - latest_event=latest_thread_event, - count=thread_count, - # If there's a thread summary it must also exist in the - # participated dictionary. - current_user_participated=participated[event_id], - ) - - if additional_events: - results.update( - await self.get_bundled_aggregations( - additional_events, - user_id, - fetch_bundled_aggregations_for_threads=False, - ) - ) - return results diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index da9b223affa4..a51d670b75dd 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -1192,7 +1192,7 @@ def assert_thread(bundled_aggregations: JsonDict) -> None: bundled_aggregations.get("latest_event"), ) - self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 11) + self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 10) @unittest.override_config({"experimental_features": {"msc3666_enabled": True}}) def test_thread_with_bundled_aggregations_for_latest(self) -> None: @@ -1239,7 +1239,7 @@ def assert_thread(bundled_aggregations: JsonDict) -> None: bundled_aggregations["latest_event"].get("unsigned"), ) - self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 11) + self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 10) def test_thread_loop(self) -> None: """Ensure that bogus events do not cause the bundled aggregations code to iterate forever.""" From 560a4ac727e45dadf7c176bba8f8f89a1b6fae4d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 23 Mar 2022 10:27:37 -0400 Subject: [PATCH 06/26] Newsfragment --- changelog.d/12273.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12273.bugfix diff --git a/changelog.d/12273.bugfix b/changelog.d/12273.bugfix new file mode 100644 index 000000000000..2a902bd3be0f --- /dev/null +++ b/changelog.d/12273.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse v1.48.0 the latest event provided in bundled aggregations where it did not include bundled aggregations. From 3f8bd1f75f78aac56c558780cd8420be174a1f7a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 30 Mar 2022 10:25:01 -0400 Subject: [PATCH 07/26] Clarify comment. Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/events/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 637e04aa0313..48e41abc314b 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -486,7 +486,7 @@ def _inject_bundled_aggregations( """ serialized_aggregations = {} - # The aggregations must exist for this event. + # We have already checked that aggregations exist for this event. event_aggregations = bundled_aggregations[event.event_id] if event_aggregations.annotations: From 60ac6b42b0e06ee46aa19630b39b5c33d03d2287 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 30 Mar 2022 10:34:58 -0400 Subject: [PATCH 08/26] Review comments. --- changelog.d/12273.bugfix | 2 +- synapse/events/utils.py | 9 ++++++--- synapse/handlers/relations.py | 17 ++++++++++------- synapse/storage/databases/main/relations.py | 2 +- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/changelog.d/12273.bugfix b/changelog.d/12273.bugfix index 2a902bd3be0f..f8d7b6c88956 100644 --- a/changelog.d/12273.bugfix +++ b/changelog.d/12273.bugfix @@ -1 +1 @@ -Fix a bug introduced in Synapse v1.48.0 the latest event provided in bundled aggregations where it did not include bundled aggregations. +Fix a bug introduced in Synapse v1.48.0 where latest thread reply provided failed to include the proper bundled aggregations. diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 48e41abc314b..81140100ed88 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -32,13 +32,13 @@ from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersion -from synapse.handlers.relations import BundledAggregations from synapse.types import JsonDict from synapse.util.frozenutils import unfreeze from . import EventBase if TYPE_CHECKING: + from synapse.handlers.relations import BundledAggregations from synapse.server import HomeServer @@ -479,16 +479,19 @@ def _inject_bundled_aggregations( Args: event: The event being serialized. time_now: The current time in milliseconds - aggregations: The bundled aggregation to serialize. + bundled_aggregations: The bundled aggregation to serialize. serialized_event: The serialized event which may be modified. config: Event serialization config """ - serialized_aggregations = {} # We have already checked that aggregations exist for this event. event_aggregations = bundled_aggregations[event.event_id] + # The JSON dictionary to be added under the unsigned property of the event + # being serialized. + serialized_aggregations = {} + if event_aggregations.annotations: serialized_aggregations[ RelationTypes.ANNOTATION diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index f8dcf40dd6b9..d568fa0d03e3 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -149,10 +149,11 @@ async def get_relations( return return_value - async def _get_bundled_aggregation_for_event( + async def _get_bundled_annotations_and_references_for_event( self, event: EventBase, user_id: str ) -> Tuple[Optional[JsonDict], Optional[JsonDict]]: - """Generate bundled aggregations for an event. + """ + Generate bundled aggregations for annotation and reference relations for an event. Note that this does not use a cache, but depends on cached methods. @@ -161,8 +162,9 @@ async def _get_bundled_aggregation_for_event( user_id: The user requesting the bundled aggregations. Returns: - The bundled aggregations for an event, if bundled aggregations are - enabled and the event can have bundled aggregations. + A tuple of the bundled aggregations for annotation and reference relations. + Either or both entries in the tuple might be None if no relations + of that type exist. """ # Do not bundle aggregations for an event which represents an edit or an @@ -203,7 +205,7 @@ async def get_bundled_aggregations( user_id: The user requesting the bundled aggregations. Returns: - A map of event ID to the bundled aggregation for the event. + A map of event ID to the bundled aggregations for the event. Not all requested events may exist in the results (if they don't have bundled aggregations). @@ -240,7 +242,8 @@ async def get_bundled_aggregations( thread_count, latest_thread_event = summary # If the latest event in a thread is not already being fetched, - # add it to the events. (Note that we don't mo + # add it. This ensures that the bundled aggregations for the + # latest thread event is correct. if ( latest_thread_event and latest_thread_event.event_id not in events_by_id @@ -259,7 +262,7 @@ async def get_bundled_aggregations( # Fetch other relations per event. for event in events_by_id.values(): - annotations, references = await self._get_bundled_aggregation_for_event( + annotations, references = await self._get_bundled_annotations_and_references_for_event( event, user_id ) if annotations or references: diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 5054f238811b..b87bd0ec8119 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -445,7 +445,7 @@ def get_thread_summary(self, event_id: str) -> Optional[Tuple[int, EventBase]]: async def get_thread_summaries( self, event_ids: Collection[str] ) -> Dict[str, Optional[Tuple[int, EventBase]]]: - """Get the number of threaded replies, the latest reply (if any), and the latest edit for that reply for the given event. + """Get the number of threaded replies and the latest reply (if any) for the given events. Args: event_ids: Summarize the thread related to this event ID. From a63d62af1defcb10b16a8bcfb3b3973d073fae9d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 30 Mar 2022 10:37:33 -0400 Subject: [PATCH 09/26] Lint --- synapse/handlers/relations.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 28e902fa04ac..0a3b0ff8bf05 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -269,7 +269,10 @@ async def get_bundled_aggregations( # Fetch other relations per event. for event in events_by_id.values(): - annotations, references = await self._get_bundled_annotations_and_references_for_event( + ( + annotations, + references, + ) = await self._get_bundled_annotations_and_references_for_event( event, user_id ) if annotations or references: From 14ff58578035bc3737147ce7d509888d28f18962 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 19 Apr 2022 11:10:15 -0400 Subject: [PATCH 10/26] Add more checks to ensure that threads don't return bundled aggregations for threads. --- synapse/events/utils.py | 13 ++++++++++--- tests/rest/client/test_relations.py | 19 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 529131b968b8..2d2601c9b481 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -510,11 +510,18 @@ def _inject_bundled_aggregations( "sender": edit.sender, } - # If this event is the start of a thread, include a summary of the replies. - if event_aggregations.thread: + # Include any threaded replies to this event. + # + # Note that is it not valid to start a thread on an event which already + # has a relation. Doing so would cause an infinite loop. + event_relation = event.content.get("m.relates_to") + include_thread = ( + not isinstance(event_relation, dict) + or event_relation.get("rel_type") is None + ) + if include_thread and event_aggregations.thread: thread = event_aggregations.thread - # Don't bundle aggregations as this could recurse forever. serialized_latest_event = self.serialize_event( thread.latest_event, time_now, diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 94a7fa69c674..343c36d97060 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -1077,6 +1077,25 @@ def test_thread_loop(self) -> None: # only has a thread attached. self.assertNotIn("m.relations", thread_summary["latest_event"]["unsigned"]) + # Ensure that requesting the room messages also does not return the sub-thread. + channel = self.make_request( + "GET", + f"/rooms/{self.room}/messages?dir=b", + access_token=self.user_token, + ) + self.assertEqual(200, channel.code, channel.json_body) + event = self._find_event_in_chunk(channel.json_body["chunk"]) + relations = event["unsigned"]["m.relations"] + + # The latest event should have bundled aggregations. + self.assertIn(RelationTypes.THREAD, relations) + thread_summary = relations[RelationTypes.THREAD] + self.assertIn("latest_event", thread_summary) + + # The latest event should not have any bundled aggregations (since it + # only has a thread attached. + self.assertNotIn("m.relations", thread_summary["latest_event"]["unsigned"]) + def test_thread_edit_latest_event(self) -> None: """Test that editing the latest event in a thread works.""" From 5e18d24a4e2abd166156a12c06d19846da2ae1e4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 19 Apr 2022 11:49:09 -0400 Subject: [PATCH 11/26] Fix parameter ordering in docstring. --- synapse/events/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 9cb27f2a0637..9dec5bb54c37 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -478,9 +478,9 @@ def _inject_bundled_aggregations( Args: event: The event being serialized. time_now: The current time in milliseconds + config: Event serialization config bundled_aggregations: The bundled aggregation to serialize. serialized_event: The serialized event which may be modified. - config: Event serialization config apply_edits: Whether the content of the event should be modified to reflect any replacement in `aggregations.replace`. """ From 8fe302976495d504ed0db208685da70a16129492 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 19 Apr 2022 13:18:56 -0400 Subject: [PATCH 12/26] Fix frozendicts on Python 3.10. --- synapse/events/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 9dec5bb54c37..9fde0ca12c60 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -521,7 +521,7 @@ def _inject_bundled_aggregations( # has a relation. Doing so would cause an infinite loop. event_relation = event.content.get("m.relates_to") include_thread = ( - not isinstance(event_relation, dict) + not isinstance(event_relation, (dict, frozendict)) or event_relation.get("rel_type") is None ) if include_thread and event_aggregations.thread: From 823a6b11c05dae9d2fde70045cb7e4e63b6766d4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 20 Apr 2022 08:46:27 -0400 Subject: [PATCH 13/26] Clean-up and rename test_thread_loop. --- tests/rest/client/test_relations.py | 34 ++++++++++++++++++----------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 09b5c93fe648..8779dbc72ead 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -1082,20 +1082,26 @@ def assert_thread(bundled_aggregations: JsonDict) -> None: self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 10) - def test_thread_loop(self) -> None: - """Ensure that bogus events do not cause the bundled aggregations code to iterate forever.""" - last_event = self.parent_id + def test_nested_thread(self) -> None: + """ + Ensure that a nested thread gets ignored by bundled aggregations, as + those are forbidden. + """ - # Disable the validation to pretend this came over federation. + # Start a thread. + channel = self._send_relation(RelationTypes.THREAD, "m.room.test") + reply_event_id = channel.json_body["event_id"] + + # Disable the validation to pretend this came over federation, since it is + # not an event the Client-Server API will allow.. with patch( "synapse.handlers.message.EventCreationHandler._validate_event_relation", new=lambda self, event: make_awaitable(None), ): - for _ in range(2): - channel = self._send_relation( - RelationTypes.THREAD, "m.room.test", parent_id=last_event - ) - last_event = channel.json_body["event_id"] + # Create a sub-thread off the thread, which is not allowed. + self._send_relation( + RelationTypes.THREAD, "m.room.test", parent_id=reply_event_id + ) # Fetch the thread root, to get the bundled aggregation for the thread. relations = self._get_bundled_aggregations() @@ -1104,9 +1110,10 @@ def test_thread_loop(self) -> None: self.assertIn(RelationTypes.THREAD, relations) thread_summary = relations[RelationTypes.THREAD] self.assertIn("latest_event", thread_summary) + self.assertEqual(thread_summary["latest_event"]["event_id"], reply_event_id) - # The latest event should not have any bundled aggregations (since it - # only has a thread attached. + # The latest event should not have any bundled aggregations (since the + # only relation to it is another thread, which is invalid). self.assertNotIn("m.relations", thread_summary["latest_event"]["unsigned"]) # Ensure that requesting the room messages also does not return the sub-thread. @@ -1123,9 +1130,10 @@ def test_thread_loop(self) -> None: self.assertIn(RelationTypes.THREAD, relations) thread_summary = relations[RelationTypes.THREAD] self.assertIn("latest_event", thread_summary) + self.assertEqual(thread_summary["latest_event"]["event_id"], reply_event_id) - # The latest event should not have any bundled aggregations (since it - # only has a thread attached. + # The latest event should not have any bundled aggregations (since the + # only relation to it is another thread, which is invalid). self.assertNotIn("m.relations", thread_summary["latest_event"]["unsigned"]) def test_thread_edit_latest_event(self) -> None: From db0ccce295b0777409dac5084a14e8538d1841ee Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 20 Apr 2022 08:51:05 -0400 Subject: [PATCH 14/26] Reduce duplicated code. --- tests/rest/client/test_relations.py | 39 ++++++++++++++--------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 8779dbc72ead..b3f334cae164 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -1104,17 +1104,7 @@ def test_nested_thread(self) -> None: ) # Fetch the thread root, to get the bundled aggregation for the thread. - relations = self._get_bundled_aggregations() - - # The latest event should have bundled aggregations. - self.assertIn(RelationTypes.THREAD, relations) - thread_summary = relations[RelationTypes.THREAD] - self.assertIn("latest_event", thread_summary) - self.assertEqual(thread_summary["latest_event"]["event_id"], reply_event_id) - - # The latest event should not have any bundled aggregations (since the - # only relation to it is another thread, which is invalid). - self.assertNotIn("m.relations", thread_summary["latest_event"]["unsigned"]) + relations_from_event = self._get_bundled_aggregations() # Ensure that requesting the room messages also does not return the sub-thread. channel = self.make_request( @@ -1124,17 +1114,26 @@ def test_nested_thread(self) -> None: ) self.assertEqual(200, channel.code, channel.json_body) event = self._find_event_in_chunk(channel.json_body["chunk"]) - relations = event["unsigned"]["m.relations"] + relations_from_messages = event["unsigned"]["m.relations"] - # The latest event should have bundled aggregations. - self.assertIn(RelationTypes.THREAD, relations) - thread_summary = relations[RelationTypes.THREAD] - self.assertIn("latest_event", thread_summary) - self.assertEqual(thread_summary["latest_event"]["event_id"], reply_event_id) + # Check the bundled aggregations from each point. + for relations, desc in ( + (relations_from_event, "/event"), + (relations_from_messages, "/messages"), + ): + # The latest event should have bundled aggregations. + self.assertIn(RelationTypes.THREAD, relations, desc) + thread_summary = relations[RelationTypes.THREAD] + self.assertIn("latest_event", thread_summary, desc) + self.assertEqual( + thread_summary["latest_event"]["event_id"], reply_event_id, desc + ) - # The latest event should not have any bundled aggregations (since the - # only relation to it is another thread, which is invalid). - self.assertNotIn("m.relations", thread_summary["latest_event"]["unsigned"]) + # The latest event should not have any bundled aggregations (since the + # only relation to it is another thread, which is invalid). + self.assertNotIn( + "m.relations", thread_summary["latest_event"]["unsigned"], desc + ) def test_thread_edit_latest_event(self) -> None: """Test that editing the latest event in a thread works.""" From d87dcba42dface4217253e15a91aef050a883b4b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 20 Apr 2022 10:04:13 -0400 Subject: [PATCH 15/26] Lint --- tests/rest/client/test_relations.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index e23b69369fe0..37d58963355a 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -1112,13 +1112,13 @@ def test_nested_thread(self) -> None: relations_from_messages = event["unsigned"]["m.relations"] # Check the bundled aggregations from each point. - for relations, desc in ( + for aggregations, desc in ( (relations_from_event, "/event"), (relations_from_messages, "/messages"), ): # The latest event should have bundled aggregations. - self.assertIn(RelationTypes.THREAD, relations, desc) - thread_summary = relations[RelationTypes.THREAD] + self.assertIn(RelationTypes.THREAD, aggregations, desc) + thread_summary = aggregations[RelationTypes.THREAD] self.assertIn("latest_event", thread_summary, desc) self.assertEqual( thread_summary["latest_event"]["event_id"], reply_event_id, desc From 8c65acec012c761770868ff2134236f6d85c1c95 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 20 Apr 2022 10:06:18 -0400 Subject: [PATCH 16/26] Clarify docstrings. Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/events/utils.py | 4 +++- synapse/handlers/relations.py | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 9fde0ca12c60..c3105279a17d 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -479,7 +479,9 @@ def _inject_bundled_aggregations( event: The event being serialized. time_now: The current time in milliseconds config: Event serialization config - bundled_aggregations: The bundled aggregation to serialize. + bundled_aggregations: Bundled aggregations to be injected. + A map from event_id to aggregation data. Must contain at least an + entry for `event`. serialized_event: The serialized event which may be modified. apply_edits: Whether the content of the event should be modified to reflect any replacement in `aggregations.replace`. diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index e7cdf6fde446..3d5b8e4b93c6 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -258,7 +258,7 @@ async def _get_bundled_annotations_and_references_for_event( self, event: EventBase, ignored_users: FrozenSet[str] ) -> Tuple[Optional[JsonDict], Optional[JsonDict]]: """ - Generate bundled aggregations for annotation and reference relations for an event. + Generate aggregations for annotation (ie, reaction) and reference (ie, reply) relations for an event. Note that this does not use a cache, but depends on cached methods. @@ -267,7 +267,9 @@ async def _get_bundled_annotations_and_references_for_event( ignored_users: The users ignored by the requesting user. Returns: - A tuple of the bundled aggregations for annotation and reference relations. + A 2-tuple consisting of the aggregations of: + - events with `m.annotation` relations to this event. + - events with `m.reference` relations to this event. Either or both entries in the tuple might be None if no relations of that type exist. """ From 1ccde00c1124982baff9c425bb519f7516ab21a2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 20 Apr 2022 12:48:53 -0400 Subject: [PATCH 17/26] Remove useless helper method. --- synapse/handlers/relations.py | 99 +++++++++++------------------------ 1 file changed, 32 insertions(+), 67 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 3d5b8e4b93c6..5de2586f3857 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -254,65 +254,6 @@ async def get_annotations_for_event( return filtered_results - async def _get_bundled_annotations_and_references_for_event( - self, event: EventBase, ignored_users: FrozenSet[str] - ) -> Tuple[Optional[JsonDict], Optional[JsonDict]]: - """ - Generate aggregations for annotation (ie, reaction) and reference (ie, reply) relations for an event. - - Note that this does not use a cache, but depends on cached methods. - - Args: - event: The event to calculate bundled aggregations for. - ignored_users: The users ignored by the requesting user. - - Returns: - A 2-tuple consisting of the aggregations of: - - events with `m.annotation` relations to this event. - - events with `m.reference` relations to this event. - Either or both entries in the tuple might be None if no relations - of that type exist. - """ - - # Do not bundle aggregations for an event which represents an edit or an - # annotation. It does not make sense for them to have related events. - relates_to = event.content.get("m.relates_to") - if isinstance(relates_to, (dict, frozendict)): - relation_type = relates_to.get("rel_type") - if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): - return None, None - - event_id = event.event_id - room_id = event.room_id - - annotations = await self.get_annotations_for_event( - event_id, room_id, ignored_users=ignored_users - ) - serialized_annotations = None - if annotations: - serialized_annotations = {"chunk": annotations} - - references, next_token = await self.get_relations_for_event( - event_id, - event, - room_id, - RelationTypes.REFERENCE, - ignored_users=ignored_users, - ) - serialized_references: Optional[JsonDict] = None - if references: - serialized_references = { - "chunk": [{"event_id": event.event_id} for event in references] - } - - if next_token: - serialized_references["next_batch"] = await next_token.to_string( - self._main_store - ) - - # Store the bundled aggregations in the event metadata for later use. - return serialized_annotations, serialized_references - async def get_threads_for_events( self, event_ids: Collection[str], user_id: str, ignored_users: FrozenSet[str] ) -> Dict[str, _ThreadAggregation]: @@ -458,16 +399,40 @@ async def get_bundled_aggregations( # Fetch other relations per event. for event in events_by_id.values(): - ( - annotations, - references, - ) = await self._get_bundled_annotations_and_references_for_event( - event, ignored_users + # Do not bundle aggregations for an event which represents an edit or an + # annotation. It does not make sense for them to have related events. + relates_to = event.content.get("m.relates_to") + if isinstance(relates_to, (dict, frozendict)): + relation_type = relates_to.get("rel_type") + if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): + continue + + annotations = await self.get_annotations_for_event( + event.event_id, event.room_id, ignored_users=ignored_users + ) + if annotations: + aggregations = results.setdefault(event.event_id, BundledAggregations()) + aggregations.annotations = {"chunk": annotations} + + references, next_token = await self.get_relations_for_event( + event.event_id, + event, + event.room_id, + RelationTypes.REFERENCE, + ignored_users=ignored_users, ) - if annotations or references: + if references: + serialized_references: JsonDict = { + "chunk": [{"event_id": event.event_id} for event in references] + } + + if next_token: + serialized_references["next_batch"] = await next_token.to_string( + self._main_store + ) + aggregations = results.setdefault(event.event_id, BundledAggregations()) - aggregations.annotations = annotations - aggregations.references = references + aggregations.references = serialized_references # Fetch any edits (but not for redacted events). # From 566b66ccd783095807f67b46c66da093ceb23550 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 26 Apr 2022 14:22:34 -0400 Subject: [PATCH 18/26] Avoid generating bundled thread aggregations for events with a relation. --- synapse/events/utils.py | 10 +--------- synapse/handlers/relations.py | 30 +++++++++++++++++++----------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index c3105279a17d..25247c237efd 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -518,15 +518,7 @@ def _inject_bundled_aggregations( } # Include any threaded replies to this event. - # - # Note that is it not valid to start a thread on an event which already - # has a relation. Doing so would cause an infinite loop. - event_relation = event.content.get("m.relates_to") - include_thread = ( - not isinstance(event_relation, (dict, frozendict)) - or event_relation.get("rel_type") is None - ) - if include_thread and event_aggregations.thread: + if event_aggregations.thread: thread = event_aggregations.thread serialized_latest_event = self.serialize_event( diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 52c646e83a30..2156647b4787 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -371,6 +371,15 @@ async def get_bundled_aggregations( event.event_id: event for event in events if not event.is_state() } + # A map of event ID to the relation in that event, if there is one. + relations_by_id = {} + for event_id, event in events_by_id.items(): + relates_to = event.content.get("m.relates_to") + if isinstance(relates_to, (dict, frozendict)): + relation_type = relates_to.get("rel_type") + if relation_type is not None: + relations_by_id[event_id] = relation_type + # event ID -> bundled aggregation in non-serialized form. results: Dict[str, BundledAggregations] = {} @@ -381,11 +390,11 @@ async def get_bundled_aggregations( # events to be fetched. Thus, we check those first! # Fetch thread summaries (but only for the directly requested events). - # - # Note that you can't have threads off of other related events, but it is - # possible for a malicious homeserver to inject them anyway. threads = await self.get_threads_for_events( - events_by_id.keys(), user_id, ignored_users + # It is not valid to start a thread on an event which already has a relation. + [eid for eid in events_by_id.keys() if eid not in relations_by_id], + user_id, + ignored_users, ) for event_id, thread in threads.items(): results.setdefault(event_id, BundledAggregations()).thread = thread @@ -399,13 +408,12 @@ async def get_bundled_aggregations( # Fetch other relations per event. for event in events_by_id.values(): - # Do not bundle aggregations for an event which represents an edit or an - # annotation. It does not make sense for them to have related events. - relates_to = event.content.get("m.relates_to") - if isinstance(relates_to, (dict, frozendict)): - relation_type = relates_to.get("rel_type") - if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): - continue + # Edits and annotations may not have related annotations or references. + if relations_by_id.get(event.event_id) in ( + RelationTypes.ANNOTATION, + RelationTypes.REPLACE, + ): + continue annotations = await self.get_annotations_for_event( event.event_id, event.room_id, ignored_users=ignored_users From c48b22056e428bee9458ad473a1fb5acb8bf73ab Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 26 Apr 2022 14:27:08 -0400 Subject: [PATCH 19/26] Clarify comment. --- tests/rest/client/test_relations.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index 10c1ee707560..33ce9471b3a8 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -1161,7 +1161,8 @@ def test_thread_edit_latest_event(self) -> None: self.assertIn("latest_event", thread_summary) latest_event_in_thread = thread_summary["latest_event"] self.assertEqual(latest_event_in_thread["content"]["body"], "I've been edited!") - # The event should also have edit appear under the bundled aggregations. + # The latest event in the thread should have the edit appear under the + # bundled aggregations. self.assertDictContainsSubset( {"event_id": edit_event_id, "sender": "@alice:test"}, latest_event_in_thread["unsigned"]["m.relations"][RelationTypes.REPLACE], From 36ec434341cdf80e8b15b2f8ef3619048f86caff Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 26 Apr 2022 14:28:53 -0400 Subject: [PATCH 20/26] Clarify comments. --- synapse/events/utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 25247c237efd..beebeb7a6e70 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -482,6 +482,9 @@ def _inject_bundled_aggregations( bundled_aggregations: Bundled aggregations to be injected. A map from event_id to aggregation data. Must contain at least an entry for `event`. + + While serializing the bundled aggregations this map may be searched + again for additional events in a recursive manner. serialized_event: The serialized event which may be modified. apply_edits: Whether the content of the event should be modified to reflect any replacement in `aggregations.replace`. From c4997570e1e00dc29aa493bda99be1a5780c4315 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 26 Apr 2022 14:36:39 -0400 Subject: [PATCH 21/26] Keep data in sync. --- synapse/handlers/relations.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 2156647b4787..87896589459e 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -405,6 +405,8 @@ async def get_bundled_aggregations( latest_thread_event = thread.latest_event if latest_thread_event and latest_thread_event.event_id not in events_by_id: events_by_id[latest_thread_event.event_id] = latest_thread_event + # The latest event in the thread must have a thread relation. + relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD # Fetch other relations per event. for event in events_by_id.values(): From 34ee41910db25c4cc5e91586730e19af809915af Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 27 Apr 2022 09:30:34 -0400 Subject: [PATCH 22/26] Clarify comment. Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/handlers/relations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 87896589459e..52641e433b19 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -391,7 +391,7 @@ async def get_bundled_aggregations( # Fetch thread summaries (but only for the directly requested events). threads = await self.get_threads_for_events( - # It is not valid to start a thread on an event which already has a relation. + # It is not valid to start a thread on an event which itself relates to another event. [eid for eid in events_by_id.keys() if eid not in relations_by_id], user_id, ignored_users, From bd886bb16636931b62c243f137390c84b4d2c0c0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 27 Apr 2022 09:33:14 -0400 Subject: [PATCH 23/26] Add comments. --- synapse/handlers/relations.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index fb5c51edc1e2..69178c9d7930 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -417,6 +417,7 @@ async def get_bundled_aggregations( ): continue + # Fetch any annotations (ie, reactions) to bundle with this event. annotations = await self.get_annotations_for_event( event.event_id, event.room_id, ignored_users=ignored_users ) @@ -425,6 +426,7 @@ async def get_bundled_aggregations( event.event_id, BundledAggregations() ).annotations = {"chunk": annotations} + # Fetch any references (ie, replies) to bundle with this event. references, next_token = await self.get_relations_for_event( event.event_id, event, From 4f7132c63418916553485e2c01b3abacc5327e39 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 27 Apr 2022 10:12:18 -0400 Subject: [PATCH 24/26] Clarify comment. --- synapse/handlers/relations.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 69178c9d7930..169a303a0cf6 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -405,12 +405,18 @@ async def get_bundled_aggregations( latest_thread_event = thread.latest_event if latest_thread_event and latest_thread_event.event_id not in events_by_id: events_by_id[latest_thread_event.event_id] = latest_thread_event - # The latest event in the thread must have a thread relation. + # Keep relations_by_id in sync with events_by_id: + # + # We know that the latest event in a thread has a thread relation + # (as that is what makes it part of the thread). relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD # Fetch other relations per event. for event in events_by_id.values(): - # Edits and annotations may not have related annotations or references. + # An event which is a replacement (ie edit) or annotation (ie, reaction) + # may not have any other event related to it. + # + # XXX This is buggy, see https://github.com/matrix-org/synapse/issues/12566 if relations_by_id.get(event.event_id) in ( RelationTypes.ANNOTATION, RelationTypes.REPLACE, From 98d73e948fb19ae00ea2afb6a58291dc477fcbca Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 27 Apr 2022 10:36:30 -0400 Subject: [PATCH 25/26] References != replies. --- synapse/handlers/relations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 169a303a0cf6..d3a08892cae3 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -432,7 +432,7 @@ async def get_bundled_aggregations( event.event_id, BundledAggregations() ).annotations = {"chunk": annotations} - # Fetch any references (ie, replies) to bundle with this event. + # Fetch any references to bundle with this event. references, next_token = await self.get_relations_for_event( event.event_id, event, From 6bcb2dce95f4434b64378615a3cb4118436a211c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 27 Apr 2022 11:36:40 -0400 Subject: [PATCH 26/26] Better checking of relation types. --- synapse/handlers/relations.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index d3a08892cae3..cec5740fbd26 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -372,12 +372,12 @@ async def get_bundled_aggregations( } # A map of event ID to the relation in that event, if there is one. - relations_by_id = {} + relations_by_id: Dict[str, str] = {} for event_id, event in events_by_id.items(): relates_to = event.content.get("m.relates_to") if isinstance(relates_to, collections.abc.Mapping): relation_type = relates_to.get("rel_type") - if relation_type is not None: + if isinstance(relation_type, str): relations_by_id[event_id] = relation_type # event ID -> bundled aggregation in non-serialized form.