Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix historical messages backfilling in random order on remote homeservers (MSC2716) #11114

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
f30302d
Scratch debugging why events appear out of order on remote homeservers
MadLittleMods Oct 18, 2021
438e222
Use OrderedDict to gurantee order returned is the same as we were bui…
MadLittleMods Oct 18, 2021
4983739
Avoid constant missing prev_event fetching while backfilling
MadLittleMods Oct 19, 2021
a64bb2e
Add changelog
MadLittleMods Oct 19, 2021
260ca06
Some more trials of trying to get many many events to backfill in ord…
MadLittleMods Oct 19, 2021
886071b
Fix backfill not picking up batch events connected to non-base insert…
MadLittleMods Oct 20, 2021
477c15d
Some more debug logging
MadLittleMods Oct 21, 2021
4191f56
Remove fake prev events from historical state chain
MadLittleMods Oct 21, 2021
f39c1da
Remove debug logging
MadLittleMods Oct 21, 2021
7da8012
Remove extra event info
MadLittleMods Oct 21, 2021
69dfa16
Move to sorting the backfill events in the existing sorted
MadLittleMods Oct 21, 2021
83474d9
Put MSC2716 backfill logic behind experimental feature flag
MadLittleMods Oct 21, 2021
1263c7e
Remove unused import
MadLittleMods Oct 21, 2021
ee47878
Fix mypy lints
MadLittleMods Oct 21, 2021
5bfde7b
Merge branch 'master' into madlittlemods/return-historical-events-in-…
MadLittleMods Oct 21, 2021
2fbe3f1
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Oct 21, 2021
1d3f417
Revert back to string interpolation for SQL boolean value
MadLittleMods Oct 21, 2021
4a12304
Put empty prev_events behind new room version
MadLittleMods Oct 28, 2021
9a6d8fa
WIP: Don't include the event we branch from
MadLittleMods Oct 29, 2021
3e09d49
Revert "WIP: Don't include the event we branch from"
MadLittleMods Oct 29, 2021
5afc264
WIP: Sort events topologically when we receive them over backfill
MadLittleMods Oct 29, 2021
6ea263b
Revert "WIP: Sort events topologically when we receive them over back…
MadLittleMods Oct 29, 2021
3d387f9
WIP: Sort events topologically when we receive them over backfill
MadLittleMods Oct 29, 2021
fb8e281
Fix direction of fake edges
MadLittleMods Oct 29, 2021
c772b35
Implement backfill in handler so we can do fetching later
MadLittleMods Oct 29, 2021
e0ff66d
Fix backfill being able to cleanly branch into history and back to "l…
MadLittleMods Oct 29, 2021
76d454f
Some backfill receive sorting fixes but not using it yet
MadLittleMods Oct 30, 2021
3529449
Fix lints
MadLittleMods Oct 30, 2021
321f9ea
Move back to the old get_backfill_events and simplify backfill.
MadLittleMods Nov 2, 2021
15c3282
Remove the new backfill implementation and pull some good parts of th…
MadLittleMods Nov 2, 2021
5db717a
Always process marker events regardless if backfilled
MadLittleMods Nov 3, 2021
e96fd5c
Add comment docs
MadLittleMods Nov 3, 2021
f3b7b3e
Add better explanatory comment
MadLittleMods Nov 3, 2021
7f2105a
Remove topological sort when receiving backfill events
MadLittleMods Nov 3, 2021
246278e
Fix lints
MadLittleMods Nov 3, 2021
ec35be5
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Nov 3, 2021
bc0ba8c
Protect from no auth events for non-existent provided prev_event
MadLittleMods Nov 3, 2021
363aed6
Revert unused refactor to get PDU raw
MadLittleMods Nov 3, 2021
d771fbd
Only run the tests package to get streaming Complement output
MadLittleMods Nov 11, 2021
b559e23
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Nov 11, 2021
6b64184
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Dec 9, 2021
1d00043
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Dec 16, 2021
b071426
Plumb allow_no_prev_events through for MSC2716
MadLittleMods Dec 16, 2021
ec33a40
Make the historical events float separately from the state chain
MadLittleMods Dec 16, 2021
b99efa8
Plumb allow_no_prev_events through create_and_send_nonmember_event
MadLittleMods Dec 16, 2021
3810ae1
Clarify comments
MadLittleMods Dec 16, 2021
df2a152
Fix NPE when trying to grab event from wrong roomId (fix sytest)
MadLittleMods Dec 16, 2021
cc4eb72
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Jan 14, 2022
47590bb
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Feb 4, 2022
a38befa
Some review optimizations
MadLittleMods Feb 4, 2022
033360a
Fix lints
MadLittleMods Feb 4, 2022
3f22e42
Fix unused lint
MadLittleMods Feb 4, 2022
e5670ff
Fix lints
MadLittleMods Feb 4, 2022
023bd3e
Don't run MSC2716 complement tests for everyone
MadLittleMods Feb 7, 2022
b3fcffb
Use same txn iteration optimization
MadLittleMods Feb 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11114.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers.
17 changes: 10 additions & 7 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,13 @@ def format_event_for_client_v1(d: JsonDict) -> JsonDict:

