Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Always close _all_ ijson coroutines, even if doing so raises Except…
Browse files Browse the repository at this point in the history
…ions (#14065)
  • Loading branch information
David Robertson authored Oct 6, 2022
1 parent 44741aa commit cb20b88
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 5 deletions.
1 change: 1 addition & 0 deletions changelog.d/14065.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.35.0 where errors parsing a `/send_join` or `/state` response would produce excessive, low-quality Sentry events.
29 changes: 25 additions & 4 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from synapse.http.matrixfederationclient import ByteParser
from synapse.http.types import QueryParams
from synapse.types import JsonDict
from synapse.util import ExceptionBundle

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -926,8 +927,7 @@ def write(self, data: bytes) -> int:
return len(data)

def finish(self) -> SendJoinResponse:
for c in self._coros:
c.close()
_close_coros(self._coros)

if self._response.event_dict:
self._response.event = make_event_from_dict(
Expand Down Expand Up @@ -970,6 +970,27 @@ def write(self, data: bytes) -> int:
return len(data)

def finish(self) -> StateRequestResponse:
for c in self._coros:
c.close()
_close_coros(self._coros)
return self._response


def _close_coros(coros: Iterable[Generator[None, bytes, None]]) -> None:
"""Close each of the given coroutines.
Always calls .close() on each coroutine, even if doing so raises an exception.
Any exceptions raised are aggregated into an ExceptionBundle.
:raises ExceptionBundle: if at least one coroutine fails to close.
"""
exceptions = []
for c in coros:
try:
c.close()
except Exception as e:
exceptions.append(e)

if exceptions:
# raise from the first exception so that the traceback has slightly more context
raise ExceptionBundle(
f"There were {len(exceptions)} errors closing coroutines", exceptions
) from exceptions[0]
14 changes: 13 additions & 1 deletion synapse/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import json
import logging
import typing
from typing import Any, Callable, Dict, Generator, Optional
from typing import Any, Callable, Dict, Generator, Optional, Sequence

import attr
from frozendict import frozendict
Expand Down Expand Up @@ -193,3 +193,15 @@ def log_failure(
# Version string with git info. Computed here once so that we don't invoke git multiple
# times.
SYNAPSE_VERSION = get_distribution_version_string("matrix-synapse", __file__)


class ExceptionBundle(Exception):
# A poor stand-in for something like Python 3.11's ExceptionGroup.
# (A backport called `exceptiongroup` exists but seems overkill: we just want a
# container type here.)
def __init__(self, message: str, exceptions: Sequence[Exception]):
parts = [message]
for e in exceptions:
parts.append(str(e))
super().__init__("\n - ".join(parts))
self.exceptions = exceptions
37 changes: 37 additions & 0 deletions tests/federation/transport/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import json
from unittest.mock import Mock

from synapse.api.room_versions import RoomVersions
from synapse.federation.transport.client import SendJoinParser
Expand Down Expand Up @@ -94,3 +95,39 @@ def test_servers_in_room(self) -> None:
# Retrieve and check the parsed SendJoinResponse
parsed_response = parser.finish()
self.assertEqual(parsed_response.servers_in_room, ["hs1", "hs2"])

def test_errors_closing_coroutines(self) -> None:
"""Check we close all coroutines, even if closing the first raises an Exception.
We also check that an Exception of some kind is raised, but we don't make any
assertions about its attributes or type.
"""
parser = SendJoinParser(RoomVersions.V1, False)
response = {"org.matrix.msc3706.servers_in_room": ["hs1", "hs2"]}
serialisation = json.dumps(response).encode()

# Mock the coroutines managed by this parser.
# The first one will error when we try to close it.
coro_1 = Mock()
coro_1.close = Mock(side_effect=RuntimeError("Couldn't close coro 1"))

coro_2 = Mock()

coro_3 = Mock()
coro_3.close = Mock(side_effect=RuntimeError("Couldn't close coro 3"))

parser._coros = [coro_1, coro_2, coro_3]

# Send half of the data to the parser
parser.write(serialisation[: len(serialisation) // 2])

# Close the parser. There should be _some_ kind of exception, but it need not
# be that RuntimeError directly. E.g. we might want to raise a wrapper
# encompassing multiple errors from multiple coroutines.
with self.assertRaises(Exception):
parser.finish()

# In any case, we should have tried to close both coros.
coro_1.close.assert_called()
coro_2.close.assert_called()
coro_3.close.assert_called()

0 comments on commit cb20b88

Please sign in to comment.