Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add receipts event stream ordering (#13703)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fizzadar authored Sep 13, 2022
1 parent fa2f3d8 commit cdbb641
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 1 deletion.
1 change: 1 addition & 0 deletions changelog.d/13703.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add & populate `event_stream_ordering` column on receipts table for future optimisation of push action processing. Contributed by Nick @ Beeper (@fizzadar).
2 changes: 2 additions & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
)
from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
from synapse.storage.databases.main.pusher import PusherWorkerStore
from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore,
find_max_generated_user_id_localpart,
Expand Down Expand Up @@ -203,6 +204,7 @@ class Store(
PushRuleStore,
PusherWorkerStore,
PresenceBackgroundUpdateStore,
ReceiptsBackgroundUpdateStore,
):
def execute(self, f: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]:
return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
Expand Down
74 changes: 73 additions & 1 deletion synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ def _insert_linearized_receipt_txn(
values={
"stream_id": stream_id,
"event_id": event_id,
"event_stream_ordering": stream_ordering,
"data": json_encoder.encode(data),
},
# receipts_linearized has a unique constraint on
Expand Down Expand Up @@ -830,5 +831,76 @@ def _insert_graph_receipt_txn(
)


class ReceiptsStore(ReceiptsWorkerStore):
class ReceiptsBackgroundUpdateStore(SQLBaseStore):
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"

def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

self.db_pool.updates.register_background_update_handler(
self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
self._populate_receipt_event_stream_ordering,
)

async def _populate_receipt_event_stream_ordering(
self, progress: JsonDict, batch_size: int
) -> int:
def _populate_receipt_event_stream_ordering_txn(
txn: LoggingTransaction,
) -> bool:

if "max_stream_id" in progress:
max_stream_id = progress["max_stream_id"]
else:
txn.execute("SELECT max(stream_id) FROM receipts_linearized")
res = txn.fetchone()
if res is None or res[0] is None:
return True
else:
max_stream_id = res[0]

start = progress.get("stream_id", 0)
stop = start + batch_size

sql = """
UPDATE receipts_linearized
SET event_stream_ordering = (
SELECT stream_ordering
FROM events
WHERE event_id = receipts_linearized.event_id
)
WHERE stream_id >= ? AND stream_id < ?
"""
txn.execute(sql, (start, stop))

self.db_pool.updates._background_update_progress_txn(
txn,
self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
{
"stream_id": stop,
"max_stream_id": max_stream_id,
},
)

return stop > max_stream_id

finished = await self.db_pool.runInteraction(
"_remove_devices_from_device_inbox_txn",
_populate_receipt_event_stream_ordering_txn,
)

if finished:
await self.db_pool.updates._end_background_update(
self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING
)

return batch_size


class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/* Copyright 2022 Beeper
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering BIGINT;

INSERT INTO background_updates (update_name, progress_json) VALUES
('populate_event_stream_ordering', '{}');

0 comments on commit cdbb641

Please sign in to comment.