From e0c1444e569851a25cfa4b353cfa185fc65b5f10 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 13:52:46 +0100 Subject: [PATCH 01/17] Add docstrings to `Linearizer` test cases Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index c4a3917b2301..19ecd72339fb 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -26,6 +26,7 @@ class LinearizerTestCase(unittest.TestCase): @defer.inlineCallbacks def test_linearizer(self): + """Tests that a task is queued up behind an earlier task.""" linearizer = Linearizer() key = object() @@ -44,6 +45,10 @@ def test_linearizer(self): @defer.inlineCallbacks def test_linearizer_is_queued(self): + """Tests `Linearizer.is_queued`. + + Runs through the same scenario as `test_linearizer`. + """ linearizer = Linearizer() key = object() @@ -75,8 +80,10 @@ def test_linearizer_is_queued(self): self.assertFalse(linearizer.is_queued(key)) def test_lots_of_queued_things(self): - # we have one slow thing, and lots of fast things queued up behind it. - # it should *not* explode the stack. + """Tests lots of fast things queued up behind a slow thing. + + The stack should *not* explode when the fast thing completes. + """ linearizer = Linearizer() @defer.inlineCallbacks @@ -97,6 +104,7 @@ def func(i, sleep=False): @defer.inlineCallbacks def test_multiple_entries(self): + """Tests a `Linearizer` with a concurrency above 1.""" limiter = Linearizer(max_count=3) key = object() @@ -143,6 +151,7 @@ def test_multiple_entries(self): @defer.inlineCallbacks def test_cancellation(self): + """Tests cancellation while waiting for a `Linearizer`.""" linearizer = Linearizer() key = object() From 071618db91a6a73a1c9252d1646257e120cd4f14 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 13:58:39 +0100 Subject: [PATCH 02/17] Add comments to `Linearizer` tests Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index 19ecd72339fb..896cafa29e71 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -37,6 +37,7 @@ def test_linearizer(self): d2 = linearizer.queue(key) self.assertFalse(d2.called) + # Once the first task is done, the second task can continue. with cm1: self.assertFalse(d2.called) @@ -56,13 +57,14 @@ def test_linearizer_is_queued(self): d1 = linearizer.queue(key) cm1 = yield d1 - # Since d1 gets called immediately, "is_queued" should return false. + # Since the first task acquires the lock immediately, "is_queued" should return + # false. self.assertFalse(linearizer.is_queued(key)) d2 = linearizer.queue(key) self.assertFalse(d2.called) - # Now d2 is queued up behind successful completion of cm1 + # Now the second task is queued up behind the first. self.assertTrue(linearizer.is_queued(key)) with cm1: @@ -71,7 +73,7 @@ def test_linearizer_is_queued(self): # cm1 still not done, so d2 still queued. self.assertTrue(linearizer.is_queued(key)) - # And now d2 is called and nothing is in the queue again + # And now the second task acquires the lock and nothing is in the queue again. self.assertFalse(linearizer.is_queued(key)) with (yield d2): @@ -118,12 +120,14 @@ def test_multiple_entries(self): d3 = limiter.queue(key) cm3 = yield d3 + # These next two tasks have to wait. d4 = limiter.queue(key) self.assertFalse(d4.called) d5 = limiter.queue(key) self.assertFalse(d5.called) + # Once the first task completes, the fourth task can continue. with cm1: self.assertFalse(d4.called) self.assertFalse(d5.called) @@ -131,11 +135,13 @@ def test_multiple_entries(self): cm4 = yield d4 self.assertFalse(d5.called) + # Once the third task completes, the fifth task can continue. with cm3: self.assertFalse(d5.called) cm5 = yield d5 + # Make all tasks finish. with cm2: pass @@ -145,6 +151,7 @@ def test_multiple_entries(self): with cm5: pass + # The next task shouldn't have to wait. d6 = limiter.queue(key) with (yield d6): pass @@ -159,12 +166,15 @@ def test_cancellation(self): d1 = linearizer.queue(key) cm1 = yield d1 + # Create a second task, waiting for the first task. d2 = linearizer.queue(key) self.assertFalse(d2.called) + # Create a third task, waiting for the second task. d3 = linearizer.queue(key) self.assertFalse(d3.called) + # Cancel the waiting second task. d2.cancel() with cm1: @@ -177,5 +187,6 @@ def test_cancellation(self): except CancelledError: pass + # The third task should continue running. with (yield d3): pass From 45ce571012904a4ca00942061c79c11ffe3909b7 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 14:01:41 +0100 Subject: [PATCH 03/17] Add helper methods to create a linearized task and pump the reactor Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 45 ++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index 896cafa29e71..c32101c9f1f3 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -13,8 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import Callable, Hashable, Tuple + from twisted.internet import defer, reactor -from twisted.internet.defer import CancelledError +from twisted.internet.base import ReactorBase +from twisted.internet.defer import CancelledError, Deferred from synapse.logging.context import LoggingContext, current_context from synapse.util import Clock @@ -24,6 +27,46 @@ class LinearizerTestCase(unittest.TestCase): + def _start_task( + self, linearizer: Linearizer, key: Hashable + ) -> Tuple["Deferred[None]", "Deferred[None]", Callable[[], None]]: + """Starts a task which acquires the linearizer lock, blocks, then completes. + + Args: + linearizer: The `Linearizer`. + key: The `Linearizer` key. + + Returns: + A tuple containing: + * A cancellable `Deferred` for the entire task. + * A `Deferred` that resolves once the task acquires the lock. + * A function that unblocks the task. Must be called by the caller + to allow the task to release the lock and complete. + """ + acquired_d: "Deferred[None]" = Deferred() + unblock_d: "Deferred[None]" = Deferred() + + async def task() -> None: + with await linearizer.queue(key): + acquired_d.callback(None) + await unblock_d + + d = defer.ensureDeferred(task()) + + def unblock() -> None: + unblock_d.callback(None) + # The next task, if it exists, will acquire the lock and require a kick of + # the reactor to advance. + self._pump() + + return d, acquired_d, unblock + + def _pump(self) -> None: + """Pump the reactor to advance `Linearizer`s.""" + assert isinstance(reactor, ReactorBase) + while reactor.getDelayedCalls(): + reactor.runUntilCurrent() + @defer.inlineCallbacks def test_linearizer(self): """Tests that a task is queued up behind an earlier task.""" From 7842add1052f81193b154547ecd55ec5a78de09f Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 14:05:24 +0100 Subject: [PATCH 04/17] Convert `Linearizer` tests from `inlineCallbacks` to async Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 142 +++++++++++++++------------------- 1 file changed, 62 insertions(+), 80 deletions(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index c32101c9f1f3..077ce3bc8784 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -19,8 +19,11 @@ from twisted.internet.base import ReactorBase from twisted.internet.defer import CancelledError, Deferred -from synapse.logging.context import LoggingContext, current_context -from synapse.util import Clock +from synapse.logging.context import ( + LoggingContext, + current_context, + make_deferred_yieldable, +) from synapse.util.async_helpers import Linearizer from tests import unittest @@ -67,27 +70,24 @@ def _pump(self) -> None: while reactor.getDelayedCalls(): reactor.runUntilCurrent() - @defer.inlineCallbacks def test_linearizer(self): """Tests that a task is queued up behind an earlier task.""" linearizer = Linearizer() key = object() - d1 = linearizer.queue(key) - cm1 = yield d1 + _, acquired_d1, unblock1 = self._start_task(linearizer, key) + self.assertTrue(acquired_d1.called) - d2 = linearizer.queue(key) - self.assertFalse(d2.called) + _, acquired_d2, unblock2 = self._start_task(linearizer, key) + self.assertFalse(acquired_d2.called) # Once the first task is done, the second task can continue. - with cm1: - self.assertFalse(d2.called) + unblock1() + self.assertTrue(acquired_d2.called) - with (yield d2): - pass + unblock2() - @defer.inlineCallbacks def test_linearizer_is_queued(self): """Tests `Linearizer.is_queued`. @@ -97,31 +97,26 @@ def test_linearizer_is_queued(self): key = object() - d1 = linearizer.queue(key) - cm1 = yield d1 + _, acquired_d1, unblock1 = self._start_task(linearizer, key) + self.assertTrue(acquired_d1.called) # Since the first task acquires the lock immediately, "is_queued" should return # false. self.assertFalse(linearizer.is_queued(key)) - d2 = linearizer.queue(key) - self.assertFalse(d2.called) + _, acquired_d2, unblock2 = self._start_task(linearizer, key) + self.assertFalse(acquired_d2.called) # Now the second task is queued up behind the first. self.assertTrue(linearizer.is_queued(key)) - with cm1: - self.assertFalse(d2.called) - - # cm1 still not done, so d2 still queued. - self.assertTrue(linearizer.is_queued(key)) + unblock1() # And now the second task acquires the lock and nothing is in the queue again. + self.assertTrue(acquired_d2.called) self.assertFalse(linearizer.is_queued(key)) - with (yield d2): - self.assertFalse(linearizer.is_queued(key)) - + unblock2() self.assertFalse(linearizer.is_queued(key)) def test_lots_of_queued_things(self): @@ -130,106 +125,93 @@ def test_lots_of_queued_things(self): The stack should *not* explode when the fast thing completes. """ linearizer = Linearizer() + key = "" - @defer.inlineCallbacks - def func(i, sleep=False): + async def func(i, wait_for=None) -> None: with LoggingContext("func(%s)" % i) as lc: - with (yield linearizer.queue("")): + with (await linearizer.queue(key)): self.assertEqual(current_context(), lc) - if sleep: - yield Clock(reactor).sleep(0) + if wait_for: + await make_deferred_yieldable(wait_for) self.assertEqual(current_context(), lc) - func(0, sleep=True) + _, _, unblock = self._start_task(linearizer, key) for i in range(1, 100): - func(i) + defer.ensureDeferred(func(i)) - return func(1000) + d = defer.ensureDeferred(func(1000)) + unblock() + self.successResultOf(d) - @defer.inlineCallbacks def test_multiple_entries(self): """Tests a `Linearizer` with a concurrency above 1.""" limiter = Linearizer(max_count=3) key = object() - d1 = limiter.queue(key) - cm1 = yield d1 + _, acquired_d1, unblock1 = self._start_task(limiter, key) + self.assertTrue(acquired_d1.called) - d2 = limiter.queue(key) - cm2 = yield d2 + _, acquired_d2, unblock2 = self._start_task(limiter, key) + self.assertTrue(acquired_d2.called) - d3 = limiter.queue(key) - cm3 = yield d3 + _, acquired_d3, unblock3 = self._start_task(limiter, key) + self.assertTrue(acquired_d3.called) # These next two tasks have to wait. - d4 = limiter.queue(key) - self.assertFalse(d4.called) + _, acquired_d4, unblock4 = self._start_task(limiter, key) + self.assertFalse(acquired_d4.called) - d5 = limiter.queue(key) - self.assertFalse(d5.called) + _, acquired_d5, unblock5 = self._start_task(limiter, key) + self.assertFalse(acquired_d5.called) # Once the first task completes, the fourth task can continue. - with cm1: - self.assertFalse(d4.called) - self.assertFalse(d5.called) - - cm4 = yield d4 - self.assertFalse(d5.called) + unblock1() + self.assertTrue(acquired_d4.called) + self.assertFalse(acquired_d5.called) # Once the third task completes, the fifth task can continue. - with cm3: - self.assertFalse(d5.called) - - cm5 = yield d5 + unblock3() + self.assertTrue(acquired_d5.called) # Make all tasks finish. - with cm2: - pass - - with cm4: - pass - - with cm5: - pass + unblock2() + unblock4() + unblock5() # The next task shouldn't have to wait. - d6 = limiter.queue(key) - with (yield d6): - pass + _, acquired_d6, unblock6 = self._start_task(limiter, key) + self.assertTrue(acquired_d6) + unblock6() - @defer.inlineCallbacks def test_cancellation(self): """Tests cancellation while waiting for a `Linearizer`.""" linearizer = Linearizer() key = object() - d1 = linearizer.queue(key) - cm1 = yield d1 + d1, acquired_d1, unblock1 = self._start_task(linearizer, key) + self.assertTrue(acquired_d1.called) # Create a second task, waiting for the first task. - d2 = linearizer.queue(key) - self.assertFalse(d2.called) + d2, acquired_d2, _ = self._start_task(linearizer, key) + self.assertFalse(acquired_d2.called) # Create a third task, waiting for the second task. - d3 = linearizer.queue(key) - self.assertFalse(d3.called) + d3, acquired_d3, unblock3 = self._start_task(linearizer, key) + self.assertFalse(acquired_d3.called) # Cancel the waiting second task. d2.cancel() - with cm1: - pass + unblock1() + self.successResultOf(d1) self.assertTrue(d2.called) - try: - yield d2 - self.fail("Expected d2 to raise CancelledError") - except CancelledError: - pass + self.failureResultOf(d2, CancelledError) # The third task should continue running. - with (yield d3): - pass + self.assertTrue(acquired_d3.called) + unblock3() + self.successResultOf(d3) From 4cca457fe86d66cce0ec6b12b5728ce7ffba5240 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 14:07:41 +0100 Subject: [PATCH 05/17] Add missing type hints to `Linearizer` tests Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index 077ce3bc8784..bd0259ef0d4b 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Hashable, Tuple +from typing import Callable, Hashable, Optional, Tuple from twisted.internet import defer, reactor from twisted.internet.base import ReactorBase @@ -70,7 +70,7 @@ def _pump(self) -> None: while reactor.getDelayedCalls(): reactor.runUntilCurrent() - def test_linearizer(self): + def test_linearizer(self) -> None: """Tests that a task is queued up behind an earlier task.""" linearizer = Linearizer() @@ -88,7 +88,7 @@ def test_linearizer(self): unblock2() - def test_linearizer_is_queued(self): + def test_linearizer_is_queued(self) -> None: """Tests `Linearizer.is_queued`. Runs through the same scenario as `test_linearizer`. @@ -119,7 +119,7 @@ def test_linearizer_is_queued(self): unblock2() self.assertFalse(linearizer.is_queued(key)) - def test_lots_of_queued_things(self): + def test_lots_of_queued_things(self) -> None: """Tests lots of fast things queued up behind a slow thing. The stack should *not* explode when the fast thing completes. @@ -127,7 +127,7 @@ def test_lots_of_queued_things(self): linearizer = Linearizer() key = "" - async def func(i, wait_for=None) -> None: + async def func(i: int, wait_for: Optional["Deferred[None]"] = None) -> None: with LoggingContext("func(%s)" % i) as lc: with (await linearizer.queue(key)): self.assertEqual(current_context(), lc) @@ -144,7 +144,7 @@ async def func(i, wait_for=None) -> None: unblock() self.successResultOf(d) - def test_multiple_entries(self): + def test_multiple_entries(self) -> None: """Tests a `Linearizer` with a concurrency above 1.""" limiter = Linearizer(max_count=3) @@ -185,7 +185,7 @@ def test_multiple_entries(self): self.assertTrue(acquired_d6) unblock6() - def test_cancellation(self): + def test_cancellation(self) -> None: """Tests cancellation while waiting for a `Linearizer`.""" linearizer = Linearizer() From 5bfb04d043eedae1c926499c4e834ecf13e006db Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 14:17:21 +0100 Subject: [PATCH 06/17] Add newsfile --- changelog.d/12353.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12353.misc diff --git a/changelog.d/12353.misc b/changelog.d/12353.misc new file mode 100644 index 000000000000..1d681fb0e3a1 --- /dev/null +++ b/changelog.d/12353.misc @@ -0,0 +1 @@ +Convert `Linearizer` tests from `inlineCallbacks` to async. From ba5839c23a490684b10546c13bfb9219974aec76 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 18:54:37 +0100 Subject: [PATCH 07/17] fixup typo in `test_lots_of_queued_things` docstring Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index bd0259ef0d4b..93d576c8a459 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -122,7 +122,7 @@ def test_linearizer_is_queued(self) -> None: def test_lots_of_queued_things(self) -> None: """Tests lots of fast things queued up behind a slow thing. - The stack should *not* explode when the fast thing completes. + The stack should *not* explode when the slow thing completes. """ linearizer = Linearizer() key = "" From 632bd38a004107cebdbafea3293334a61d573e1b Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 18:06:30 +0100 Subject: [PATCH 08/17] fixup: add helpful message to assert Signed-off-by: Sean Quah --- tests/util/test_linearizer.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index 93d576c8a459..b6bc8a39e0f0 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -212,6 +212,9 @@ def test_cancellation(self) -> None: self.failureResultOf(d2, CancelledError) # The third task should continue running. - self.assertTrue(acquired_d3.called) + self.assertTrue( + acquired_d3.called, + "Third task did not get the lock after the second task was cancelled", + ) unblock3() self.successResultOf(d3) From ac802343b1ad8e881b4d71971fe3d416dbbdeaff Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 18:00:59 +0100 Subject: [PATCH 09/17] Convert `Linearizer` methods to async Signed-off-by: Sean Quah --- synapse/util/async_helpers.py | 61 ++++++++++++++++------------------- 1 file changed, 27 insertions(+), 34 deletions(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 6a8e844d6365..663451ba34a9 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -25,6 +25,7 @@ Awaitable, Callable, Collection, + ContextManager, Dict, Generic, Hashable, @@ -383,10 +384,7 @@ def is_queued(self, key: Hashable) -> bool: # non-empty. return bool(entry.deferreds) - def queue(self, key: Hashable) -> defer.Deferred: - # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly. - # (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not - # propagated inside inlineCallbacks until Twisted 18.7) + async def queue(self, key: Hashable) -> ContextManager[None]: entry = self.key_to_defer.setdefault( key, _LinearizerEntry(0, collections.OrderedDict()) ) @@ -396,19 +394,18 @@ def queue(self, key: Hashable) -> defer.Deferred: # When one of the things currently executing finishes it will callback # this item so that it can continue executing. if entry.count >= self.max_count: - res = self._await_lock(key) + await self._await_lock(key) else: logger.debug( "Acquired uncontended linearizer lock %r for key %r", self.name, key ) entry.count += 1 - res = defer.succeed(None) - # once we successfully get the lock, we need to return a context manager which - # will release the lock. + # now that we have the lock, we need to return a context manager which will + # release the lock. @contextmanager - def _ctx_manager(_: None) -> Iterator[None]: + def _ctx_manager() -> Iterator[None]: try: yield finally: @@ -429,18 +426,16 @@ def _ctx_manager(_: None) -> Iterator[None]: # map. del self.key_to_defer[key] - res.addCallback(_ctx_manager) - return res + return _ctx_manager() - def _await_lock(self, key: Hashable) -> defer.Deferred: + async def _await_lock(self, key: Hashable) -> None: """Helper for queue: adds a deferred to the queue Assumes that we've already checked that we've reached the limit of the number of lock-holders we allow. Creates a new deferred which is added to the list, and adds some management around cancellations. - Returns the deferred, which will callback once we have secured the lock. - + Returns once we have secured the lock. """ entry = self.key_to_defer[key] @@ -449,23 +444,9 @@ def _await_lock(self, key: Hashable) -> defer.Deferred: new_defer: "defer.Deferred[None]" = make_deferred_yieldable(defer.Deferred()) entry.deferreds[new_defer] = 1 - def cb(_r: None) -> "defer.Deferred[None]": - logger.debug("Acquired linearizer lock %r for key %r", self.name, key) - entry.count += 1 - - # if the code holding the lock completes synchronously, then it - # will recursively run the next claimant on the list. That can - # relatively rapidly lead to stack exhaustion. This is essentially - # the same problem as http://twistedmatrix.com/trac/ticket/9304. - # - # In order to break the cycle, we add a cheeky sleep(0) here to - # ensure that we fall back to the reactor between each iteration. - # - # (This needs to happen while we hold the lock, and the context manager's exit - # code must be synchronous, so this is the only sensible place.) - return self._clock.sleep(0) - - def eb(e: Failure) -> Failure: + try: + await new_defer + except Exception as e: logger.info("defer %r got err %r", new_defer, e) if isinstance(e, CancelledError): logger.debug( @@ -481,10 +462,22 @@ def eb(e: Failure) -> Failure: # we just have to take ourselves back out of the queue. del entry.deferreds[new_defer] - return e + raise + + logger.debug("Acquired linearizer lock %r for key %r", self.name, key) + entry.count += 1 - new_defer.addCallbacks(cb, eb) - return new_defer + # if the code holding the lock completes synchronously, then it + # will recursively run the next claimant on the list. That can + # relatively rapidly lead to stack exhaustion. This is essentially + # the same problem as http://twistedmatrix.com/trac/ticket/9304. + # + # In order to break the cycle, we add a cheeky sleep(0) here to + # ensure that we fall back to the reactor between each iteration. + # + # (This needs to happen while we hold the lock, and the context manager's exit + # code must be synchronous, so this is the only sensible place.) + await self._clock.sleep(0) class ReadWriteLock: From f96583dd8d6a2db976423c6eb7db93cd308ae5c5 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 18:22:39 +0100 Subject: [PATCH 10/17] Refactor `Linearizer._await_lock`, moving preconditions inside Signed-off-by: Sean Quah --- synapse/util/async_helpers.py | 118 +++++++++++++++++----------------- 1 file changed, 60 insertions(+), 58 deletions(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 663451ba34a9..2c47bb07a6ae 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -385,21 +385,7 @@ def is_queued(self, key: Hashable) -> bool: return bool(entry.deferreds) async def queue(self, key: Hashable) -> ContextManager[None]: - entry = self.key_to_defer.setdefault( - key, _LinearizerEntry(0, collections.OrderedDict()) - ) - - # If the number of things executing is greater than the maximum - # then add a deferred to the list of blocked items - # When one of the things currently executing finishes it will callback - # this item so that it can continue executing. - if entry.count >= self.max_count: - await self._await_lock(key) - else: - logger.debug( - "Acquired uncontended linearizer lock %r for key %r", self.name, key - ) - entry.count += 1 + entry = await self._acquire_lock(key) # now that we have the lock, we need to return a context manager which will # release the lock. @@ -428,56 +414,72 @@ def _ctx_manager() -> Iterator[None]: return _ctx_manager() - async def _await_lock(self, key: Hashable) -> None: - """Helper for queue: adds a deferred to the queue - - Assumes that we've already checked that we've reached the limit of the number - of lock-holders we allow. Creates a new deferred which is added to the list, and - adds some management around cancellations. + async def _acquire_lock(self, key: Hashable) -> _LinearizerEntry: + """Acquires a linearizer lock, waiting if necessary. Returns once we have secured the lock. """ - entry = self.key_to_defer[key] + entry = self.key_to_defer.setdefault( + key, _LinearizerEntry(0, collections.OrderedDict()) + ) + + # If the number of things executing is greater than the maximum + # then add a deferred to the list of blocked items + # When one of the things currently executing finishes it will callback + # this item so that it can continue executing. + if entry.count >= self.max_count: + logger.debug( + "Waiting to acquire linearizer lock %r for key %r", self.name, key + ) - logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key) + new_defer: "defer.Deferred[None]" = make_deferred_yieldable( + defer.Deferred() + ) + entry.deferreds[new_defer] = 1 - new_defer: "defer.Deferred[None]" = make_deferred_yieldable(defer.Deferred()) - entry.deferreds[new_defer] = 1 + try: + await new_defer + except Exception as e: + logger.info("defer %r got err %r", new_defer, e) + if isinstance(e, CancelledError): + logger.debug( + "Cancelling wait for linearizer lock %r for key %r", + self.name, + key, + ) + else: + logger.warning( + "Unexpected exception waiting for linearizer lock %r for key " + "%r", + self.name, + key, + ) - try: - await new_defer - except Exception as e: - logger.info("defer %r got err %r", new_defer, e) - if isinstance(e, CancelledError): - logger.debug( - "Cancelling wait for linearizer lock %r for key %r", self.name, key - ) + # we just have to take ourselves back out of the queue. + del entry.deferreds[new_defer] + raise - else: - logger.warning( - "Unexpected exception waiting for linearizer lock %r for key %r", - self.name, - key, - ) - - # we just have to take ourselves back out of the queue. - del entry.deferreds[new_defer] - raise - - logger.debug("Acquired linearizer lock %r for key %r", self.name, key) - entry.count += 1 - - # if the code holding the lock completes synchronously, then it - # will recursively run the next claimant on the list. That can - # relatively rapidly lead to stack exhaustion. This is essentially - # the same problem as http://twistedmatrix.com/trac/ticket/9304. - # - # In order to break the cycle, we add a cheeky sleep(0) here to - # ensure that we fall back to the reactor between each iteration. - # - # (This needs to happen while we hold the lock, and the context manager's exit - # code must be synchronous, so this is the only sensible place.) - await self._clock.sleep(0) + logger.debug("Acquired linearizer lock %r for key %r", self.name, key) + entry.count += 1 + + # if the code holding the lock completes synchronously, then it + # will recursively run the next claimant on the list. That can + # relatively rapidly lead to stack exhaustion. This is essentially + # the same problem as http://twistedmatrix.com/trac/ticket/9304. + # + # In order to break the cycle, we add a cheeky sleep(0) here to + # ensure that we fall back to the reactor between each iteration. + # + # (This needs to happen while we hold the lock, and the context manager's + # exit code must be synchronous, so this is the only sensible place.) + await self._clock.sleep(0) + else: + logger.debug( + "Acquired uncontended linearizer lock %r for key %r", self.name, key + ) + entry.count += 1 + + return entry class ReadWriteLock: From adf5d141519ffed5968fd04aa9879191ec4033f7 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 18:26:31 +0100 Subject: [PATCH 11/17] Factor out a `Linearizer._release_lock()` method. Signed-off-by: Sean Quah --- synapse/util/async_helpers.py | 36 +++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 2c47bb07a6ae..ece702ccf6d6 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -395,22 +395,7 @@ def _ctx_manager() -> Iterator[None]: try: yield finally: - logger.debug("Releasing linearizer lock %r for key %r", self.name, key) - - # We've finished executing so check if there are any things - # blocked waiting to execute and start one of them - entry.count -= 1 - - if entry.deferreds: - (next_def, _) = entry.deferreds.popitem(last=False) - - # we need to run the next thing in the sentinel context. - with PreserveLoggingContext(): - next_def.callback(None) - elif entry.count == 0: - # We were the last thing for this key: remove it from the - # map. - del self.key_to_defer[key] + self._release_lock(key, entry) return _ctx_manager() @@ -481,6 +466,25 @@ async def _acquire_lock(self, key: Hashable) -> _LinearizerEntry: return entry + def _release_lock(self, key: Hashable, entry: _LinearizerEntry) -> None: + """Releases a held linearizer lock.""" + logger.debug("Releasing linearizer lock %r for key %r", self.name, key) + + # We've finished executing so check if there are any things + # blocked waiting to execute and start one of them + entry.count -= 1 + + if entry.deferreds: + (next_def, _) = entry.deferreds.popitem(last=False) + + # we need to run the next thing in the sentinel context. + with PreserveLoggingContext(): + next_def.callback(None) + elif entry.count == 0: + # We were the last thing for this key: remove it from the + # map. + del self.key_to_defer[key] + class ReadWriteLock: """An async read write lock. From 744cbbe97179df4d2b8f2f2c317bde81672fa903 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 18:46:08 +0100 Subject: [PATCH 12/17] Convert `Linearizer` to use an async context manager Eliminates an unlikely footgun where code that doesn't immediately use the context manager could forget to release the lock. Signed-off-by: Sean Quah --- synapse/util/async_helpers.py | 18 ++++++------------ tests/util/test_linearizer.py | 4 ++-- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index ece702ccf6d6..c3042ee55e03 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -18,19 +18,17 @@ import inspect import itertools import logging -from contextlib import asynccontextmanager, contextmanager +from contextlib import asynccontextmanager from typing import ( Any, AsyncIterator, Awaitable, Callable, Collection, - ContextManager, Dict, Generic, Hashable, Iterable, - Iterator, List, Optional, Set, @@ -343,7 +341,7 @@ class Linearizer: Example: - with await limiter.queue("test_key"): + async with limiter.queue("test_key"): # do some work. """ @@ -384,14 +382,10 @@ def is_queued(self, key: Hashable) -> bool: # non-empty. return bool(entry.deferreds) - async def queue(self, key: Hashable) -> ContextManager[None]: - entry = await self._acquire_lock(key) - - # now that we have the lock, we need to return a context manager which will - # release the lock. - - @contextmanager - def _ctx_manager() -> Iterator[None]: + def queue(self, key: Hashable) -> AsyncContextManager[None]: + @asynccontextmanager + async def _ctx_manager() -> AsyncIterator[None]: + entry = await self._acquire_lock(key) try: yield finally: diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index b6bc8a39e0f0..06f3468b2db8 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -50,7 +50,7 @@ def _start_task( unblock_d: "Deferred[None]" = Deferred() async def task() -> None: - with await linearizer.queue(key): + async with linearizer.queue(key): acquired_d.callback(None) await unblock_d @@ -129,7 +129,7 @@ def test_lots_of_queued_things(self) -> None: async def func(i: int, wait_for: Optional["Deferred[None]"] = None) -> None: with LoggingContext("func(%s)" % i) as lc: - with (await linearizer.queue(key)): + async with linearizer.queue(key): self.assertEqual(current_context(), lc) if wait_for: await make_deferred_yieldable(wait_for) From 3fcd05bc5c3c5acc96ea1104a7af324a81f962fa Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 18:48:53 +0100 Subject: [PATCH 13/17] Convert `Linearizer` to use an async context manager Signed-off-by: Sean Quah --- synapse/federation/federation_server.py | 10 +++++----- synapse/handlers/appservice.py | 6 ++---- synapse/handlers/device.py | 2 +- synapse/handlers/e2e_keys.py | 4 ++-- synapse/handlers/e2e_room_keys.py | 14 +++++++------- synapse/handlers/federation.py | 2 +- synapse/handlers/federation_event.py | 2 +- synapse/handlers/message.py | 2 +- synapse/handlers/presence.py | 4 ++-- synapse/handlers/read_marker.py | 2 +- synapse/handlers/room.py | 2 +- synapse/handlers/room_member.py | 4 ++-- synapse/handlers/sso.py | 2 +- synapse/push/bulk_push_rule_evaluator.py | 2 +- synapse/replication/tcp/client.py | 2 +- synapse/rest/media/v1/media_repository.py | 6 +++--- synapse/state/__init__.py | 2 +- synapse/storage/databases/main/roommember.py | 2 +- 18 files changed, 34 insertions(+), 36 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index c7400c737bd7..69d833585f2a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -188,7 +188,7 @@ async def _handle_old_staged_events(self) -> None: async def on_backfill_request( self, origin: str, room_id: str, versions: List[str], limit: int ) -> Tuple[int, Dict[str, Any]]: - with (await self._server_linearizer.queue((origin, room_id))): + async with self._server_linearizer.queue((origin, room_id)): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) @@ -218,7 +218,7 @@ async def on_timestamp_to_event_request( Tuple indicating the response status code and dictionary response body including `event_id`. """ - with (await self._server_linearizer.queue((origin, room_id))): + async with self._server_linearizer.queue((origin, room_id)): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) @@ -529,7 +529,7 @@ async def on_room_state_request( # in the cache so we could return it without waiting for the linearizer # - but that's non-trivial to get right, and anyway somewhat defeats # the point of the linearizer. - with (await self._server_linearizer.queue((origin, room_id))): + async with self._server_linearizer.queue((origin, room_id)): resp: JsonDict = dict( await self._state_resp_cache.wrap( (room_id, event_id), @@ -883,7 +883,7 @@ async def _on_send_membership_event( async def on_event_auth( self, origin: str, room_id: str, event_id: str ) -> Tuple[int, Dict[str, Any]]: - with (await self._server_linearizer.queue((origin, room_id))): + async with self._server_linearizer.queue((origin, room_id)): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) @@ -945,7 +945,7 @@ async def on_get_missing_events( latest_events: List[str], limit: int, ) -> Dict[str, list]: - with (await self._server_linearizer.queue((origin, room_id))): + async with self._server_linearizer.queue((origin, room_id)): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 316c4b677ce1..1b5784050621 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -330,10 +330,8 @@ async def _notify_interested_services_ephemeral( continue # Since we read/update the stream position for this AS/stream - with ( - await self._ephemeral_events_linearizer.queue( - (service.id, stream_key) - ) + async with self._ephemeral_events_linearizer.queue( + (service.id, stream_key) ): if stream_key == "receipt_key": events = await self._handle_receipts(service, new_token) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index d5ccaa0c37cc..64e9f5a6dff4 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -725,7 +725,7 @@ async def incoming_device_list_update( async def _handle_device_updates(self, user_id: str) -> None: "Actually handle pending updates." - with (await self._remote_edu_linearizer.queue(user_id)): + async with self._remote_edu_linearizer.queue(user_id): pending_updates = self._pending_updates.pop(user_id, []) if not pending_updates: # This can happen since we batch updates diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index d96456cd406c..d6714228ef41 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -118,7 +118,7 @@ async def query_devices( from_device_id: the device making the query. This is used to limit the number of in-flight queries at a time. """ - with await self._query_devices_linearizer.queue((from_user_id, from_device_id)): + async with self._query_devices_linearizer.queue((from_user_id, from_device_id)): device_keys_query: Dict[str, Iterable[str]] = query_body.get( "device_keys", {} ) @@ -1386,7 +1386,7 @@ async def _handle_signing_key_updates(self, user_id: str) -> None: device_handler = self.e2e_keys_handler.device_handler device_list_updater = device_handler.device_list_updater - with (await self._remote_edu_linearizer.queue(user_id)): + async with self._remote_edu_linearizer.queue(user_id): pending_updates = self._pending_updates.pop(user_id, []) if not pending_updates: # This can happen since we batch updates diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 52e44a2d426f..446f509bdc3b 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -83,7 +83,7 @@ async def get_room_keys( # we deliberately take the lock to get keys so that changing the version # works atomically - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): # make sure the backup version exists try: await self.store.get_e2e_room_keys_version_info(user_id, version) @@ -126,7 +126,7 @@ async def delete_room_keys( """ # lock for consistency with uploading - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): # make sure the backup version exists try: version_info = await self.store.get_e2e_room_keys_version_info( @@ -187,7 +187,7 @@ async def upload_room_keys( # TODO: Validate the JSON to make sure it has the right keys. # XXX: perhaps we should use a finer grained lock here? - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): # Check that the version we're trying to upload is the current version try: @@ -332,7 +332,7 @@ async def create_version(self, user_id: str, version_info: JsonDict) -> str: # TODO: Validate the JSON to make sure it has the right keys. # lock everyone out until we've switched version - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): new_version = await self.store.create_e2e_room_keys_version( user_id, version_info ) @@ -359,7 +359,7 @@ async def get_version_info( } """ - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): try: res = await self.store.get_e2e_room_keys_version_info(user_id, version) except StoreError as e: @@ -383,7 +383,7 @@ async def delete_version(self, user_id: str, version: Optional[str] = None) -> N NotFoundError: if this backup version doesn't exist """ - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): try: await self.store.delete_e2e_room_keys_version(user_id, version) except StoreError as e: @@ -413,7 +413,7 @@ async def update_version( raise SynapseError( 400, "Version in body does not match", Codes.INVALID_PARAM ) - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): try: old_info = await self.store.get_e2e_room_keys_version_info( user_id, version diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 350ec9c03af1..78d149905f52 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -151,7 +151,7 @@ async def maybe_backfill( return. This is used as part of the heuristic to decide if we should back paginate. """ - with (await self._room_backfill.queue(room_id)): + async with self._room_backfill.queue(room_id): return await self._maybe_backfill_inner(room_id, current_depth, limit) async def _maybe_backfill_inner( diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 567afc910f27..468cb8e96452 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -224,7 +224,7 @@ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None: len(missing_prevs), shortstr(missing_prevs), ) - with (await self._room_pdu_linearizer.queue(pdu.room_id)): + async with self._room_pdu_linearizer.queue(pdu.room_id): logger.info( "Acquired room lock to fetch %d missing prev_events", len(missing_prevs), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 766f597a55bd..7db6905c6165 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -851,7 +851,7 @@ async def create_and_send_nonmember_event( # a situation where event persistence can't keep up, causing # extremities to pile up, which in turn leads to state resolution # taking longer. - with (await self.limiter.queue(event_dict["room_id"])): + async with self.limiter.queue(event_dict["room_id"]): if txn_id and requester.access_token_id: existing_event_id = await self.store.get_event_id_from_transaction_id( event_dict["room_id"], diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index dace31d87e17..209a4b0e521f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1030,7 +1030,7 @@ async def update_external_syncs_row( is_syncing: Whether or not the user is now syncing sync_time_msec: Time in ms when the user was last syncing """ - with (await self.external_sync_linearizer.queue(process_id)): + async with self.external_sync_linearizer.queue(process_id): prev_state = await self.current_state_for_user(user_id) process_presence = self.external_process_to_current_syncs.setdefault( @@ -1071,7 +1071,7 @@ async def update_external_syncs_clear(self, process_id: str) -> None: Used when the process has stopped/disappeared. """ - with (await self.external_sync_linearizer.queue(process_id)): + async with self.external_sync_linearizer.queue(process_id): process_presence = self.external_process_to_current_syncs.pop( process_id, set() ) diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index bad1acc6344d..05122fd5a6b4 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -40,7 +40,7 @@ async def received_client_read_marker( the read marker has changed. """ - with await self.read_marker_linearizer.queue((room_id, user_id)): + async with self.read_marker_linearizer.queue((room_id, user_id)): existing_read_marker = await self.store.get_account_data_for_room_and_type( user_id, room_id, "m.fully_read" ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 092e185c9950..ad9cc23c1ed5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -881,7 +881,7 @@ async def create_room( # # we also don't need to check the requester's shadow-ban here, as we # have already done so above (and potentially emptied invite_list). - with (await self.room_member_handler.member_linearizer.queue((room_id,))): + async with self.room_member_handler.member_linearizer.queue((room_id,)): content = {} is_direct = config.get("is_direct", None) if is_direct: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 0785e3111417..802e57c4d0cc 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -515,8 +515,8 @@ async def update_membership( # We first linearise by the application service (to try to limit concurrent joins # by application services), and then by room ID. - with (await self.member_as_limiter.queue(as_id)): - with (await self.member_linearizer.queue(key)): + async with self.member_as_limiter.queue(as_id): + async with self.member_linearizer.queue(key): result = await self.update_membership_locked( requester, target, diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index 4f02a060d953..e4fe94e557ad 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -430,7 +430,7 @@ async def complete_sso_login_request( # grab a lock while we try to find a mapping for this user. This seems... # optimistic, especially for implementations that end up redirecting to # interstitial pages. - with await self._mapping_lock.queue(auth_provider_id): + async with self._mapping_lock.queue(auth_provider_id): # first of all, check if we already have a mapping for this user user_id = await self.get_sso_user_by_remote_user_id( auth_provider_id, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index a402a3e40374..b07cf2eee705 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -397,7 +397,7 @@ async def get_rules( self.room_push_rule_cache_metrics.inc_hits() return self.data.rules_by_user - with (await self.linearizer.queue(self.room_id)): + async with self.linearizer.queue(self.room_id): if state_group and self.data.state_group == state_group: logger.debug("Using cached rules for %r", self.room_id) self.room_push_rule_cache_metrics.inc_hits() diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index deeaaec4e66c..122892c7bca2 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -451,7 +451,7 @@ async def _save_and_send_ack(self) -> None: # service for robustness? Or could we replace it with an assertion that # we're not being re-entered? - with (await self._fed_position_linearizer.queue(None)): + async with self._fed_position_linearizer.queue(None): # We persist and ack the same position, so we take a copy of it # here as otherwise it can get modified from underneath us. current_position = self.federation_position diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 6c414402bd46..3e5d6c629418 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -258,7 +258,7 @@ async def get_remote_media( # We linearize here to ensure that we don't try and download remote # media multiple times concurrently key = (server_name, media_id) - with (await self.remote_media_linearizer.queue(key)): + async with self.remote_media_linearizer.queue(key): responder, media_info = await self._get_remote_media_impl( server_name, media_id ) @@ -294,7 +294,7 @@ async def get_remote_media_info(self, server_name: str, media_id: str) -> dict: # We linearize here to ensure that we don't try and download remote # media multiple times concurrently key = (server_name, media_id) - with (await self.remote_media_linearizer.queue(key)): + async with self.remote_media_linearizer.queue(key): responder, media_info = await self._get_remote_media_impl( server_name, media_id ) @@ -850,7 +850,7 @@ async def delete_old_remote_media(self, before_ts: int) -> Dict[str, int]: # TODO: Should we delete from the backup store - with (await self.remote_media_linearizer.queue(key)): + async with self.remote_media_linearizer.queue(key): full_path = self.filepaths.remote_media_filepath(origin, file_id) try: os.remove(full_path) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 21888cc8c561..fbf7ba4600f8 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -573,7 +573,7 @@ async def resolve_state_groups( """ group_names = frozenset(state_groups_ids.keys()) - with (await self.resolve_linearizer.queue(group_names)): + async with self.resolve_linearizer.queue(group_names): cache = self._state_cache.get(group_names, None) if cache: return cache diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 98d09b37366b..48e83592e728 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -888,7 +888,7 @@ async def _get_joined_hosts( return frozenset(cache.hosts_to_joined_users) # Since we'll mutate the cache we need to lock. - with (await self._joined_host_linearizer.queue(room_id)): + async with self._joined_host_linearizer.queue(room_id): if state_entry.state_group == cache.state_group: # Same state group, so nothing to do. We've already checked for # this above, but the cache may have changed while waiting on From 31fda92a5db7ec4205b1f40210d203f771dd938e Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 1 Apr 2022 19:05:25 +0100 Subject: [PATCH 14/17] Add newsfile --- changelog.d/12357.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/12357.misc diff --git a/changelog.d/12357.misc b/changelog.d/12357.misc new file mode 100644 index 000000000000..d571ae034ca0 --- /dev/null +++ b/changelog.d/12357.misc @@ -0,0 +1 @@ +Refactor `Linearizer`, convert methods to async and use an async context manager. From 9de8d18d5c2158759b75d018521a08baef2487b2 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 4 Apr 2022 17:57:39 +0100 Subject: [PATCH 15/17] Early return and outdent the slow path --- synapse/util/async_helpers.py | 97 +++++++++++++++++------------------ 1 file changed, 47 insertions(+), 50 deletions(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index c3042ee55e03..db82c8d8cc7d 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -402,61 +402,58 @@ async def _acquire_lock(self, key: Hashable) -> _LinearizerEntry: key, _LinearizerEntry(0, collections.OrderedDict()) ) - # If the number of things executing is greater than the maximum - # then add a deferred to the list of blocked items - # When one of the things currently executing finishes it will callback - # this item so that it can continue executing. - if entry.count >= self.max_count: + if entry.count < self.max_count: + # The number of things executing is less than the maximum. logger.debug( - "Waiting to acquire linearizer lock %r for key %r", self.name, key - ) - - new_defer: "defer.Deferred[None]" = make_deferred_yieldable( - defer.Deferred() + "Acquired uncontended linearizer lock %r for key %r", self.name, key ) - entry.deferreds[new_defer] = 1 - - try: - await new_defer - except Exception as e: - logger.info("defer %r got err %r", new_defer, e) - if isinstance(e, CancelledError): - logger.debug( - "Cancelling wait for linearizer lock %r for key %r", - self.name, - key, - ) - else: - logger.warning( - "Unexpected exception waiting for linearizer lock %r for key " - "%r", - self.name, - key, - ) + entry.count += 1 + return entry - # we just have to take ourselves back out of the queue. - del entry.deferreds[new_defer] - raise + # Otherwise, the number of things executing is at the maximum and we have to + # add a deferred to the list of blocked items. + # When one of the things currently executing finishes it will callback + # this item so that it can continue executing. + logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key) - logger.debug("Acquired linearizer lock %r for key %r", self.name, key) - entry.count += 1 + new_defer: "defer.Deferred[None]" = make_deferred_yieldable(defer.Deferred()) + entry.deferreds[new_defer] = 1 - # if the code holding the lock completes synchronously, then it - # will recursively run the next claimant on the list. That can - # relatively rapidly lead to stack exhaustion. This is essentially - # the same problem as http://twistedmatrix.com/trac/ticket/9304. - # - # In order to break the cycle, we add a cheeky sleep(0) here to - # ensure that we fall back to the reactor between each iteration. - # - # (This needs to happen while we hold the lock, and the context manager's - # exit code must be synchronous, so this is the only sensible place.) - await self._clock.sleep(0) - else: - logger.debug( - "Acquired uncontended linearizer lock %r for key %r", self.name, key - ) - entry.count += 1 + try: + await new_defer + except Exception as e: + logger.info("defer %r got err %r", new_defer, e) + if isinstance(e, CancelledError): + logger.debug( + "Cancelling wait for linearizer lock %r for key %r", + self.name, + key, + ) + else: + logger.warning( + "Unexpected exception waiting for linearizer lock %r for key " "%r", + self.name, + key, + ) + + # we just have to take ourselves back out of the queue. + del entry.deferreds[new_defer] + raise + + logger.debug("Acquired linearizer lock %r for key %r", self.name, key) + entry.count += 1 + + # if the code holding the lock completes synchronously, then it + # will recursively run the next claimant on the list. That can + # relatively rapidly lead to stack exhaustion. This is essentially + # the same problem as http://twistedmatrix.com/trac/ticket/9304. + # + # In order to break the cycle, we add a cheeky sleep(0) here to + # ensure that we fall back to the reactor between each iteration. + # + # (This needs to happen while we hold the lock, and the context manager's + # exit code must be synchronous, so this is the only sensible place.) + await self._clock.sleep(0) return entry From 039872e7bfbd274da6021cd6fb2fc04c42546cf5 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 4 Apr 2022 17:58:00 +0100 Subject: [PATCH 16/17] Update out-of-date comment --- synapse/util/async_helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index db82c8d8cc7d..79e43159ce46 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -451,8 +451,8 @@ async def _acquire_lock(self, key: Hashable) -> _LinearizerEntry: # In order to break the cycle, we add a cheeky sleep(0) here to # ensure that we fall back to the reactor between each iteration. # - # (This needs to happen while we hold the lock, and the context manager's - # exit code must be synchronous, so this is the only sensible place.) + # This needs to happen while we hold the lock. We could put it on the + # exit path, but that would slow down the uncontended case. await self._clock.sleep(0) return entry From bb7a8e3bd62281eab79d5e73318c846517d89654 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Tue, 5 Apr 2022 14:58:07 +0100 Subject: [PATCH 17/17] Update synapse/util/async_helpers.py Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/util/async_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 79e43159ce46..4b2a16a6a902 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -431,7 +431,7 @@ async def _acquire_lock(self, key: Hashable) -> _LinearizerEntry: ) else: logger.warning( - "Unexpected exception waiting for linearizer lock %r for key " "%r", + "Unexpected exception waiting for linearizer lock %r for key %r", self.name, key, )