Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix historical messages backfilling in random order on remote homeservers (MSC2716) #11114

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
f30302d
Scratch debugging why events appear out of order on remote homeservers
MadLittleMods Oct 18, 2021
438e222
Use OrderedDict to gurantee order returned is the same as we were bui…
MadLittleMods Oct 18, 2021
4983739
Avoid constant missing prev_event fetching while backfilling
MadLittleMods Oct 19, 2021
a64bb2e
Add changelog
MadLittleMods Oct 19, 2021
260ca06
Some more trials of trying to get many many events to backfill in ord…
MadLittleMods Oct 19, 2021
886071b
Fix backfill not picking up batch events connected to non-base insert…
MadLittleMods Oct 20, 2021
477c15d
Some more debug logging
MadLittleMods Oct 21, 2021
4191f56
Remove fake prev events from historical state chain
MadLittleMods Oct 21, 2021
f39c1da
Remove debug logging
MadLittleMods Oct 21, 2021
7da8012
Remove extra event info
MadLittleMods Oct 21, 2021
69dfa16
Move to sorting the backfill events in the existing sorted
MadLittleMods Oct 21, 2021
83474d9
Put MSC2716 backfill logic behind experimental feature flag
MadLittleMods Oct 21, 2021
1263c7e
Remove unused import
MadLittleMods Oct 21, 2021
ee47878
Fix mypy lints
MadLittleMods Oct 21, 2021
5bfde7b
Merge branch 'master' into madlittlemods/return-historical-events-in-…
MadLittleMods Oct 21, 2021
2fbe3f1
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Oct 21, 2021
1d3f417
Revert back to string interpolation for SQL boolean value
MadLittleMods Oct 21, 2021
4a12304
Put empty prev_events behind new room version
MadLittleMods Oct 28, 2021
9a6d8fa
WIP: Don't include the event we branch from
MadLittleMods Oct 29, 2021
3e09d49
Revert "WIP: Don't include the event we branch from"
MadLittleMods Oct 29, 2021
5afc264
WIP: Sort events topologically when we receive them over backfill
MadLittleMods Oct 29, 2021
6ea263b
Revert "WIP: Sort events topologically when we receive them over back…
MadLittleMods Oct 29, 2021
3d387f9
WIP: Sort events topologically when we receive them over backfill
MadLittleMods Oct 29, 2021
fb8e281
Fix direction of fake edges
MadLittleMods Oct 29, 2021
c772b35
Implement backfill in handler so we can do fetching later
MadLittleMods Oct 29, 2021
e0ff66d
Fix backfill being able to cleanly branch into history and back to "l…
MadLittleMods Oct 29, 2021
76d454f
Some backfill receive sorting fixes but not using it yet
MadLittleMods Oct 30, 2021
3529449
Fix lints
MadLittleMods Oct 30, 2021
321f9ea
Move back to the old get_backfill_events and simplify backfill.
MadLittleMods Nov 2, 2021
15c3282
Remove the new backfill implementation and pull some good parts of th…
MadLittleMods Nov 2, 2021
5db717a
Always process marker events regardless if backfilled
MadLittleMods Nov 3, 2021
e96fd5c
Add comment docs
MadLittleMods Nov 3, 2021
f3b7b3e
Add better explanatory comment
MadLittleMods Nov 3, 2021
7f2105a
Remove topological sort when receiving backfill events
MadLittleMods Nov 3, 2021
246278e
Fix lints
MadLittleMods Nov 3, 2021
ec35be5
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Nov 3, 2021
bc0ba8c
Protect from no auth events for non-existent provided prev_event
MadLittleMods Nov 3, 2021
363aed6
Revert unused refactor to get PDU raw
MadLittleMods Nov 3, 2021
d771fbd
Only run the tests package to get streaming Complement output
MadLittleMods Nov 11, 2021
b559e23
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Nov 11, 2021
6b64184
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Dec 9, 2021
1d00043
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Dec 16, 2021
b071426
Plumb allow_no_prev_events through for MSC2716
MadLittleMods Dec 16, 2021
ec33a40
Make the historical events float separately from the state chain
MadLittleMods Dec 16, 2021
b99efa8
Plumb allow_no_prev_events through create_and_send_nonmember_event
MadLittleMods Dec 16, 2021
3810ae1
Clarify comments
MadLittleMods Dec 16, 2021
df2a152
Fix NPE when trying to grab event from wrong roomId (fix sytest)
MadLittleMods Dec 16, 2021
cc4eb72
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Jan 14, 2022
47590bb
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Feb 4, 2022
a38befa
Some review optimizations
MadLittleMods Feb 4, 2022
033360a
Fix lints
MadLittleMods Feb 4, 2022
3f22e42
Fix unused lint
MadLittleMods Feb 4, 2022
e5670ff
Fix lints
MadLittleMods Feb 4, 2022
023bd3e
Don't run MSC2716 complement tests for everyone
MadLittleMods Feb 7, 2022
b3fcffb
Use same txn iteration optimization
MadLittleMods Feb 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11114.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers.
2 changes: 1 addition & 1 deletion scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
fi

