Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Reduce DB load of /sync when using presence #12885

Merged
merged 6 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12885.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce database load of `/sync` when presence is enabled.
70 changes: 43 additions & 27 deletions synapse/storage/databases/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Tuple, cast
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, cast

from synapse.api.presence import PresenceState, UserPresenceState
from synapse.replication.tcp.streams import PresenceStream
Expand All @@ -22,6 +22,7 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Connection
from synapse.storage.util.id_generators import (
Expand Down Expand Up @@ -56,7 +57,7 @@ def __init__(
)


class PresenceStore(PresenceBackgroundUpdateStore):
class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore):
def __init__(
self,
database: DatabasePool,
Expand Down Expand Up @@ -281,20 +282,25 @@ async def should_user_receive_full_presence_with_token(
True if the user should have full presence sent to them, False otherwise.
"""

def _should_user_receive_full_presence_with_token_txn(
txn: LoggingTransaction,
) -> bool:
sql = """
SELECT 1 FROM users_to_send_full_presence_to
WHERE user_id = ?
AND presence_stream_id >= ?
"""
txn.execute(sql, (user_id, from_token))
return bool(txn.fetchone())
token = await self._get_when_user_should_receive_full_presence(user_id)
if token is None:
return False

return await self.db_pool.runInteraction(
"should_user_receive_full_presence_with_token",
_should_user_receive_full_presence_with_token_txn,
return from_token <= token

@cached()
async def _get_when_user_should_receive_full_presence(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_get_full_presence_stream_token_for_user, maybe?

self, user_id: str
) -> Optional[int]:
"""Return the presence stream token, if any, which should trigger the
user to receive full presence.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""
return await self.db_pool.simple_select_one_onecol(
table="users_to_send_full_presence_to",
keyvalues={"user_id": user_id},
retcol="presence_stream_id",
allow_none=True,
desc="_get_when_user_should_receive_full_presence",
)

async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]) -> None:
Expand All @@ -307,18 +313,28 @@ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]) -> N
# Add user entries to the table, updating the presence_stream_id column if the user already
# exists in the table.
presence_stream_id = self._presence_id_gen.get_current_token()
await self.db_pool.simple_upsert_many(
table="users_to_send_full_presence_to",
key_names=("user_id",),
key_values=[(user_id,) for user_id in user_ids],
value_names=("presence_stream_id",),
# We save the current presence stream ID token along with the user ID entry so
# that when a user /sync's, even if they syncing multiple times across separate
# devices at different times, each device will receive full presence once - when
# the presence stream ID in their sync token is less than the one in the table
# for their user ID.
value_values=[(presence_stream_id,) for _ in user_ids],
desc="add_users_to_send_full_presence_to",

def _add_users_to_send_full_presence_to(txn: LoggingTransaction) -> None:
self.db_pool.simple_upsert_many_txn(
txn,
table="users_to_send_full_presence_to",
key_names=("user_id",),
key_values=[(user_id,) for user_id in user_ids],
value_names=("presence_stream_id",),
# We save the current presence stream ID token along with the user ID entry so
# that when a user /sync's, even if they syncing multiple times across separate
# devices at different times, each device will receive full presence once - when
# the presence stream ID in their sync token is less than the one in the table
# for their user ID.
value_values=[(presence_stream_id,) for _ in user_ids],
)
for user_id in user_ids:
self._invalidate_cache_and_stream(
txn, self._get_when_user_should_receive_full_presence, (user_id,)
)

return await self.db_pool.runInteraction(
"add_users_to_send_full_presence_to", _add_users_to_send_full_presence_to
)

async def get_presence_for_all_users(
Expand Down