Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Batch fetch bundled references #14508

Merged
merged 4 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14508.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce database load of [Client-Server endpoints](https://spec.matrix.org/v1.4/client-server-api/#aggregations) which return bundled aggregations.
128 changes: 59 additions & 69 deletions synapse/handlers/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,7 @@
# limitations under the License.
import enum
import logging
from typing import (
TYPE_CHECKING,
Collection,
Dict,
FrozenSet,
Iterable,
List,
Optional,
Tuple,
)
from typing import TYPE_CHECKING, Collection, Dict, FrozenSet, Iterable, List, Optional

import attr

Copy link
Contributor

@MadLittleMods MadLittleMods Nov 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing! 😍

Using the artificial Complement test from matrix-org/complement#443 which has zero relations, get_bundled_aggregations is down from ~1000ms -> ~50ms!


Reproduction steps in #14491 (comment)

Expand All @@ -32,7 +23,7 @@
from synapse.logging.opentracing import trace
from synapse.storage.databases.main.relations import ThreadsNextBatch, _RelatedEvent
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamToken, UserID
from synapse.types import JsonDict, Requester, UserID
from synapse.visibility import filter_events_for_client

if TYPE_CHECKING:
Expand Down Expand Up @@ -181,40 +172,6 @@ async def get_relations(

return return_value

async def get_relations_for_event(
self,
event_id: str,
event: EventBase,
room_id: str,
relation_type: str,
ignored_users: FrozenSet[str] = frozenset(),
) -> Tuple[List[_RelatedEvent], Optional[StreamToken]]:
"""Get a list of events which relate to an event, ordered by topological ordering.

Args:
event_id: Fetch events that relate to this event ID.
event: The matching EventBase to event_id.
room_id: The room the event belongs to.
relation_type: The type of relation.
ignored_users: The users ignored by the requesting user.

Returns:
List of event IDs that match relations requested. The rows are of
the form `{"event_id": "..."}`.
"""

# Call the underlying storage method, which is cached.
related_events, next_token = await self._main_store.get_relations_for_event(
event_id, event, room_id, relation_type, direction="f"
)

# Filter out ignored users and convert to the expected format.
related_events = [
event for event in related_events if event.sender not in ignored_users
]

return related_events, next_token

async def redact_events_related_to(
self,
requester: Requester,
Expand Down Expand Up @@ -329,6 +286,46 @@ async def get_annotations_for_events(

return filtered_results

async def get_references_for_events(
self, event_ids: Collection[str], ignored_users: FrozenSet[str] = frozenset()
) -> Dict[str, List[_RelatedEvent]]:
"""Get a list of references to the given events.

Args:
event_ids: Fetch events that relate to this event ID.
ignored_users: The users ignored by the requesting user.

Returns:
A map of event IDs to a list related events.
"""

related_events = await self._main_store.get_references_for_events(event_ids)

# Avoid additional logic if there are no ignored users.
if not ignored_users:
return {
event_id: results
for event_id, results in related_events.items()
if results
}

# Filter out ignored users.
results = {}
for event_id, events in related_events.items():
# If no references, skip.
if not events:
continue

# Filter ignored users out.
events = [event for event in events if event.sender not in ignored_users]
# If there are no events left, skip this event.
if not events:
continue

results[event_id] = events

return results

async def _get_threads_for_events(
self,
events_by_id: Dict[str, EventBase],
Expand Down Expand Up @@ -412,14 +409,18 @@ async def _get_threads_for_events(
if event is None:
continue

potential_events, _ = await self.get_relations_for_event(
event_id,
event,
room_id,
RelationTypes.THREAD,
ignored_users,
# Attempt to find another event to use as the latest event.
potential_events, _ = await self._main_store.get_relations_for_event(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, haven't we deleted this function get_relations_for_event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We deleted the one in the handler, not the one in the store.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH ha

event_id, event, room_id, RelationTypes.THREAD, direction="f"
)

# Filter out ignored users.
potential_events = [
event
for event in potential_events
if event.sender not in ignored_users
]

# If all found events are from ignored users, do not include
# a summary of the thread.
if not potential_events:
Expand Down Expand Up @@ -534,27 +535,16 @@ async def get_bundled_aggregations(
"chunk": annotations
}

# Fetch other relations per event.
for event in events_by_id.values():
# Fetch any references to bundle with this event.
references, next_token = await self.get_relations_for_event(
event.event_id,
event,
event.room_id,
RelationTypes.REFERENCE,
ignored_users=ignored_users,
)
# Fetch any references to bundle with this event.
references_by_event_id = await self.get_references_for_events(
events_by_id.keys(), ignored_users=ignored_users
)
for event_id, references in references_by_event_id.items():
if references:
aggregations = results.setdefault(event.event_id, BundledAggregations())
aggregations.references = {
results.setdefault(event_id, BundledAggregations()).references = {
"chunk": [{"event_id": ev.event_id} for ev in references]
}

if next_token:
aggregations.references["next_batch"] = await next_token.to_string(
self._main_store
)
Comment on lines -553 to -556
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unspecced field, see #14486.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we checked with the client teams that this is Definitely Safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err, no. Not at all. Oops. 😢

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


# Fetch any edits (but not for redacted events).
#
# Note that there is no use in limiting edits by ignored users since the
Expand Down Expand Up @@ -600,7 +590,7 @@ async def get_threads(
room_id, requester, allow_departed_users=True
)

# Note that ignored users are not passed into get_relations_for_event
# Note that ignored users are not passed into get_threads
# below. Ignored users are handled in filter_events_for_client (and by
# not passing them in here we should get a better cache hit rate).
thread_roots, next_batch = await self._main_store.get_threads(
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def _invalidate_caches_for_event(

if relates_to:
self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,))
self._attempt_to_invalidate_cache("get_references_for_event", (relates_to,))
self._attempt_to_invalidate_cache(
"get_aggregation_groups_for_event", (relates_to,)
)
Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2049,6 +2049,10 @@ def _handle_redact_relations(
self.store._invalidate_cache_and_stream(
txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,)
)
if rel_type == RelationTypes.REFERENCE:
self.store._invalidate_cache_and_stream(
txn, self.store.get_references_for_event, (redacted_relates_to,)
)
if rel_type == RelationTypes.REPLACE:
self.store._invalidate_cache_and_stream(
txn, self.store.get_applicable_edit, (redacted_relates_to,)
Expand Down
74 changes: 66 additions & 8 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ class _RelatedEvent:
event_id: str
# The sender of the related event.
sender: str
topological_ordering: Optional[int]
stream_ordering: int


class RelationsWorkerStore(SQLBaseStore):
Expand Down Expand Up @@ -246,13 +244,17 @@ def _get_recent_references_for_event_txn(
txn.execute(sql, where_args + [limit + 1])

events = []
for event_id, relation_type, sender, topo_ordering, stream_ordering in txn:
topo_orderings: List[int] = []
stream_orderings: List[int] = []
for event_id, relation_type, sender, topo_ordering, stream_ordering in cast(
List[Tuple[str, str, str, int, int]], txn
):
# Do not include edits for redacted events as they leak event
# content.
if not is_redacted or relation_type != RelationTypes.REPLACE:
events.append(
_RelatedEvent(event_id, sender, topo_ordering, stream_ordering)
)
events.append(_RelatedEvent(event_id, sender))
topo_orderings.append(topo_ordering)
stream_orderings.append(stream_ordering)

# If there are more events, generate the next pagination key from the
# last event returned.
Expand All @@ -261,9 +263,11 @@ def _get_recent_references_for_event_txn(
# Instead of using the last row (which tells us there is more
# data), use the last row to be returned.
events = events[:limit]
topo_orderings = topo_orderings[:limit]
stream_orderings = stream_orderings[:limit]

topo = events[-1].topological_ordering
token = events[-1].stream_ordering
topo = topo_orderings[-1]
token = stream_orderings[-1]
if direction == "b":
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
Expand Down Expand Up @@ -530,6 +534,60 @@ def _get_aggregation_groups_for_users_txn(
"get_aggregation_groups_for_users", _get_aggregation_groups_for_users_txn
)

@cached()
async def get_references_for_event(self, event_id: str) -> List[JsonDict]:
raise NotImplementedError()

@cachedList(cached_method_name="get_references_for_event", list_name="event_ids")
async def get_references_for_events(
self, event_ids: Collection[str]
) -> Mapping[str, Optional[List[_RelatedEvent]]]:
Comment on lines +541 to +544
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't apply a limit for the number of references to include, but this actually matches the spec:

The aggregation format of m.reference relations consists of a single chunk property, which lists all the events which m.reference the event (the parent).

🤷

"""Get a list of references to the given events.

Args:
event_ids: Fetch events that relate to these event IDs.

Returns:
A map of event IDs to a list of related event IDs (and their senders).
"""

clause, args = make_in_list_sql_clause(
self.database_engine, "relates_to_id", event_ids
)
args.append(RelationTypes.REFERENCE)

sql = f"""
SELECT relates_to_id, ref.event_id, ref.sender
FROM events AS ref
INNER JOIN event_relations USING (event_id)
INNER JOIN events AS parent ON
parent.event_id = relates_to_id
AND parent.room_id = ref.room_id
WHERE
{clause}
AND relation_type = ?
ORDER BY ref.topological_ordering, ref.stream_ordering
"""

def _get_references_for_events_txn(
txn: LoggingTransaction,
) -> Mapping[str, List[_RelatedEvent]]:
txn.execute(sql, args)

result: Dict[str, List[_RelatedEvent]] = {}
for relates_to_id, event_id, sender in cast(
List[Tuple[str, str, str]], txn
):
result.setdefault(relates_to_id, []).append(
_RelatedEvent(event_id, sender)
)

return result

return await self.db_pool.runInteraction(
"_get_references_for_events_txn", _get_references_for_events_txn
)

@cached()
def get_applicable_edit(self, event_id: str) -> Optional[EventBase]:
raise NotImplementedError()
Expand Down
4 changes: 2 additions & 2 deletions tests/rest/client/test_relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ def assert_thread(bundled_aggregations: JsonDict) -> None:

# The "user" sent the root event and is making queries for the bundled
# aggregations: they have participated.
self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 8)
self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 7)
# The "user2" sent replies in the thread and is making queries for the
# bundled aggregations: they have participated.
#
Expand Down Expand Up @@ -1170,7 +1170,7 @@ def assert_thread(bundled_aggregations: JsonDict) -> None:
bundled_aggregations["latest_event"].get("unsigned"),
)

self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 8)
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 7)

def test_nested_thread(self) -> None:
"""
Expand Down