# Run the tests!
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/main_test.go ./tests/msc2716_test.go
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 2 additions & 1 deletion synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,12 @@ async def _maybe_backfill_inner(
]

logger.debug(
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s",
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s",
room_id,
current_depth,
limit,
max_depth,
len(sorted_extremeties_tuple),
sorted_extremeties_tuple,
filtered_sorted_extremeties_tuple,
)
Expand Down
13 changes: 11 additions & 2 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,11 @@ async def backfill(
f"room {ev.room_id}, when we were backfilling in {room_id}"
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

await self._process_pulled_events(dest, events, backfilled=True)
await self._process_pulled_events(
dest,
events,
backfilled=True,
)

async def _get_missing_events_for_pdu(
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
Expand Down Expand Up @@ -1229,7 +1233,12 @@ def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
return event, context

events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
await self.persist_events_and_notify(room_id, tuple(events_to_persist))
await self.persist_events_and_notify(
room_id,
tuple(events_to_persist),
# Events we fetch during backfill should be marked as backfilled as well
backfilled=True,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

async def _check_event_auth(
self,
Expand Down
6 changes: 4 additions & 2 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,8 +955,10 @@ async def create_new_client_event(
# event and then try to auth it (which fails with a somewhat confusing "No
# create event in auth events")
assert (
builder.type == EventTypes.Create or len(prev_event_ids) > 0
), "Attempting to create an event with no prev_events"
builder.type == EventTypes.Create
or len(prev_event_ids) > 0
or len(auth_event_ids) > 0
), "Attempting to create an event with no prev_events or auth_event_ids"
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

event = await builder.build(
prev_event_ids=prev_event_ids,
Expand Down
12 changes: 4 additions & 8 deletions synapse/handlers/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
logger = logging.getLogger(__name__)


def generate_fake_event_id() -> str:
return "$fake_" + random_string(43)


class RoomBatchHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
Expand Down Expand Up @@ -184,7 +180,7 @@ async def persist_state_events_at_start(

# Make the state events float off on their own so we don't have a
# bunch of `@mxid joined the room` noise between each batch
prev_event_id_for_state_chain = generate_fake_event_id()
prev_event_ids_for_state_chain: List[str] = []
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using fake prev_events to make the historical events float, I've moved to just making an event with an empty array of prev_events. Which also stops the backfill mechanism from getting clogged with unresolvable backward extremities, see #11091

This is to allow prev_event_ids = [] if it has auth_event_ids present

-- #11114 (comment)


for state_event in state_events_at_start:
assert_params_in_dict(
Expand Down Expand Up @@ -221,7 +217,7 @@ async def persist_state_events_at_start(
action=membership,
content=event_dict["content"],
outlier=True,
prev_event_ids=[prev_event_id_for_state_chain],
prev_event_ids=prev_event_ids_for_state_chain,
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
Expand All @@ -240,7 +236,7 @@ async def persist_state_events_at_start(
),
event_dict,
outlier=True,
prev_event_ids=[prev_event_id_for_state_chain],
prev_event_ids=prev_event_ids_for_state_chain,
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
Expand All @@ -251,7 +247,7 @@ async def persist_state_events_at_start(
state_event_ids_at_start.append(event_id)
auth_event_ids.append(event_id)
# Connect all the state in a floating chain
prev_event_id_for_state_chain = event_id
prev_event_ids_for_state_chain = [event_id]

return state_event_ids_at_start

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ async def update_membership_locked(
if block_invite:
raise SynapseError(403, "Invites have been disabled on this server")

if prev_event_ids:
if prev_event_ids is not None:
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
return await self._local_membership_update(
requester=requester,
target=target,
Expand Down
92 changes: 64 additions & 28 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import itertools
import logging
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
from typing import Collection, Dict, Iterable, List, Optional, Set, OrderedDict, Tuple

from prometheus_client import Counter, Gauge

from synapse.api.constants import MAX_DEPTH
from synapse.api.constants import MAX_DEPTH, EventTypes
from synapse.api.errors import StoreError
from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
Expand Down Expand Up @@ -1007,26 +1007,29 @@ async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
def _get_backfill_events(self, txn, room_id, event_list, limit):
logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)

event_results = set()
event_results = OrderedDict()
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# We want to make sure that we do a breadth-first, "depth" ordered
# search.

# Look for the prev_event_id connected to the given event_id
query = """
SELECT depth, prev_event_id FROM event_edges
/* Get the depth of the prev_event_id from the events table */
connected_prev_event_query = """
SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
/* Get the depth and stream_ordering of the prev_event_id from the events table */
INNER JOIN events
ON prev_event_id = events.event_id
/* Find an event which matches the given event_id */
/* Look for an edge which matches the given event_id */
WHERE event_edges.event_id = ?
AND event_edges.is_state = ?
/* Because we can have many events at the same depth,
* we want to also tie-break and sort on stream_ordering */
ORDER BY depth DESC, stream_ordering DESC
LIMIT ?
"""

# Look for the "insertion" events connected to the given event_id
connected_insertion_event_query = """
SELECT e.depth, i.event_id FROM insertion_event_edges AS i
SELECT e.depth, e.stream_ordering, i.event_id, e.type FROM insertion_event_edges AS i
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS e USING (event_id)
/* Find an insertion event which points via prev_events to the given event_id */
Expand All @@ -1036,7 +1039,7 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):

# Find any batch connections of a given insertion event
batch_connection_query = """
SELECT e.depth, c.event_id FROM insertion_events AS i
SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
/* Find the batch that connects to the given insertion event */
INNER JOIN batch_events AS c
ON i.next_batch_id = c.batch_id
Expand All @@ -1055,35 +1058,45 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
queue = PriorityQueue()
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

for event_id in event_list:
depth = self.db_pool.simple_select_one_onecol_txn(
event_lookup_result = self.db_pool.simple_select_one_txn(
txn,
table="events",
keyvalues={"event_id": event_id, "room_id": room_id},
retcol="depth",
retcols=(
"type",
"depth",
"stream_ordering",
),
allow_none=True,
)

if depth:
queue.put((-depth, event_id))
if event_lookup_result["depth"]:
queue.put(
(
-event_lookup_result["depth"],
-event_lookup_result["stream_ordering"],
event_id,
event_lookup_result["type"],
)
)

while not queue.empty() and len(event_results) < limit:
try:
_, event_id = queue.get_nowait()
_, _, event_id, event_type = queue.get_nowait()
except Empty:
break

if event_id in event_results:
continue

event_results.add(event_id)
event_results[event_id] = event_id

# Try and find any potential historical batches of message history.
#
# First we look for an insertion event connected to the current
# event (by prev_event). If we find any, we need to go and try to
# find any batch events connected to the insertion event (by
# batch_id). If we find any, we'll add them to the queue and
# navigate up the DAG like normal in the next iteration of the loop.
# event (by prev_event). If we find any, we'll add them to the queue
# and navigate up the DAG like normal in the next iteration of the
# loop.
txn.execute(
connected_insertion_event_query, (event_id, limit - len(event_results))
)
Expand All @@ -1094,34 +1107,57 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
)
for row in connected_insertion_event_id_results:
connected_insertion_event_depth = row[0]
connected_insertion_event = row[1]
queue.put((-connected_insertion_event_depth, connected_insertion_event))
connected_insertion_event_stream_ordering = row[1]
connected_insertion_event_id = row[2]
connected_insertion_event_type = row[3]
if connected_insertion_event_id not in event_results:
queue.put(
(
-connected_insertion_event_depth,
-connected_insertion_event_stream_ordering,
connected_insertion_event_id,
connected_insertion_event_type,
)
)

# Second, we need to go and try to find any batch events connected
# to a given insertion event (by batch_id). If we find any, we'll
# add them to the queue and navigate up the DAG like normal in the
# next iteration of the loop.
if event_type == EventTypes.MSC2716_INSERTION:
# Find any batch connections for the given insertion event
txn.execute(
batch_connection_query,
(connected_insertion_event, limit - len(event_results)),
(event_id, limit - len(event_results)),
)
batch_start_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: batch_start_event_id_results %s",
batch_start_event_id_results,
)
for row in batch_start_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
if row[2] not in event_results:
queue.put((-row[0], -row[1], row[2], row[3]))
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

txn.execute(query, (event_id, False, limit - len(event_results)))
txn.execute(
connected_prev_event_query,
(event_id, False, limit - len(event_results)),
)
prev_event_id_results = txn.fetchall()
logger.debug(
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
)

# TODO: We should probably skip adding the event itself if we
# branched off onto the insertion event first above. Need to make this a
# bit smart so it doesn't skip over the event altogether if we're at
# the end of the historical messages.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

for row in prev_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
if row[2] not in event_results:
queue.put((-row[0], -row[1], row[2], row[3]))

return event_results
return event_results.values()

async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
ids = await self.db_pool.runInteraction(
Expand Down
15 changes: 12 additions & 3 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2139,16 +2139,21 @@ def _update_backward_extremeties(self, txn, events):
" SELECT 1 FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
" )"
# 1. Don't add an event as a extremity again if we already persisted it
# as a non-outlier.
# 2. Don't add an outlier as an extremity if it has no prev_events
" AND NOT EXISTS ("
" SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
" AND outlier = ?"
" SELECT 1 FROM events"
" LEFT JOIN event_edges edge"
" ON edge.event_id = events.event_id"
" WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
" )"
)

txn.execute_batch(
query,
[
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
for ev in events
for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
Expand All @@ -2167,6 +2172,10 @@ def _update_backward_extremeties(self, txn, events):
(ev.event_id, ev.room_id)
for ev in events
if not ev.internal_metadata.is_outlier()
# If we encountered an event with no prev_events, then we might
# as well remove it now because it won't ever have anything else
# to backfill from.
or len(ev.prev_event_ids()) == 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the floating historical state are outliers, we want to still try to remove them if we know they don't have any prev_events to backfill from.

],
)

Expand Down