def format_event_for_client_v2(d: JsonDict) -> JsonDict:
drop_keys = (
"auth_events",
"prev_events",
"hashes",
"signatures",
"depth",
"origin",
"prev_state",
# "auth_events",
# "prev_events",
# "hashes",
# "signatures",
# "depth",
# "origin",
# "prev_state",
)
for key in drop_keys:
d.pop(key, None)
Expand Down Expand Up @@ -340,6 +340,9 @@ def serialize_event(

d["event_id"] = e.event_id

# TODO: Remove
d["stream_ordering"] = e.internal_metadata.stream_ordering

if "age_ts" in d["unsigned"]:
d["unsigned"]["age"] = time_now_ms - d["unsigned"]["age_ts"]
del d["unsigned"]["age_ts"]
Expand Down
10 changes: 5 additions & 5 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ async def _maybe_backfill_inner(
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
)
logger.debug(
logger.info(
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
oldest_events_with_depth,
insertion_events_to_be_backfilled,
)

if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
logger.debug("Not backfilling as no extremeties found.")
logger.info("Not backfilling as no extremeties found.")
return False

# We only want to paginate if we can actually see the events we'll get,
Expand Down Expand Up @@ -203,7 +203,7 @@ async def _maybe_backfill_inner(
redact=False,
check_history_visibility_only=True,
)
logger.debug(
logger.info(
"_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
)

Expand All @@ -230,7 +230,7 @@ async def _maybe_backfill_inner(
# much larger factor will result in triggering a backfill request much
# earlier than necessary.
if current_depth - 2 * limit > max_depth:
logger.debug(
logger.info(
"Not backfilling as we don't need to. %d < %d - 2 * %d",
max_depth,
current_depth,
Expand All @@ -249,7 +249,7 @@ async def _maybe_backfill_inner(
t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
]

logger.debug(
logger.info(
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s",
room_id,
current_depth,
Expand Down
53 changes: 51 additions & 2 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,18 @@ async def backfill(
events = await self._federation_client.backfill(
Copy link
Contributor Author

@MadLittleMods MadLittleMods Oct 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erikjohnston After experimenting with bigger datasets using the Gitter import script and writing bigger Complement tests, I'm seeing problems with our whole stream_ordering approach to sorting historical messages between depth. depth is the same for all homeservers but stream_ordering is different across homeservers and in Synapse land, just means whenever the event was processed.

With all of the massaging I'm trying to do in this PR to make it work for the basic scrollback case, it won't solve all of the problems. Technically, it shouldn't really matter how /backfill returns things but I'm just trying to make the stream_ordering a little more consistent from the origin to the remote homeservers in order to get the order of messages from /messages consistent (sorted by (topological_ordering, stream_ordering)).

Even if we can backfill messages in order, it still doesn't guarantee the same stream_ordering (and more importantly the /messages order) on the other server. For example, if a room has a bunch of history imported and someone visits a permalink to a historical message back in time, their homeserver will skip over the historical messages in between and insert the permalink as the next message in the stream_order and totally throw off the sort.

This will be even more of an exact use case when we add the MSC3030 jump to date API endpoint so the static archives can navigate and jump to a certain date.

The real solution around event ordering looks related to these issues where we don't rely on depth or stream_ordering and it's just up to the shape of the DAG itself. Any thoughts for making this work?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The real solution around event ordering looks related to these issues where we don't rely on depth or stream_ordering and it's just up to the shape of the DAG itself. Any thoughts for making this work?

Can we sort the events returned by /backfill before persisting them based on the DAG? We have a sorted_topologically function that can be used to sort a set of events somewhere. Or is that not enough?

Copy link
Contributor Author

@MadLittleMods MadLittleMods Oct 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we sort the events returned by /backfill before persisting them based on the DAG? We have a sorted_topologically function that can be used to sort a set of events somewhere. Or is that not enough?

That may help for the specific backfill case I'm sorta optimizing here in this PR where the user is scrolling back in time sequentially.

But it doesn't help the case where you jump back in time via permalink or jump to date endpoint and start the stream_ordering from the middle the historical messages. It's currently important that stream_ordering starts from our newest historical messages at -1 and decrements to our oldest historical messages -939394, etc. If we have the middle historical messages at -1, then the sorting is all off.

That's the most obvious case of it going wrong, but could also go wrong in /backfill if some other homeserver backfills in their own not Synapse way where we try to provide it in perfect order (like an older Synapse or other homeserver implementation).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of doing the sorted_topologically on the receiving side? That way it doesn't matter what order the other server returns them in.

Ideally ideally, we wouldn't use depths and instead use an "online topological sorting" algorithm (e.g. Katriel–Bodlaender algorithm) to ensure that we correctly handle disjoint chunks of the graph becoming connected. But that is fiddly to do and I'm not quite sure how things like pagination tokens would work if the ordering could change.

Copy link
Contributor Author

@MadLittleMods MadLittleMods Oct 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of doing the sorted_topologically on the receiving side? That way it doesn't matter what order the other server returns them in.

I think sorting on both the sending and receiving sides would be good to try to ensure best chance of it working (just for server differences between sending and receiving). But it's still not bullet-proof in the permalink case where stream_ordering becomes out of order.


Ideally ideally, we wouldn't use depths and instead use an "online topological sorting" algorithm (e.g. Katriel–Bodlaender algorithm) to ensure that we correctly handle disjoint chunks of the graph becoming connected. But that is fiddly to do and I'm not quite sure how things like pagination tokens would work if the ordering could change.

Does the chunking you worked on achieve this? #3785 Worth reviving?

Any good references for the Katriel–Bodlaender algorithm? Something more grokable over text PDF 😵 -- maybe how that algorithm works doesn't matter and the aspect I'm curious about is how we store this topological order and how it's updated if we insert an event in the middle. I assume it's more than just depth: 9 and we update every single row before or after when we insert something in the middle?

I see matrix-org/gomatrixserverlib#187 which discusses the "online topological sorting" and additionally the chunking stuff.

I'm not quite sure how things like pagination tokens would work if the ordering could change.

For reference: current pagination tokens, https://github.com/matrix-org/matrix-doc/issues/574

t8-8_0_0_0_0_0_0_0_0 seems to represent a single event already, i.e. the combo of depth and stream_ordering only point to a single event.

Could pagination tokens just become the event ID itself and then you just use the direction to paginate from there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any good references for the Katriel–Bodlaender algorithm? Something more grokable over text PDF dizzy_face -- maybe how that algorithm works doesn't matter and the aspect I'm curious about is how we store this topological order and how it's updated if we insert an event in the middle. I assume it's more than just depth: 9 and we update every single row before or after when we insert something in the middle?

I have not found any helpful references to that algorithm beyond compsci papers. The actual algorithm is relatively fine, the major problem is picking a data structure for the labels that satisfy the necessary properties and is efficient to store in the DB:

The ord labels of the nodes are maintained by an Ordered List data structure ORD,
which is a data structure that allows to maintain a total order over a list of items and to perform
the following operations in constant amortized time [2, 4]: InsertAfter (x, y) (InsertBefore(x, y))
inserts the item x immediately after (before) the item y in the total order, Delete(x) removes the
item x, the query Order (x, y) determines whether x precedes y or y precedes x in the total order
and Next(x) (Prev (x)) returns the item that appears immediately after (before) x in the total
order.

While they suggest just using integers, that is a bit problematic for us as that would require potentially rewriting a lot of rows in the DB. There's no reason that they need to be integers, the previous work used fractions to allow inserting nodes in between existing nodes without renumbering everything (and also operated on chunks rather than individual events, iirc).

Could pagination tokens just become the event ID itself and then you just use the direction to paginate from there?

It would need to be a set of events to handle forks in the graph, annoyingly, as you basically need to serialise the state of a breadth-first search algorithm, iyswim. I.e. if you have a DAG A -> B -> D, A -> C -> D then if you have a token at B you don't know if you then need to still send down C or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be valuable to resurrect the chunking project fwiw, its just a faff without concrete benefits to end users so its just never been prioritised.

Copy link
Contributor Author

@MadLittleMods MadLittleMods Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversation for topological sorting when we receive backfill events continues in #11114 (comment)


For chunking and online topological sorting, we discussed this in an out of band call:

Plan forward:

  1. Go forward with [this PR] so it fixes the scrollback case in federation
  2. For Gitter and the static archives, we can have a script manually scrollback across all of the rooms in the archive server before anyone else or Google spider crawls in some weird way. This way it will lock the sort in place for all of the historical messages
  3. Online topological ordering can happen in the future and by its nature will apply retroactively to fix any inconsistencies introduced by people permalinking

-- meeting summary notes 2021-10-28

dest, room_id, limit=limit, extremities=extremities
)
logger.info(
"from remote server: got backfill response events=%s",
[
{
"event_id": ev.event_id,
"type": ev["type"],
"depth": ev["depth"],
"content": ev["content"].get("body", None),
}
for ev in events
],
)

if not events:
return
Expand All @@ -429,7 +441,39 @@ async def backfill(
f"room {ev.room_id}, when we were backfilling in {room_id}"
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

await self._process_pulled_events(dest, events, backfilled=True)
await self._process_pulled_events(
dest,
# The /backfill response should start from `?v` and include the
# events that preceded it (so the list will be newest -> oldest). We
# reverse that order so the messages are oldest -> newest and we can
# persist the backfilled events without constantly have to go fetch
# missing prev_events which are probably included in the same
# backfill chunk.
# TODO: If we try to reverse this list, the stream_ordering will be backwards
# reversed(events),
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
events,
backfilled=True,
)

for ev in events:
event_after_persisted = await self._store.get_event(
ev.event_id, allow_none=True
)

if event_after_persisted:
logger.info(
"from remote server: processed backfilled event_id=%s type=%s depth=%s stream_ordering=%s content=%s",
ev.event_id,
event_after_persisted["type"],
event_after_persisted["depth"],
event_after_persisted.internal_metadata.stream_ordering,
event_after_persisted["content"].get("body", None),
)
else:
logger.info(
"from remote server: processed backfilled event_id=%s failed to lookup",
ev.event_id,
)

async def _get_missing_events_for_pdu(
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
Expand Down Expand Up @@ -1229,7 +1273,12 @@ def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
return event, context

events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
await self.persist_events_and_notify(room_id, tuple(events_to_persist))
await self.persist_events_and_notify(
room_id,
tuple(events_to_persist),
# TODO: Maybe this to get fetched missing events during backfill as backfill also :/
backfilled=True,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

async def _check_event_auth(
self,
Expand Down
9 changes: 8 additions & 1 deletion synapse/handlers/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,20 @@ async def persist_historical_events(
# Events are sorted by (topological_ordering, stream_ordering)
# where topological_ordering is just depth.
for (event, context) in reversed(events_to_persist):
await self.event_creation_handler.handle_new_client_event(
result_event = await self.event_creation_handler.handle_new_client_event(
await self.create_requester_for_user_id_from_app_service(
event["sender"], app_service_requester.app_service
),
event=event,
context=context,
)
logger.info(
"result_event depth=%s stream_ordering=%s event_id=%s body=%s",
result_event.depth,
result_event.internal_metadata.stream_ordering,
result_event.event_id,
result_event.content.get("body", None),
)

return event_ids

Expand Down
1 change: 1 addition & 0 deletions synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ async def on_GET(
pagination_config = await PaginationConfig.from_request(
self.store, request, default_limit=10
)
logger.info("/messages rest start pagination_config=%s", pagination_config)
# Twisted will have processed the args by now.
assert request.args is not None
as_client_event = b"raw" not in request.args
Expand Down
100 changes: 79 additions & 21 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import itertools
import logging
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
from typing import Collection, Dict, Iterable, List, Optional, Set, OrderedDict, Tuple

from prometheus_client import Counter, Gauge

Expand Down Expand Up @@ -1007,26 +1007,29 @@ async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
def _get_backfill_events(self, txn, room_id, event_list, limit):
logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)

event_results = set()
event_results = OrderedDict()
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# We want to make sure that we do a breadth-first, "depth" ordered
# search.

# Look for the prev_event_id connected to the given event_id
query = """
SELECT depth, prev_event_id FROM event_edges
/* Get the depth of the prev_event_id from the events table */
SELECT depth, stream_ordering, prev_event_id FROM event_edges
/* Get the depth and stream_ordering of the prev_event_id from the events table */
INNER JOIN events
ON prev_event_id = events.event_id
/* Find an event which matches the given event_id */
/* Look for an edge which matches the given event_id */
WHERE event_edges.event_id = ?
AND event_edges.is_state = ?
/* Because we can have many events at the same depth,
* we want to also tie-break and sort on stream_ordering */
ORDER BY depth DESC, stream_ordering DESC
LIMIT ?
"""

# Look for the "insertion" events connected to the given event_id
connected_insertion_event_query = """
SELECT e.depth, i.event_id FROM insertion_event_edges AS i
SELECT e.depth, e.stream_ordering, i.event_id FROM insertion_event_edges AS i
/* Get the depth of the insertion event from the events table */
INNER JOIN events AS e USING (event_id)
/* Find an insertion event which points via prev_events to the given event_id */
Expand All @@ -1036,7 +1039,7 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):

# Find any batch connections of a given insertion event
batch_connection_query = """
SELECT e.depth, c.event_id FROM insertion_events AS i
SELECT e.depth, e.stream_ordering, c.event_id FROM insertion_events AS i
/* Find the batch that connects to the given insertion event */
INNER JOIN batch_events AS c
ON i.next_batch_id = c.batch_id
Expand All @@ -1055,27 +1058,69 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
queue = PriorityQueue()
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

for event_id in event_list:
depth = self.db_pool.simple_select_one_onecol_txn(
event_lookup_result = self.db_pool.simple_select_one_txn(
txn,
table="events",
keyvalues={"event_id": event_id, "room_id": room_id},
retcol="depth",
retcols=(
"depth",
"stream_ordering",
),
allow_none=True,
)

if depth:
queue.put((-depth, event_id))
if event_lookup_result["depth"]:
queue.put(
(
-event_lookup_result["depth"],
-event_lookup_result["stream_ordering"],
event_id,
)
)

while not queue.empty() and len(event_results) < limit:
try:
_, event_id = queue.get_nowait()
_, _, event_id = queue.get_nowait()
except Empty:
break

if event_id in event_results:
continue

event_results.add(event_id)
event_lookup_result = self.db_pool.simple_select_one_txn(
txn,
table="events",
keyvalues={"event_id": event_id},
retcols=["type", "depth", "stream_ordering", "content"],
allow_none=True,
)

event_json_lookup_result = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_json",
keyvalues={"event_id": event_id},
retcol="json",
allow_none=True,
)

ev = db_to_json(event_json_lookup_result)

if event_lookup_result:
logger.info(
"_get_backfill_events: event_results add event_id=%s type=%s depth=%s stream_ordering=%s content=%s",
event_id,
ev["type"],
ev["depth"],
event_lookup_result["stream_ordering"],
ev["content"].get("body", None),
)
else:
logger.info(
"_get_backfill_events: event_results event_id=%s failed to lookup",
event_id,
)

event_results[event_id] = event_id

# Try and find any potential historical batches of message history.
#
Expand All @@ -1094,8 +1139,16 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
)
for row in connected_insertion_event_id_results:
connected_insertion_event_depth = row[0]
connected_insertion_event = row[1]
queue.put((-connected_insertion_event_depth, connected_insertion_event))
connected_insertion_event_stream_ordering = row[1]
connected_insertion_event = row[2]
if connected_insertion_event not in event_results:
queue.put(
(
-connected_insertion_event_depth,
-connected_insertion_event_stream_ordering,
connected_insertion_event,
)
)

# Find any batch connections for the given insertion event
txn.execute(
Expand All @@ -1108,20 +1161,25 @@ def _get_backfill_events(self, txn, room_id, event_list, limit):
batch_start_event_id_results,
)
for row in batch_start_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
if row[2] not in event_results:
queue.put((-row[0], -row[1], row[2]))

txn.execute(query, (event_id, False, limit - len(event_results)))
prev_event_id_results = txn.fetchall()
logger.debug(
logger.info(
"_get_backfill_events: prev_event_ids %s", prev_event_id_results
)

# TODO: We should probably skip adding the event itself if we
# branched off onto the insertion event first above. Need to make this a
# bit smart so it doesn't skip over the event altogether if we're at
# the end of the historical messages.
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

for row in prev_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
if row[2] not in event_results:
queue.put((-row[0], -row[1], row[2]))

return event_results
return event_results.values()

async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
ids = await self.db_pool.runInteraction(
Expand Down
8 changes: 8 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ async def _persist_events_and_state_updates(

async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
logger.info(
"_persist_events_and_state_updates backfilled=%s event_id=%s depth=%s stream_ordering=%s content=%s",
backfilled,
event.event_id,
event.depth,
stream,
event["content"].get("body", None),
)
event.internal_metadata.stream_ordering = stream

await self.db_pool.runInteraction(
Expand Down