diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index cf08737d115a..1799174c2f1a 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1764,14 +1764,14 @@ async def _filter_all_presence_updates_for_user( Returns: A list of presence states for the given user to receive. """ + updated_users = None if from_key: # Only return updates since the last sync updated_users = self.store.presence_stream_cache.get_all_entities_changed( from_key ) - if not updated_users: - updated_users = [] + if updated_users is not None: # Get the actual presence update for each change users_to_state = await self.get_presence_handler().current_state_for_users( updated_users diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 1e83c62753a9..edbd1f9f2292 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -842,12 +842,11 @@ async def get_users_whose_devices_changed( user_ids, from_key ) - if not user_ids_to_check: + # If an empty set was returned, there's nothing to do. + if user_ids_to_check is not None and not user_ids_to_check: return set() def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]: - changes: Set[str] = set() - stream_id_where_clause = "stream_id > ?" sql_args = [from_key] @@ -858,19 +857,25 @@ def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]: sql = f""" SELECT DISTINCT user_id FROM device_lists_stream WHERE {stream_id_where_clause} - AND """ - # Query device changes with a batch of users at a time - # Assertion for mypy's benefit; see also - # https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions - assert user_ids_to_check is not None - for chunk in batch_iter(user_ids_to_check, 100): - clause, args = make_in_list_sql_clause( - txn.database_engine, "user_id", chunk - ) - txn.execute(sql + clause, sql_args + args) - changes.update(user_id for user_id, in txn) + # If the stream change cache gave us no information, fetch *all* + # users between the stream IDs. + if user_ids_to_check is None: + txn.execute(sql, sql_args) + return {user_id for user_id, in txn} + + # Otherwise, fetch changes for the given users. + else: + changes: Set[str] = set() + + # Query device changes with a batch of users at a time + for chunk in batch_iter(user_ids_to_check, 100): + clause, args = make_in_list_sql_clause( + txn.database_engine, "user_id", chunk + ) + txn.execute(sql + " AND " + clause, sql_args + args) + changes.update(user_id for user_id, in txn) return changes