Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Include bundled aggregations for the latest event in a thread #12273

Merged
merged 37 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
797dd9e
The latest event in a thread should include bundled aggregations.
clokep Mar 23, 2022
5065c10
Move the thread edit test into BundledAggregationstTestCase.
clokep Mar 23, 2022
f0ab01f
Support including other bundled aggregations for the latest event.
clokep Mar 23, 2022
d78ca4c
Do not recurse into threads indefinitely.
clokep Mar 23, 2022
5a1c218
Rejigger to avoid passing state around.
clokep Mar 23, 2022
560a4ac
Newsfragment
clokep Mar 23, 2022
3f8bd1f
Clarify comment.
clokep Mar 30, 2022
60ac6b4
Review comments.
clokep Mar 30, 2022
af4c814
Merge remote-tracking branch 'origin/develop' into clokep/bundled-agg…
clokep Mar 30, 2022
a63d62a
Lint
clokep Mar 30, 2022
cf96c0a
Merge remote-tracking branch 'origin/develop' into clokep/bundled-agg…
clokep Apr 11, 2022
d247e72
Merge remote-tracking branch 'origin/develop' into clokep/bundled-agg…
clokep Apr 12, 2022
2a64c41
Merge remote-tracking branch 'origin/develop' into clokep/bundled-agg…
clokep Apr 19, 2022
14ff585
Add more checks to ensure that threads don't return bundled aggregati…
clokep Apr 19, 2022
a8c02b4
Merge remote-tracking branch 'origin/develop' into clokep/bundled-agg…
clokep Apr 19, 2022
5e18d24
Fix parameter ordering in docstring.
clokep Apr 19, 2022
c89d227
Merge remote-tracking branch 'origin/develop' into clokep/bundled-agg…
clokep Apr 19, 2022
8fe3029
Fix frozendicts on Python 3.10.
clokep Apr 19, 2022
823a6b1
Clean-up and rename test_thread_loop.
clokep Apr 20, 2022
db0ccce
Reduce duplicated code.
clokep Apr 20, 2022
87dc36a
Merge remote-tracking branch 'origin/develop' into clokep/bundled-agg…
clokep Apr 20, 2022
d87dcba
Lint
clokep Apr 20, 2022
8c65ace
Clarify docstrings.
clokep Apr 20, 2022
1ccde00
Remove useless helper method.
clokep Apr 20, 2022
6951c83
Merge remote-tracking branch 'origin/develop' into clokep/bundled-agg…
clokep Apr 25, 2022
566b66c
Avoid generating bundled thread aggregations for events with a relation.
clokep Apr 26, 2022
c48b220
Clarify comment.
clokep Apr 26, 2022
586943c
Merge remote-tracking branch 'origin/develop' into clokep/bundled-agg…
clokep Apr 26, 2022
36ec434
Clarify comments.
clokep Apr 26, 2022
c499757
Keep data in sync.
clokep Apr 26, 2022
d8d2879
Merge remote-tracking branch 'origin/develop' into clokep/bundled-agg…
clokep Apr 27, 2022
34ee419
Clarify comment.
clokep Apr 27, 2022
53fda7f
Merge remote-tracking branch 'refs/remotes/origin/clokep/bundled-aggs…
clokep Apr 27, 2022
bd886bb
Add comments.
clokep Apr 27, 2022
4f7132c
Clarify comment.
clokep Apr 27, 2022
98d73e9
References != replies.
clokep Apr 27, 2022
6bcb2dc
Better checking of relation types.
clokep Apr 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12273.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse v1.48.0 where latest thread reply provided failed to include the proper bundled aggregations.
60 changes: 37 additions & 23 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,12 @@ def serialize_event(

# Check if there are any bundled aggregations to include with the event.
if bundle_aggregations:
event_aggregations = bundle_aggregations.get(event.event_id)
if event_aggregations:
if event.event_id in bundle_aggregations:
self._inject_bundled_aggregations(
event,
time_now,
config,
event_aggregations,
bundle_aggregations,
serialized_event,
apply_edits=apply_edits,
)
Expand Down Expand Up @@ -470,7 +469,7 @@ def _inject_bundled_aggregations(
event: EventBase,
time_now: int,
config: SerializeEventConfig,
aggregations: "BundledAggregations",
bundled_aggregations: Dict[str, "BundledAggregations"],
clokep marked this conversation as resolved.
Show resolved Hide resolved
clokep marked this conversation as resolved.
Show resolved Hide resolved
serialized_event: JsonDict,
apply_edits: bool,
) -> None:
Expand All @@ -479,23 +478,33 @@ def _inject_bundled_aggregations(
Args:
event: The event being serialized.
time_now: The current time in milliseconds
aggregations: The bundled aggregation to serialize.
serialized_event: The serialized event which may be modified.
config: Event serialization config
bundled_aggregations: The bundled aggregation to serialize.
clokep marked this conversation as resolved.
Show resolved Hide resolved
serialized_event: The serialized event which may be modified.
apply_edits: Whether the content of the event should be modified to reflect
any replacement in `aggregations.replace`.
"""

# We have already checked that aggregations exist for this event.
event_aggregations = bundled_aggregations[event.event_id]

# The JSON dictionary to be added under the unsigned property of the event
# being serialized.
serialized_aggregations = {}

if aggregations.annotations:
serialized_aggregations[RelationTypes.ANNOTATION] = aggregations.annotations
if event_aggregations.annotations:
serialized_aggregations[
RelationTypes.ANNOTATION
] = event_aggregations.annotations

if aggregations.references:
serialized_aggregations[RelationTypes.REFERENCE] = aggregations.references
if event_aggregations.references:
serialized_aggregations[
RelationTypes.REFERENCE
] = event_aggregations.references

if aggregations.replace:
if event_aggregations.replace:
# If there is an edit, optionally apply it to the event.
edit = aggregations.replace
edit = event_aggregations.replace
if apply_edits:
self._apply_edit(event, serialized_event, edit)

Expand All @@ -506,19 +515,24 @@ def _inject_bundled_aggregations(
"sender": edit.sender,
}

# If this event is the start of a thread, include a summary of the replies.
if aggregations.thread:
thread = aggregations.thread
# Include any threaded replies to this event.
#
# Note that is it not valid to start a thread on an event which already
# has a relation. Doing so would cause an infinite loop.
clokep marked this conversation as resolved.
Show resolved Hide resolved
event_relation = event.content.get("m.relates_to")
include_thread = (
not isinstance(event_relation, dict)
or event_relation.get("rel_type") is None
)
clokep marked this conversation as resolved.
Show resolved Hide resolved
if include_thread and event_aggregations.thread:
thread = event_aggregations.thread

# Don't bundle aggregations as this could recurse forever.
serialized_latest_event = serialize_event(
thread.latest_event, time_now, config=config
serialized_latest_event = self.serialize_event(
thread.latest_event,
time_now,
config=config,
bundle_aggregations=bundled_aggregations,
)
# Manually apply an edit, if one exists.
if thread.latest_edit:
self._apply_edit(
thread.latest_event, serialized_latest_event, thread.latest_edit
)

thread_summary = {
"latest_event": serialized_latest_event,
Expand Down
80 changes: 50 additions & 30 deletions synapse/handlers/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
class _ThreadAggregation:
# The latest event in the thread.
latest_event: EventBase
# The latest edit to the latest event in the thread.
latest_edit: Optional[EventBase]
# The total number of events in the thread.
count: int
# True if the current user has sent an event to the thread.
Expand Down Expand Up @@ -256,10 +254,11 @@ async def get_annotations_for_event(

return filtered_results

async def _get_bundled_aggregation_for_event(
async def _get_bundled_annotations_and_references_for_event(
clokep marked this conversation as resolved.
Show resolved Hide resolved
clokep marked this conversation as resolved.
Show resolved Hide resolved
self, event: EventBase, ignored_users: FrozenSet[str]
) -> Optional[BundledAggregations]:
"""Generate bundled aggregations for an event.
) -> Tuple[Optional[JsonDict], Optional[JsonDict]]:
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""
Generate bundled aggregations for annotation and reference relations for an event.
clokep marked this conversation as resolved.
Show resolved Hide resolved

Note that this does not use a cache, but depends on cached methods.

Expand All @@ -268,8 +267,9 @@ async def _get_bundled_aggregation_for_event(
ignored_users: The users ignored by the requesting user.

Returns:
The bundled aggregations for an event, if bundled aggregations are
enabled and the event can have bundled aggregations.
A tuple of the bundled aggregations for annotation and reference relations.
Either or both entries in the tuple might be None if no relations
of that type exist.
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""

# Do not bundle aggregations for an event which represents an edit or an
Expand All @@ -278,21 +278,17 @@ async def _get_bundled_aggregation_for_event(
if isinstance(relates_to, (dict, frozendict)):
relation_type = relates_to.get("rel_type")
if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
return None
return None, None

event_id = event.event_id
room_id = event.room_id

# The bundled aggregations to include, a mapping of relation type to a
# type-specific value. Some types include the direct return type here
# while others need more processing during serialization.
aggregations = BundledAggregations()

annotations = await self.get_annotations_for_event(
event_id, room_id, ignored_users=ignored_users
)
serialized_annotations = None
if annotations:
aggregations.annotations = {"chunk": annotations}
serialized_annotations = {"chunk": annotations}

references, next_token = await self.get_relations_for_event(
event_id,
Expand All @@ -301,18 +297,19 @@ async def _get_bundled_aggregation_for_event(
RelationTypes.REFERENCE,
ignored_users=ignored_users,
)
serialized_references: Optional[JsonDict] = None
if references:
aggregations.references = {
serialized_references = {
"chunk": [{"event_id": event.event_id} for event in references]
}

if next_token:
aggregations.references["next_batch"] = await next_token.to_string(
serialized_references["next_batch"] = await next_token.to_string(
self._main_store
)

# Store the bundled aggregations in the event metadata for later use.
clokep marked this conversation as resolved.
Show resolved Hide resolved
return aggregations
return serialized_annotations, serialized_references

async def get_threads_for_events(
self, event_ids: Collection[str], user_id: str, ignored_users: FrozenSet[str]
Expand Down Expand Up @@ -353,7 +350,7 @@ async def get_threads_for_events(

for event_id, summary in summaries.items():
if summary:
thread_count, latest_thread_event, edit = summary
thread_count, latest_thread_event = summary

# Subtract off the count of any ignored users.
for ignored_user in ignored_users:
Expand Down Expand Up @@ -398,7 +395,6 @@ async def get_threads_for_events(

results[event_id] = _ThreadAggregation(
latest_event=latest_thread_event,
latest_edit=edit,
count=thread_count,
# If there's a thread summary it must also exist in the
# participated dictionary.
Expand All @@ -417,8 +413,13 @@ async def get_bundled_aggregations(
user_id: The user requesting the bundled aggregations.

Returns:
A map of event ID to the bundled aggregation for the event. Not all
events may have bundled aggregations in the results.
A map of event ID to the bundled aggregations for the event.

Not all requested events may exist in the results (if they don't have
bundled aggregations).

The results may include additional events which are related to the
requested events.
"""
# De-duplicate events by ID to handle the same event requested multiple times.
#
Expand All @@ -433,13 +434,38 @@ async def get_bundled_aggregations(
# Fetch any ignored users of the requesting user.
ignored_users = await self._main_store.ignored_users(user_id)

# Threads are special as the latest event of a thread might cause additional
# events to be fetched. Thus, we check those first!

# Fetch thread summaries (but only for the directly requested events).
#
# Note that you can't have threads off of other related events, but it is
# possible for a malicious homeserver to inject them anyway.
threads = await self.get_threads_for_events(
events_by_id.keys(), user_id, ignored_users
)
for event_id, thread in threads.items():
results.setdefault(event_id, BundledAggregations()).thread = thread

# If the latest event in a thread is not already being fetched,
# add it. This ensures that the bundled aggregations for the
# latest thread event is correct.
latest_thread_event = thread.latest_event
if latest_thread_event and latest_thread_event.event_id not in events_by_id:
events_by_id[latest_thread_event.event_id] = latest_thread_event

# Fetch other relations per event.
for event in events_by_id.values():
event_result = await self._get_bundled_aggregation_for_event(
(
annotations,
references,
clokep marked this conversation as resolved.
Show resolved Hide resolved
) = await self._get_bundled_annotations_and_references_for_event(
event, ignored_users
)
if event_result:
results[event.event_id] = event_result
if annotations or references:
aggregations = results.setdefault(event.event_id, BundledAggregations())
aggregations.annotations = annotations
aggregations.references = references

# Fetch any edits (but not for redacted events).
#
Expand All @@ -455,10 +481,4 @@ async def get_bundled_aggregations(
for event_id, edit in edits.items():
results.setdefault(event_id, BundledAggregations()).replace = edit

threads = await self.get_threads_for_events(
events_by_id.keys(), user_id, ignored_users
)
for event_id, thread in threads.items():
results.setdefault(event_id, BundledAggregations()).thread = thread

return results
11 changes: 3 additions & 8 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ def get_thread_summary(self, event_id: str) -> Optional[Tuple[int, EventBase]]:
@cachedList(cached_method_name="get_thread_summary", list_name="event_ids")
async def get_thread_summaries(
self, event_ids: Collection[str]
) -> Dict[str, Optional[Tuple[int, EventBase, Optional[EventBase]]]]:
"""Get the number of threaded replies, the latest reply (if any), and the latest edit for that reply for the given event.
) -> Dict[str, Optional[Tuple[int, EventBase]]]:
"""Get the number of threaded replies and the latest reply (if any) for the given events.

Args:
event_ids: Summarize the thread related to this event ID.
Expand All @@ -458,7 +458,6 @@ async def get_thread_summaries(
Each summary is a tuple of:
The number of events in the thread.
The most recent event in the thread.
The most recent edit to the most recent event in the thread, if applicable.
"""

def _get_thread_summaries_txn(
Expand Down Expand Up @@ -544,9 +543,6 @@ def _get_thread_summaries_txn(

latest_events = await self.get_events(latest_event_ids.values()) # type: ignore[attr-defined]

# Check to see if any of those events are edited.
latest_edits = await self.get_applicable_edits(latest_event_ids.values())

# Map to the event IDs to the thread summary.
#
# There might not be a summary due to there not being a thread or
Expand All @@ -557,8 +553,7 @@ def _get_thread_summaries_txn(

summary = None
if latest_event:
latest_edit = latest_edits.get(latest_event_id)
summary = (counts[parent_event_id], latest_event, latest_edit)
summary = (counts[parent_event_id], latest_event)
summaries[parent_event_id] = summary

return summaries
Expand Down
Loading