Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Be smarter about which hosts to send presence to when processing room joins #9402

Merged
merged 6 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9402.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug where a lot of unnecessary presence updates were sent when joining a room.
2 changes: 1 addition & 1 deletion synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ async def send_presence(self, states: List[UserPresenceState]):
self._processing_pending_presence = False

def send_presence_to_destinations(
self, states: List[UserPresenceState], destinations: List[str]
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
"""Send the given presence states to the given destinations.
destinations (list[str])
Expand Down
56 changes: 42 additions & 14 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,9 @@ async def _handle_state_delta(self, deltas):
"""Process current state deltas to find new joins that need to be
handled.
"""
# A map of destination to a set of user state that they should receive
presence_destinations = {} # type: Dict[str, Set[UserPresenceState]]

for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
Expand All @@ -858,6 +861,7 @@ async def _handle_state_delta(self, deltas):

logger.debug("Handling: %r %r, %s", typ, state_key, event_id)

# Drop any event that isn't a membership join
if typ != EventTypes.Member:
continue

Expand All @@ -880,29 +884,54 @@ async def _handle_state_delta(self, deltas):
# Ignore changes to join events.
continue

await self._on_user_joined_room(room_id, state_key)
# Retrieve any user presence state updates that need to be sent as a result,
# and the destinations that need to receive it
destinations, user_presence_states = await self._on_user_joined_room(
room_id, state_key
)

# Insert the destinations and respective updates into our destinations dict
for destination in destinations:
presence_destinations.setdefault(destination, set()).update(
user_presence_states
)

# Send out user presence updates for each destination
for destination, user_state_set in presence_destinations.items():
self.federation.send_presence_to_destinations(
destinations=[destination], states=user_state_set
)

async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
async def _on_user_joined_room(
self, room_id: str, user_id: str
) -> Tuple[List[str], List[UserPresenceState]]:
"""Called when we detect a user joining the room via the current state
delta stream.
"""
delta stream. Returns the destinations that need to be updated and the
presence updates to send to them.

Args:
room_id: The ID of the room that the user has joined.
user_id: The ID of the user that has joined the room.

Returns:
A tuple of destinations and presence updates to send to them.
"""
if self.is_mine_id(user_id):
# If this is a local user then we need to send their presence
# out to hosts in the room (who don't already have it)

# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved

state = await self.current_state_for_user(user_id)
hosts = await self.state.get_current_hosts_in_room(room_id)
remote_hosts = await self.state.get_current_hosts_in_room(room_id)

# Filter out ourselves.
hosts = {host for host in hosts if host != self.server_name}
filtered_remote_hosts = [
host for host in remote_hosts if host != self.server_name
]

self.federation.send_presence_to_destinations(
states=[state], destinations=hosts
)
state = await self.current_state_for_user(user_id)
return filtered_remote_hosts, [state]
else:
# A remote user has joined the room, so we need to:
# 1. Check if this is a new server in the room
Expand All @@ -915,6 +944,8 @@ async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
# TODO: Check that this is actually a new server joining the
# room.

remote_host = get_domain_from_id(user_id)

users = await self.state.get_current_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, users))

Expand All @@ -934,10 +965,7 @@ async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
or state.status_msg is not None
]

if states:
self.federation.send_presence_to_destinations(
states=states, destinations=[get_domain_from_id(user_id)]
)
return [remote_host], states


def should_notify(old_state, new_state):
Expand Down
14 changes: 10 additions & 4 deletions tests/handlers/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def test_remote_joins(self):
)
self.assertEqual(expected_state.state, PresenceState.ONLINE)
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations=["server2"], states=[expected_state]
destinations=["server2"], states={expected_state}
)

#
Expand All @@ -533,7 +533,7 @@ def test_remote_joins(self):

self.federation_sender.send_presence.assert_not_called()
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations=["server3"], states=[expected_state]
destinations=["server3"], states={expected_state}
)

def test_remote_gets_presence_when_local_user_joins(self):
Expand Down Expand Up @@ -584,8 +584,14 @@ def test_remote_gets_presence_when_local_user_joins(self):
self.presence_handler.current_state_for_user("@test2:server")
)
self.assertEqual(expected_state.state, PresenceState.ONLINE)
self.federation_sender.send_presence_to_destinations.assert_called_once_with(
destinations={"server2", "server3"}, states=[expected_state]
self.assertEqual(
self.federation_sender.send_presence_to_destinations.call_count, 2
)
self.federation_sender.send_presence_to_destinations.assert_any_call(
destinations=["server3"], states={expected_state}
)
self.federation_sender.send_presence_to_destinations.assert_any_call(
destinations=["server2"], states={expected_state}
)

def _add_new_user(self, room_id, user_id):
Expand Down