Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Speed up persisting large number of outliers #16649

Merged
merged 2 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16649.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up persisting large number of outliers.
18 changes: 7 additions & 11 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter, partition
from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr

Expand Down Expand Up @@ -1669,14 +1669,13 @@ async def _auth_and_persist_outliers(

# XXX: it might be possible to kick this process off in parallel with fetching
# the events.
while event_map:
# build a list of events whose auth events are not in the queue.
roots = tuple(
ev
for ev in event_map.values()
if not any(aid in event_map for aid in ev.auth_event_ids())
)

# We need to persist an event's auth events before the event.
auth_graph = {
ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map]
clokep marked this conversation as resolved.
Show resolved Hide resolved
for ev in event_map.values()
}
for roots in sorted_topologically_batched(event_map.values(), auth_graph):
if not roots:
# if *none* of the remaining events are ready, that means
# we have a loop. This either means a bug in our logic, or that
Expand All @@ -1698,9 +1697,6 @@ async def _auth_and_persist_outliers(

await self._auth_and_persist_outliers_inner(room_id, roots)

for ev in roots:
del event_map[ev.event_id]

async def _auth_and_persist_outliers_inner(
self, room_id: str, fetched_events: Collection[EventBase]
) -> None:
Expand Down
51 changes: 51 additions & 0 deletions synapse/util/iterutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,54 @@ def sorted_topologically(
degree_map[edge] -= 1
if degree_map[edge] == 0:
heapq.heappush(zero_degree, edge)


def sorted_topologically_batched(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think sorted_topologically could become

def sorted_topologically(nodes, graph):
	for batch in sorted_topologically_batched(nodes, graph):
        yield from batch

It would probably create a bunch of intermediate lists and such though, which is unnecessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ish. I think sorted_topologically gives a lexicographical topological sort, which could return a slightly different order (though I don't think we really rely on that fact anywhere, so)

nodes: Iterable[T],
graph: Mapping[T, Collection[T]],
) -> Generator[Collection[T], None, None]:
r"""Walk the graph topologically, returning batches of nodes where all nodes
that references it have been previously returned.

For example, given the following graph:

A
/ \
B C
\ /
D

This function will return: `[[A], [B, C], [D]]`.

This function is useful for e.g. batch persisting events in an auth chain,
where we can only persist an event if all its auth events have already been
persisted.
"""

degree_map = {node: 0 for node in nodes}
reverse_graph: Dict[T, Set[T]] = {}

for node, edges in graph.items():
if node not in degree_map:
continue

for edge in set(edges):
if edge in degree_map:
degree_map[node] += 1

reverse_graph.setdefault(edge, set()).add(node)
reverse_graph.setdefault(node, set())

zero_degree = [node for node, degree in degree_map.items() if degree == 0]

while zero_degree:
new_zero_degree = []
for node in zero_degree:
for edge in reverse_graph.get(node, []):
if edge in degree_map:
degree_map[edge] -= 1
if degree_map[edge] == 0:
new_zero_degree.append(edge)

yield zero_degree
zero_degree = new_zero_degree
76 changes: 75 additions & 1 deletion tests/util/test_itertools.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
# limitations under the License.
from typing import Dict, Iterable, List, Sequence

from synapse.util.iterutils import chunk_seq, sorted_topologically
from synapse.util.iterutils import (
chunk_seq,
sorted_topologically,
sorted_topologically_batched,
)

from tests.unittest import TestCase

Expand Down Expand Up @@ -107,3 +111,73 @@ def test_multiple_paths(self) -> None:
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}

self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])


class SortTopologicallyBatched(TestCase):
"Test cases for `sorted_topologically_batched`"

def test_empty(self) -> None:
"Test that an empty graph works correctly"

graph: Dict[int, List[int]] = {}
self.assertEqual(list(sorted_topologically_batched([], graph)), [])

def test_handle_empty_graph(self) -> None:
"Test that a graph where a node doesn't have an entry is treated as empty"

graph: Dict[int, List[int]] = {}

# For disconnected nodes the output is simply sorted.
self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])

def test_disconnected(self) -> None:
"Test that a graph with no edges work"

graph: Dict[int, List[int]] = {1: [], 2: []}

# For disconnected nodes the output is simply sorted.
self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]])

def test_linear(self) -> None:
"Test that a simple `4 -> 3 -> 2 -> 1` graph works"

graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}

self.assertEqual(
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
[[1], [2], [3], [4]],
)

def test_subset(self) -> None:
"Test that only sorting a subset of the graph works"
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}

self.assertEqual(list(sorted_topologically_batched([4, 3], graph)), [[3], [4]])

def test_fork(self) -> None:
"Test that a forked graph works"
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [1], 4: [2, 3]}

# Valid orderings are `[1, 3, 2, 4]` or `[1, 2, 3, 4]`, but we should
# always get the same one.
self.assertEqual(
list(sorted_topologically_batched([4, 3, 2, 1], graph)), [[1], [2, 3], [4]]
)

def test_duplicates(self) -> None:
"Test that a graph with duplicate edges work"
graph: Dict[int, List[int]] = {1: [], 2: [1, 1], 3: [2, 2], 4: [3]}

self.assertEqual(
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
[[1], [2], [3], [4]],
)

def test_multiple_paths(self) -> None:
"Test that a graph with multiple paths between two nodes work"
graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}

self.assertEqual(
list(sorted_topologically_batched([4, 3, 2, 1], graph)),
[[1], [2], [3], [4]],
)
Loading