Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor Linearizer, convert methods to async and use an async context manager #12357

Merged
merged 18 commits into from
Apr 5, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 47 additions & 50 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,61 +402,58 @@ async def _acquire_lock(self, key: Hashable) -> _LinearizerEntry:
key, _LinearizerEntry(0, collections.OrderedDict())
)

# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When one of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry.count >= self.max_count:
if entry.count < self.max_count:
# The number of things executing is less than the maximum.
logger.debug(
"Waiting to acquire linearizer lock %r for key %r", self.name, key
)

new_defer: "defer.Deferred[None]" = make_deferred_yieldable(
defer.Deferred()
"Acquired uncontended linearizer lock %r for key %r", self.name, key
)
entry.deferreds[new_defer] = 1

try:
await new_defer
except Exception as e:
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
logger.debug(
"Cancelling wait for linearizer lock %r for key %r",
self.name,
key,
)
else:
logger.warning(
"Unexpected exception waiting for linearizer lock %r for key "
"%r",
self.name,
key,
)
entry.count += 1
return entry

# we just have to take ourselves back out of the queue.
del entry.deferreds[new_defer]
raise
# Otherwise, the number of things executing is at the maximum and we have to
# add a deferred to the list of blocked items.
# When one of the things currently executing finishes it will callback
# this item so that it can continue executing.
logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key)

logger.debug("Acquired linearizer lock %r for key %r", self.name, key)
entry.count += 1
new_defer: "defer.Deferred[None]" = make_deferred_yieldable(defer.Deferred())
entry.deferreds[new_defer] = 1

# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (This needs to happen while we hold the lock, and the context manager's
# exit code must be synchronous, so this is the only sensible place.)
await self._clock.sleep(0)
else:
logger.debug(
"Acquired uncontended linearizer lock %r for key %r", self.name, key
)
entry.count += 1
try:
await new_defer
except Exception as e:
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
logger.debug(
"Cancelling wait for linearizer lock %r for key %r",
self.name,
key,
)
else:
logger.warning(
"Unexpected exception waiting for linearizer lock %r for key " "%r",
squahtx marked this conversation as resolved.
Show resolved Hide resolved
self.name,
key,
)

# we just have to take ourselves back out of the queue.
del entry.deferreds[new_defer]
raise

logger.debug("Acquired linearizer lock %r for key %r", self.name, key)
entry.count += 1

# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# This needs to happen while we hold the lock. We could put it on the
# exit path, but that would slow down the uncontended case.
await self._clock.sleep(0)

return entry

Expand Down