Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Store arbitrary relations from events #11391

Merged
merged 12 commits into from
Nov 22, 2021
1 change: 1 addition & 0 deletions changelog.d/11391.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Store and allow querying of arbitrary event relations.
29 changes: 14 additions & 15 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018-2019 New Vector Ltd
richvdh marked this conversation as resolved.
Show resolved Hide resolved
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
richvdh marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1696,34 +1696,33 @@ def non_null_str_or_none(val: Any) -> Optional[str]:
},
)

def _handle_event_relations(self, txn, event):
"""Handles inserting relation data during peristence of events
def _handle_event_relations(
self, txn: LoggingTransaction, event: EventBase
) -> None:
"""Handles inserting relation data during persistence of events

Args:
txn
event (EventBase)
txn: The current database transaction.
event: The event which might have relations.
"""
relation = event.content.get("m.relates_to")
if not relation:
# No relations
return

# Relations must have a type and parent event ID.
rel_type = relation.get("rel_type")
if rel_type not in (
RelationTypes.ANNOTATION,
RelationTypes.REFERENCE,
RelationTypes.REPLACE,
RelationTypes.THREAD,
):
# Unknown relation type
if not isinstance(rel_type, str):
return

parent_id = relation.get("event_id")
if not parent_id:
# Invalid relation
if not isinstance(parent_id, str):
return

aggregation_key = relation.get("key")
# Annotations have a key field.
aggregation_key = None
if rel_type == RelationTypes.ANNOTATION:
aggregation_key = relation.get("key")

self.db_pool.simple_insert_txn(
txn,
Expand Down
88 changes: 60 additions & 28 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 The Matrix.org Foundation C.I.C.
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -171,8 +171,14 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
self._purged_chain_cover_index,
)

# The event_thread_relation background update was replaced with the
# event_arbitrary_relations one, which handles any relation to avoid
# needed to potentially crawl the entire events table in the future.
self.db_pool.updates.register_noop_background_update("event_thread_relation")
clokep marked this conversation as resolved.
Show resolved Hide resolved

self.db_pool.updates.register_background_update_handler(
"event_thread_relation", self._event_thread_relation
"event_arbitrary_relations",
self._event_arbitrary_relations,
)

################################################################################
Expand Down Expand Up @@ -1099,23 +1105,27 @@ def purged_chain_cover_txn(txn) -> int:

return result

async def _event_thread_relation(self, progress: JsonDict, batch_size: int) -> int:
"""Background update handler which will store thread relations for existing events."""
async def _event_arbitrary_relations(
self, progress: JsonDict, batch_size: int
) -> int:
"""Background update handler which will store previously unknown relations for existing events."""
last_event_id = progress.get("last_event_id", "")

def _event_thread_relation_txn(txn: LoggingTransaction) -> int:
def _event_arbitrary_relations_txn(txn: LoggingTransaction) -> int:
# Fetch events and then filter based on whether the event has a
# relation or not.
txn.execute(
"""
SELECT event_id, json FROM event_json
LEFT JOIN event_relations USING (event_id)
WHERE event_id > ? AND event_relations.event_id IS NULL
WHERE event_id > ?
ORDER BY event_id LIMIT ?
""",
(last_event_id, batch_size),
)

results = list(txn)
missing_thread_relations = []
# (event_id, parent_id, rel_type) for each relation
relations_to_insert: List[Tuple[str, str, str]] = []
for (event_id, event_json_raw) in results:
try:
event_json = db_to_json(event_json_raw)
Expand All @@ -1127,48 +1137,70 @@ def _event_thread_relation_txn(txn: LoggingTransaction) -> int:
)
continue

# If there's no relation (or it is not a thread), skip!
# If there's no relation, skip!
relates_to = event_json["content"].get("m.relates_to")
if not relates_to or not isinstance(relates_to, dict):
continue
if relates_to.get("rel_type") != RelationTypes.THREAD:

# If the relation type or parent event ID is not a string, skip it.
#
# Do not consider relation types that have existed for a long time,
# since they will already be listed in the `event_relations` table.
rel_type = relates_to.get("rel_type")
if not isinstance(rel_type, str) or rel_type in (
RelationTypes.ANNOTATION,
RelationTypes.REFERENCE,
RelationTypes.REPLACE,
):
continue

# Get the parent ID.
parent_id = relates_to.get("event_id")
if not isinstance(parent_id, str):
continue

missing_thread_relations.append((event_id, parent_id))
relations_to_insert.append((event_id, parent_id, rel_type))

# Insert the missing data, note that we upsert here in case the event
# has already been processed.
if relations_to_insert:
self.db_pool.simple_upsert_many_txn(
txn=txn,
table="event_relations",
key_names=("event_id",),
key_values=[(r[0],) for r in relations_to_insert],
value_names=("relates_to_id", "relation_type"),
value_values=[r[1:] for r in relations_to_insert],
)

# Insert the missing data.
self.db_pool.simple_insert_many_txn(
txn=txn,
table="event_relations",
values=[
{
"event_id": event_id,
"relates_to_Id": parent_id,
"relation_type": RelationTypes.THREAD,
}
for event_id, parent_id in missing_thread_relations
],
)
# Iterate the parent IDs and invalidate caches.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if we usually clear caches as part of background updates, but in this case I think it could be a bit broken without doing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems sensible

for parent_id in {r[1] for r in relations_to_insert}:
cache_tuple = (parent_id,)
self._invalidate_cache_and_stream(
txn, self.get_relations_for_event, cache_tuple
)
self._invalidate_cache_and_stream(
txn, self.get_aggregation_groups_for_event, cache_tuple
)
self._invalidate_cache_and_stream(
txn, self.get_thread_summary, cache_tuple
)

if results:
latest_event_id = results[-1][0]
self.db_pool.updates._background_update_progress_txn(
txn, "event_thread_relation", {"last_event_id": latest_event_id}
txn, "event_arbitrary_relations", {"last_event_id": latest_event_id}
)

return len(results)

num_rows = await self.db_pool.runInteraction(
desc="event_thread_relation", func=_event_thread_relation_txn
desc="event_arbitrary_relations", func=_event_arbitrary_relations_txn
)

if not num_rows:
await self.db_pool.updates._end_background_update("event_thread_relation")
await self.db_pool.updates._end_background_update(
"event_arbitrary_relations"
)

return num_rows

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@

-- Check old events for thread relations.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(6502, 'event_thread_relation', '{}');
(6507, 'event_arbitrary_relations', '{}');
111 changes: 111 additions & 0 deletions tests/rest/client/test_relations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright 2019 New Vector Ltd
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,6 +47,8 @@ def default_config(self) -> dict:
return config

def prepare(self, reactor, clock, hs):
self.store = hs.get_datastore()

self.user_id, self.user_token = self._create_user("alice")
self.user2_id, self.user2_token = self._create_user("bob")

Expand Down Expand Up @@ -765,6 +768,52 @@ def test_aggregations_redaction_prevents_access_to_aggregations(self):
self.assertIn("chunk", channel.json_body)
self.assertEquals(channel.json_body["chunk"], [])

def test_unknown_relations(self):
"""Unknown relations should be accepted."""
channel = self._send_relation("m.relation.test", "m.room.test")
self.assertEquals(200, channel.code, channel.json_body)
event_id = channel.json_body["event_id"]

channel = self.make_request(
"GET",
"/_matrix/client/unstable/rooms/%s/relations/%s?limit=1"
% (self.room, self.parent_id),
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)

# We expect to get back a single pagination result, which is the full
# relation event we sent above.
self.assertEquals(len(channel.json_body["chunk"]), 1, channel.json_body)
self.assert_dict(
{"event_id": event_id, "sender": self.user_id, "type": "m.room.test"},
channel.json_body["chunk"][0],
)

# We also expect to get the original event (the id of which is self.parent_id)
self.assertEquals(
channel.json_body["original_event"]["event_id"], self.parent_id
)

# When bundling the unknown relation is not included.
channel = self.make_request(
"GET",
"/rooms/%s/event/%s" % (self.room, self.parent_id),
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
self.assertNotIn("m.relations", channel.json_body["unsigned"])

# But unknown relations can be directly queried.
channel = self.make_request(
"GET",
"/_matrix/client/unstable/rooms/%s/aggregations/%s?limit=1"
% (self.room, self.parent_id),
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
self.assertEquals(channel.json_body["chunk"], [])

def _send_relation(
self,
relation_type: str,
Expand Down Expand Up @@ -811,3 +860,65 @@ def _create_user(self, localpart: str) -> Tuple[str, str]:
access_token = self.login(localpart, "abc123")

return user_id, access_token

def test_background_update(self):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting this in this file seems dirty, but it seems best to keep relations tests together? Maybe it would be better to start a storage test file and we can add to it in the future when fleshing out some unit tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's best here.

"""Test the event_arbitrary_relations background update."""
channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="👍")
self.assertEquals(200, channel.code, channel.json_body)
annotation_event_id_good = channel.json_body["event_id"]

channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", key="A")
self.assertEquals(200, channel.code, channel.json_body)
annotation_event_id_bad = channel.json_body["event_id"]

channel = self._send_relation(RelationTypes.THREAD, "m.room.test")
self.assertEquals(200, channel.code, channel.json_body)
thread_event_id = channel.json_body["event_id"]

# Clean-up the table as if the inserts did not happen during event creation.
self.get_success(
self.store.db_pool.simple_delete_many(
table="event_relations",
column="event_id",
iterable=(annotation_event_id_bad, thread_event_id),
keyvalues={},
desc="RelationsTestCase.test_background_update",
)
)

# Only the "good" annotation should be found.
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=10",
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
self.assertEquals(
[ev["event_id"] for ev in channel.json_body["chunk"]],
[annotation_event_id_good],
)

# Insert and run the background update.
self.get_success(
self.store.db_pool.simple_insert(
"background_updates",
{"update_name": "event_arbitrary_relations", "progress_json": "{}"},
)
)

# Ugh, have to reset this flag
self.store.db_pool.updates._all_done = False
self.wait_for_background_updates()

# The "good" annotation and the thread should be found, but not the "bad"
# annotation.
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?limit=10",
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
self.assertCountEqual(
[ev["event_id"] for ev in channel.json_body["chunk"]],
[annotation_event_id_good, thread_event_id],
)
7 changes: 6 additions & 1 deletion tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,12 @@ def wait_on_thread(self, deferred, timeout=10):
time.sleep(0.01)

def wait_for_background_updates(self) -> None:
"""Block until all background database updates have completed."""
"""
Block until all background database updates have completed.

Note that callers must ensure that's a store property created on the
testcase.
"""
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
Expand Down