Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
STASH
Browse files Browse the repository at this point in the history
  • Loading branch information
reivilibre committed Sep 17, 2021
1 parent a5fdd46 commit 0e0085c
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions synapse/storage/databases/state/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from synapse.storage.types import Cursor
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import MutableStateMap, StateMap
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.descriptors import cached
from synapse.util.caches.dictionary_cache import DictionaryCache

Expand Down Expand Up @@ -91,6 +92,12 @@ def __init__(self, database: DatabasePool, db_conn, hs):
500000,
)

# Current ongoing get_state_for_groups in-flight requests
# {group ID -> {StateFilter -> ObservableDeferred}}
self._state_group_inflight_requests: Dict[
int, Dict[StateFilter, ObservableDeferred[StateMap[str]]]
] = {}

def get_max_state_group_txn(txn: Cursor):
txn.execute("SELECT COALESCE(max(id), 0) FROM state_groups")
return txn.fetchone()[0]
Expand Down Expand Up @@ -249,6 +256,44 @@ async def _get_state_for_groups(
if not incomplete_groups:
return state

# try and rely on in-flight requests to complete this request without
# needing to spawn additional queries.

# (group ID, ObservableDeferred of request result)
reusable_inflight_requests: List[
Tuple[int, ObservableDeferred[StateMap[str]]]
] = []
# group ID -> left over StateFilter to request
requests_to_spawn: Dict[int, StateFilter] = {}

for group in incomplete_groups:
requests_in_flight_for_group = self._state_group_inflight_requests.get(
group
)
if requests_in_flight_for_group is None:
continue

state_filter_left_over = state_filter
for (
request_state_filter,
request_deferred,
) in requests_in_flight_for_group.items():
new_state_filter_left_over = state_filter_left_over.approx_difference(
request_state_filter
)
if new_state_filter_left_over != state_filter_left_over:
# reusing this request narrows our StateFilter down a bit.
reusable_inflight_requests.append((group, request_deferred))
state_filter_left_over = new_state_filter_left_over
if state_filter_left_over == StateFilter.none():
# we have managed to collect enough of the in-flight requests
# to cover our StateFilter and give us the state we need.
break
else:
# we have some of the state filter left over, so need to spawn
# a request
requests_to_spawn[group] = state_filter_left_over

cache_sequence_nm = self._state_group_cache.sequence
cache_sequence_m = self._state_group_members_cache.sequence

Expand Down

0 comments on commit 0e0085c

Please sign in to comment.