Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add is_encrypted filtering to Sliding Sync /sync #17281

Merged
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
76ce7a9
Add `is_dm` filtering to Sliding Sync `/sync`
MadLittleMods Jun 6, 2024
360f05c
Move changelog
MadLittleMods Jun 6, 2024
d8e2b1d
Add docstring
MadLittleMods Jun 6, 2024
dd43938
Reference actual filter code
MadLittleMods Jun 6, 2024
88fe201
Condense true/false tests
MadLittleMods Jun 6, 2024
44088bd
Add `is_encrypted` filtering to Sliding Sync `/sync`
MadLittleMods Jun 6, 2024
4412dbd
Update changelog number
MadLittleMods Jun 6, 2024
35b18be
Fix lints
MadLittleMods Jun 6, 2024
61f86e0
Add future todo
MadLittleMods Jun 10, 2024
578b44a
Move get_state_at() to area we can share from
MadLittleMods Jun 10, 2024
7dec930
Filter based on state at to_token
MadLittleMods Jun 10, 2024
945197b
Update docstring
MadLittleMods Jun 10, 2024
48eca7d
Less test bulk
MadLittleMods Jun 10, 2024
7aa0519
Incorporate `to_token` to filters
MadLittleMods Jun 10, 2024
a6e5798
Explain why no `to_token` for global account data
MadLittleMods Jun 10, 2024
5dd6d37
Add docstring
MadLittleMods Jun 10, 2024
d0d198f
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter…
MadLittleMods Jun 10, 2024
271ae6f
Remove import workaround
MadLittleMods Jun 10, 2024
2e4627b
Merge branch 'madlittlemods/msc3575-sliding-sync-filter-dms' into mad…
MadLittleMods Jun 10, 2024
355de36
Remove import workaround
MadLittleMods Jun 10, 2024
f69d1c5
Remove sneaky log
MadLittleMods Jun 10, 2024
bb5dfc3
Merge branch 'madlittlemods/msc3575-sliding-sync-filter-dms' into mad…
MadLittleMods Jun 10, 2024
d752b8a
Comment no longer as useful
MadLittleMods Jun 10, 2024
9896478
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter…
MadLittleMods Jun 12, 2024
eaaf408
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter…
MadLittleMods Jun 13, 2024
aff2e82
Merge branch 'madlittlemods/msc3575-sliding-sync-filter-dms' into mad…
MadLittleMods Jun 13, 2024
0ea4fdd
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter…
MadLittleMods Jun 13, 2024
810e9af
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter…
MadLittleMods Jun 17, 2024
8965f3b
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-filter…
MadLittleMods Jun 17, 2024
c73391d
Fix tests
MadLittleMods Jun 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17281.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `is_encrypted` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
26 changes: 23 additions & 3 deletions synapse/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@

from immutabledict import immutabledict

from synapse.api.constants import AccountDataTypes, Membership
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.events import EventBase
from synapse.types import Requester, RoomStreamToken, StreamToken, UserID
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
from synapse.types.state import StateFilter

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -57,6 +58,7 @@ class SlidingSyncHandler:
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
self.storage_controllers = hs.get_storage_controllers()
self.auth_blocking = hs.get_auth_blocking()
self.notifier = hs.get_notifier()
self.event_sources = hs.get_event_sources()
Expand Down Expand Up @@ -523,8 +525,26 @@ async def filter_rooms(
if filters.spaces:
raise NotImplementedError()

if filters.is_encrypted:
raise NotImplementedError()
# Filter for encrypted rooms
if filters.is_encrypted is not None:
# Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items
for room_id in list(filtered_room_id_set):
state_at_to_token = await self.storage_controllers.state.get_state_at(
room_id,
to_token,
state_filter=StateFilter.from_types(
[(EventTypes.RoomEncryption, "")]
),
)
is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, ""))

# If we're looking for encrypted rooms, filter out rooms that are not
# encrypted and vice versa
if (filters.is_encrypted and not is_encrypted) or (
not filters.is_encrypted and is_encrypted
):
filtered_room_id_set.remove(room_id)

MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
if filters.is_invite:
raise NotImplementedError()
Expand Down
107 changes: 13 additions & 94 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,89 +979,6 @@ async def _load_filtered_recents(
bundled_aggregations=bundled_aggregations,
)

