Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in sliding sync when using old DB. (default instance_name to "master") #17398

Merged
merged 10 commits into from
Jul 8, 2024
1 change: 1 addition & 0 deletions changelog.d/17398.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint when using an old database.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying out Sliding Sync and jumping on a fix for this!

6 changes: 0 additions & 6 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ def _invalidate_state_caches(
self._attempt_to_invalidate_cache(
"get_user_in_room_with_profile", (room_id, user_id)
)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", (user_id,)
)
self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))

# Purge other caches based on room state.
Expand All @@ -148,9 +145,6 @@ def _invalidate_state_caches_all(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))

Expand Down
10 changes: 0 additions & 10 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,12 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]

if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined]
(data.state_key,)
)
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
elif row.type == EventsStreamAllStateRow.TypeId:
assert isinstance(data, EventsStreamAllStateRow)
# Similar to the above, but the entire caches are invalidated. This is
# unfortunate for the membership caches, but should recover quickly.
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
self.get_rooms_for_user_with_stream_ordering.invalidate_all() # type: ignore[attr-defined]
self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined]
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
Expand Down Expand Up @@ -334,9 +330,6 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache(
"get_invited_rooms_for_local_user", (state_key,)
)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", (state_key,)
)
self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))

self._attempt_to_invalidate_cache(
Expand Down Expand Up @@ -399,9 +392,6 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_thread_id", None)
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,7 @@ def _fetch_event_rows(
event_dict[event_id] = _EventRow(
event_id=event_id,
stream_ordering=row[1],
instance_name=row[2],
instance_name=row[2] or "master",
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
internal_metadata=row[3],
json=row[4],
format_version=row[5],
Expand Down
67 changes: 6 additions & 61 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.roommember import (
GetRoomsForUserWithStreamOrdering,
MemberSummary,
ProfileInfo,
RoomsForUser,
)
from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser
from synapse.types import (
JsonDict,
PersistedEventPosition,
Expand Down Expand Up @@ -494,7 +489,11 @@ def _get_rooms_for_local_user_where_membership_is_txn(
sender=sender,
membership=membership,
event_id=event_id,
event_pos=PersistedEventPosition(instance_name, stream_ordering),
event_pos=PersistedEventPosition(
# If instance_name is null we default to "master"
instance_name or "master",
Copy link
Collaborator

@MadLittleMods MadLittleMods Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have some constant to represent "master" to avoid typos (subtle bugs)

(can iterate in another PR)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The typing from the SQL queries makes this type of thing too easy. Kinda wish we had to duck-type all of it to narrow things down (forced by the linter instead of letting it slide because Any)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no real easy way of annotating the types of the returned rows that aren't error prone, e.g. here we'd probably assume instance_name column was NON NULL.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we just need to use the disallow-any-xxx options in mypy so we can't just assign Any to something that expects a str.

Copy link
Contributor

@reivilibre reivilibre Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have thought about this in the past, the best idea I came up with is: during the trial tests, have Postgres return the type information of the SQL statements (and combine this with an API change on the fetch functions where you tell it what type you're expecting and this is used as the return type). Then it could assert that the Postgres type information matches what is expected. (You'd disable this at real runtime to avoid any performance hits)
Ultimately this is loosely inspired by the sqlx library in Rust (where type information flows out from the database at or ahead of compile-time) but I think for us it'd be too much work for too little benefit now.

stream_ordering,
),
room_version_id=room_version,
)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
Expand Down Expand Up @@ -606,53 +605,6 @@ async def get_local_current_membership_for_user_in_room(

return results

@cached(max_entries=500000, iterable=True)
async def get_rooms_for_user_with_stream_ordering(
self, user_id: str
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
"""Returns a set of room_ids the user is currently joined to.

If a remote user only returns rooms this server is currently
participating in.

Args:
user_id

Returns:
Returns the rooms the user is in currently, along with the stream
ordering of the most recent join for that user and room, along with
the room version of the room.
"""
return await self.db_pool.runInteraction(
"get_rooms_for_user_with_stream_ordering",
self._get_rooms_for_user_with_stream_ordering_txn,
user_id,
)

def _get_rooms_for_user_with_stream_ordering_txn(
self, txn: LoggingTransaction, user_id: str
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
# We use `current_state_events` here and not `local_current_membership`
# as a) this gets called with remote users and b) this only gets called
# for rooms the server is participating in.
sql = """
SELECT room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND c.state_key = ?
AND c.membership = ?
"""

txn.execute(sql, (user_id, Membership.JOIN))
return frozenset(
GetRoomsForUserWithStreamOrdering(
room_id, PersistedEventPosition(instance, stream_id)
)
for room_id, instance, stream_id in txn
)

async def get_users_server_still_shares_room_with(
self, user_ids: Collection[str]
) -> Set[str]:
Expand Down Expand Up @@ -701,13 +653,6 @@ async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]:
If a remote user only returns rooms this server is currently
participating in.
"""
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(
(user_id,),
None,
update_metrics=False,
)
if rooms:
return frozenset(r.room_id for r in rooms)

room_ids = await self.db_pool.simple_select_onecol(
table="current_state_events",
Expand Down
10 changes: 7 additions & 3 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,8 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
# Event
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
event_id=event_id,
event_pos=PersistedEventPosition(
instance_name=instance_name,
# If instance_name is null we default to "master"
instance_name=instance_name or "master",
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
stream=stream_ordering,
),
# When `s.event_id = null`, we won't be able to get respective
Expand All @@ -952,7 +953,8 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
prev_event_id=prev_event_id,
prev_event_pos=(
PersistedEventPosition(
instance_name=prev_instance_name,
# If instance_name is null we default to "master"
instance_name=prev_instance_name or "master",
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
stream=prev_stream_ordering,
)
if (
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -1257,7 +1259,9 @@ def get_last_event_pos_in_room_before_stream_ordering_txn(
stream_ordering=stream_ordering,
):
return event_id, PersistedEventPosition(
instance_name, stream_ordering
# If instance_name is null we default to "master"
instance_name or "master",
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
stream_ordering,
)

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
return None
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
1 change: 0 additions & 1 deletion tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def test_unknown_room_version(self) -> None:
)

# Blow away caches (supported room versions can only change due to a restart).
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store.get_rooms_for_user.invalidate_all()
self.store._get_event_cache.clear()
self.store._event_ref.clear()
Expand Down
124 changes: 1 addition & 123 deletions tests/replication/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,16 @@
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.handlers.room import RoomEventSource
from synapse.server import HomeServer
from synapse.storage.databases.main.event_push_actions import (
NotifCounts,
RoomNotifCounts,
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.roommember import GetRoomsForUserWithStreamOrdering, RoomsForUser
from synapse.storage.roommember import RoomsForUser
from synapse.types import PersistedEventPosition
from synapse.util import Clock

from tests.server import FakeTransport

from ._base import BaseWorkerStoreTestCase

USER_ID = "@feeling:test"
Expand Down Expand Up @@ -221,125 +218,6 @@ def test_push_actions_for_user(self, send_receipt: bool) -> None:
),
)

def test_get_rooms_for_user_with_stream_ordering(self) -> None:
"""Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated
by rows in the events stream
"""
self.persist(type="m.room.create", key="", creator=USER_ID)
self.persist(type="m.room.member", key=USER_ID, membership="join")
self.replicate()
self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())

j2 = self.persist(
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
)
assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None
self.replicate()

expected_pos = PersistedEventPosition(
j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
)
self.check(
"get_rooms_for_user_with_stream_ordering",
(USER_ID_2,),
{GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
)

def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
self,
) -> None:
"""Check that current_state invalidation happens correctly with multiple events
in the persistence batch.

This test attempts to reproduce a race condition between the event persistence
loop and a worker-based Sync handler.

The problem occurred when the master persisted several events in one batch. It
only updates the current_state at the end of each batch, so the obvious thing
to do is then to issue a current_state_delta stream update corresponding to the
last stream_id in the batch.

However, that raises the possibility that a worker will see the replication
notification for a join event before the current_state caches are invalidated.

The test involves:
* creating a join and a message event for a user, and persisting them in the
same batch

* controlling the replication stream so that updates are sent gradually

* between each bunch of replication updates, check that we see a consistent
snapshot of the state.
"""
self.persist(type="m.room.create", key="", creator=USER_ID)
self.persist(type="m.room.member", key=USER_ID, membership="join")
self.replicate()
self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())

# limit the replication rate
repl_transport = self._server_transport
assert isinstance(repl_transport, FakeTransport)
repl_transport.autoflush = False

# build the join and message events and persist them in the same batch.
logger.info("----- build test events ------")
j2, j2ctx = self.build_event(
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
)
msg, msgctx = self.build_event()
self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
self.replicate()
assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None

event_source = RoomEventSource(self.hs)
event_source.store = self.worker_store
current_token = event_source.get_current_key()

# gradually stream out the replication
while repl_transport.buffer:
logger.info("------ flush ------")
repl_transport.flush(30)
self.pump(0)

prev_token = current_token
current_token = event_source.get_current_key()

# attempt to replicate the behaviour of the sync handler.
#
# First, we get a list of the rooms we are joined to
joined_rooms = self.get_success(
self.worker_store.get_rooms_for_user_with_stream_ordering(USER_ID_2)
)

# Then, we get a list of the events since the last sync
membership_changes = self.get_success(
self.worker_store.get_membership_changes_for_user(
USER_ID_2, prev_token, current_token
)
)

logger.info(
"%s->%s: joined_rooms=%r membership_changes=%r",
prev_token,
current_token,
joined_rooms,
membership_changes,
)

# the membership change is only any use to us if the room is in the
# joined_rooms list.
if membership_changes:
expected_pos = PersistedEventPosition(
j2.internal_metadata.instance_name,
j2.internal_metadata.stream_ordering,
)
self.assertEqual(
joined_rooms,
{GetRoomsForUserWithStreamOrdering(ROOM_ID, expected_pos)},
)

event_id = 0

def persist(self, backfill: bool = False, **kwargs: Any) -> EventBase:
Expand Down
Loading