Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix sync bug when accepting invites
Browse files Browse the repository at this point in the history
Hopefully this time we really will fix #4422.

We need to make sure that the cache on
`get_rooms_for_user_with_stream_ordering` is invalidated *before* the
SyncHandler is notified for the new events, and we can now do so reliably via
the `events` stream.
  • Loading branch information
richvdh committed Mar 28, 2019
1 parent 902cdc6 commit dcde05d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 16 deletions.
29 changes: 20 additions & 9 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
import logging

from synapse.api.constants import EventTypes
from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.replication.tcp.streams.events import (
EventsStreamCurrentStateRow,
EventsStreamEventRow,
)
from synapse.storage.event_federation import EventFederationWorkerStore
from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.events_worker import EventsWorkerStore
Expand Down Expand Up @@ -80,14 +83,7 @@ def process_replication_rows(self, stream_name, token, rows):
if stream_name == "events":
self._stream_id_gen.advance(token)
for row in rows:
if row.type != EventsStreamEventRow.TypeId:
continue
data = row.data
self.invalidate_caches_for_event(
token, data.event_id, data.room_id, data.type, data.state_key,
data.redacts,
backfilled=False,
)
self._process_event_stream_row(token, row)
elif stream_name == "backfill":
self._backfill_id_gen.advance(-token)
for row in rows:
Expand All @@ -100,6 +96,21 @@ def process_replication_rows(self, stream_name, token, rows):
stream_name, token, rows
)

def _process_event_stream_row(self, token, row):
data = row.data

if row.type == EventsStreamEventRow.TypeId:
self.invalidate_caches_for_event(
token, data.event_id, data.room_id, data.type, data.state_key,
data.redacts,
backfilled=False,
)
elif row.type == EventsStreamCurrentStateRow.TypeId:
if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key, ),
)

def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
etype, state_key, redacts, backfilled):
self._invalidate_get_event_cache(event_id)
Expand Down
5 changes: 0 additions & 5 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1355,11 +1355,6 @@ def _invalidate_state_caches(self, room_id, members_changed):
members_changed (iterable[str]): The user_ids of members that have
changed
"""
for member in members_changed:
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", (member,),
)

for host in set(get_domain_from_id(u) for u in members_changed):
self._attempt_to_invalidate_cache(
"is_host_joined", (room_id, host,),
Expand Down
6 changes: 6 additions & 0 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,12 @@ def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
if ev_type == EventTypes.Member
)

for member in members_changed:
txn.call_after(
self.get_rooms_for_user_with_stream_ordering.invalidate,
(member,),
)

self._invalidate_state_caches_and_stream(txn, room_id, members_changed)

def _update_forward_extremities_txn(
Expand Down
22 changes: 20 additions & 2 deletions tests/replication/slave/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,35 @@ def test_push_actions_for_user(self):
{"highlight_count": 1, "notify_count": 2},
)

def test_get_rooms_for_user_with_stream_ordering(self):
"""Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
by rows in the events stream
"""
self.persist(type="m.room.create", key="", creator=USER_ID)
self.persist(type="m.room.member", key=USER_ID, membership="join")
self.replicate()
self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())

j2 = self.persist(
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
)
self.replicate()
self.check(
"get_rooms_for_user_with_stream_ordering",
(USER_ID_2,),
{(ROOM_ID, j2.internal_metadata.stream_ordering)},
)

event_id = 0

def persist(
self,
sender=USER_ID,
room_id=ROOM_ID,
type={},
type="m.room.message",
key=None,
internal={},
state=None,
reset_state=False,
backfill=False,
depth=None,
prev_events=[],
Expand Down

0 comments on commit dcde05d

Please sign in to comment.