From 0e0085c9a7d2509a81a72e9427a29457d397272d Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 17 Sep 2021 12:21:53 +0100 Subject: [PATCH] STASH --- synapse/storage/databases/state/store.py | 45 ++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index f839c0c24f1c..c3181e4a3952 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -24,6 +24,7 @@ from synapse.storage.types import Cursor from synapse.storage.util.sequence import build_sequence_generator from synapse.types import MutableStateMap, StateMap +from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached from synapse.util.caches.dictionary_cache import DictionaryCache @@ -91,6 +92,12 @@ def __init__(self, database: DatabasePool, db_conn, hs): 500000, ) + # Current ongoing get_state_for_groups in-flight requests + # {group ID -> {StateFilter -> ObservableDeferred}} + self._state_group_inflight_requests: Dict[ + int, Dict[StateFilter, ObservableDeferred[StateMap[str]]] + ] = {} + def get_max_state_group_txn(txn: Cursor): txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups") return txn.fetchone()[0] @@ -249,6 +256,44 @@ async def _get_state_for_groups( if not incomplete_groups: return state + # try and rely on in-flight requests to complete this request without + # needing to spawn additional queries. + + # (group ID, ObservableDeferred of request result) + reusable_inflight_requests: List[ + Tuple[int, ObservableDeferred[StateMap[str]]] + ] = [] + # group ID -> left over StateFilter to request + requests_to_spawn: Dict[int, StateFilter] = {} + + for group in incomplete_groups: + requests_in_flight_for_group = self._state_group_inflight_requests.get( + group + ) + if requests_in_flight_for_group is None: + continue + + state_filter_left_over = state_filter + for ( + request_state_filter, + request_deferred, + ) in requests_in_flight_for_group.items(): + new_state_filter_left_over = state_filter_left_over.approx_difference( + request_state_filter + ) + if new_state_filter_left_over != state_filter_left_over: + # reusing this request narrows our StateFilter down a bit. + reusable_inflight_requests.append((group, request_deferred)) + state_filter_left_over = new_state_filter_left_over + if state_filter_left_over == StateFilter.none(): + # we have managed to collect enough of the in-flight requests + # to cover our StateFilter and give us the state we need. + break + else: + # we have some of the state filter left over, so need to spawn + # a request + requests_to_spawn[group] = state_filter_left_over + cache_sequence_nm = self._state_group_cache.sequence cache_sequence_m = self._state_group_members_cache.sequence