Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Batch fetch bundled annotations (#14491)
Browse files Browse the repository at this point in the history
Avoid an n+1 query problem and fetch the bundled aggregations for
m.annotation relations in a single query instead of a query per event.

This applies similar logic for as was previously done for edits in
8b309ad (#11660) and threads
in b65acea (#11752).
  • Loading branch information
clokep authored and H-Shay committed Dec 13, 2022
1 parent ce8877b commit c801245
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 141 deletions.
1 change: 1 addition & 0 deletions changelog.d/14491.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce database load of [Client-Server endpoints](https://spec.matrix.org/v1.4/client-server-api/#aggregations) which return bundled aggregations.
197 changes: 113 additions & 84 deletions synapse/handlers/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,16 @@
# limitations under the License.
import enum
import logging
from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Tuple
from typing import (
TYPE_CHECKING,
Collection,
Dict,
FrozenSet,
Iterable,
List,
Optional,
Tuple,
)

import attr

Expand Down Expand Up @@ -259,48 +268,64 @@ async def redact_events_related_to(
e.msg,
)

async def get_annotations_for_event(
self,
event_id: str,
room_id: str,
limit: int = 5,
ignored_users: FrozenSet[str] = frozenset(),
) -> List[JsonDict]:
"""Get a list of annotations on the event, grouped by event type and
async def get_annotations_for_events(
self, event_ids: Collection[str], ignored_users: FrozenSet[str] = frozenset()
) -> Dict[str, List[JsonDict]]:
"""Get a list of annotations to the given events, grouped by event type and
aggregation key, sorted by count.
This is used e.g. to get the what and how many reactions have happend
This is used e.g. to get the what and how many reactions have happened
on an event.
Args:
event_id: Fetch events that relate to this event ID.
room_id: The room the event belongs to.
limit: Only fetch the `limit` groups.
event_ids: Fetch events that relate to these event IDs.
ignored_users: The users ignored by the requesting user.
Returns:
List of groups of annotations that match. Each row is a dict with
`type`, `key` and `count` fields.
A map of event IDs to a list of groups of annotations that match.
Each entry is a dict with `type`, `key` and `count` fields.
"""
# Get the base results for all users.
full_results = await self._main_store.get_aggregation_groups_for_event(
event_id, room_id, limit
full_results = await self._main_store.get_aggregation_groups_for_events(
event_ids
)

# Avoid additional logic if there are no ignored users.
if not ignored_users:
return {
event_id: results
for event_id, results in full_results.items()
if results
}

# Then subtract off the results for any ignored users.
ignored_results = await self._main_store.get_aggregation_groups_for_users(
event_id, room_id, limit, ignored_users
[event_id for event_id, results in full_results.items() if results],
ignored_users,
)

filtered_results = []
for result in full_results:
key = (result["type"], result["key"])
if key in ignored_results:
result = result.copy()
result["count"] -= ignored_results[key]
if result["count"] <= 0:
continue
filtered_results.append(result)
filtered_results = {}
for event_id, results in full_results.items():
# If no annotations, skip.
if not results:
continue

# If there are not ignored results for this event, copy verbatim.
if event_id not in ignored_results:
filtered_results[event_id] = results
continue

# Otherwise, subtract out the ignored results.
event_ignored_results = ignored_results[event_id]
for result in results:
key = (result["type"], result["key"])
if key in event_ignored_results:
# Ensure to not modify the cache.
result = result.copy()
result["count"] -= event_ignored_results[key]
if result["count"] <= 0:
continue
filtered_results.setdefault(event_id, []).append(result)

return filtered_results

Expand Down Expand Up @@ -366,59 +391,62 @@ async def _get_threads_for_events(
results = {}

for event_id, summary in summaries.items():
if summary:
thread_count, latest_thread_event = summary

# Subtract off the count of any ignored users.
for ignored_user in ignored_users:
thread_count -= ignored_results.get((event_id, ignored_user), 0)

# This is gnarly, but if the latest event is from an ignored user,
# attempt to find one that isn't from an ignored user.
if latest_thread_event.sender in ignored_users:
room_id = latest_thread_event.room_id

# If the root event is not found, something went wrong, do
# not include a summary of the thread.
event = await self._event_handler.get_event(user, room_id, event_id)
if event is None:
continue
# If no thread, skip.
if not summary:
continue

potential_events, _ = await self.get_relations_for_event(
event_id,
event,
room_id,
RelationTypes.THREAD,
ignored_users,
)
thread_count, latest_thread_event = summary

# If all found events are from ignored users, do not include
# a summary of the thread.
if not potential_events:
continue
# Subtract off the count of any ignored users.
for ignored_user in ignored_users:
thread_count -= ignored_results.get((event_id, ignored_user), 0)

# The *last* event returned is the one that is cared about.
event = await self._event_handler.get_event(
user, room_id, potential_events[-1].event_id
)
# It is unexpected that the event will not exist.
if event is None:
logger.warning(
"Unable to fetch latest event in a thread with event ID: %s",
potential_events[-1].event_id,
)
continue
latest_thread_event = event

results[event_id] = _ThreadAggregation(
latest_event=latest_thread_event,
count=thread_count,
# If there's a thread summary it must also exist in the
# participated dictionary.
current_user_participated=events_by_id[event_id].sender == user_id
or participated[event_id],
# This is gnarly, but if the latest event is from an ignored user,
# attempt to find one that isn't from an ignored user.
if latest_thread_event.sender in ignored_users:
room_id = latest_thread_event.room_id

# If the root event is not found, something went wrong, do
# not include a summary of the thread.
event = await self._event_handler.get_event(user, room_id, event_id)
if event is None:
continue

potential_events, _ = await self.get_relations_for_event(
event_id,
event,
room_id,
RelationTypes.THREAD,
ignored_users,
)

# If all found events are from ignored users, do not include
# a summary of the thread.
if not potential_events:
continue

# The *last* event returned is the one that is cared about.
event = await self._event_handler.get_event(
user, room_id, potential_events[-1].event_id
)
# It is unexpected that the event will not exist.
if event is None:
logger.warning(
"Unable to fetch latest event in a thread with event ID: %s",
potential_events[-1].event_id,
)
continue
latest_thread_event = event

results[event_id] = _ThreadAggregation(
latest_event=latest_thread_event,
count=thread_count,
# If there's a thread summary it must also exist in the
# participated dictionary.
current_user_participated=events_by_id[event_id].sender == user_id
or participated[event_id],
)

return results

@trace
Expand Down Expand Up @@ -496,17 +524,18 @@ async def get_bundled_aggregations(
# (as that is what makes it part of the thread).
relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD

# Fetch other relations per event.
for event in events_by_id.values():
# Fetch any annotations (ie, reactions) to bundle with this event.
annotations = await self.get_annotations_for_event(
event.event_id, event.room_id, ignored_users=ignored_users
)
# Fetch any annotations (ie, reactions) to bundle with this event.
annotations_by_event_id = await self.get_annotations_for_events(
events_by_id.keys(), ignored_users=ignored_users
)
for event_id, annotations in annotations_by_event_id.items():
if annotations:
results.setdefault(
event.event_id, BundledAggregations()
).annotations = {"chunk": annotations}
results.setdefault(event_id, BundledAggregations()).annotations = {
"chunk": annotations
}

# Fetch other relations per event.
for event in events_by_id.values():
# Fetch any references to bundle with this event.
references, next_token = await self.get_relations_for_event(
event.event_id,
Expand Down
Loading

0 comments on commit c801245

Please sign in to comment.