Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Yield during large v2 state res. #7735

Merged
merged 5 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7735.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix large state resolutions from stalling Synapse for seconds at a time.
1 change: 1 addition & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ async def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False) -> None:

room_version = await self.store.get_room_version_id(room_id)
state_map = await resolve_events_with_store(
self.clock,
room_id,
room_version,
state_maps,
Expand Down
6 changes: 5 additions & 1 deletion synapse/state/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from synapse.state import v1, v2
from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
from synapse.types import StateMap
from synapse.util import Clock
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func
Expand Down Expand Up @@ -414,6 +415,7 @@ def resolve_events(self, room_version, state_sets, event):

with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
self.clock,
event.room_id,
room_version,
state_set_ids,
Expand Down Expand Up @@ -516,6 +518,7 @@ def resolve_state_groups(
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_store(
self.clock,
room_id,
room_version,
list(state_groups_ids.values()),
Expand Down Expand Up @@ -589,6 +592,7 @@ def _make_state_cache_entry(new_state, state_groups_ids):


def resolve_events_with_store(
clock: Clock,
room_id: str,
room_version: str,
state_sets: List[StateMap[str]],
Expand Down Expand Up @@ -625,7 +629,7 @@ def resolve_events_with_store(
)
else:
return v2.resolve_events_with_store(
room_id, room_version, state_sets, event_map, state_res_store
clock, room_id, room_version, state_sets, event_map, state_res_store
)


Expand Down
56 changes: 46 additions & 10 deletions synapse/state/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.types import StateMap
from synapse.util import Clock

logger = logging.getLogger(__name__)


# We want to yield to the reactor occasionally during state res when dealing
# with large data sets, so that we don't exhaust the reactor. This is done by
# yielding to reactor during loops every N iterations.
_YIELD_AFTER_ITERATIONS = 100


@defer.inlineCallbacks
def resolve_events_with_store(
clock: Clock,
room_id: str,
room_version: str,
state_sets: List[StateMap[str]],
Expand All @@ -42,13 +50,11 @@ def resolve_events_with_store(
"""Resolves the state using the v2 state resolution algorithm

Args:
clock
room_id: the room we are working in

room_version: The room version

state_sets: List of dicts of (type, state_key) -> event_id,
which are the different state groups to resolve.

event_map:
a dict from event_id to event, for any events that we happen to
have in flight (eg, those currently being persisted). This will be
Expand Down Expand Up @@ -113,7 +119,7 @@ def resolve_events_with_store(
)

sorted_power_events = yield _reverse_topological_power_sort(
room_id, power_events, event_map, state_res_store, full_conflicted_set
clock, room_id, power_events, event_map, state_res_store, full_conflicted_set
)

logger.debug("sorted %d power events", len(sorted_power_events))
Expand Down Expand Up @@ -142,7 +148,7 @@ def resolve_events_with_store(

pl = resolved_state.get((EventTypes.PowerLevels, ""), None)
leftover_events = yield _mainline_sort(
room_id, leftover_events, pl, event_map, state_res_store
clock, room_id, leftover_events, pl, event_map, state_res_store
)

logger.debug("resolving remaining events")
Expand Down Expand Up @@ -317,12 +323,13 @@ def _add_event_and_auth_chain_to_graph(

@defer.inlineCallbacks
def _reverse_topological_power_sort(
room_id, event_ids, event_map, state_res_store, auth_diff
clock, room_id, event_ids, event_map, state_res_store, auth_diff
):
"""Returns a list of the event_ids sorted by reverse topological ordering,
and then by power level and origin_server_ts

Args:
clock (Clock)
room_id (str): the room we are working in
event_ids (list[str]): The events to sort
event_map (dict[str,FrozenEvent])
Expand All @@ -334,18 +341,28 @@ def _reverse_topological_power_sort(
"""

graph = {}
for event_id in event_ids:
for idx, event_id in enumerate(event_ids):
yield _add_event_and_auth_chain_to_graph(
graph, room_id, event_id, event_map, state_res_store, auth_diff
)

# We yield occasionally when we're working with large data sets to
# ensure that we don't block the reactor loop for too long.
if idx != 0 and idx % _YIELD_AFTER_ITERATIONS:
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
yield clock.sleep(0)

event_to_pl = {}
for event_id in graph:
for idx, event_id in enumerate(graph):
pl = yield _get_power_level_for_sender(
room_id, event_id, event_map, state_res_store
)
event_to_pl[event_id] = pl

# We yield occasionally when we're working with large data sets to
# ensure that we don't block the reactor loop for too long.
if idx != 0 and idx % _YIELD_AFTER_ITERATIONS:
yield clock.sleep(0)

def _get_power_order(event_id):
ev = event_map[event_id]
pl = event_to_pl[event_id]
Expand Down Expand Up @@ -423,12 +440,13 @@ def _iterative_auth_checks(

@defer.inlineCallbacks
def _mainline_sort(
room_id, event_ids, resolved_power_event_id, event_map, state_res_store
clock, room_id, event_ids, resolved_power_event_id, event_map, state_res_store
):
"""Returns a sorted list of event_ids sorted by mainline ordering based on
the given event resolved_power_event_id

Args:
clock (Clock)
room_id (str): room we're working in
event_ids (list[str]): Events to sort
resolved_power_event_id (str): The final resolved power level event ID
Expand All @@ -438,8 +456,14 @@ def _mainline_sort(
Returns:
Deferred[list[str]]: The sorted list
"""
if not event_ids:
# It's possible for there to be no event IDs here to sort, so we can
# skip calculating the mainline in that case.
return []

mainline = []
pl = resolved_power_event_id
idx = 0
while pl:
mainline.append(pl)
pl_ev = yield _get_event(room_id, pl, event_map, state_res_store)
Expand All @@ -453,17 +477,29 @@ def _mainline_sort(
pl = aid
break

# We yield occasionally when we're working with large data sets to
# ensure that we don't block the reactor loop for too long.
if idx != 0 and idx % _YIELD_AFTER_ITERATIONS:
yield clock.sleep(0)

idx += 1

mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))}

event_ids = list(event_ids)

order_map = {}
for ev_id in event_ids:
for idx, ev_id in enumerate(event_ids):
depth = yield _get_mainline_depth_for_event(
event_map[ev_id], mainline_map, event_map, state_res_store
)
order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id)

# We yield occasionally when we're working with large data sets to
# ensure that we don't block the reactor loop for too long.
if idx != 0 and idx % _YIELD_AFTER_ITERATIONS:
yield clock.sleep(0)

event_ids.sort(key=lambda ev_id: order_map[ev_id])

return event_ids
Expand Down
9 changes: 9 additions & 0 deletions tests/state/test_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import attr

from twisted.internet import defer

from synapse.api.constants import EventTypes, JoinRules, Membership
from synapse.api.room_versions import RoomVersions
from synapse.event_auth import auth_types_for_event
Expand All @@ -41,6 +43,11 @@
ORIGIN_SERVER_TS = 0


class FakeClock:
def sleep(self, msec):
return defer.succeed(None)


class FakeEvent(object):
"""A fake event we use as a convenience.

Expand Down Expand Up @@ -417,6 +424,7 @@ def do_check(self, events, edges, expected_state_ids):
state_before = dict(state_at_event[prev_events[0]])
else:
state_d = resolve_events_with_store(
FakeClock(),
ROOM_ID,
RoomVersions.V2.identifier,
[state_at_event[n] for n in prev_events],
Expand Down Expand Up @@ -565,6 +573,7 @@ def test_event_map_none(self):
# Test that we correctly handle passing `None` as the event_map

state_d = resolve_events_with_store(
FakeClock(),
ROOM_ID,
RoomVersions.V2.identifier,
[self.state_at_bob, self.state_at_charlie],
Expand Down