Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't drop user dir deltas when server leaves room #10982

Merged
merged 5 commits into from
Oct 6, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10982.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug where the remainder of a batch of user directory changes would be silently dropped if the server left a room early in the batch.
2 changes: 1 addition & 1 deletion synapse/handlers/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:

for user_id in user_ids:
await self._handle_remove_user(room_id, user_id)
return
continue
else:
logger.debug("Server is still in room: %r", room_id)

Expand Down
21 changes: 3 additions & 18 deletions tests/handlers/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,7 @@ def _perform_background_initial_update(self):
# Do the initial population of the stats via the background update
self._add_background_updates()

while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()

def test_initial_room(self):
"""
Expand Down Expand Up @@ -140,12 +135,7 @@ def test_initial_room(self):
# Do the initial population of the user directory via the background update
self._add_background_updates()

while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()

r = self.get_success(self.get_all_room_state())

Expand Down Expand Up @@ -568,12 +558,7 @@ def test_incomplete_stats(self):
)
)

while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()

r1stats_complete = self._get_current_stats("room", r1)
u1stats_complete = self._get_current_stats("user", u1)
Expand Down
39 changes: 39 additions & 0 deletions tests/handlers/test_user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,45 @@ def test_reactivation_makes_regular_user_searchable(self) -> None:
self.assertEqual(len(s["results"]), 1)
self.assertEqual(s["results"][0]["user_id"], user)

def test_process_join_after_server_leaves_room(self) -> None:
alice = self.register_user("alice", "pass")
alice_token = self.login(alice, "pass")
bob = self.register_user("bob", "pass")
bob_token = self.login(bob, "pass")

# Alice makes two rooms. Bob joins one of them.
room1 = self.helper.create_room_as(alice, tok=alice_token)
room2 = self.helper.create_room_as(alice, tok=alice_token)
print("room1=", room1)
print("room2=", room2)
self.helper.join(room1, bob, tok=bob_token)

# The user sharing tables should have been updated.
public1 = self.get_success(self.user_dir_helper.get_users_in_public_rooms())
self.assertEqual(set(public1), {(alice, room1), (alice, room2), (bob, room1)})

# Alice leaves room1. The user sharing tables should be updated.
self.helper.leave(room1, alice, tok=alice_token)
public2 = self.get_success(self.user_dir_helper.get_users_in_public_rooms())
self.assertEqual(set(public2), {(alice, room2), (bob, room1)})

# Pause the processing of new events.
dir_handler = self.hs.get_user_directory_handler()
dir_handler.update_user_directory = False

# Bob leaves one room and joins the other.
self.helper.leave(room1, bob, tok=bob_token)
self.helper.join(room2, bob, tok=bob_token)

# Process the leave and join in one go.
dir_handler.update_user_directory = True
dir_handler.notify_new_event()
self.wait_for_background_updates()

# The user sharing tables should have been updated.
public3 = self.get_success(self.user_dir_helper.get_users_in_public_rooms())
self.assertEqual(set(public3), {(alice, room2), (bob, room2)})

def test_private_room(self) -> None:
"""
A user can be searched for only by people that are either in a public
Expand Down
7 changes: 1 addition & 6 deletions tests/storage/databases/main/test_room.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,7 @@ def test_background_populate_rooms_creator_column(self):
self.store.db_pool.updates._all_done = False

# Now let's actually drive the updates to completion
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()

# Make sure the background update filled in the room creator
room_creator_after = self.get_success(
Expand Down
7 changes: 1 addition & 6 deletions tests/storage/test_cleanup_extrems.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,7 @@ def run_delta_file(txn):
# Ugh, have to reset this flag
self.store.db_pool.updates._all_done = False

while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()

def test_soft_failed_extremities_handled_correctly(self):
"""Test that extremities are correctly calculated in the presence of
Expand Down
21 changes: 3 additions & 18 deletions tests/storage/test_client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,7 @@ def test_updating_monthly_active_user_when_space(self):

def test_devices_last_seen_bg_update(self):
# First make sure we have completed all updates.
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()

user_id = "@user:id"
device_id = "MY_DEVICE"
Expand Down Expand Up @@ -277,12 +272,7 @@ def test_devices_last_seen_bg_update(self):
self.store.db_pool.updates._all_done = False

# Now let's actually drive the updates to completion
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()

# We should now get the correct result again
result = self.get_success(
Expand All @@ -303,12 +293,7 @@ def test_devices_last_seen_bg_update(self):

def test_old_user_ips_pruned(self):
# First make sure we have completed all updates.
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()

user_id = "@user:id"
device_id = "MY_DEVICE"
Expand Down
14 changes: 2 additions & 12 deletions tests/storage/test_event_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,12 +578,7 @@ def test_background_update_single_room(self):
# Ugh, have to reset this flag
self.store.db_pool.updates._all_done = False

while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()

# Test that the `has_auth_chain_index` has been set
self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id)))
Expand Down Expand Up @@ -619,12 +614,7 @@ def test_background_update_multiple_rooms(self):
# Ugh, have to reset this flag
self.store.db_pool.updates._all_done = False

while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()

# Test that the `has_auth_chain_index` has been set
self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id1)))
Expand Down
14 changes: 2 additions & 12 deletions tests/storage/test_roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,7 @@ def prepare(self, reactor, clock, homeserver):

def test_can_rerun_update(self):
# First make sure we have completed all updates.
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()

# Now let's create a room, which will insert a membership
user = UserID("alice", "test")
Expand All @@ -197,9 +192,4 @@ def test_can_rerun_update(self):
self.store.db_pool.updates._all_done = False
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did have one concern here. Some of these explicitly reset the _all_done flag, and others don't. I haven't made the situation worse---apart from an extra stack frame the behaviour should be identical. But I wondered if it was was more consistent to reset the flag in the helper function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷‍♂️


# Now let's actually drive the updates to completion
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()
7 changes: 1 addition & 6 deletions tests/storage/test_user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,7 @@ def _purge_and_rebuild_user_dir(self) -> None:
)
)

while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)
self.wait_for_background_updates()

def test_initial(self) -> None:
"""
Expand Down
9 changes: 9 additions & 0 deletions tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,15 @@ def wait_on_thread(self, deferred, timeout=10):
self.reactor.advance(0.01)
time.sleep(0.01)

def wait_for_background_updates(self) -> None:
"""Block until all background updates have completed."""
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
while not self.get_success(
self.store.db_pool.updates.has_completed_background_updates()
):
self.get_success(
self.store.db_pool.updates.do_next_background_update(100), by=0.1
)

def make_homeserver(self, reactor, clock):
"""
Make and return a homeserver.
Expand Down