-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Use a chain cover index to efficiently calculate auth chain difference #8868
Changes from 1 commit
49e888d
8c760ff
85348e1
02d1198
61ab47e
6141825
c7e2ce5
66e779d
cf2243f
bd30c9e
55f03b9
3e98fb7
9087033
21b3ef0
fdaf4da
7f5ac13
afb7f80
dec1f74
9279940
6a74e21
654eff1
dbecefd
123b431
883e922
988f25a
08ec78b
024c802
4cc769f
7d75efb
92b5e4b
a9552c2
7cc6d7e
5fa05f2
8dac80c
e3d0be4
cdb88c2
0f91c86
888450a
c9422b6
c8758af
b2ac553
d96264d
d64f5f8
6071eff
368d3b8
bea2c47
03dd636
8c1e32c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
from synapse.storage.databases.main.events_worker import EventsWorkerStore | ||
from synapse.storage.databases.main.signatures import SignatureWorkerStore | ||
from synapse.storage.engines import PostgresEngine | ||
from synapse.storage.types import Cursor | ||
from synapse.types import Collection | ||
from synapse.util.caches.descriptors import cached | ||
from synapse.util.caches.lrucache import LruCache | ||
|
@@ -33,6 +34,11 @@ | |
logger = logging.getLogger(__name__) | ||
|
||
|
||
class _NoChainCoverIndex(Exception): | ||
def __init__(self, room_id: str): | ||
super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,)) | ||
|
||
|
||
class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore): | ||
def __init__(self, database: DatabasePool, db_conn, hs): | ||
super().__init__(database, db_conn, hs) | ||
|
@@ -156,20 +162,26 @@ async def get_auth_chain_difference( | |
# algorithm. | ||
room = await self.get_room(room_id) | ||
if room["has_auth_chain_index"]: | ||
return await self.db_pool.runInteraction( | ||
"get_auth_chain_difference_chains", | ||
self._get_auth_chain_difference_using_cover_index_txn, | ||
state_sets, | ||
) | ||
else: | ||
return await self.db_pool.runInteraction( | ||
"get_auth_chain_difference", | ||
self._get_auth_chain_difference_txn, | ||
state_sets, | ||
) | ||
try: | ||
return await self.db_pool.runInteraction( | ||
"get_auth_chain_difference_chains", | ||
self._get_auth_chain_difference_using_cover_index_txn, | ||
room_id, | ||
state_sets, | ||
) | ||
except _NoChainCoverIndex: | ||
# For whatever reason we don't actually have a chain cover index | ||
# for the events in question, so we fall back to the old method. | ||
pass | ||
|
||
return await self.db_pool.runInteraction( | ||
"get_auth_chain_difference", | ||
self._get_auth_chain_difference_txn, | ||
state_sets, | ||
) | ||
|
||
def _get_auth_chain_difference_using_cover_index_txn( | ||
self, txn, state_sets: List[Set[str]] | ||
self, txn: Cursor, room_id: str, state_sets: List[Set[str]] | ||
) -> Set[str]: | ||
"""Calculates the auth chain difference using the chain index. | ||
|
||
|
@@ -207,6 +219,18 @@ def _get_auth_chain_difference_using_cover_index_txn( | |
seen_chains.add(chain_id) | ||
chain_to_event.setdefault(chain_id, {})[sequence_number] = event_id | ||
|
||
# Check that we actually have a chain ID for all the events. | ||
events_missing_chain_info = initial_events.difference(chain_info) | ||
if events_missing_chain_info: | ||
# This can happen due to e.g. downgrade/upgrade of the server. We | ||
# raise an exception and fall back to the previous algorithm. | ||
logger.info( | ||
"Unexpectedly found that events don't have chain IDs in room %s: %s", | ||
room_id, | ||
events_missing_chain_info, | ||
) | ||
Comment on lines
+227
to
+231
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any way to recover from this? Will it happen automatically? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should mostly happen automatically, unless there are events we don't have the full auth chain for (I have a couple of those on jki.re somehow). There's not really a way to recover from that failure mode, so I'm tempted to just leave it as is and revisit if it turns out that it happens a lot. |
||
raise _NoChainCoverIndex(room_id) | ||
|
||
# Corresponds to `state_sets`, except as a map from chain ID to max | ||
# sequence number reachable from the state set. | ||
set_to_chain = [] # type: List[Dict[int, int]] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -259,6 +259,169 @@ def insert_event(txn): | |
) | ||
self.assertSetEqual(difference, set()) | ||
|
||
def test_auth_difference_partial_cover(self): | ||
"""Test that we correctly handle rooms where not all events have a chain | ||
cover calculated. This can happen due to a downgrade/upgrade. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we move these to schema 59 is the downgrade really a potential cause of this? It is likely worth keeping the code to handle this case regardless. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I've updated it |
||
""" | ||
|
||
room_id = "@ROOM:local" | ||
|
||
# The silly auth graph we use to test the auth difference algorithm, | ||
# where the top are the most recent events. | ||
# | ||
# A B | ||
# \ / | ||
# D E | ||
# \ | | ||
# ` F C | ||
# | /| | ||
# G ´ | | ||
# | \ | | ||
# H I | ||
# | | | ||
# K J | ||
|
||
auth_graph = { | ||
"a": ["e"], | ||
"b": ["e"], | ||
"c": ["g", "i"], | ||
"d": ["f"], | ||
"e": ["f"], | ||
"f": ["g"], | ||
"g": ["h", "i"], | ||
"h": ["k"], | ||
"i": ["j"], | ||
"k": [], | ||
"j": [], | ||
} | ||
|
||
depth_map = { | ||
"a": 7, | ||
"b": 7, | ||
"c": 4, | ||
"d": 6, | ||
"e": 6, | ||
"f": 5, | ||
"g": 3, | ||
"h": 2, | ||
"i": 2, | ||
"k": 1, | ||
"j": 1, | ||
} | ||
|
||
# We rudely fiddle with the appropriate tables directly, as that's much | ||
# easier than constructing events properly. | ||
|
||
def insert_event(txn): | ||
# First insert the room and mark it has having a chain cover. | ||
erikjohnston marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.store.db_pool.simple_insert_txn( | ||
txn, | ||
"rooms", | ||
{ | ||
"room_id": room_id, | ||
"creator": "room_creator_user_id", | ||
"is_public": True, | ||
"room_version": "6", | ||
"has_auth_chain_index": True, | ||
}, | ||
) | ||
|
||
stream_ordering = 0 | ||
|
||
for event_id in auth_graph: | ||
stream_ordering += 1 | ||
depth = depth_map[event_id] | ||
|
||
self.store.db_pool.simple_insert_txn( | ||
txn, | ||
table="events", | ||
values={ | ||
"event_id": event_id, | ||
"room_id": room_id, | ||
"depth": depth, | ||
"topological_ordering": depth, | ||
"type": "m.test", | ||
"processed": True, | ||
"outlier": False, | ||
"stream_ordering": stream_ordering, | ||
}, | ||
) | ||
|
||
# Insert all events apart from 'B' | ||
self.hs.datastores.persist_events._persist_event_auth_chain_txn( | ||
txn, | ||
[ | ||
FakeEvent(event_id, room_id, auth_graph[event_id]) | ||
for event_id in auth_graph | ||
if event_id != "b" | ||
], | ||
) | ||
|
||
# Now we insert the event 'B' without a chain cover, by temporarily | ||
# pretending the room doesn't have a chain cover. | ||
|
||
self.store.db_pool.simple_update_txn( | ||
txn, | ||
table="rooms", | ||
keyvalues={"room_id": room_id}, | ||
updatevalues={"has_auth_chain_index": False}, | ||
) | ||
|
||
self.hs.datastores.persist_events._persist_event_auth_chain_txn( | ||
txn, [FakeEvent("b", room_id, auth_graph["b"])], | ||
) | ||
|
||
self.store.db_pool.simple_update_txn( | ||
txn, | ||
table="rooms", | ||
keyvalues={"room_id": room_id}, | ||
updatevalues={"has_auth_chain_index": True}, | ||
) | ||
|
||
self.get_success(self.store.db_pool.runInteraction("insert", insert_event,)) | ||
|
||
# Now actually test that various combinations give the right result: | ||
|
||
difference = self.get_success( | ||
self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}]) | ||
) | ||
self.assertSetEqual(difference, {"a", "b"}) | ||
|
||
difference = self.get_success( | ||
self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}]) | ||
) | ||
self.assertSetEqual(difference, {"a", "b", "c", "e", "f"}) | ||
|
||
difference = self.get_success( | ||
self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b"}]) | ||
) | ||
self.assertSetEqual(difference, {"a", "b", "c"}) | ||
|
||
difference = self.get_success( | ||
self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b", "c"}]) | ||
) | ||
self.assertSetEqual(difference, {"a", "b"}) | ||
|
||
difference = self.get_success( | ||
self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"d"}]) | ||
) | ||
self.assertSetEqual(difference, {"a", "b", "d", "e"}) | ||
|
||
difference = self.get_success( | ||
self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}, {"d"}]) | ||
) | ||
self.assertSetEqual(difference, {"a", "b", "c", "d", "e", "f"}) | ||
|
||
difference = self.get_success( | ||
self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"e"}]) | ||
) | ||
self.assertSetEqual(difference, {"a", "b"}) | ||
|
||
difference = self.get_success( | ||
self.store.get_auth_chain_difference(room_id, [{"a"}]) | ||
) | ||
self.assertSetEqual(difference, set()) | ||
|
||
|
||
@attr.s | ||
class FakeEvent: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to suggest using
initial_events >= chain_info
as we don't care about the difference, just if there is one. Butchain_info
is adict
and it would probably be more expensive to convert it to a set first. 🤷There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤷