Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Avoid fetching unused account data in sync #14973

Merged
merged 8 commits into from
Feb 10, 2023
1 change: 1 addition & 0 deletions changelog.d/14973.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of `/sync` in a few situations.
30 changes: 25 additions & 5 deletions synapse/api/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,13 @@ def __init__(self, hs: "HomeServer", filter_json: JsonDict):
self._room_timeline_filter = Filter(hs, room_filter_json.get("timeline", {}))
self._room_state_filter = Filter(hs, room_filter_json.get("state", {}))
self._room_ephemeral_filter = Filter(hs, room_filter_json.get("ephemeral", {}))
self._room_account_data = Filter(hs, room_filter_json.get("account_data", {}))
self._room_account_data_filter = Filter(
hs, room_filter_json.get("account_data", {})
)
self._presence_filter = Filter(hs, filter_json.get("presence", {}))
self._account_data = Filter(hs, filter_json.get("account_data", {}))
self._global_account_data_filter = Filter(
hs, filter_json.get("account_data", {})
)

self.include_leave = filter_json.get("room", {}).get("include_leave", False)
self.event_fields = filter_json.get("event_fields", [])
Expand Down Expand Up @@ -256,8 +260,10 @@ async def filter_presence(
) -> List[UserPresenceState]:
return await self._presence_filter.filter(presence_states)

async def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
return await self._account_data.filter(events)
async def filter_global_account_data(
self, events: Iterable[JsonDict]
) -> List[JsonDict]:
return await self._global_account_data_filter.filter(events)