async def get_state_after_event(
self,
event_id: str,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[str]:
"""
Get the room state after the given event

Args:
event_id: event of interest
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the event and `state_filter` is not satisfied by partial state.
Defaults to `True`.
"""
state_ids = await self._state_storage_controller.get_state_ids_for_event(
event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)

# using get_metadata_for_events here (instead of get_event) sidesteps an issue
# with redactions: if `event_id` is a redaction event, and we don't have the
# original (possibly because it got purged), get_event will refuse to return
# the redaction event, which isn't terribly helpful here.
#
# (To be fair, in that case we could assume it's *not* a state event, and
# therefore we don't need to worry about it. But still, it seems cleaner just
# to pull the metadata.)
m = (await self.store.get_metadata_for_events([event_id]))[event_id]
if m.state_key is not None and m.rejection_reason is None:
state_ids = dict(state_ids)
state_ids[(m.event_type, m.state_key)] = event_id

return state_ids

async def get_state_at(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving get_state_at() to a shared storage controller so that we can use it in Sync v2 and Sliding sync

self,
room_id: str,
stream_position: StreamToken,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[str]:
"""Get the room state at a particular stream position

Args:
room_id: room for which to get state
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the last event in the room before `stream_position` and
`state_filter` is not satisfied by partial state. Defaults to `True`.
"""
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
# of the stream token if there were multiple forward extremities at the time.
last_event_id = await self.store.get_last_event_in_room_before_stream_ordering(
room_id,
end_token=stream_position.room_key,
)

if last_event_id:
state = await self.get_state_after_event(
last_event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)

else:
# no events in this room - so presumably no state
state = {}

# (erikj) This should be rarely hit, but we've had some reports that
# we get more state down gappy syncs than we should, so let's add
# some logging.
logger.info(
"Failed to find any events in room %s at %s",
room_id,
stream_position.room_key,
)
return state

async def compute_summary(
self,
room_id: str,
Expand Down Expand Up @@ -1435,7 +1352,7 @@ async def _compute_state_delta_for_full_sync(
await_full_state = True
lazy_load_members = False

state_at_timeline_end = await self.get_state_at(
state_at_timeline_end = await self._state_storage_controller.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
Expand Down Expand Up @@ -1563,7 +1480,7 @@ async def _compute_state_delta_for_incremental_sync(
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
state_at_timeline_start = await self._state_storage_controller.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
Expand All @@ -1585,14 +1502,14 @@ async def _compute_state_delta_for_incremental_sync(
# about them).
state_filter = StateFilter.all()

state_at_previous_sync = await self.get_state_at(
state_at_previous_sync = await self._state_storage_controller.get_state_at(
room_id,
stream_position=since_token,
state_filter=state_filter,
await_full_state=await_full_state,
)

state_at_timeline_end = await self.get_state_at(
state_at_timeline_end = await self._state_storage_controller.get_state_at(
room_id,
stream_position=end_token,
state_filter=state_filter,
Expand Down Expand Up @@ -2591,7 +2508,7 @@ async def _get_room_changes_for_incremental_sync(
continue

if room_id in sync_result_builder.joined_room_ids or has_join:
old_state_ids = await self.get_state_at(
old_state_ids = await self._state_storage_controller.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
Expand Down Expand Up @@ -2621,12 +2538,14 @@ async def _get_room_changes_for_incremental_sync(
newly_left_rooms.append(room_id)
else:
if not old_state_ids:
old_state_ids = await self.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types(
[(EventTypes.Member, user_id)]
),
old_state_ids = (
await self._state_storage_controller.get_state_at(
room_id,
since_token,
state_filter=StateFilter.from_types(
[(EventTypes.Member, user_id)]
),
)
)
old_mem_ev_id = old_state_ids.get(
(EventTypes.Member, user_id), None
Expand Down
87 changes: 86 additions & 1 deletion synapse/storage/controllers/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
PartialStateEventsTracker,
)
from synapse.synapse_rust.acl import ServerAclEvaluator
from synapse.types import MutableStateMap, StateMap, get_domain_from_id
from synapse.types import MutableStateMap, StateMap, StreamToken, get_domain_from_id
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
Expand Down Expand Up @@ -372,6 +372,91 @@ async def get_state_ids_for_event(
)
return state_map[event_id]

async def get_state_after_event(
self,
event_id: str,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[str]:
"""
Get the room state after the given event

Args:
event_id: event of interest
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the event and `state_filter` is not satisfied by partial state.
Defaults to `True`.
"""
state_ids = await self.get_state_ids_for_event(
event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)

# using get_metadata_for_events here (instead of get_event) sidesteps an issue
# with redactions: if `event_id` is a redaction event, and we don't have the
# original (possibly because it got purged), get_event will refuse to return
# the redaction event, which isn't terribly helpful here.
#
# (To be fair, in that case we could assume it's *not* a state event, and
# therefore we don't need to worry about it. But still, it seems cleaner just
# to pull the metadata.)
m = (await self.stores.main.get_metadata_for_events([event_id]))[event_id]
if m.state_key is not None and m.rejection_reason is None:
state_ids = dict(state_ids)
state_ids[(m.event_type, m.state_key)] = event_id

return state_ids

async def get_state_at(
self,
room_id: str,
stream_position: StreamToken,
state_filter: Optional[StateFilter] = None,
await_full_state: bool = True,
) -> StateMap[str]:
"""Get the room state at a particular stream position

Args:
room_id: room for which to get state
stream_position: point at which to get state
state_filter: The state filter used to fetch state from the database.
await_full_state: if `True`, will block if we do not yet have complete state
at the last event in the room before `stream_position` and
`state_filter` is not satisfied by partial state. Defaults to `True`.
"""
# FIXME: This gets the state at the latest event before the stream ordering,
# which might not be the same as the "current state" of the room at the time
# of the stream token if there were multiple forward extremities at the time.
last_event_id = (
await self.stores.main.get_last_event_in_room_before_stream_ordering(
room_id,
end_token=stream_position.room_key,
)
)

if last_event_id:
state = await self.get_state_after_event(
last_event_id,
state_filter=state_filter or StateFilter.all(),
await_full_state=await_full_state,
)

else:
# no events in this room - so presumably no state
state = {}

# (erikj) This should be rarely hit, but we've had some reports that
# we get more state down gappy syncs than we should, so let's add
# some logging.
logger.info(
"Failed to find any events in room %s at %s",
room_id,
stream_position.room_key,
)
return state

@trace
@tag_args
async def get_state_for_groups(
Expand Down
57 changes: 57 additions & 0 deletions tests/handlers/test_sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1244,3 +1244,60 @@ def test_filter_dm_rooms(self) -> None:
)

self.assertEqual(falsy_filtered_room_ids, {room_id})

def test_filter_encrypted_rooms(self) -> None:
"""
Test `filter.is_encrypted` for encrypted rooms
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

# Create a normal room
room_id = self.helper.create_room_as(
user1_id,
is_public=False,
tok=user1_tok,
)

# Create an encrypted room
encrypted_room_id = self.helper.create_room_as(
user1_id,
is_public=False,
tok=user1_tok,
)
self.helper.send_state(
encrypted_room_id,
EventTypes.RoomEncryption,
{"algorithm": "m.megolm.v1.aes-sha2"},
tok=user1_tok,
)

after_rooms_token = self.event_sources.get_current_token()

# Try with `is_encrypted=True`
truthy_filtered_room_ids = self.get_success(
self.sliding_sync_handler.filter_rooms(
UserID.from_string(user1_id),
{room_id, encrypted_room_id},
SlidingSyncConfig.SlidingSyncList.Filters(
is_encrypted=True,
),
after_rooms_token,
)
)

self.assertEqual(truthy_filtered_room_ids, {encrypted_room_id})

# Try with `is_encrypted=False`
falsy_filtered_room_ids = self.get_success(
self.sliding_sync_handler.filter_rooms(
UserID.from_string(user1_id),
{room_id, encrypted_room_id},
SlidingSyncConfig.SlidingSyncList.Filters(
is_encrypted=False,
),
after_rooms_token,
)
)

self.assertEqual(falsy_filtered_room_ids, {room_id})
Loading