From 9278725e922ae513a53a258b48f827ef323b1552 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Nov 2021 13:44:48 +0000 Subject: [PATCH 1/3] Make lock better handle process being killed If the process gets killed and restarted (so that it didn't have a chance to drop its locks gracefully) then there may still be locks in the DB that are for the same instance that haven't yet timed out but are safe to delete. We handle this case by a) checking if the current instance already has taken out the lock, and b) if not then ignoring locks that are for the same instance. --- synapse/storage/databases/main/lock.py | 31 +++++++++++++++++--------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 3d1dff660bd9..3d0df0cbd430 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -14,6 +14,7 @@ import logging from types import TracebackType from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type +from weakref import WeakValueDictionary from twisted.internet.interfaces import IReactorCore @@ -61,7 +62,7 @@ def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer" # A map from `(lock_name, lock_key)` to the token of any locks that we # think we currently hold. - self._live_tokens: Dict[Tuple[str, str], str] = {} + self._live_tokens: Dict[Tuple[str, str], Lock] = WeakValueDictionary() # When we shut down we want to remove the locks. Technically this can # lead to a race, as we may drop the lock while we are still processing. @@ -80,10 +81,10 @@ async def _on_shutdown(self) -> None: # We need to take a copy of the tokens dict as dropping the locks will # cause the dictionary to change. - tokens = dict(self._live_tokens) + locks = dict(self._live_tokens) - for (lock_name, lock_key), token in tokens.items(): - await self._drop_lock(lock_name, lock_key, token) + for lock in locks.values(): + await lock.release() logger.info("Dropped locks due to shutdown") @@ -93,6 +94,11 @@ async def try_acquire_lock(self, lock_name: str, lock_key: str) -> Optional["Loc used (otherwise the lock will leak). """ + # Check if this process has taken out a lock and if it's still valid. + lock = self._live_tokens.get((lock_name, lock_key)) + if lock and await lock.is_still_valid(): + return None + now = self._clock.time_msec() token = random_string(6) @@ -100,7 +106,9 @@ async def try_acquire_lock(self, lock_name: str, lock_key: str) -> Optional["Loc def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool: # We take out the lock if either a) there is no row for the lock - # already or b) the existing row has timed out. + # already, b) the existing row has timed out, or c) the row is + # for this instance (which means the process got killed and + # restarted) sql = """ INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts) VALUES (?, ?, ?, ?, ?) @@ -112,6 +120,7 @@ def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool: last_renewed_ts = EXCLUDED.last_renewed_ts WHERE worker_locks.last_renewed_ts < ? + OR worker_locks.instance_name = EXCLUDED.instance_name """ txn.execute( sql, @@ -148,11 +157,11 @@ def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool: WHERE lock_name = ? AND lock_key = ? - AND last_renewed_ts < ? + AND (last_renewed_ts < ? OR instance_name = ?) """ txn.execute( sql, - (lock_name, lock_key, now - _LOCK_TIMEOUT_MS), + (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name), ) inserted = self.db_pool.simple_upsert_txn_emulated( @@ -179,9 +188,7 @@ def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool: if not did_lock: return None - self._live_tokens[(lock_name, lock_key)] = token - - return Lock( + lock = Lock( self._reactor, self._clock, self, @@ -190,6 +197,10 @@ def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool: token=token, ) + self._live_tokens[(lock_name, lock_key)] = lock + + return lock + async def _is_lock_still_valid( self, lock_name: str, lock_key: str, token: str ) -> bool: From 26f55fbf04244cdedcef54d078e9923087fed5d9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Nov 2021 13:56:10 +0000 Subject: [PATCH 2/3] Periodically check for old staged events This is to protect against other instances dying and their locks timing out. --- synapse/federation/federation_server.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 42e3acecb442..9a8758e9a6d3 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -213,6 +213,11 @@ async def on_incoming_transaction( self._started_handling_of_staged_events = True self._handle_old_staged_events() + # Start a periodic check for old staged events. This is to handle + # the case where locks time out, e.g. if another process gets killed + # without dropping its locks. + self._clock.looping_call(self._handle_old_staged_events, 60 * 1000) + # keep this as early as possible to make the calculated origin ts as # accurate as possible. request_time = self._clock.time_msec() From 6a3f45e7793cc5d1012ce5bda433ad855c07656e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Nov 2021 14:03:37 +0000 Subject: [PATCH 3/3] Newsfile --- changelog.d/11262.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11262.bugfix diff --git a/changelog.d/11262.bugfix b/changelog.d/11262.bugfix new file mode 100644 index 000000000000..768fbb897380 --- /dev/null +++ b/changelog.d/11262.bugfix @@ -0,0 +1 @@ +Fix a bug where if a remote event is being processed by a worker when it gets killed then it won't get processed on restart. Introduced in v1.37.1.