Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use a sequence to generate AS transaction IDs, drop last_txn AS state #12209

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 22 additions & 60 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.types import Cursor
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import _CacheContext, cached
Expand Down Expand Up @@ -72,6 +74,21 @@ def __init__(
)
self.exclusive_user_regex = _make_exclusive_regex(self.services_cache)

def get_max_as_txn_id(txn: Cursor) -> int:
txn.execute(
"SELECT COALESCE(max(txn_id), 0) FROM application_services_txns"
)
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved
return txn.fetchone()[0] # type: ignore

self._as_txn_seq_gen = build_sequence_generator(
db_conn,
self.database_engine,
get_max_as_txn_id,
"application_services_txn_id_seq",
table="application_services_txns",
id_column="txn_id",
)

super().__init__(database, db_conn, hs)

def get_app_services(self):
Expand Down Expand Up @@ -237,21 +254,7 @@ async def create_appservice_txn(
"""

def _create_appservice_txn(txn):
# work out new txn id (highest txn id for this service += 1)
# The highest id may be the last one sent (in which case it is last_txn)
# or it may be the highest in the txns list (which are waiting to be/are
# being sent)
last_txn_id = self._get_last_txn(txn, service.id)

txn.execute(
"SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?",
(service.id,),
)
highest_txn_id = txn.fetchone()[0]
if highest_txn_id is None:
highest_txn_id = 0

new_txn_id = max(highest_txn_id, last_txn_id) + 1
new_txn_id = self._as_txn_seq_gen.get_next_id_txn(txn)

# Insert new txn into txn table
event_ids = json_encoder.encode([e.event_id for e in events])
Expand Down Expand Up @@ -285,40 +288,10 @@ async def complete_appservice_txn(
"""
txn_id = int(txn_id)

def _complete_appservice_txn(txn):
# Debugging query: Make sure the txn being completed is EXACTLY +1 from
# what was there before. If it isn't, we've got problems (e.g. the AS
# has probably missed some events), so whine loudly but still continue,
# since it shouldn't fail completion of the transaction.
last_txn_id = self._get_last_txn(txn, service.id)
if (last_txn_id + 1) != txn_id:
logger.error(
"appservice: Completing a transaction which has an ID > 1 from "
"the last ID sent to this AS. We've either dropped events or "
"sent it to the AS out of order. FIX ME. last_txn=%s "
"completing_txn=%s service_id=%s",
last_txn_id,
txn_id,
service.id,
)

# Set current txn_id for AS to 'txn_id'
self.db_pool.simple_upsert_txn(
txn,
"application_services_state",
{"as_id": service.id},
{"last_txn": txn_id},
)
Fizzadar marked this conversation as resolved.
Show resolved Hide resolved

# Delete txn
self.db_pool.simple_delete_txn(
txn,
"application_services_txns",
{"txn_id": txn_id, "as_id": service.id},
)

await self.db_pool.runInteraction(
"complete_appservice_txn", _complete_appservice_txn
await self.db_pool.simple_delete(
"application_services_txns",
{"txn_id": txn_id, "as_id": service.id},
"delete_completed_as_txn",
)

async def get_oldest_unsent_txn(
Expand Down Expand Up @@ -372,17 +345,6 @@ def _get_oldest_unsent_txn(txn):
unused_fallback_keys={},
)

def _get_last_txn(self, txn, service_id: Optional[str]) -> int:
txn.execute(
"SELECT last_txn FROM application_services_state WHERE as_id=?",
(service_id,),
)
last_txn_id = txn.fetchone()
if last_txn_id is None or last_txn_id[0] is None: # no row exists
return 0
else:
return int(last_txn_id[0]) # select 'last_txn' col

async def set_appservice_last_pos(self, pos: int) -> None:
def set_appservice_last_pos_txn(txn):
txn.execute(
Expand Down