Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Store arbitrary relations from events. (#11391)
Browse files Browse the repository at this point in the history
Instead of only known relation types. This also reworks the background
update for thread relations to crawl events and search for any relation
type, not just threaded relations.
  • Loading branch information
clokep authored Nov 22, 2021
1 parent d9e9771 commit 3d893b8
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 45 deletions.
1 change: 1 addition & 0 deletions changelog.d/11391.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Store and allow querying of arbitrary event relations.
29 changes: 14 additions & 15 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018-2019 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1696,34 +1696,33 @@ def non_null_str_or_none(val: Any) -> Optional[str]:
},
)

def _handle_event_relations(self, txn, event):
"""Handles inserting relation data during peristence of events
def _handle_event_relations(
self, txn: LoggingTransaction, event: EventBase
) -> None:
"""Handles inserting relation data during persistence of events
Args:
txn
event (EventBase)
txn: The current database transaction.
event: The event which might have relations.
"""
relation = event.content.get("m.relates_to")
if not relation:
# No relations
return

# Relations must have a type and parent event ID.
rel_type = relation.get("rel_type")
if rel_type not in (
RelationTypes.ANNOTATION,
RelationTypes.REFERENCE,
RelationTypes.REPLACE,
RelationTypes.THREAD,
):
# Unknown relation type
if not isinstance(rel_type, str):
return

parent_id = relation.get("event_id")
if not parent_id:
# Invalid relation
if not isinstance(parent_id, str):
return

aggregation_key = relation.get("key")
# Annotations have a key field.
aggregation_key = None
if rel_type == RelationTypes.ANNOTATION:
aggregation_key = relation.get("key")

self.db_pool.simple_insert_txn(
txn,
Expand Down
88 changes: 60 additions & 28 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -171,8 +171,14 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
self._purged_chain_cover_index,
)

# The event_thread_relation background update was replaced with the
# event_arbitrary_relations one, which handles any relation to avoid
# needed to potentially crawl the entire events table in the future.
self.db_pool.updates.register_noop_background_update("event_thread_relation")

self.db_pool.updates.register_background_update_handler(
"event_thread_relation", self._event_thread_relation
"event_arbitrary_relations",
self._event_arbitrary_relations,
)

################################################################################
Expand Down Expand Up @@ -1099,23 +1105,27 @@ def purged_chain_cover_txn(txn) -> int:

return result

async def _event_thread_relation(self, progress: JsonDict, batch_size: int) -> int:
"""Background update handler which will store thread relations for existing events."""
async def _event_arbitrary_relations(
self, progress: JsonDict, batch_size: int
) -> int:
"""Background update handler which will store previously unknown relations for existing events."""
last_event_id = progress.get("last_event_id", "")

def _event_thread_relation_txn(txn: LoggingTransaction) -> int:
def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
# Fetch events and then filter based on whether the event has a
# relation or not.
txn.execute(
"""
SELECT event_id, json FROM event_json
LEFT JOIN event_relations USING (event_id)
WHERE event_id > ? AND event_relations.event_id IS NULL
WHERE event_id > ?
ORDER BY event_id LIMIT ?
""",
(last_event_id, batch_size),
)

results = list(txn)
missing_thread_relations = []
# (event_id, parent_id, rel_type) for each relation
relations_to_insert: List[Tuple[str, str, str]] = []
for (event_id, event_json_raw) in results:
try:
event_json = db_to_json(event_json_raw)
Expand All @@ -1127,48 +1137,70 @@ def _event_thread_relation_txn(txn: LoggingTransaction) -> int:
)
continue

# If there's no relation (or it is not a thread), skip!
# If there's no relation, skip!
relates_to = event_json["content"].get("m.relates_to")
if not relates_to or not isinstance(relates_to, dict):
continue
if relates_to.get("rel_type") != RelationTypes.THREAD:

# If the relation type or parent event ID is not a string, skip it.
#
# Do not consider relation types that have existed for a long time,
# since they will already be listed in the `event_relations` table.
rel_type = relates_to.get("rel_type")
if not isinstance(rel_type, str) or rel_type in (
RelationTypes.ANNOTATION,
RelationTypes.REFERENCE,
RelationTypes.REPLACE,
):
continue

# Get the parent ID.
parent_id = relates_to.get("event_id")
if not isinstance(parent_id, str):
continue

missing_thread_relations.append((event_id, parent_id))
relations_to_insert.append((event_id, parent_id, rel_type))

# Insert the missing data, note that we upsert here in case the event
# has already been processed.
if relations_to_insert:
self.db_pool.simple_upsert_many_txn(
txn=txn,
table="event_relations",
key_names=("event_id",),
key_values=[(r[0],) for r in relations_to_insert],
value_names=("relates_to_id", "relation_type"),
value_values=[r[1:] for r in relations_to_insert],
)

# Insert the missing data.
self.db_pool.simple_insert_many_txn(
txn=txn,
table="event_relations",
values=[
{
"event_id": event_id,
"relates_to_Id": parent_id,
"relation_type": RelationTypes.THREAD,
}
for event_id, parent_id in missing_thread_relations
],
)
# Iterate the parent IDs and invalidate caches.
for parent_id in {r[1] for r in relations_to_insert}:
cache_tuple = (parent_id,)
self._invalidate_cache_and_stream(
txn, self.get_relations_for_event, cache_tuple
)
self._invalidate_cache_and_stream(
txn, self.get_aggregation_groups_for_event, cache_tuple
)
self._invalidate_cache_and_stream(
txn, self.get_thread_summary, cache_tuple
)

if results:
latest_event_id = results[-1][0]
self.db_pool.updates._background_update_progress_txn(
txn, "event_thread_relation", {"last_event_id": latest_event_id}
txn, "event_arbitrary_relations", {"last_event_id": latest_event_id}
)

return len(results)

num_rows = await self.db_pool.runInteraction(
desc="event_thread_relation", func=_event_thread_relation_txn
desc="event_arbitrary_relations", func=_event_arbitrary_relations_txn
)

if not num_rows:
await self.db_pool.updates._end_background_update("event_thread_relation")
await self.db_pool.updates._end_background_update(
"event_arbitrary_relations"
)

return num_rows

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@

-- Check old events for thread relations.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6502, 'event_thread_relation', '{}');
(6507, 'event_arbitrary_relations', '{}');
111 changes: 111 additions & 0 deletions tests/rest/client/test_relations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright 2019 New Vector Ltd
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,6 +47,8 @@ def default_config(self) -> dict:
return config

def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()

self.user_id, self.user_token = self._create_user("alice")
self.user2_id, self.user2_token = self._create_user("bob")

Expand Down Expand Up @@ -765,6 +768,52 @@ def test_aggregations_redaction_prevents_access_to_aggregations(self):
self.assertIn("chunk", channel.json_body)
self.assertEquals(channel.json_body["chunk"], [])

def test_unknown_relations(self):
"""Unknown relations should be accepted."""
channel = self._send_relation("m.relation.test", "m.room.test")
self.assertEquals(200, channel.code, channel.json_body)
event_id = channel.json_body["event_id"]

channel = self.make_request(
"GET",
"/_matrix/client/unstable/rooms/%s/relations/%s?limit=1"
% (self.room, self.parent_id),
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)

# We expect to get back a single pagination result, which is the full
# relation event we sent above.
self.assertEquals(len(channel.json_body["chunk"]), 1, channel.json_body)
self.assert_dict(
{"event_id": event_id, "sender": self.user_id, "type": "m.room.test"},
channel.json_body["chunk"][0],
)

# We also expect to get the original event (the id of which is self.parent_id)
self.assertEquals(
channel.json_body["original_event"]["event_id"], self.parent_id
)

# When bundling the unknown relation is not included.
channel = self.make_request(
"GET",
"/rooms/%s/event/%s" % (self.room, self.parent_id),
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
self.assertNotIn("m.relations", channel.json_body["unsigned"])

# But unknown relations can be directly queried.
channel = self.make_request(
"GET",
"/_matrix/client/unstable/rooms/%s/aggregations/%s?limit=1"
% (self.room, self.parent_id),
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
self.assertEquals(channel.json_body["chunk"], [])

def _send_relation(
self,
relation_type: str,
Expand Down Expand Up @@ -811,3 +860,65 @@ def _create_user(self, localpart: str) -> Tuple[str, str]:
access_token = self.login(localpart, "abc123")

return user_id, access_token

def test_background_update(self):
"""Test the event_arbitrary_relations background update."""
channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="👍")
self.assertEquals(200, channel.code, channel.json_body)
annotation_event_id_good = channel.json_body["event_id"]

channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="A")
self.assertEquals(200, channel.code, channel.json_body)
annotation_event_id_bad = channel.json_body["event_id"]

channel = self._send_relation(RelationTypes.THREAD, "m.room.test")
self.assertEquals(200, channel.code, channel.json_body)
thread_event_id = channel.json_body["event_id"]

# Clean-up the table as if the inserts did not happen during event creation.
self.get_success(
self.store.db_pool.simple_delete_many(
table="event_relations",
column="event_id",
iterable=(annotation_event_id_bad, thread_event_id),
keyvalues={},
desc="RelationsTestCase.test_background_update",
)
)

# Only the "good" annotation should be found.
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=10",
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
self.assertEquals(
[ev["event_id"] for ev in channel.json_body["chunk"]],
[annotation_event_id_good],
)

# Insert and run the background update.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{"update_name": "event_arbitrary_relations", "progress_json": "{}"},
)
)

# Ugh, have to reset this flag
self.store.db_pool.updates._all_done = False
self.wait_for_background_updates()

# The "good" annotation and the thread should be found, but not the "bad"
# annotation.
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=10",
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
self.assertCountEqual(
[ev["event_id"] for ev in channel.json_body["chunk"]],
[annotation_event_id_good, thread_event_id],
)
7 changes: 6 additions & 1 deletion tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,12 @@ def wait_on_thread(self, deferred, timeout=10):
time.sleep(0.01)

def wait_for_background_updates(self) -> None:
"""Block until all background database updates have completed."""
"""
Block until all background database updates have completed.
Note that callers must ensure that's a store property created on the
testcase.
"""
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
Expand Down

0 comments on commit 3d893b8

Please sign in to comment.