Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix rare bug that broke looping calls
Browse files Browse the repository at this point in the history
We can't interact with the reactor from the main thread via looping
call.

Introduced in v1.90.0 / #15791.
  • Loading branch information
erikjohnston committed Aug 30, 2023
1 parent 62a1a9b commit 103fb1a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 14 deletions.
36 changes: 22 additions & 14 deletions synapse/storage/databases/main/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type
from weakref import WeakValueDictionary

from twisted.internet.interfaces import IReactorCore
from twisted.internet.task import LoopingCall

from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
Expand All @@ -26,6 +26,7 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -358,7 +359,7 @@ class Lock:

def __init__(
self,
reactor: IReactorCore,
reactor: ISynapseReactor,
clock: Clock,
store: LockStore,
read_write: bool,
Expand All @@ -377,19 +378,25 @@ def __init__(

self._table = "worker_read_write_locks" if read_write else "worker_locks"

self._looping_call = clock.looping_call(
# We might be called from a non-main thread, so we defer setting up the
# looping call.
self._looping_call: Optional[LoopingCall] = None
reactor.callFromThread(self._setup_looping_call)

self._dropped = False

def _setup_looping_call(self) -> None:
self._looping_call = self._clock.looping_call(
self._renew,
_RENEWAL_INTERVAL_MS,
store,
clock,
read_write,
lock_name,
lock_key,
token,
self._store,
self._clock,
self._read_write,
self._lock_name,
self._lock_key,
self._token,
)

self._dropped = False

@staticmethod
@wrap_as_background_process("Lock._renew")
async def _renew(
Expand Down Expand Up @@ -459,7 +466,7 @@ async def release(self) -> None:
if self._dropped:
return

if self._looping_call.running:
if self._looping_call and self._looping_call.running:
self._looping_call.stop()

await self._store.db_pool.simple_delete(
Expand All @@ -486,8 +493,9 @@ def __del__(self) -> None:
# We should not be dropped without the lock being released (unless
# we're shutting down), but if we are then let's at least stop
# renewing the lock.
if self._looping_call.running:
self._looping_call.stop()
if self._looping_call and self._looping_call.running:
# We might be called from a non-main thread.
self._reactor.callFromThread(self._looping_call.stop)

if self._reactor.running:
logger.error(
Expand Down
2 changes: 2 additions & 0 deletions tests/storage/databases/main/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def test_timeout_lock(self) -> None:

# We simulate the process getting stuck by cancelling the looping call
# that keeps the lock active.
assert lock._looping_call
lock._looping_call.stop()

# Wait for the lock to timeout.
Expand Down Expand Up @@ -403,6 +404,7 @@ def test_timeout_lock(self) -> None:

# We simulate the process getting stuck by cancelling the looping call
# that keeps the lock active.
assert lock._looping_call
lock._looping_call.stop()

# Wait for the lock to timeout.
Expand Down

0 comments on commit 103fb1a

Please sign in to comment.