-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Don't keep old stream_ordering_to_exterm around #15382
Changes from 4 commits
fe0f38e
81a27da
6665a7f
d5ccd2d
c9ee761
4b3c536
527812c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Improve DB performance of clearing out old data from `stream_ordering_to_exterm`. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1171,6 +1171,35 @@ def _get_min_depth_interaction( | |
|
||
return int(min_depth) if min_depth is not None else None | ||
|
||
async def has_room_extremities_changed_since( | ||
self, | ||
room_id: str, | ||
stream_ordering: int, | ||
) -> bool: | ||
"""Check if the forward extremities in a room have changed since the | ||
given stream ordering | ||
|
||
Throws a StoreError if we have since purged the index for | ||
stream_orderings from that point. | ||
""" | ||
|
||
if stream_ordering <= self.stream_ordering_month_ago: # type: ignore[attr-defined] | ||
raise StoreError(400, f"stream_ordering too old {stream_ordering}") | ||
|
||
sql = """ | ||
SELECT 1 FROM stream_ordering_to_exterm | ||
WHERE stream_ordering > ? AND room_id = ? | ||
LIMIT 1 | ||
""" | ||
|
||
def has_room_extremities_changed_since_txn(txn: LoggingTransaction) -> bool: | ||
txn.execute(sql, (stream_ordering, room_id)) | ||
return txn.fetchone() is not None | ||
|
||
return await self.db_pool.runInteraction( | ||
"has_room_extremities_changed_since", has_room_extremities_changed_since_txn | ||
) | ||
|
||
@cancellable | ||
async def get_forward_extremities_for_room_at_stream_ordering( | ||
self, room_id: str, stream_ordering: int | ||
|
@@ -1232,10 +1261,17 @@ def get_forward_extremeties_for_room_txn(txn: LoggingTransaction) -> List[str]: | |
txn.execute(sql, (stream_ordering, room_id)) | ||
return [event_id for event_id, in txn] | ||
|
||
return await self.db_pool.runInteraction( | ||
event_ids = await self.db_pool.runInteraction( | ||
"get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn | ||
) | ||
|
||
# If we didn't find any IDs, then we must have cleared out the | ||
# associated `stream_ordering_to_exterm`. | ||
if not event_ids: | ||
raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,)) | ||
|
||
return event_ids | ||
|
||
Comment on lines
+1271
to
+1277
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this a new error condition, i.e. was it impossible for the interaction to return a falsey value before? I think before this could fail, but we'd try to keep it working for the last month's worth of stream orderings? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not, and note that the one caller treats empty list and This is and was hit when we asked for the extremities at a recent ish stream ordering where we had purged all entries in the room before that stream ordering (we'd keep around one entry per room but that might be after that). |
||
def _get_connected_batch_event_backfill_results_txn( | ||
self, txn: LoggingTransaction, insertion_event_id: str, limit: int | ||
) -> List[BackfillQueueNavigationItem]: | ||
|
@@ -1664,19 +1700,12 @@ async def get_successor_events(self, event_id: str) -> List[str]: | |
@wrap_as_background_process("delete_old_forward_extrem_cache") | ||
async def _delete_old_forward_extrem_cache(self) -> None: | ||
def _delete_old_forward_extrem_cache_txn(txn: LoggingTransaction) -> None: | ||
# Delete entries older than a month, while making sure we don't delete | ||
# the only entries for a room. | ||
Comment on lines
-1667
to
-1668
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Chesterton's Fence: do we know why this was here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've been trying to figure this out, but couldn't see any reason for it until last night. I think it may have been to protect against the case of a room that hasn't been updated in the last month, where you want to return the most recent extremities, so that the caller then becomes a no-op. Let me think how best to keep that optimisation. |
||
sql = """ | ||
DELETE FROM stream_ordering_to_exterm | ||
WHERE | ||
room_id IN ( | ||
SELECT room_id | ||
FROM stream_ordering_to_exterm | ||
WHERE stream_ordering > ? | ||
) AND stream_ordering < ? | ||
WHERE stream_ordering < ? | ||
""" | ||
txn.execute( | ||
sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago) # type: ignore[attr-defined] | ||
sql, (self.stream_ordering_month_ago) # type: ignore[attr-defined] | ||
) | ||
|
||
await self.db_pool.runInteraction( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest the name
have_room_forward_extremities_changed_since
to make it clear this doesn't care about backwards extremities