Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor Linearizer, convert methods to async and use an async context manager #12357

Merged
merged 18 commits into from
Apr 5, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 60 additions & 58 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,21 +385,7 @@ def is_queued(self, key: Hashable) -> bool:
return bool(entry.deferreds)

async def queue(self, key: Hashable) -> ContextManager[None]:
entry = self.key_to_defer.setdefault(
key, _LinearizerEntry(0, collections.OrderedDict())
)

# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When one of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry.count >= self.max_count:
await self._await_lock(key)
else:
logger.debug(
"Acquired uncontended linearizer lock %r for key %r", self.name, key
)
entry.count += 1
entry = await self._acquire_lock(key)

# now that we have the lock, we need to return a context manager which will
# release the lock.
Expand Down Expand Up @@ -428,56 +414,72 @@ def _ctx_manager() -> Iterator[None]:

return _ctx_manager()

async def _await_lock(self, key: Hashable) -> None:
"""Helper for queue: adds a deferred to the queue

Assumes that we've already checked that we've reached the limit of the number
of lock-holders we allow. Creates a new deferred which is added to the list, and
adds some management around cancellations.
async def _acquire_lock(self, key: Hashable) -> _LinearizerEntry:
"""Acquires a linearizer lock, waiting if necessary.

Returns once we have secured the lock.
"""
entry = self.key_to_defer[key]
entry = self.key_to_defer.setdefault(
key, _LinearizerEntry(0, collections.OrderedDict())
)

# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When one of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry.count >= self.max_count:
logger.debug(
"Waiting to acquire linearizer lock %r for key %r", self.name, key
)

logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key)
new_defer: "defer.Deferred[None]" = make_deferred_yieldable(
defer.Deferred()
)
entry.deferreds[new_defer] = 1

new_defer: "defer.Deferred[None]" = make_deferred_yieldable(defer.Deferred())
entry.deferreds[new_defer] = 1
try:
await new_defer
except Exception as e:
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
logger.debug(
"Cancelling wait for linearizer lock %r for key %r",
self.name,
key,
)
else:
logger.warning(
"Unexpected exception waiting for linearizer lock %r for key "
"%r",
self.name,
key,
)

try:
await new_defer
except Exception as e:
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
logger.debug(
"Cancelling wait for linearizer lock %r for key %r", self.name, key
)
# we just have to take ourselves back out of the queue.
del entry.deferreds[new_defer]
raise

else:
logger.warning(
"Unexpected exception waiting for linearizer lock %r for key %r",
self.name,
key,
)

# we just have to take ourselves back out of the queue.
del entry.deferreds[new_defer]
raise

logger.debug("Acquired linearizer lock %r for key %r", self.name, key)
entry.count += 1

# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (This needs to happen while we hold the lock, and the context manager's exit
# code must be synchronous, so this is the only sensible place.)
await self._clock.sleep(0)
logger.debug("Acquired linearizer lock %r for key %r", self.name, key)
entry.count += 1

# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (This needs to happen while we hold the lock, and the context manager's
# exit code must be synchronous, so this is the only sensible place.)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's worth noting this is no longer strictly true - the context manager's __aexit__ doesnt have to be synchronous, so the sleep could happen there instead.

Not sure it's worth changing, just noting that the comment is incorrect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good spot! I'll update the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that the linearizer sleep uses a different reactor to the homeserver, but only during testing. So tests would start failing if we did a sleep on every __aexit__.

await self._clock.sleep(0)
else:
logger.debug(
"Acquired uncontended linearizer lock %r for key %r", self.name, key
)
entry.count += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it'd be nice to do an early return here, rather than have a big if block which is hard to follow.


return entry


class ReadWriteLock:
Expand Down