Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Don't keep old stream_ordering_to_exterm around (#15382)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Apr 6, 2023
1 parent 72b43be commit 485b9fd
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 10 deletions.
1 change: 1 addition & 0 deletions changelog.d/15382.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve DB performance of clearing out old data from `stream_ordering_to_exterm`.
10 changes: 10 additions & 0 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@ async def get_user_ids_changed(
possibly_changed = set(changed)
possibly_left = set()
for room_id in rooms_changed:
# Check if the forward extremities have changed. If not then we know
# the current state won't have changed, and so we can skip this room.
try:
if not await self.store.have_room_forward_extremities_changed_since(
room_id, stream_ordering
):
continue
except errors.StoreError:
pass

current_state_ids = await self._state_storage.get_current_state_ids(
room_id, await_full_state=False
)
Expand Down
52 changes: 42 additions & 10 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,38 @@ def _get_min_depth_interaction(

return int(min_depth) if min_depth is not None else None

async def have_room_forward_extremities_changed_since(
self,
room_id: str,
stream_ordering: int,
) -> bool:
"""Check if the forward extremities in a room have changed since the
given stream ordering
Throws a StoreError if we have since purged the index for
stream_orderings from that point.
"""

if stream_ordering <= self.stream_ordering_month_ago: # type: ignore[attr-defined]
raise StoreError(400, f"stream_ordering too old {stream_ordering}")

sql = """
SELECT 1 FROM stream_ordering_to_exterm
WHERE stream_ordering > ? AND room_id = ?
LIMIT 1
"""

def have_room_forward_extremities_changed_since_txn(
txn: LoggingTransaction,
) -> bool:
txn.execute(sql, (stream_ordering, room_id))
return txn.fetchone() is not None

return await self.db_pool.runInteraction(
"have_room_forward_extremities_changed_since",
have_room_forward_extremities_changed_since_txn,
)

@cancellable
async def get_forward_extremities_for_room_at_stream_ordering(
self, room_id: str, stream_ordering: int
Expand Down Expand Up @@ -1232,10 +1264,17 @@ def get_forward_extremeties_for_room_txn(txn: LoggingTransaction) -> List[str]:
txn.execute(sql, (stream_ordering, room_id))
return [event_id for event_id, in txn]

return await self.db_pool.runInteraction(
event_ids = await self.db_pool.runInteraction(
"get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
)

# If we didn't find any IDs, then we must have cleared out the
# associated `stream_ordering_to_exterm`.
if not event_ids:
raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))

return event_ids

def _get_connected_batch_event_backfill_results_txn(
self, txn: LoggingTransaction, insertion_event_id: str, limit: int
) -> List[BackfillQueueNavigationItem]:
Expand Down Expand Up @@ -1664,19 +1703,12 @@ async def get_successor_events(self, event_id: str) -> List[str]:
@wrap_as_background_process("delete_old_forward_extrem_cache")
async def _delete_old_forward_extrem_cache(self) -> None:
def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None:
# Delete entries older than a month, while making sure we don't delete
# the only entries for a room.
sql = """
DELETE FROM stream_ordering_to_exterm
WHERE
room_id IN (
SELECT room_id
FROM stream_ordering_to_exterm
WHERE stream_ordering > ?
) AND stream_ordering < ?
WHERE stream_ordering < ?
"""
txn.execute(
sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago) # type: ignore[attr-defined]
sql, (self.stream_ordering_month_ago) # type: ignore[attr-defined]
)

await self.db_pool.runInteraction(
Expand Down

0 comments on commit 485b9fd

Please sign in to comment.