async def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]:
return await self._room_state_filter.filter(
Expand All @@ -279,7 +285,7 @@ async def filter_room_ephemeral(self, events: Iterable[JsonDict]) -> List[JsonDi
async def filter_room_account_data(
self, events: Iterable[JsonDict]
) -> List[JsonDict]:
return await self._room_account_data.filter(
return await self._room_account_data_filter.filter(
await self._room_filter.filter(events)
)

Expand All @@ -292,13 +298,27 @@ def blocks_all_presence(self) -> bool:
or self._presence_filter.filters_all_senders()
)

def blocks_all_global_account_data(self) -> bool:
"""True if all global acount data will be filtered out."""
return (
self._global_account_data_filter.filters_all_types()
or self._global_account_data_filter.filters_all_senders()
)

def blocks_all_room_ephemeral(self) -> bool:
return (
self._room_ephemeral_filter.filters_all_types()
or self._room_ephemeral_filter.filters_all_senders()
or self._room_ephemeral_filter.filters_all_rooms()
)

def blocks_all_room_account_data(self) -> bool:
return (
self._room_account_data_filter.filters_all_types()
or self._room_account_data_filter.filters_all_senders()
or self._room_account_data_filter.filters_all_rooms()
)

def blocks_all_room_timeline(self) -> bool:
return (
self._room_timeline_filter.filters_all_types()
Expand Down
10 changes: 6 additions & 4 deletions synapse/handlers/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,12 @@ async def get_new_events(
}
)

(
account_data,
room_account_data,
) = await self.store.get_updated_account_data_for_user(user_id, last_stream_id)
account_data = await self.store.get_updated_global_account_data_for_user(
user_id, last_stream_id
)
room_account_data = await self.store.get_updated_room_account_data_for_user(
user_id, last_stream_id
)

for account_data_type, content in account_data.items():
results.append({"type": account_data_type, "content": content})
Expand Down
5 changes: 2 additions & 3 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,8 @@ async def _snapshot_all_rooms(

tags_by_room = await self.store.get_tags_for_user(user_id)

account_data, account_data_by_room = await self.store.get_account_data_for_user(
user_id
)
account_data = await self.store.get_global_account_data_for_user(user_id)
account_data_by_room = await self.store.get_room_account_data_for_user(user_id)

public_room_ids = await self.store.get_public_room_ids()

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ async def copy_room_tags_and_direct_to_room(
user_id: The user's ID.
"""
# Retrieve user account data for predecessor room
user_account_data, _ = await self.store.get_account_data_for_user(user_id)
user_account_data = await self.store.get_global_account_data_for_user(user_id)

# Copy direct message state if applicable
direct_rooms = user_account_data.get(AccountDataTypes.DIRECT, {})
Expand Down
88 changes: 48 additions & 40 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1444,9 +1444,9 @@ async def generate_sync_result(

logger.debug("Fetching account data")

account_data_by_room = await self._generate_sync_entry_for_account_data(
sync_result_builder
)
# Global account data is included if it is not filtered out.
if not sync_config.filter_collection.blocks_all_global_account_data():
await self._generate_sync_entry_for_account_data(sync_result_builder)

# Presence data is included if the server has it enabled and not filtered out.
include_presence_data = bool(
Expand All @@ -1472,9 +1472,7 @@ async def generate_sync_result(
(
newly_joined_rooms,
newly_left_rooms,
) = await self._generate_sync_entry_for_rooms(
sync_result_builder, account_data_by_room
)
) = await self._generate_sync_entry_for_rooms(sync_result_builder)

# Work out which users have joined or left rooms we're in. We use this
# to build the presence and device_list parts of the sync response in
Expand Down Expand Up @@ -1717,35 +1715,29 @@ async def _generate_sync_entry_for_to_device(

async def _generate_sync_entry_for_account_data(
self, sync_result_builder: "SyncResultBuilder"
) -> Dict[str, Dict[str, JsonDict]]:
"""Generates the account data portion of the sync response.
) -> None:
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""Generates the global account data portion of the sync response.

Account data (called "Client Config" in the spec) can be set either globally
or for a specific room. Account data consists of a list of events which
accumulate state, much like a room.

This function retrieves global and per-room account data. The former is written
to the given `sync_result_builder`. The latter is returned directly, to be
later written to the `sync_result_builder` on a room-by-room basis.
This function retrieves global account data and writes it to the given
`sync_result_builder`. See `_generate_sync_entry_for_rooms` for handling
of per-room account data.

Args:
sync_result_builder

Returns:
A dictionary whose keys (room ids) map to the per room account data for that
room.
"""
sync_config = sync_result_builder.sync_config
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token

if since_token and not sync_result_builder.full_state:
# TODO Do not fetch room account data if it will be unused.
(
global_account_data,
account_data_by_room,
) = await self.store.get_updated_account_data_for_user(
user_id, since_token.account_data_key
global_account_data = (
await self.store.get_updated_global_account_data_for_user(
user_id, since_token.account_data_key
)
)

push_rules_changed = await self.store.have_push_rules_changed_for_user(
Expand All @@ -1758,28 +1750,26 @@ async def _generate_sync_entry_for_account_data(
sync_config.user
)
else:
# TODO Do not fetch room account data if it will be unused.
(
global_account_data,
account_data_by_room,
) = await self.store.get_account_data_for_user(sync_config.user.to_string())
all_global_account_data = await self.store.get_global_account_data_for_user(
user_id
)

global_account_data = dict(global_account_data)
global_account_data = dict(all_global_account_data)
global_account_data["m.push_rules"] = await self.push_rules_for_user(
sync_config.user
)

account_data_for_user = await sync_config.filter_collection.filter_account_data(
[
{"type": account_data_type, "content": content}
for account_data_type, content in global_account_data.items()
]
account_data_for_user = (
await sync_config.filter_collection.filter_global_account_data(
[
{"type": account_data_type, "content": content}
for account_data_type, content in global_account_data.items()
]
)
)

sync_result_builder.account_data = account_data_for_user

return account_data_by_room

async def _generate_sync_entry_for_presence(
self,
sync_result_builder: "SyncResultBuilder",
Expand Down Expand Up @@ -1839,9 +1829,7 @@ async def _generate_sync_entry_for_presence(
sync_result_builder.presence = presence

async def _generate_sync_entry_for_rooms(
self,
sync_result_builder: "SyncResultBuilder",
account_data_by_room: Dict[str, Dict[str, JsonDict]],
self, sync_result_builder: "SyncResultBuilder"
) -> Tuple[AbstractSet[str], AbstractSet[str]]:
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
Expand All @@ -1852,7 +1840,6 @@ async def _generate_sync_entry_for_rooms(

Args:
sync_result_builder
account_data_by_room: Dictionary of per room account data

Returns:
Returns a 2-tuple describing rooms the user has joined or left.
Expand All @@ -1865,9 +1852,30 @@ async def _generate_sync_entry_for_rooms(
since_token = sync_result_builder.since_token
user_id = sync_result_builder.sync_config.user.to_string()

blocks_all_rooms = (
sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
)

# 0. Start by fetching room account data (if required).
if (
blocks_all_rooms
or sync_result_builder.sync_config.filter_collection.blocks_all_room_account_data()
):
account_data_by_room: Mapping[str, Mapping[str, JsonDict]] = {}
elif since_token and not sync_result_builder.full_state:
account_data_by_room = (
await self.store.get_updated_room_account_data_for_user(
user_id, since_token.account_data_key
)
)
else:
account_data_by_room = await self.store.get_room_account_data_for_user(
user_id
)

# 1. Start by fetching all ephemeral events in rooms we've joined (if required).
block_all_room_ephemeral = (
sync_result_builder.sync_config.filter_collection.blocks_all_rooms()
blocks_all_rooms
or sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
)
if block_all_room_ephemeral:
Expand Down Expand Up @@ -2294,7 +2302,7 @@ async def _generate_room_entry(
room_builder: "RoomSyncResultBuilder",
ephemeral: List[JsonDict],
tags: Optional[Dict[str, Dict[str, Any]]],
account_data: Dict[str, JsonDict],
account_data: Mapping[str, JsonDict],
always_include: bool = False,
) -> None:
"""Populates the `joined` and `archived` section of `sync_result_builder`
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/admin/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,8 @@ async def on_GET(
if not await self._store.get_user_by_id(user_id):
raise NotFoundError("User not found")

global_data, by_room_data = await self._store.get_account_data_for_user(user_id)
global_data = await self._store.get_global_account_data_for_user(user_id)
by_room_data = await self._store.get_room_account_data_for_user(user_id)
return HTTPStatus.OK, {
"account_data": {
"global": global_data,
Expand Down
Loading