Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use stream cache in get_linearized_receipts_for_room #3505

Merged
merged 4 commits into from
Jul 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3505.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce database consumption when processing large numbers of receipts
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def stream_positions(self):

def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
self.get_receipts_for_user.invalidate((user_id, receipt_type))
self.get_linearized_receipts_for_room.invalidate_many((room_id,))
self._get_linearized_receipts_for_room.invalidate_many((room_id,))
self.get_last_receipt_event_id_for_user.invalidate(
(user_id, room_id, receipt_type)
)
Expand Down
25 changes: 19 additions & 6 deletions synapse/storage/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
"""
room_ids = set(room_ids)

if from_key:
if from_key is not None:
# Only ask the database about rooms where there have been new
# receipts added since `from_key`
room_ids = yield self._receipts_stream_cache.get_entities_changed(
room_ids, from_key
)
Expand All @@ -151,7 +153,6 @@ def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):

defer.returnValue([ev for res in results.values() for ev in res])

@cachedInlineCallbacks(num_args=3, tree=True)
def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""Get receipts for a single room for sending to clients.

Expand All @@ -162,7 +163,19 @@ def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
from the start.

Returns:
list: A list of receipts.
Deferred[list]: A list of receipts.
"""
if from_key is not None:
# Check the cache first to see if any new receipts have been added
# since`from_key`. If not we can no-op.
if not self._receipts_stream_cache.has_entity_changed(room_id, from_key):
defer.succeed([])

return self._get_linearized_receipts_for_room(room_id, to_key, from_key)

@cachedInlineCallbacks(num_args=3, tree=True)
def _get_linearized_receipts_for_room(self, room_id, to_key, from_key=None):
"""See get_linearized_receipts_for_room
"""
def f(txn):
if from_key:
Expand Down Expand Up @@ -211,7 +224,7 @@ def f(txn):
"content": content,
}])

@cachedList(cached_method_name="get_linearized_receipts_for_room",
@cachedList(cached_method_name="_get_linearized_receipts_for_room",
list_name="room_ids", num_args=3, inlineCallbacks=True)
def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
if not room_ids:
Expand Down Expand Up @@ -373,7 +386,7 @@ def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))

txn.call_after(
self._receipts_stream_cache.entity_has_changed,
Expand Down Expand Up @@ -493,7 +506,7 @@ def insert_graph_receipt_txn(self, txn, room_id, receipt_type,
self.get_receipts_for_user.invalidate, (user_id, receipt_type)
)
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self.get_linearized_receipts_for_room.invalidate_many, (room_id,))
txn.call_after(self._get_linearized_receipts_for_room.invalidate_many, (room_id,))

self._simple_delete_txn(
txn,
Expand Down