From eb6e1321dd1e93fab896eb446b8b5ae2a5007d49 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 31 Mar 2023 11:01:54 +0100 Subject: [PATCH 01/40] Proxy federation requests --- synapse/http/matrixfederationclient.py | 9 +++------ synapse/http/proxyagent.py | 5 +++++ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 634882487c06..e4e6c479e2cf 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -72,6 +72,7 @@ read_body_with_max_size, ) from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent +from synapse.http.proxyagent import ProxyAgent from synapse.http.types import QueryParams from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -388,12 +389,8 @@ def __init__( if hs.config.server.user_agent_suffix: user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix) - federation_agent = MatrixFederationAgent( - self.reactor, - tls_client_options_factory, - user_agent.encode("ascii"), - hs.config.server.federation_ip_range_whitelist, - hs.config.server.federation_ip_range_blacklist, + federation_agent = ProxyAgent( + self.reactor, self.reactor, tls_client_options_factory ) # Use a BlacklistingAgentWrapper to prevent circumventing the IP diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 94ef737b9ee0..c9f16b25c0f1 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -214,6 +214,9 @@ def request( parsed_uri.port, self.https_proxy_creds, ) + elif parsed_uri.scheme == b"matrix": + endpoint = HostnameEndpoint(self.proxy_reactor, "127.0.0.1", 3000) + request_path = uri else: # not using a proxy endpoint = HostnameEndpoint( @@ -233,6 +236,8 @@ def request( endpoint = wrapClientTLS(tls_connection_creator, endpoint) elif parsed_uri.scheme == b"http": pass + elif parsed_uri.scheme == b"matrix": + pass else: return defer.fail( Failure( From 6a95e7ad90602969935bac2e044448cf14f203be Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 28 Apr 2023 16:28:22 +0100 Subject: [PATCH 02/40] Make configurable --- synapse/config/federation.py | 19 ++++++++++++ .../federation/matrix_federation_agent.py | 4 +-- synapse/http/matrixfederationclient.py | 29 +++++++++++++++---- synapse/http/proxyagent.py | 13 +++++++-- 4 files changed, 55 insertions(+), 10 deletions(-) diff --git a/synapse/config/federation.py b/synapse/config/federation.py index 336fca578aa1..4c261e2aecc5 100644 --- a/synapse/config/federation.py +++ b/synapse/config/federation.py @@ -13,11 +13,23 @@ # limitations under the License. from typing import Any, Optional +import attr + from synapse.config._base import Config from synapse.config._util import validate_config from synapse.types import JsonDict +@attr.s(frozen=True, auto_attribs=True) +class FederationProxy: + """A proxy server for outbound federation traffic, for URIs with a + `matrix-federation://` scheme. + """ + + host: str + port: int + + class FederationConfig(Config): section = "federation" @@ -49,5 +61,12 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: "allow_device_name_lookup_over_federation", False ) + self.federation_proxy = None + federation_proxy = config.get("federation_proxy") + if federation_proxy: + host = federation_proxy["host"] + port = int(federation_proxy["port"]) + self.federation_proxy = FederationProxy(host, port) + _METRICS_FOR_DOMAINS_SCHEMA = {"type": "array", "items": {"type": "string"}} diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 8d7d0a387529..7692f9fb6432 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -174,7 +174,7 @@ def request( # the host header with the delegated server name. delegated_server = None if ( - parsed_uri.scheme == b"matrix" + parsed_uri.scheme == b"matrix-federation" and not _is_ip_literal(parsed_uri.hostname) and not parsed_uri.port ): @@ -379,7 +379,7 @@ async def _resolve_server(self) -> List[Server]: connect to. """ - if self._parsed_uri.scheme != b"matrix": + if self._parsed_uri.scheme != b"matrix-federation": return [Server(host=self._parsed_uri.host, port=self._parsed_uri.port)] # Note: We don't do well-known lookup as that needs to have happened diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index e4e6c479e2cf..9bf3c2b915a6 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -50,7 +50,7 @@ from twisted.internet.task import Cooperator from twisted.web.client import ResponseFailed from twisted.web.http_headers import Headers -from twisted.web.iweb import IBodyProducer, IResponse +from twisted.web.iweb import IAgent, IBodyProducer, IResponse import synapse.metrics import synapse.util.retryutils @@ -175,7 +175,14 @@ def __attrs_post_init__(self) -> None: # The object is frozen so we can pre-compute this. uri = urllib.parse.urlunparse( - (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") + ( + b"matrix-federation", + destination_bytes, + path_bytes, + None, + query_bytes, + b"", + ) ) object.__setattr__(self, "uri", uri) @@ -389,9 +396,21 @@ def __init__( if hs.config.server.user_agent_suffix: user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix) - federation_agent = ProxyAgent( - self.reactor, self.reactor, tls_client_options_factory - ) + if hs.config.federation.federation_proxy: + federation_agent: IAgent = ProxyAgent( + self.reactor, + self.reactor, + tls_client_options_factory, + federation_proxy=hs.config.federation.federation_proxy, + ) + else: + federation_agent = MatrixFederationAgent( + self.reactor, + tls_client_options_factory, + user_agent.encode("ascii"), + hs.config.server.federation_ip_range_whitelist, + hs.config.server.federation_ip_range_blacklist, + ) # Use a BlacklistingAgentWrapper to prevent circumventing the IP # blacklist via IP literals in server names diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index c9f16b25c0f1..0ce90fb7c6bd 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -36,6 +36,7 @@ from twisted.web.http_headers import Headers from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse +from synapse.config.federation import FederationProxy from synapse.http import redact_uri from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials @@ -89,6 +90,7 @@ def __init__( bindAddress: Optional[bytes] = None, pool: Optional[HTTPConnectionPool] = None, use_proxy: bool = False, + federation_proxy: Optional[FederationProxy] = None, ): contextFactory = contextFactory or BrowserLikePolicyForHTTPS() @@ -126,6 +128,7 @@ def __init__( self._policy_for_https = contextFactory self._reactor = reactor + self._federation_proxy = federation_proxy def request( self, @@ -214,8 +217,12 @@ def request( parsed_uri.port, self.https_proxy_creds, ) - elif parsed_uri.scheme == b"matrix": - endpoint = HostnameEndpoint(self.proxy_reactor, "127.0.0.1", 3000) + elif parsed_uri.scheme == b"matrix-federation" and self._federation_proxy: + endpoint = HostnameEndpoint( + self.proxy_reactor, + self._federation_proxy.host, + self._federation_proxy.port, + ) request_path = uri else: # not using a proxy @@ -236,7 +243,7 @@ def request( endpoint = wrapClientTLS(tls_connection_creator, endpoint) elif parsed_uri.scheme == b"http": pass - elif parsed_uri.scheme == b"matrix": + elif parsed_uri.scheme == b"matrix-federation" and self._federation_proxy: pass else: return defer.fail( From f0270aad616a04301c2c6f6b7b2ea753f0d44bba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 May 2023 14:37:07 +0100 Subject: [PATCH 03/40] Cache the fed proxy --- synapse/http/proxyagent.py | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 0ce90fb7c6bd..9431aaa872ac 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -126,9 +126,16 @@ def __init__( self.no_proxy = no_proxy + self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None + if federation_proxy: + self._federation_proxy_endpoint = HostnameEndpoint( + self.proxy_reactor, + federation_proxy.host, + federation_proxy.port, + ) + self._policy_for_https = contextFactory self._reactor = reactor - self._federation_proxy = federation_proxy def request( self, @@ -217,12 +224,13 @@ def request( parsed_uri.port, self.https_proxy_creds, ) - elif parsed_uri.scheme == b"matrix-federation" and self._federation_proxy: - endpoint = HostnameEndpoint( - self.proxy_reactor, - self._federation_proxy.host, - self._federation_proxy.port, - ) + elif ( + parsed_uri.scheme == b"matrix-federation" + and self._federation_proxy_endpoint + ): + # Cache *all* connections under the same key, since we are only + # connecting to a single destination, the proxy: + endpoint = self._federation_proxy_endpoint request_path = uri else: # not using a proxy @@ -243,7 +251,10 @@ def request( endpoint = wrapClientTLS(tls_connection_creator, endpoint) elif parsed_uri.scheme == b"http": pass - elif parsed_uri.scheme == b"matrix-federation" and self._federation_proxy: + elif ( + parsed_uri.scheme == b"matrix-federation" + and self._federation_proxy_endpoint + ): pass else: return defer.fail( From 6d98582ed0e11e1efc4cba6ca2b6e824fbb090ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 May 2023 15:15:36 +0100 Subject: [PATCH 04/40] Accept a list of federation proxies --- synapse/http/proxyagent.py | 52 ++++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 9431aaa872ac..5e5671b18272 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import random import re -from typing import Any, Dict, Optional, Tuple +from typing import Any, Collection, Dict, List, Optional, Sequence, Tuple from urllib.parse import urlparse from urllib.request import ( # type: ignore[attr-defined] getproxies_environment, @@ -24,7 +25,12 @@ from twisted.internet import defer from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS -from twisted.internet.interfaces import IReactorCore, IStreamClientEndpoint +from twisted.internet.interfaces import ( + IProtocol, + IProtocolFactory, + IReactorCore, + IStreamClientEndpoint, +) from twisted.python.failure import Failure from twisted.web.client import ( URI, @@ -39,6 +45,7 @@ from synapse.config.federation import FederationProxy from synapse.http import redact_uri from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials +from synapse.logging.context import run_in_background logger = logging.getLogger(__name__) @@ -90,7 +97,7 @@ def __init__( bindAddress: Optional[bytes] = None, pool: Optional[HTTPConnectionPool] = None, use_proxy: bool = False, - federation_proxy: Optional[FederationProxy] = None, + federation_proxies: Collection[FederationProxy] = None, ): contextFactory = contextFactory or BrowserLikePolicyForHTTPS() @@ -127,11 +134,16 @@ def __init__( self.no_proxy = no_proxy self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None - if federation_proxy: - self._federation_proxy_endpoint = HostnameEndpoint( - self.proxy_reactor, - federation_proxy.host, - federation_proxy.port, + if federation_proxies: + self._federation_proxy_endpoint = _ProxyEndpoints( + [ + HostnameEndpoint( + self.proxy_reactor, + federation_proxy.host, + federation_proxy.port, + ) + for federation_proxy in federation_proxies + ] ) self._policy_for_https = contextFactory @@ -360,3 +372,27 @@ def parse_proxy( credentials = ProxyCredentials(b"".join([url.username, b":", url.password])) return url.scheme, url.hostname, url.port or default_port, credentials + + +@implementer(IStreamClientEndpoint) +class _ProxyEndpoints: + def __init__(self, endpoints: Sequence[IStreamClientEndpoint]) -> None: + assert endpoints + self._endpoints = endpoints + + def connect( + self, protocol_factory: IProtocolFactory + ) -> "defer.Deferred[IProtocol]": + """Implements IStreamClientEndpoint interface""" + + return run_in_background(self._do_connect, protocol_factory) + + async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol: + failures: List[Failure] = [] + for endpoint in random.sample(self._endpoints, k=len(self._endpoints)): + try: + return await endpoint.connect(protocol_factory) + except Exception: + failures.append(Failure()) + + failures.pop().raiseException() From 58893960ba993a5bd3b95babadf77f25811cd15c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 May 2023 15:46:41 +0100 Subject: [PATCH 05/40] Make configurable --- synapse/config/federation.py | 7 ----- synapse/config/workers.py | 36 +++++++++++++++++++++++++- synapse/http/matrixfederationclient.py | 10 +++++-- synapse/http/proxyagent.py | 30 +++++++++++++-------- 4 files changed, 62 insertions(+), 21 deletions(-) diff --git a/synapse/config/federation.py b/synapse/config/federation.py index 4c261e2aecc5..cae28d9d0330 100644 --- a/synapse/config/federation.py +++ b/synapse/config/federation.py @@ -61,12 +61,5 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: "allow_device_name_lookup_over_federation", False ) - self.federation_proxy = None - federation_proxy = config.get("federation_proxy") - if federation_proxy: - host = federation_proxy["host"] - port = int(federation_proxy["port"]) - self.federation_proxy = FederationProxy(host, port) - _METRICS_FOR_DOMAINS_SCHEMA = {"type": "array", "items": {"type": "string"}} diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 95b4047f1d3f..72dbcba16091 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -15,7 +15,7 @@ import argparse import logging -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union import attr from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr @@ -135,6 +135,23 @@ class WriterLocations: ) +@attr.s(auto_attribs=True) +class OutboundFederationRestrictedTo: + """Whether we limit outbound federation to a certain set of instances. + + Attributes: + instances: optional list of instances that can make outbound federation + requests. If None then all instances can make federation requests. + locations: list of instance locations to connect to proxy via. + """ + + instances: Optional[List[str]] + locations: List[InstanceLocationConfig] = attr.Factory(list) + + def __contains__(self, instance: str) -> bool: + return self.instances is None or instance in self.instances + + class WorkerConfig(Config): """The workers are processes run separately to the main synapse process. They have their own pid_file and listener configuration. They use the @@ -313,6 +330,23 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: new_option_name="update_user_directory_from_worker", ) + outbound_federation_restricted_to = config.get( + "outbound_federation_restricted_to", None + ) + self.outbound_federation_restricted_to = OutboundFederationRestrictedTo( + outbound_federation_restricted_to + ) + if outbound_federation_restricted_to: + for instance in outbound_federation_restricted_to: + if instance != "master" and instance not in self.instance_map: + raise ConfigError( + "Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config." + % (instance,) + ) + self.outbound_federation_restricted_to.locations.append( + self.instance_map[instance] + ) + def _should_this_worker_perform_duty( self, config: Dict[str, Any], diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 9bf3c2b915a6..1b5354f835a1 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -396,12 +396,18 @@ def __init__( if hs.config.server.user_agent_suffix: user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix) - if hs.config.federation.federation_proxy: + if ( + hs.get_instance_name() + not in hs.config.worker.outbound_federation_restricted_to + ): + federation_proxies = ( + hs.config.worker.outbound_federation_restricted_to.locations + ) federation_agent: IAgent = ProxyAgent( self.reactor, self.reactor, tls_client_options_factory, - federation_proxy=hs.config.federation.federation_proxy, + federation_proxies=federation_proxies, ) else: federation_agent = MatrixFederationAgent( diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 5e5671b18272..99ab1d15bd38 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -42,7 +42,7 @@ from twisted.web.http_headers import Headers from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse -from synapse.config.federation import FederationProxy +from synapse.config.workers import InstanceLocationConfig from synapse.http import redact_uri from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials from synapse.logging.context import run_in_background @@ -97,7 +97,7 @@ def __init__( bindAddress: Optional[bytes] = None, pool: Optional[HTTPConnectionPool] = None, use_proxy: bool = False, - federation_proxies: Collection[FederationProxy] = None, + federation_proxies: Collection[InstanceLocationConfig] = (), ): contextFactory = contextFactory or BrowserLikePolicyForHTTPS() @@ -133,21 +133,29 @@ def __init__( self.no_proxy = no_proxy + self._policy_for_https = contextFactory + self._reactor = reactor + self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None if federation_proxies: - self._federation_proxy_endpoint = _ProxyEndpoints( - [ - HostnameEndpoint( - self.proxy_reactor, + endpoints = [] + for federation_proxy in federation_proxies: + endpoint = HostnameEndpoint( + self.proxy_reactor, + federation_proxy.host, + federation_proxy.port, + ) + + if federation_proxy.tls: + tls_connection_creator = self._policy_for_https.creatorForNetloc( federation_proxy.host, federation_proxy.port, ) - for federation_proxy in federation_proxies - ] - ) + endpoint = wrapClientTLS(tls_connection_creator, endpoint) - self._policy_for_https = contextFactory - self._reactor = reactor + endpoints.append(endpoint) + + self._federation_proxy_endpoint = _ProxyEndpoints(endpoints) def request( self, From 58fe4daa66b9e72550da29361dadfa1a0a907fef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 May 2023 15:49:14 +0100 Subject: [PATCH 06/40] Comment --- synapse/http/proxyagent.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 99ab1d15bd38..f0f11177740f 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -384,6 +384,10 @@ def parse_proxy( @implementer(IStreamClientEndpoint) class _ProxyEndpoints: + """An endpoint that randomly iterates through a given list of endpoints at + each connection attempt. + """ + def __init__(self, endpoints: Sequence[IStreamClientEndpoint]) -> None: assert endpoints self._endpoints = endpoints From f00fedd7f652853d3c1ab50548f4ddee7c615441 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 May 2023 10:55:31 +0100 Subject: [PATCH 07/40] Remove unused class --- synapse/config/federation.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/synapse/config/federation.py b/synapse/config/federation.py index cae28d9d0330..336fca578aa1 100644 --- a/synapse/config/federation.py +++ b/synapse/config/federation.py @@ -13,23 +13,11 @@ # limitations under the License. from typing import Any, Optional -import attr - from synapse.config._base import Config from synapse.config._util import validate_config from synapse.types import JsonDict -@attr.s(frozen=True, auto_attribs=True) -class FederationProxy: - """A proxy server for outbound federation traffic, for URIs with a - `matrix-federation://` scheme. - """ - - host: str - port: int - - class FederationConfig(Config): section = "federation" From f219f0e822c20e51cae9b843e607c22aca72ea9c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 13 Jun 2023 01:05:37 -0500 Subject: [PATCH 08/40] Add changelog --- changelog.d/15773.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/15773.feature diff --git a/changelog.d/15773.feature b/changelog.d/15773.feature new file mode 100644 index 000000000000..0d77fae2dc68 --- /dev/null +++ b/changelog.d/15773.feature @@ -0,0 +1 @@ +Allow configuring the set of workers to proxy outbound federation traffic through via `outbound_federation_restricted_to`. From c998d280e15a6faa7f3111057bde457dc3989165 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 13 Jun 2023 01:27:34 -0500 Subject: [PATCH 09/40] Avoid negated condition --- synapse/http/matrixfederationclient.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 58f5d7662634..aaf933ef1903 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -394,20 +394,10 @@ def __init__( if hs.config.server.user_agent_suffix: user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix) - if ( - hs.get_instance_name() - not in hs.config.worker.outbound_federation_restricted_to - ): - federation_proxies = ( - hs.config.worker.outbound_federation_restricted_to.locations - ) - federation_agent: IAgent = ProxyAgent( - self.reactor, - self.reactor, - tls_client_options_factory, - federation_proxies=federation_proxies, - ) - else: + outbound_federation_restricted_to = ( + hs.config.worker.outbound_federation_restricted_to + ) + if hs.get_instance_name() in outbound_federation_restricted_to: federation_agent = MatrixFederationAgent( self.reactor, tls_client_options_factory, @@ -415,6 +405,14 @@ def __init__( hs.config.server.federation_ip_range_allowlist, hs.config.server.federation_ip_range_blocklist, ) + else: + federation_proxies = outbound_federation_restricted_to.locations + federation_agent: IAgent = ProxyAgent( + self.reactor, + self.reactor, + tls_client_options_factory, + federation_proxies=federation_proxies, + ) # Use a BlocklistingAgentWrapper to prevent circumventing the IP # blocking via IP literals in server names From cc05c97342de9ef7258c1c551b9f3246db837aae Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 13 Jun 2023 01:32:11 -0500 Subject: [PATCH 10/40] Fix tests and align to new `matrix-federation://` schema --- contrib/lnav/synapse-log-format.json | 2 +- scripts-dev/federation_client.py | 4 +- .../federation/matrix_federation_agent.py | 6 +-- synapse/http/matrixfederationclient.py | 1 + tests/federation/test_federation_client.py | 4 +- .../test_matrix_federation_agent.py | 38 ++++++++++--------- 6 files changed, 30 insertions(+), 25 deletions(-) diff --git a/contrib/lnav/synapse-log-format.json b/contrib/lnav/synapse-log-format.json index ad7017ee5ec2..649cd623e8b4 100644 --- a/contrib/lnav/synapse-log-format.json +++ b/contrib/lnav/synapse-log-format.json @@ -29,7 +29,7 @@ "level": "error" }, { - "line": "my-matrix-server-federation-sender-1 | 2023-01-25 20:56:20,995 - synapse.http.matrixfederationclient - 709 - WARNING - federation_transaction_transmission_loop-3 - {PUT-O-3} [example.com] Request failed: PUT matrix://example.com/_matrix/federation/v1/send/1674680155797: HttpResponseException('403: Forbidden')", + "line": "my-matrix-server-federation-sender-1 | 2023-01-25 20:56:20,995 - synapse.http.matrixfederationclient - 709 - WARNING - federation_transaction_transmission_loop-3 - {PUT-O-3} [example.com] Request failed: PUT matrix-federation://example.com/_matrix/federation/v1/send/1674680155797: HttpResponseException('403: Forbidden')", "level": "warning" }, { diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py index b1d5e2e61667..63f0b25ddd7f 100755 --- a/scripts-dev/federation_client.py +++ b/scripts-dev/federation_client.py @@ -136,11 +136,11 @@ def request( authorization_headers.append(header) print("Authorization: %s" % header, file=sys.stderr) - dest = "matrix://%s%s" % (destination, path) + dest = "matrix-federation://%s%s" % (destination, path) print("Requesting %s" % dest, file=sys.stderr) s = requests.Session() - s.mount("matrix://", MatrixConnectionAdapter()) + s.mount("matrix-federation://", MatrixConnectionAdapter()) headers: Dict[str, str] = { "Authorization": authorization_headers[0], diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 240044e756bd..50a70457d646 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -51,7 +51,7 @@ @implementer(IAgent) class MatrixFederationAgent: """An Agent-like thing which provides a `request` method which correctly - handles resolving matrix server names when using matrix://. Handles standard + handles resolving matrix server names when using matrix-federation://. Handles standard https URIs as normal. Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.) @@ -167,7 +167,7 @@ def request( # There must be a valid hostname. assert parsed_uri.hostname - # If this is a matrix:// URI check if the server has delegated matrix + # If this is a matrix-federation:// URI check if the server has delegated matrix # traffic using well-known delegation. # # We have to do this here and not in the endpoint as we need to rewrite @@ -250,7 +250,7 @@ def endpointForURI(self, parsed_uri: URI) -> "MatrixHostnameEndpoint": @implementer(IStreamClientEndpoint) class MatrixHostnameEndpoint: - """An endpoint that resolves matrix:// URLs using Matrix server name + """An endpoint that resolves matrix-federation:// URLs using Matrix server name resolution (i.e. via SRV). Does not check for well-known delegation. Args: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index aaf933ef1903..335c8958c217 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -398,6 +398,7 @@ def __init__( hs.config.worker.outbound_federation_restricted_to ) if hs.get_instance_name() in outbound_federation_restricted_to: + logger.info("asdf") federation_agent = MatrixFederationAgent( self.reactor, tls_client_options_factory, diff --git a/tests/federation/test_federation_client.py b/tests/federation/test_federation_client.py index 91694e4fcada..a45ab8368347 100644 --- a/tests/federation/test_federation_client.py +++ b/tests/federation/test_federation_client.py @@ -124,7 +124,7 @@ def test_get_room_state(self) -> None: # check the right call got made to the agent self._mock_agent.request.assert_called_once_with( b"GET", - b"matrix://yet.another.server/_matrix/federation/v1/state/%21room_id?event_id=event_id", + b"matrix-federation://yet.another.server/_matrix/federation/v1/state/%21room_id?event_id=event_id", headers=mock.ANY, bodyProducer=None, ) @@ -232,7 +232,7 @@ def _get_pdu_once(self) -> EventBase: # check the right call got made to the agent self._mock_agent.request.assert_called_once_with( b"GET", - b"matrix://yet.another.server/_matrix/federation/v1/event/event_id", + b"matrix-federation://yet.another.server/_matrix/federation/v1/event/event_id", headers=mock.ANY, bodyProducer=None, ) diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index 105b4caefa3d..aed2a4c07a46 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -292,7 +292,7 @@ def _do_get(self) -> None: self.agent = self._make_agent() self.reactor.lookups["testserv"] = "1.2.3.4" - test_d = self._make_get_request(b"matrix://testserv:8448/foo/bar") + test_d = self._make_get_request(b"matrix-federation://testserv:8448/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -393,7 +393,7 @@ def _do_get_via_proxy( self.reactor.lookups["testserv"] = "1.2.3.4" self.reactor.lookups["proxy.com"] = "9.9.9.9" - test_d = self._make_get_request(b"matrix://testserv:8448/foo/bar") + test_d = self._make_get_request(b"matrix-federation://testserv:8448/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -532,7 +532,7 @@ def test_get_ip_address(self) -> None: # there will be a getaddrinfo on the IP self.reactor.lookups["1.2.3.4"] = "1.2.3.4" - test_d = self._make_get_request(b"matrix://1.2.3.4/foo/bar") + test_d = self._make_get_request(b"matrix-federation://1.2.3.4/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -568,7 +568,7 @@ def test_get_ipv6_address(self) -> None: # there will be a getaddrinfo on the IP self.reactor.lookups["::1"] = "::1" - test_d = self._make_get_request(b"matrix://[::1]/foo/bar") + test_d = self._make_get_request(b"matrix-federation://[::1]/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -604,7 +604,7 @@ def test_get_ipv6_address_with_port(self) -> None: # there will be a getaddrinfo on the IP self.reactor.lookups["::1"] = "::1" - test_d = self._make_get_request(b"matrix://[::1]:80/foo/bar") + test_d = self._make_get_request(b"matrix-federation://[::1]:80/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -639,7 +639,7 @@ def test_get_hostname_bad_cert(self) -> None: self.mock_resolver.resolve_service.side_effect = generate_resolve_service([]) self.reactor.lookups["testserv1"] = "1.2.3.4" - test_d = self._make_get_request(b"matrix://testserv1/foo/bar") + test_d = self._make_get_request(b"matrix-federation://testserv1/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -693,7 +693,7 @@ def test_get_ip_address_bad_cert(self) -> None: # there will be a getaddrinfo on the IP self.reactor.lookups["1.2.3.5"] = "1.2.3.5" - test_d = self._make_get_request(b"matrix://1.2.3.5/foo/bar") + test_d = self._make_get_request(b"matrix-federation://1.2.3.5/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -725,7 +725,7 @@ def test_get_no_srv_no_well_known(self) -> None: self.mock_resolver.resolve_service.side_effect = generate_resolve_service([]) self.reactor.lookups["testserv"] = "1.2.3.4" - test_d = self._make_get_request(b"matrix://testserv/foo/bar") + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -780,7 +780,7 @@ def test_get_well_known(self) -> None: self.reactor.lookups["testserv"] = "1.2.3.4" self.reactor.lookups["target-server"] = "1::f" - test_d = self._make_get_request(b"matrix://testserv/foo/bar") + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -844,7 +844,7 @@ def test_get_well_known_redirect(self) -> None: self.reactor.lookups["testserv"] = "1.2.3.4" self.reactor.lookups["target-server"] = "1::f" - test_d = self._make_get_request(b"matrix://testserv/foo/bar") + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -933,7 +933,7 @@ def test_get_invalid_well_known(self) -> None: self.mock_resolver.resolve_service.side_effect = generate_resolve_service([]) self.reactor.lookups["testserv"] = "1.2.3.4" - test_d = self._make_get_request(b"matrix://testserv/foo/bar") + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -1009,7 +1009,7 @@ def test_get_well_known_unsigned_cert(self) -> None: ), ) - test_d = agent.request(b"GET", b"matrix://testserv/foo/bar") + test_d = agent.request(b"GET", b"matrix-federation://testserv/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -1042,7 +1042,7 @@ def test_get_hostname_srv(self) -> None: ) self.reactor.lookups["srvtarget"] = "1.2.3.4" - test_d = self._make_get_request(b"matrix://testserv/foo/bar") + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -1082,7 +1082,7 @@ def test_get_well_known_srv(self) -> None: self.reactor.lookups["testserv"] = "1.2.3.4" self.reactor.lookups["srvtarget"] = "5.6.7.8" - test_d = self._make_get_request(b"matrix://testserv/foo/bar") + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") # Nothing happened yet self.assertNoResult(test_d) @@ -1143,7 +1143,9 @@ def test_idna_servername(self) -> None: self.reactor.lookups["xn--bcher-kva.com"] = "1.2.3.4" # this is idna for bücher.com - test_d = self._make_get_request(b"matrix://xn--bcher-kva.com/foo/bar") + test_d = self._make_get_request( + b"matrix-federation://xn--bcher-kva.com/foo/bar" + ) # Nothing happened yet self.assertNoResult(test_d) @@ -1204,7 +1206,9 @@ def test_idna_srv_target(self) -> None: ) self.reactor.lookups["xn--trget-3qa.com"] = "1.2.3.4" - test_d = self._make_get_request(b"matrix://xn--bcher-kva.com/foo/bar") + test_d = self._make_get_request( + b"matrix-federation://xn--bcher-kva.com/foo/bar" + ) # Nothing happened yet self.assertNoResult(test_d) @@ -1411,7 +1415,7 @@ def test_srv_fallbacks(self) -> None: ) self.reactor.lookups["target.com"] = "1.2.3.4" - test_d = self._make_get_request(b"matrix://testserv/foo/bar") + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") # Nothing happened yet self.assertNoResult(test_d) From 8cfad3d9517b16c8465e2cdd21e364afd003ed82 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 13 Jun 2023 01:38:23 -0500 Subject: [PATCH 11/40] Fix lints --- synapse/http/matrixfederationclient.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 335c8958c217..724316f2afc3 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -398,8 +398,7 @@ def __init__( hs.config.worker.outbound_federation_restricted_to ) if hs.get_instance_name() in outbound_federation_restricted_to: - logger.info("asdf") - federation_agent = MatrixFederationAgent( + federation_agent: IAgent = MatrixFederationAgent( self.reactor, tls_client_options_factory, user_agent.encode("ascii"), @@ -408,7 +407,7 @@ def __init__( ) else: federation_proxies = outbound_federation_restricted_to.locations - federation_agent: IAgent = ProxyAgent( + federation_agent = ProxyAgent( self.reactor, self.reactor, tls_client_options_factory, From 9eec61474a91c60a5aeac83f755ca7727ab4dd52 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 May 2023 14:04:42 +0100 Subject: [PATCH 12/40] WORKER PROXY WIP Conflicts: synapse/http/matrixfederationclient.py --- synapse/app/_base.py | 2 + synapse/app/generic_worker.py | 1 + synapse/app/homeserver.py | 1 + synapse/http/matrixfederationclient.py | 2 +- synapse/http/proxy.py | 150 +++++++++++++++++++++++++ synapse/http/server.py | 54 +++++---- synapse/http/site.py | 26 +++-- 7 files changed, 203 insertions(+), 33 deletions(-) create mode 100644 synapse/http/proxy.py diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 936b1b043037..938ab40f278e 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -386,6 +386,7 @@ def listen_unix( def listen_http( + hs: "HomeServer", listener_config: ListenerConfig, root_resource: Resource, version_string: str, @@ -406,6 +407,7 @@ def listen_http( version_string, max_request_body_size=max_request_body_size, reactor=reactor, + federation_agent=hs.get_federation_http_client().agent, ) if isinstance(listener_config, TCPListenerConfig): diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 909ebccf78cb..8688c573e18e 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -223,6 +223,7 @@ def _listen_http(self, listener_config: ListenerConfig) -> None: root_resource = create_resource_tree(resources, OptionsResource()) _base.listen_http( + self, listener_config, root_resource, self.version_string, diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 84236ac299e0..f188c7265a50 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -139,6 +139,7 @@ def _listener_http( root_resource = OptionsResource() ports = listen_http( + self, listener_config, create_resource_tree(resources, root_resource), self.version_string, diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 724316f2afc3..35b9736fb4c8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -416,7 +416,7 @@ def __init__( # Use a BlocklistingAgentWrapper to prevent circumventing the IP # blocking via IP literals in server names - self.agent = BlocklistingAgentWrapper( + self.agent: IAgent = BlocklistingAgentWrapper( federation_agent, ip_blocklist=hs.config.server.federation_ip_range_blocklist, ) diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py new file mode 100644 index 000000000000..58a5f7f0c9a8 --- /dev/null +++ b/synapse/http/proxy.py @@ -0,0 +1,150 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import urllib.parse +from typing import TYPE_CHECKING, Any, Optional, Tuple, cast + +from twisted.internet import protocol +from twisted.internet.interfaces import ITCPTransport +from twisted.internet.protocol import connectionDone +from twisted.python import failure +from twisted.python.failure import Failure +from twisted.web.client import ResponseDone +from twisted.web.http import PotentialDataLoss +from twisted.web.http_headers import Headers +from twisted.web.iweb import IAgent, IResponse +from twisted.web.resource import IResource +from twisted.web.server import Site + +from synapse.http import QuieterFileBodyProducer +from synapse.http.server import _AsyncResource +from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.types import ISynapseReactor +from synapse.util.async_helpers import timeout_deferred + +if TYPE_CHECKING: + from synapse.http.site import SynapseRequest + +logger = logging.getLogger(__name__) + + +class ProxyResource(_AsyncResource): + isLeaf = True + + def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent): + super().__init__(True) + + self.reactor = reactor + self.agent = federation_agent + + async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: + assert request.uri.startswith(b"matrix://") + + logger.info("Got proxy request %s", request.uri) + + headers = Headers() + for header_name in (b"User-Agent", b"Authorization", b"Content-Type"): + header_value = request.getHeader(header_name) + if header_value: + headers.addRawHeader(header_name, header_value) + + request_deferred = run_in_background( + self.agent.request, + request.method, + request.uri, + headers=headers, + bodyProducer=QuieterFileBodyProducer(request.content), + ) + request_deferred = timeout_deferred( + request_deferred, + timeout=90, + reactor=self.reactor, + ) + + response = await make_deferred_yieldable(request_deferred) + + logger.info("Got proxy response %s", response.code) + + return response.code, response + + def _send_response( + self, + request: "SynapseRequest", + code: int, + response_object: Any, + ) -> None: + response = cast(IResponse, response_object) + + request.setResponseCode(code) + + # Copy headers. + for k, v in response.headers.getAllRawHeaders(): + request.responseHeaders.setRawHeaders(k, v) + + response.deliverBody(_ProxyResponseBody(request)) + + def _send_error_response( + self, + f: failure.Failure, + request: "SynapseRequest", + ) -> None: + request.setResponseCode(502) + request.finish() + + +class _ProxyResponseBody(protocol.Protocol): + transport: Optional[ITCPTransport] = None + + def __init__(self, request: "SynapseRequest") -> None: + self._request = request + + def dataReceived(self, data: bytes) -> None: + if self._request._disconnected and self.transport is not None: + self.transport.abortConnection() + return + + self._request.write(data) + + def connectionLost(self, reason: Failure = connectionDone) -> None: + if self._request.finished: + return + + if reason.check(ResponseDone): + self._request.finish() + elif reason.check(PotentialDataLoss): + # TODO: ARGH + self._request.finish() + else: + self._request.transport.abortConnection() + + +class ProxySite(Site): + def __init__( + self, + resource: IResource, + reactor: ISynapseReactor, + federation_agent: IAgent, + ): + super().__init__(resource, reactor=reactor) + + self._proxy_resource = ProxyResource(reactor, federation_agent) + + def getResourceFor(self, request: "SynapseRequest") -> IResource: + uri = urllib.parse.urlparse(request.uri) + if uri.scheme == b"matrix": + return self._proxy_resource + + return super().getResourceFor(request) diff --git a/synapse/http/server.py b/synapse/http/server.py index 933172c87327..27c5324078b1 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -18,6 +18,7 @@ import logging import types import urllib +import urllib.parse from http import HTTPStatus from http.client import FOUND from inspect import isawaitable @@ -65,7 +66,6 @@ UnrecognizedRequestError, ) from synapse.config.homeserver import HomeServerConfig -from synapse.http.site import SynapseRequest from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background from synapse.logging.opentracing import active_span, start_active_span, trace_servlet from synapse.util import json_encoder @@ -76,6 +76,7 @@ if TYPE_CHECKING: import opentracing + from synapse.http.site import SynapseRequest from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -102,7 +103,7 @@ def return_json_error( - f: failure.Failure, request: SynapseRequest, config: Optional[HomeServerConfig] + f: failure.Failure, request: "SynapseRequest", config: Optional[HomeServerConfig] ) -> None: """Sends a JSON error response to clients.""" @@ -220,8 +221,8 @@ def return_html_error( def wrap_async_request_handler( - h: Callable[["_AsyncResource", SynapseRequest], Awaitable[None]] -) -> Callable[["_AsyncResource", SynapseRequest], "defer.Deferred[None]"]: + h: Callable[["_AsyncResource", "SynapseRequest"], Awaitable[None]] +) -> Callable[["_AsyncResource", "SynapseRequest"], "defer.Deferred[None]"]: """Wraps an async request handler so that it calls request.processing. This helps ensure that work done by the request handler after the request is completed @@ -235,7 +236,7 @@ def wrap_async_request_handler( """ async def wrapped_async_request_handler( - self: "_AsyncResource", request: SynapseRequest + self: "_AsyncResource", request: "SynapseRequest" ) -> None: with request.processing(): await h(self, request) @@ -300,7 +301,7 @@ def __init__(self, extract_context: bool = False): self._extract_context = extract_context - def render(self, request: SynapseRequest) -> int: + def render(self, request: "SynapseRequest") -> int: """This gets called by twisted every time someone sends us a request.""" request.render_deferred = defer.ensureDeferred( self._async_render_wrapper(request) @@ -308,7 +309,7 @@ def render(self, request: SynapseRequest) -> int: return NOT_DONE_YET @wrap_async_request_handler - async def _async_render_wrapper(self, request: SynapseRequest) -> None: + async def _async_render_wrapper(self, request: "SynapseRequest") -> None: """This is a wrapper that delegates to `_async_render` and handles exceptions, return values, metrics, etc. """ @@ -326,9 +327,14 @@ async def _async_render_wrapper(self, request: SynapseRequest) -> None: # of our stack, and thus gives us a sensible stack # trace. f = failure.Failure() + logger.exception( + "Error handling request", exc_info=(f.type, f.value, f.getTracebackObject()) # type: ignore[arg-type] + ) self._send_error_response(f, request) - async def _async_render(self, request: SynapseRequest) -> Optional[Tuple[int, Any]]: + async def _async_render( + self, request: "SynapseRequest" + ) -> Optional[Tuple[int, Any]]: """Delegates to `_async_render_` methods, or returns a 400 if no appropriate method exists. Can be overridden in sub classes for different routing. @@ -358,7 +364,7 @@ async def _async_render(self, request: SynapseRequest) -> Optional[Tuple[int, An @abc.abstractmethod def _send_response( self, - request: SynapseRequest, + request: "SynapseRequest", code: int, response_object: Any, ) -> None: @@ -368,7 +374,7 @@ def _send_response( def _send_error_response( self, f: failure.Failure, - request: SynapseRequest, + request: "SynapseRequest", ) -> None: raise NotImplementedError() @@ -384,7 +390,7 @@ def __init__(self, canonical_json: bool = False, extract_context: bool = False): def _send_response( self, - request: SynapseRequest, + request: "SynapseRequest", code: int, response_object: Any, ) -> None: @@ -401,7 +407,7 @@ def _send_response( def _send_error_response( self, f: failure.Failure, - request: SynapseRequest, + request: "SynapseRequest", ) -> None: """Implements _AsyncResource._send_error_response""" return_json_error(f, request, None) @@ -473,7 +479,7 @@ def register_paths( ) def _get_handler_for_request( - self, request: SynapseRequest + self, request: "SynapseRequest" ) -> Tuple[ServletCallback, str, Dict[str, str]]: """Finds a callback method to handle the given request. @@ -503,7 +509,7 @@ def _get_handler_for_request( # Huh. No one wanted to handle that? Fiiiiiine. raise UnrecognizedRequestError(code=404) - async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]: + async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: callback, servlet_classname, group_dict = self._get_handler_for_request(request) request.is_render_cancellable = is_function_cancellable(callback) @@ -535,7 +541,7 @@ async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]: def _send_error_response( self, f: failure.Failure, - request: SynapseRequest, + request: "SynapseRequest", ) -> None: """Implements _AsyncResource._send_error_response""" return_json_error(f, request, self.hs.config) @@ -551,7 +557,7 @@ class DirectServeHtmlResource(_AsyncResource): def _send_response( self, - request: SynapseRequest, + request: "SynapseRequest", code: int, response_object: Any, ) -> None: @@ -565,7 +571,7 @@ def _send_response( def _send_error_response( self, f: failure.Failure, - request: SynapseRequest, + request: "SynapseRequest", ) -> None: """Implements _AsyncResource._send_error_response""" return_html_error(f, request, self.ERROR_TEMPLATE) @@ -592,7 +598,7 @@ class UnrecognizedRequestResource(resource.Resource): errcode of M_UNRECOGNIZED. """ - def render(self, request: SynapseRequest) -> int: + def render(self, request: "SynapseRequest") -> int: f = failure.Failure(UnrecognizedRequestError(code=404)) return_json_error(f, request, None) # A response has already been sent but Twisted requires either NOT_DONE_YET @@ -622,7 +628,7 @@ def getChild(self, name: str, request: Request) -> resource.Resource: class OptionsResource(resource.Resource): """Responds to OPTION requests for itself and all children.""" - def render_OPTIONS(self, request: SynapseRequest) -> bytes: + def render_OPTIONS(self, request: "SynapseRequest") -> bytes: request.setResponseCode(204) request.setHeader(b"Content-Length", b"0") @@ -737,7 +743,7 @@ def _encode_json_bytes(json_object: object) -> bytes: def respond_with_json( - request: SynapseRequest, + request: "SynapseRequest", code: int, json_object: Any, send_cors: bool = False, @@ -787,7 +793,7 @@ def respond_with_json( def respond_with_json_bytes( - request: SynapseRequest, + request: "SynapseRequest", code: int, json_bytes: bytes, send_cors: bool = False, @@ -825,7 +831,7 @@ def respond_with_json_bytes( async def _async_write_json_to_request_in_thread( - request: SynapseRequest, + request: "SynapseRequest", json_encoder: Callable[[Any], bytes], json_object: Any, ) -> None: @@ -883,7 +889,7 @@ def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None: _ByteProducer(request, bytes_generator) -def set_cors_headers(request: SynapseRequest) -> None: +def set_cors_headers(request: "SynapseRequest") -> None: """Set the CORS headers so that javascript running in a web browsers can use this API @@ -981,7 +987,7 @@ def set_clickjacking_protection_headers(request: Request) -> None: def respond_with_redirect( - request: SynapseRequest, url: bytes, statusCode: int = FOUND, cors: bool = False + request: "SynapseRequest", url: bytes, statusCode: int = FOUND, cors: bool = False ) -> None: """ Write a 302 (or other specified status code) response to the request, if it is still alive. diff --git a/synapse/http/site.py b/synapse/http/site.py index c530966ef336..879436177d3c 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -21,25 +21,28 @@ from twisted.internet.address import UNIXAddress from twisted.internet.defer import Deferred -from twisted.internet.interfaces import IAddress, IReactorTime +from twisted.internet.interfaces import IAddress from twisted.python.failure import Failure from twisted.web.http import HTTPChannel +from twisted.web.iweb import IAgent from twisted.web.resource import IResource, Resource -from twisted.web.server import Request, Site +from twisted.web.server import Request from synapse.config.server import ListenerConfig from synapse.http import get_request_user_agent, redact_uri +from synapse.http.proxy import ProxySite from synapse.http.request_metrics import RequestMetrics, requests_counter from synapse.logging.context import ( ContextRequest, LoggingContext, PreserveLoggingContext, ) -from synapse.types import Requester +from synapse.types import ISynapseReactor, Requester if TYPE_CHECKING: import opentracing + logger = logging.getLogger(__name__) _next_request_seq = 0 @@ -102,7 +105,7 @@ def __init__( # A boolean indicating whether `render_deferred` should be cancelled if the # client disconnects early. Expected to be set by the coroutine started by # `Resource.render`, if rendering is asynchronous. - self.is_render_cancellable = False + self.is_render_cancellable: bool = False global _next_request_seq self.request_seq = _next_request_seq @@ -596,7 +599,7 @@ class _XForwardedForAddress: host: str -class SynapseSite(Site): +class SynapseSite(ProxySite): """ Synapse-specific twisted http Site @@ -618,7 +621,8 @@ def __init__( resource: IResource, server_version_string: str, max_request_body_size: int, - reactor: IReactorTime, + reactor: ISynapseReactor, + federation_agent: IAgent, ): """ @@ -633,7 +637,11 @@ def __init__( dropping the connection reactor: reactor to be used to manage connection timeouts """ - Site.__init__(self, resource, reactor=reactor) + super().__init__( + resource=resource, + reactor=reactor, + federation_agent=federation_agent, + ) self.site_tag = site_tag self.reactor = reactor @@ -644,7 +652,9 @@ def __init__( request_id_header = config.http_options.request_id_header - self.experimental_cors_msc3886 = config.http_options.experimental_cors_msc3886 + self.experimental_cors_msc3886: bool = ( + config.http_options.experimental_cors_msc3886 + ) def request_factory(channel: HTTPChannel, queued: bool) -> Request: return request_class( From e9e900fd3ca694e3f9ce3a0776d5a3d28d805437 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Jun 2023 00:55:21 -0500 Subject: [PATCH 13/40] Align scheme checking --- synapse/http/proxy.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index 58a5f7f0c9a8..7ec22fa1823a 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -51,7 +51,8 @@ def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent): self.agent = federation_agent async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: - assert request.uri.startswith(b"matrix://") + uri = urllib.parse.urlparse(request.uri) + assert uri.scheme == b"matrix-federation" logger.info("Got proxy request %s", request.uri) @@ -144,7 +145,7 @@ def __init__( def getResourceFor(self, request: "SynapseRequest") -> IResource: uri = urllib.parse.urlparse(request.uri) - if uri.scheme == b"matrix": + if uri.scheme == b"matrix-federation": return self._proxy_resource return super().getResourceFor(request) From dcb4105f96ac4f7fa3495f9c50e27294403afeb1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Jun 2023 01:12:39 -0500 Subject: [PATCH 14/40] Fix lints --- synapse/http/server.py | 3 ++- tests/replication/_base.py | 1 + tests/test_server.py | 7 +++++++ tests/unittest.py | 1 + 4 files changed, 11 insertions(+), 1 deletion(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index 27c5324078b1..ff3153a9d949 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -328,7 +328,8 @@ async def _async_render_wrapper(self, request: "SynapseRequest") -> None: # trace. f = failure.Failure() logger.exception( - "Error handling request", exc_info=(f.type, f.value, f.getTracebackObject()) # type: ignore[arg-type] + "Error handling request", + exc_info=(f.type, f.value, f.getTracebackObject()), ) self._send_error_response(f, request) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index eb9b1f1cd9be..aad0028bfba7 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -380,6 +380,7 @@ def make_worker_hs( server_version_string="1", max_request_body_size=8192, reactor=self.reactor, + federation_agent=worker_hs.get_federation_http_client().agent, ) worker_hs.get_replication_command_handler().start_replication(worker_hs) diff --git a/tests/test_server.py b/tests/test_server.py index e266c06a2cbf..ec9a08f31140 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -210,6 +210,12 @@ def _callback( class OptionsResourceTests(unittest.TestCase): def setUp(self) -> None: self.reactor = ThreadedMemoryReactorClock() + self.hs_clock = Clock(self.reactor) + self.homeserver = setup_test_homeserver( + self.addCleanup, + clock=self.hs_clock, + reactor=self.reactor, + ) class DummyResource(Resource): isLeaf = True @@ -242,6 +248,7 @@ def _make_request( "1.0", max_request_body_size=4096, reactor=self.reactor, + federation_agent=self.homeserver.get_federation_http_client().agent, ) # render the request and return the channel diff --git a/tests/unittest.py b/tests/unittest.py index c73195b32b00..334a95a9175d 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -358,6 +358,7 @@ def setUp(self) -> None: server_version_string="1", max_request_body_size=4096, reactor=self.reactor, + federation_agent=self.hs.get_federation_http_client().agent, ) from tests.rest.client.utils import RestHelper From c6dcd5ecf13e27c37c8b7868685260a6f002e03b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Jun 2023 01:18:50 -0500 Subject: [PATCH 15/40] Refactor tests to use `get_clock()` --- tests/test_server.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/tests/test_server.py b/tests/test_server.py index ec9a08f31140..14fe189fb44c 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -38,7 +38,7 @@ from tests.server import ( FakeChannel, FakeSite, - ThreadedMemoryReactorClock, + get_clock, make_request, setup_test_homeserver, ) @@ -46,12 +46,12 @@ class JsonResourceTests(unittest.TestCase): def setUp(self) -> None: - self.reactor = ThreadedMemoryReactorClock() - self.hs_clock = Clock(self.reactor) + reactor, clock = get_clock() + self.reactor = reactor self.homeserver = setup_test_homeserver( self.addCleanup, federation_http_client=None, - clock=self.hs_clock, + clock=clock, reactor=self.reactor, ) @@ -209,11 +209,11 @@ def _callback( class OptionsResourceTests(unittest.TestCase): def setUp(self) -> None: - self.reactor = ThreadedMemoryReactorClock() - self.hs_clock = Clock(self.reactor) + reactor, clock = get_clock() + self.reactor = reactor self.homeserver = setup_test_homeserver( self.addCleanup, - clock=self.hs_clock, + clock=clock, reactor=self.reactor, ) @@ -351,7 +351,8 @@ async def _async_render_GET(self, request: SynapseRequest) -> None: await self.callback(request) def setUp(self) -> None: - self.reactor = ThreadedMemoryReactorClock() + reactor, _ = get_clock() + self.reactor = reactor def test_good_response(self) -> None: async def callback(request: SynapseRequest) -> None: @@ -469,9 +470,9 @@ class DirectServeJsonResourceCancellationTests(unittest.TestCase): """Tests for `DirectServeJsonResource` cancellation.""" def setUp(self) -> None: - self.reactor = ThreadedMemoryReactorClock() - self.clock = Clock(self.reactor) - self.resource = CancellableDirectServeJsonResource(self.clock) + reactor, clock = get_clock() + self.reactor = reactor + self.resource = CancellableDirectServeJsonResource(clock) self.site = FakeSite(self.resource, self.reactor) def test_cancellable_disconnect(self) -> None: @@ -503,9 +504,9 @@ class DirectServeHtmlResourceCancellationTests(unittest.TestCase): """Tests for `DirectServeHtmlResource` cancellation.""" def setUp(self) -> None: - self.reactor = ThreadedMemoryReactorClock() - self.clock = Clock(self.reactor) - self.resource = CancellableDirectServeHtmlResource(self.clock) + reactor, clock = get_clock() + self.reactor = reactor + self.resource = CancellableDirectServeHtmlResource(clock) self.site = FakeSite(self.resource, self.reactor) def test_cancellable_disconnect(self) -> None: From 8f9f478b2c26184453fc1e36a86a02efaa7ac8f1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Jun 2023 12:54:39 -0500 Subject: [PATCH 16/40] Fix tests (make sure `federation_http_client` is defined) --- tests/app/test_openid_listener.py | 8 ++------ tests/handlers/test_device.py | 3 +-- tests/handlers/test_federation.py | 2 +- tests/handlers/test_presence.py | 1 - tests/replication/_base.py | 1 - tests/rest/client/test_presence.py | 1 - tests/rest/client/test_rooms.py | 2 -- tests/storage/test_e2e_room_keys.py | 2 +- tests/storage/test_purge.py | 2 +- tests/storage/test_rollback_worker.py | 4 +--- tests/test_server.py | 1 - 11 files changed, 7 insertions(+), 20 deletions(-) diff --git a/tests/app/test_openid_listener.py b/tests/app/test_openid_listener.py index 5a965f233b05..21c530974083 100644 --- a/tests/app/test_openid_listener.py +++ b/tests/app/test_openid_listener.py @@ -31,9 +31,7 @@ class FederationReaderOpenIDListenerTests(HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver( - federation_http_client=None, homeserver_to_use=GenericWorkerServer - ) + hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer) return hs def default_config(self) -> JsonDict: @@ -91,9 +89,7 @@ def test_openid_listener(self, names: List[str], expectation: str) -> None: @patch("synapse.app.homeserver.KeyResource", new=Mock()) class SynapseHomeserverOpenIDListenerTests(HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver( - federation_http_client=None, homeserver_to_use=SynapseHomeServer - ) + hs = self.setup_test_homeserver(homeserver_to_use=SynapseHomeServer) return hs @parameterized.expand( diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index ee48f9e546e8..66215af2b890 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -41,7 +41,6 @@ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: self.appservice_api = mock.Mock() hs = self.setup_test_homeserver( "server", - federation_http_client=None, application_service_api=self.appservice_api, ) handler = hs.get_device_handler() @@ -401,7 +400,7 @@ def test_on_federation_query_user_devices_appservice(self) -> None: class DehydrationTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver("server", federation_http_client=None) + hs = self.setup_test_homeserver("server") handler = hs.get_device_handler() assert isinstance(handler, DeviceHandler) self.handler = handler diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index bf0862ed541c..5f11d5df11ad 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -57,7 +57,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase): ] def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver(federation_http_client=None) + hs = self.setup_test_homeserver() self.handler = hs.get_federation_handler() self.store = hs.get_datastores().main return hs diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 19f5322317a1..fd66d573d221 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -993,7 +993,6 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: hs = self.setup_test_homeserver( "server", - federation_http_client=None, federation_sender=Mock(spec=FederationSender), ) return hs diff --git a/tests/replication/_base.py b/tests/replication/_base.py index aad0028bfba7..1215f593362a 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -69,7 +69,6 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: # Make a new HomeServer object for the worker self.reactor.lookups["testserv"] = "1.2.3.4" self.worker_hs = self.setup_test_homeserver( - federation_http_client=None, homeserver_to_use=GenericWorkerServer, config=self._get_worker_hs_config(), reactor=self.reactor, diff --git a/tests/rest/client/test_presence.py b/tests/rest/client/test_presence.py index dcbb125a3bba..e12098102b96 100644 --- a/tests/rest/client/test_presence.py +++ b/tests/rest/client/test_presence.py @@ -40,7 +40,6 @@ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: hs = self.setup_test_homeserver( "red", - federation_http_client=None, federation_client=Mock(), presence_handler=self.presence_handler, ) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index f1b4e1ad2fc1..d013e75d55d7 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -67,8 +67,6 @@ class RoomBase(unittest.HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: self.hs = self.setup_test_homeserver( "red", - federation_http_client=None, - federation_client=Mock(), ) self.hs.get_federation_handler = Mock() # type: ignore[assignment] diff --git a/tests/storage/test_e2e_room_keys.py b/tests/storage/test_e2e_room_keys.py index 9cb326d90a87..f6df31aba4ca 100644 --- a/tests/storage/test_e2e_room_keys.py +++ b/tests/storage/test_e2e_room_keys.py @@ -31,7 +31,7 @@ class E2eRoomKeysHandlerTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver("server", federation_http_client=None) + hs = self.setup_test_homeserver("server") self.store = hs.get_datastores().main return hs diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py index 857e2caf2e24..02826731679e 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py @@ -27,7 +27,7 @@ class PurgeTests(HomeserverTestCase): servlets = [room.register_servlets] def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver("server", federation_http_client=None) + hs = self.setup_test_homeserver("server") return hs def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: diff --git a/tests/storage/test_rollback_worker.py b/tests/storage/test_rollback_worker.py index 6861d3a6c9e5..809c9f175d42 100644 --- a/tests/storage/test_rollback_worker.py +++ b/tests/storage/test_rollback_worker.py @@ -45,9 +45,7 @@ def fake_listdir(filepath: str) -> List[str]: class WorkerSchemaTests(HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: - hs = self.setup_test_homeserver( - federation_http_client=None, homeserver_to_use=GenericWorkerServer - ) + hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer) return hs def default_config(self) -> JsonDict: diff --git a/tests/test_server.py b/tests/test_server.py index 14fe189fb44c..fe5afebdcda4 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -50,7 +50,6 @@ def setUp(self) -> None: self.reactor = reactor self.homeserver = setup_test_homeserver( self.addCleanup, - federation_http_client=None, clock=clock, reactor=self.reactor, ) From e789c64c6c8fa44217750ab70681b52359ee04fd Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Jun 2023 14:28:17 -0500 Subject: [PATCH 17/40] Fix tests --- tests/handlers/test_typing.py | 9 ++++++++ .../test_federation_sender_shard.py | 23 ++++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 94518a7196b4..454d9bcd1dda 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -15,6 +15,7 @@ import json from typing import Dict, List, Set +from netaddr import IPSet from unittest.mock import ANY, Mock, call from twisted.test.proto_helpers import MemoryReactor @@ -24,6 +25,7 @@ from synapse.api.errors import AuthError from synapse.federation.transport.server import TransportLayerServer from synapse.handlers.typing import TypingWriterHandler +from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.server import HomeServer from synapse.types import JsonDict, Requester, UserID, create_requester from synapse.util import Clock @@ -76,6 +78,13 @@ def make_homeserver( # we mock out the federation client too self.mock_federation_client = Mock(spec=["put_json"]) self.mock_federation_client.put_json.return_value = make_awaitable((200, "OK")) + self.mock_federation_client.agent = MatrixFederationAgent( + reactor, + tls_client_options_factory=None, + user_agent=b"SynapseInTrialTest/0.0.0", + ip_allowlist=None, + ip_blocklist=IPSet(), + ) # the tests assume that we are starting at unix time 1000 reactor.pump((1000,)) diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index 08703206a9c9..e1a6895a6e8d 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -12,17 +12,22 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from netaddr import IPSet from unittest.mock import Mock from synapse.api.constants import EventTypes, Membership from synapse.events.builder import EventBuilderFactory from synapse.handlers.typing import TypingWriterHandler +from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.rest.admin import register_servlets_for_client_rest_resource from synapse.rest.client import login, room from synapse.types import UserID, create_requester from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.test_utils import make_awaitable +from tests.server import ( + get_clock, +) logger = logging.getLogger(__name__) @@ -41,13 +46,25 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): room.register_servlets, ] + def setUp(self) -> None: + super().setUp() + + reactor, _ = get_clock() + self.matrix_federation_agent = MatrixFederationAgent( + reactor, + tls_client_options_factory=None, + user_agent=b"SynapseInTrialTest/0.0.0", + ip_allowlist=None, + ip_blocklist=IPSet(), + ) + def test_send_event_single_sender(self) -> None: """Test that using a single federation sender worker correctly sends a new event. """ mock_client = Mock(spec=["put_json"]) mock_client.put_json.return_value = make_awaitable({}) - + mock_client.agent = self.matrix_federation_agent self.make_worker_hs( "synapse.app.generic_worker", { @@ -78,6 +95,7 @@ def test_send_event_sharded(self) -> None: """ mock_client1 = Mock(spec=["put_json"]) mock_client1.put_json.return_value = make_awaitable({}) + mock_client1.agent = self.matrix_federation_agent self.make_worker_hs( "synapse.app.generic_worker", { @@ -92,6 +110,7 @@ def test_send_event_sharded(self) -> None: mock_client2 = Mock(spec=["put_json"]) mock_client2.put_json.return_value = make_awaitable({}) + mock_client2.agent = self.matrix_federation_agent self.make_worker_hs( "synapse.app.generic_worker", { @@ -145,6 +164,7 @@ def test_send_typing_sharded(self) -> None: """ mock_client1 = Mock(spec=["put_json"]) mock_client1.put_json.return_value = make_awaitable({}) + mock_client1.agent = self.matrix_federation_agent self.make_worker_hs( "synapse.app.generic_worker", { @@ -159,6 +179,7 @@ def test_send_typing_sharded(self) -> None: mock_client2 = Mock(spec=["put_json"]) mock_client2.put_json.return_value = make_awaitable({}) + mock_client2.agent = self.matrix_federation_agent self.make_worker_hs( "synapse.app.generic_worker", { From 0cead401bd660ee46e0f55e45cadac910ab8ec66 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Jun 2023 14:32:43 -0500 Subject: [PATCH 18/40] Fix lints --- tests/handlers/test_typing.py | 3 ++- tests/replication/test_federation_sender_shard.py | 7 +++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 454d9bcd1dda..5da1d95f0b22 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -15,9 +15,10 @@ import json from typing import Dict, List, Set -from netaddr import IPSet from unittest.mock import ANY, Mock, call +from netaddr import IPSet + from twisted.test.proto_helpers import MemoryReactor from twisted.web.resource import Resource diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index e1a6895a6e8d..a324b4d31dde 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from netaddr import IPSet from unittest.mock import Mock +from netaddr import IPSet + from synapse.api.constants import EventTypes, Membership from synapse.events.builder import EventBuilderFactory from synapse.handlers.typing import TypingWriterHandler @@ -24,10 +25,8 @@ from synapse.types import UserID, create_requester from tests.replication._base import BaseMultiWorkerStreamTestCase +from tests.server import get_clock from tests.test_utils import make_awaitable -from tests.server import ( - get_clock, -) logger = logging.getLogger(__name__) From 11bf041cdf3b84505e03a455c18222c2b549920f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Jun 2023 15:11:01 -0500 Subject: [PATCH 19/40] Maybe fix more replication tests Maybe fix https://github.com/matrix-org/synapse/pull/15773#discussion_r1230109565 ``` Failure: twisted.trial.util.DirtyReactorAggregateError: Reactor was unclean. ``` --- tests/replication/_base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 1215f593362a..96badc46b04d 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -72,6 +72,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: homeserver_to_use=GenericWorkerServer, config=self._get_worker_hs_config(), reactor=self.reactor, + federation_http_client=None, ) # Since we use sqlite in memory databases we need to make sure the From d847564e28db5d3efc05eaeeb015e576eecd5799 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 15 Jun 2023 14:17:59 -0500 Subject: [PATCH 20/40] Mark out spots to add docs --- docs/usage/configuration/config_documentation.md | 15 ++++++++------- synapse/http/proxyagent.py | 2 ++ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 8426de04179b..03a53550867e 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3930,13 +3930,14 @@ federation_sender_instances: --- ### `instance_map` -When using workers this should be a map from [`worker_name`](#worker_name) to the -HTTP replication listener of the worker, if configured, and to the main process. -Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs -a HTTP replication listener, and that listener should be included in the `instance_map`. -The main process also needs an entry on the `instance_map`, and it should be listed under -`main` **if even one other worker exists**. Ensure the port matches with what is declared -inside the `listener` block for a `replication` listener. +When using workers this should be a map from [`worker_name`](#worker_name) to the HTTP +replication listener of the worker, if configured, and to the main process. Each worker +declared under [`stream_writers`](../../workers.md#stream-writers) and +[`outbound_federation_restricted_to`](#TODO) needs a HTTP replication listener, and that +listener should be included in the `instance_map`. The main process also needs an entry +on the `instance_map`, and it should be listed under `main` **if even one other worker +exists**. Ensure the port matches with what is declared inside the `listener` block for +a `replication` listener. Example configuration: diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 06e4d4e70284..4cd79c9d7399 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -82,6 +82,8 @@ class ProxyAgent(_AgentBase): use_proxy: Whether proxy settings should be discovered and used from conventional environment variables. + federation_proxies: TODO + Raises: ValueError if use_proxy is set and the environment variables contain an invalid proxy specification. From 74988e29e6c1fefb55e14e8591b131a69e0b0ac4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 16 Jun 2023 03:46:16 -0500 Subject: [PATCH 21/40] WIP: Very rough worker test --- synapse/http/matrixfederationclient.py | 14 ++++ synapse/http/proxy.py | 3 + tests/http/test_matrixfederationclient.py | 93 ++++++++++++++++++++++- tests/replication/_base.py | 6 ++ 4 files changed, 113 insertions(+), 3 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 35b9736fb4c8..8cc6a1c8ce2d 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -398,6 +398,9 @@ def __init__( hs.config.worker.outbound_federation_restricted_to ) if hs.get_instance_name() in outbound_federation_restricted_to: + logger.info( + "asdf currently=%s asking federation directly", hs.get_instance_name() + ) federation_agent: IAgent = MatrixFederationAgent( self.reactor, tls_client_options_factory, @@ -406,6 +409,11 @@ def __init__( hs.config.server.federation_ip_range_blocklist, ) else: + logger.info( + "asdf currently=%s proxying over to %s", + hs.get_instance_name(), + outbound_federation_restricted_to, + ) federation_proxies = outbound_federation_restricted_to.locations federation_agent = ProxyAgent( self.reactor, @@ -661,6 +669,12 @@ async def _send_request( # * The `Deferred` that joins the forks back together is # wrapped in `make_deferred_yieldable` to restore the # logging context regardless of the path taken. + logger.info( + "asdf %s matrixfederationclient._send_request %s %s", + self.hs.get_instance_name(), + method_bytes, + url_bytes, + ) request_deferred = run_in_background( self.agent.request, method_bytes, diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index 7ec22fa1823a..b4aaf63cc4c8 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -48,6 +48,7 @@ def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent): super().__init__(True) self.reactor = reactor + logger.info("asdf proxy set agent") self.agent = federation_agent async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: @@ -55,6 +56,7 @@ async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: assert uri.scheme == b"matrix-federation" logger.info("Got proxy request %s", request.uri) + logger.info("asdf proxy self.agent.request=%s", self.agent.request) headers = Headers() for header_name in (b"User-Agent", b"Authorization", b"Content-Type"): @@ -145,6 +147,7 @@ def __init__( def getResourceFor(self, request: "SynapseRequest") -> IResource: uri = urllib.parse.urlparse(request.uri) + logger.info("asdf getResourceFor request=%s", request.uri) if uri.scheme == b"matrix-federation": return self._proxy_resource diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py index 8565f8ac64ad..0a7c2907b4bc 100644 --- a/tests/http/test_matrixfederationclient.py +++ b/tests/http/test_matrixfederationclient.py @@ -11,8 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Generator -from unittest.mock import Mock +from typing import Any, Dict, Generator +from unittest.mock import ANY, Mock, create_autospec from netaddr import IPSet from parameterized import parameterized @@ -21,8 +21,9 @@ from twisted.internet.defer import Deferred, TimeoutError from twisted.internet.error import ConnectingCancelledError, DNSLookupError from twisted.test.proto_helpers import MemoryReactor, StringTransport -from twisted.web.client import ResponseNeverReceived +from twisted.web.client import Agent, ResponseNeverReceived from twisted.web.http import HTTPChannel +from twisted.web.iweb import IResponse from synapse.api.errors import RequestSendFailed from synapse.http.matrixfederationclient import ( @@ -39,7 +40,9 @@ from synapse.server import HomeServer from synapse.util import Clock +from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.server import FakeTransport +from tests.test_utils import FakeResponse from tests.unittest import HomeserverTestCase, override_config @@ -658,3 +661,87 @@ def test_configurable_retry_and_delay_values(self) -> None: self.assertEqual(self.cl.max_short_retry_delay, 7) self.assertEqual(self.cl.max_long_retries, 20) self.assertEqual(self.cl.max_short_retries, 5) + + +class FederationClientProxyTests(BaseMultiWorkerStreamTestCase): + def default_config(self) -> Dict[str, Any]: + conf = super().default_config() + conf["instance_map"] = { + "main": {"host": "testserv", "port": 8765}, + "federation_sender": {"host": "testserv", "port": 1001}, + } + return conf + + @override_config({"outbound_federation_restricted_to": ["federation_sender"]}) + def test_asdf(self) -> None: + """ + TODO + """ + import logging + + logger = logging.getLogger(__name__) + + self.reactor.lookups["remoteserv"] = "1.2.3.4" + + mock_client = Mock() + # mock out the Agent used by the federation client, which is easier than + # catching the HTTPS connection and do the TLS stuff. + self._mock_agent = create_autospec(Agent, spec_set=True) + mock_client.agent = self._mock_agent + + self.federation_sender = self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "federation_sender"}, + federation_http_client=mock_client, + ) + + # TODO: Mock HTTP of the `federation_sender` instance + # self.federation_sender.get_federation_http_client().post_json = Mock(side_effect=post_json) # type: ignore[assignment] + # self.federation_sender.get_federation_http_client().agent.request = Mock(side_effect=request) + + def _request_stub( + method: bytes, + uri: bytes, + # headers: Optional[Headers] = None, + # bodyProducer: Optional[IBodyProducer] = None, + ) -> IResponse: + logger.info("asdfasdf request(%s, %s)", method, uri) + return FakeResponse.json( + payload={ + "foo": "bar", + } + ) + + # mock up the response, and have the agent return it + # self._mock_agent.request.side_effect = lambda *args, **kwargs: defer.succeed( + # FakeResponse.json( + # payload={ + # "foo": "bar", + # } + # ) + # ) + self._mock_agent.request.side_effect = _request_stub + + # self.get_success( + # self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar") + # ) + + # This request should go through the `federation_sender` worker off to the remote + # server + test_d = defer.ensureDeferred( + self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar") + ) + + self.pump(5.0) + + self._mock_agent.request.assert_called_once_with( + b"GET", + b"matrix-federation://remoteserv:8008/foo/bar", + headers=ANY, + bodyProducer=None, + ) + + res = self.successResultOf(test_d) + + # check the response is as expected + self.assertEqual(res, {"a": 1}) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 96badc46b04d..7bbf11470ac4 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -354,6 +354,12 @@ def make_worker_hs( # not going to reset `instance_loc` to `None` under its feet. See # https://mypy.readthedocs.io/en/latest/common_issues.html#narrowing-and-inner-functions port = instance_loc.port + logger.info( + "asdf make_worker_hs %s -> host=%s port=%s", + instance_name, + instance_loc.host, + instance_loc.port, + ) self.reactor.add_tcp_client_callback( self.reactor.lookups[instance_loc.host], From 6b44e66ce2b07b8e88e86635ba041abfd0116b28 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 16 Jun 2023 03:57:49 -0500 Subject: [PATCH 22/40] Cleaned up test --- tests/http/test_matrixfederationclient.py | 74 ++++++++--------------- 1 file changed, 25 insertions(+), 49 deletions(-) diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py index 0a7c2907b4bc..c8925ae23992 100644 --- a/tests/http/test_matrixfederationclient.py +++ b/tests/http/test_matrixfederationclient.py @@ -677,71 +677,47 @@ def test_asdf(self) -> None: """ TODO """ - import logging - - logger = logging.getLogger(__name__) - - self.reactor.lookups["remoteserv"] = "1.2.3.4" - - mock_client = Mock() - # mock out the Agent used by the federation client, which is easier than - # catching the HTTPS connection and do the TLS stuff. - self._mock_agent = create_autospec(Agent, spec_set=True) - mock_client.agent = self._mock_agent + # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance + # so we can act like some remote server responding to requests + mock_client_on_federation_sender = Mock() + mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True) + mock_client_on_federation_sender.agent = mock_agent_on_federation_sender + # Make the `federation_sender` worker self.federation_sender = self.make_worker_hs( "synapse.app.generic_worker", {"worker_name": "federation_sender"}, - federation_http_client=mock_client, + federation_http_client=mock_client_on_federation_sender, ) - # TODO: Mock HTTP of the `federation_sender` instance - # self.federation_sender.get_federation_http_client().post_json = Mock(side_effect=post_json) # type: ignore[assignment] - # self.federation_sender.get_federation_http_client().agent.request = Mock(side_effect=request) - - def _request_stub( - method: bytes, - uri: bytes, - # headers: Optional[Headers] = None, - # bodyProducer: Optional[IBodyProducer] = None, - ) -> IResponse: - logger.info("asdfasdf request(%s, %s)", method, uri) - return FakeResponse.json( - payload={ - "foo": "bar", - } + # Fake `remoteserv:8008` responding to requests + mock_agent_on_federation_sender.request.side_effect = ( + lambda *args, **kwargs: defer.succeed( + FakeResponse.json( + payload={ + "foo": "bar", + } + ) ) + ) - # mock up the response, and have the agent return it - # self._mock_agent.request.side_effect = lambda *args, **kwargs: defer.succeed( - # FakeResponse.json( - # payload={ - # "foo": "bar", - # } - # ) - # ) - self._mock_agent.request.side_effect = _request_stub - - # self.get_success( - # self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar") - # ) - - # This request should go through the `federation_sender` worker off to the remote - # server + # This federation request from the main worker should be proxied through the + # `federation_sender` worker off to the remote server test_d = defer.ensureDeferred( self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar") ) - self.pump(5.0) + # Pump the reactor so our deferred goes through the motions + self.pump() - self._mock_agent.request.assert_called_once_with( + # Make sure that the request was proxied through the `federation_sender` worker + mock_agent_on_federation_sender.request.assert_called_once_with( b"GET", b"matrix-federation://remoteserv:8008/foo/bar", headers=ANY, - bodyProducer=None, + bodyProducer=ANY, ) + # Make sure the response is as expected back on the main worker res = self.successResultOf(test_d) - - # check the response is as expected - self.assertEqual(res, {"a": 1}) + self.assertEqual(res, {"foo": "bar"}) From 1abd3b1e626a5644b7412d515a0c2cb4f9764728 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 19 Jun 2023 21:53:05 -0500 Subject: [PATCH 23/40] Clean up test --- synapse/http/matrixfederationclient.py | 14 -------------- synapse/http/proxy.py | 3 --- tests/http/test_matrixfederationclient.py | 13 +++++++------ tests/replication/_base.py | 6 ------ 4 files changed, 7 insertions(+), 29 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 1f39f5a97255..2684bb18b8e9 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -400,9 +400,6 @@ def __init__( hs.config.worker.outbound_federation_restricted_to ) if hs.get_instance_name() in outbound_federation_restricted_to: - logger.info( - "asdf currently=%s asking federation directly", hs.get_instance_name() - ) federation_agent: IAgent = MatrixFederationAgent( self.reactor, tls_client_options_factory, @@ -411,11 +408,6 @@ def __init__( hs.config.server.federation_ip_range_blocklist, ) else: - logger.info( - "asdf currently=%s proxying over to %s", - hs.get_instance_name(), - outbound_federation_restricted_to, - ) federation_proxies = outbound_federation_restricted_to.locations federation_agent = ProxyAgent( self.reactor, @@ -666,12 +658,6 @@ async def _send_request( # * The `Deferred` that joins the forks back together is # wrapped in `make_deferred_yieldable` to restore the # logging context regardless of the path taken. - logger.info( - "asdf %s matrixfederationclient._send_request %s %s", - self.hs.get_instance_name(), - method_bytes, - url_bytes, - ) request_deferred = run_in_background( self.agent.request, method_bytes, diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index b4aaf63cc4c8..7ec22fa1823a 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -48,7 +48,6 @@ def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent): super().__init__(True) self.reactor = reactor - logger.info("asdf proxy set agent") self.agent = federation_agent async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: @@ -56,7 +55,6 @@ async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: assert uri.scheme == b"matrix-federation" logger.info("Got proxy request %s", request.uri) - logger.info("asdf proxy self.agent.request=%s", self.agent.request) headers = Headers() for header_name in (b"User-Agent", b"Authorization", b"Content-Type"): @@ -147,7 +145,6 @@ def __init__( def getResourceFor(self, request: "SynapseRequest") -> IResource: uri = urllib.parse.urlparse(request.uri) - logger.info("asdf getResourceFor request=%s", request.uri) if uri.scheme == b"matrix-federation": return self._proxy_resource diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py index 49d1a0b938e4..2cc3a51e5afb 100644 --- a/tests/http/test_matrixfederationclient.py +++ b/tests/http/test_matrixfederationclient.py @@ -654,9 +654,10 @@ def default_config(self) -> Dict[str, Any]: return conf @override_config({"outbound_federation_restricted_to": ["federation_sender"]}) - def test_asdf(self) -> None: + def test_proxy_requests_through_federation_sender_worker(self) -> None: """ - TODO + Test that all outbound federation requests go through the `federation_sender` + worker """ # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance # so we can act like some remote server responding to requests @@ -664,7 +665,7 @@ def test_asdf(self) -> None: mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True) mock_client_on_federation_sender.agent = mock_agent_on_federation_sender - # Make the `federation_sender` worker + # Create the `federation_sender` worker self.federation_sender = self.make_worker_hs( "synapse.app.generic_worker", {"worker_name": "federation_sender"}, @@ -682,9 +683,9 @@ def test_asdf(self) -> None: ) ) - # This federation request from the main worker should be proxied through the + # This federation request from the main process should be proxied through the # `federation_sender` worker off to the remote server - test_d = defer.ensureDeferred( + test_request_from_main_process_d = defer.ensureDeferred( self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar") ) @@ -700,5 +701,5 @@ def test_asdf(self) -> None: ) # Make sure the response is as expected back on the main worker - res = self.successResultOf(test_d) + res = self.successResultOf(test_request_from_main_process_d) self.assertEqual(res, {"foo": "bar"}) diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 7bbf11470ac4..96badc46b04d 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -354,12 +354,6 @@ def make_worker_hs( # not going to reset `instance_loc` to `None` under its feet. See # https://mypy.readthedocs.io/en/latest/common_issues.html#narrowing-and-inner-functions port = instance_loc.port - logger.info( - "asdf make_worker_hs %s -> host=%s port=%s", - instance_name, - instance_loc.host, - instance_loc.port, - ) self.reactor.add_tcp_client_callback( self.reactor.lookups[instance_loc.host], From 477844cae644871ed7a7edb9e1ed9fa546d56d58 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 20 Jun 2023 00:03:56 -0500 Subject: [PATCH 24/40] Explain why we care about catching `PotentialDataLoss` See https://github.com/matrix-org/synapse/pull/15773#discussion_r1234738682 --- synapse/http/client.py | 7 ++++++- synapse/http/proxy.py | 7 ++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/synapse/http/client.py b/synapse/http/client.py index 09ea93e10d0b..ca2cdbc6e243 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -1037,7 +1037,12 @@ def connectionLost(self, reason: Failure = connectionDone) -> None: if reason.check(ResponseDone): self.deferred.callback(self.length) elif reason.check(PotentialDataLoss): - # stolen from https://github.com/twisted/treq/pull/49/files + # This applies to requests which don't set `Content-Length` or a + # `Transfer-Encoding` in the response because in this case the end of the + # response is indicated by the connection being closed, an event which may + # also be due to a transient network problem or other error. But since this + # behavior is expected of some servers (like YouTube), let's ignore it. + # Stolen from https://github.com/twisted/treq/pull/49/files # http://twistedmatrix.com/trac/ticket/4840 self.deferred.callback(self.length) else: diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index 7ec22fa1823a..ea47fe9c4320 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -107,6 +107,10 @@ def _send_error_response( class _ProxyResponseBody(protocol.Protocol): + """ + A protocol that passes the data back out through the given request. + """ + transport: Optional[ITCPTransport] = None def __init__(self, request: "SynapseRequest") -> None: @@ -125,9 +129,6 @@ def connectionLost(self, reason: Failure = connectionDone) -> None: if reason.check(ResponseDone): self._request.finish() - elif reason.check(PotentialDataLoss): - # TODO: ARGH - self._request.finish() else: self._request.transport.abortConnection() From dac553252bad5066d5a78e373f8625a4c2b0273b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 20 Jun 2023 00:23:07 -0500 Subject: [PATCH 25/40] Add some more context --- synapse/http/proxy.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index ea47fe9c4320..b49beebf7847 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -108,7 +108,8 @@ def _send_error_response( class _ProxyResponseBody(protocol.Protocol): """ - A protocol that passes the data back out through the given request. + A protocol that proxies the given response data back out to the given original + request. """ transport: Optional[ITCPTransport] = None @@ -117,19 +118,27 @@ def __init__(self, request: "SynapseRequest") -> None: self._request = request def dataReceived(self, data: bytes) -> None: + # Avoid sending response data to a request that already disconnected if self._request._disconnected and self.transport is not None: + # Close the connection (forcefully) since all the data will get + # discarded anyway. self.transport.abortConnection() return self._request.write(data) def connectionLost(self, reason: Failure = connectionDone) -> None: + # If the underlying request is already finished (successfully or failed), don't + # worry about sending anything back. if self._request.finished: return if reason.check(ResponseDone): self._request.finish() else: + # TODO: Should we also log something? + # + # Abort the underlying request since our remote request also failed. self._request.transport.abortConnection() From cf208d2f370eb5e713410e01d807bdae9802f15a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 20 Jun 2023 01:22:09 -0500 Subject: [PATCH 26/40] Test error case --- synapse/http/matrixfederationclient.py | 2 +- synapse/http/proxy.py | 21 ++++++--- tests/http/test_matrixfederationclient.py | 52 ++++++++++++++++++++++- 3 files changed, 68 insertions(+), 7 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 2684bb18b8e9..d9eb8d6b7737 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -758,7 +758,7 @@ async def _send_request( delay = min(delay, 2) delay *= random.uniform(0.8, 1.4) - logger.debug( + logger.info( "{%s} [%s] Waiting %ss before re-sending...", request.txn_id, request.destination, diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index b49beebf7847..9b33807d21e0 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -29,6 +29,7 @@ from twisted.web.resource import IResource from twisted.web.server import Site +from synapse.api.errors import Codes from synapse.http import QuieterFileBodyProducer from synapse.http.server import _AsyncResource from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -103,12 +104,24 @@ def _send_error_response( request: "SynapseRequest", ) -> None: request.setResponseCode(502) + request.setHeader(b"Content-Type", b"application/json") + request.write( + ( + '{"errcode": "%s","err":"ProxyResource: Error when proxying request: %s %s -> %s"}' + % ( + Codes.UNKNOWN, + request.method.decode("ascii"), + request.uri.decode("ascii"), + f, + ) + ).encode() + ) request.finish() class _ProxyResponseBody(protocol.Protocol): """ - A protocol that proxies the given response data back out to the given original + A protocol that proxies the given remote response data back out to the given local request. """ @@ -118,7 +131,7 @@ def __init__(self, request: "SynapseRequest") -> None: self._request = request def dataReceived(self, data: bytes) -> None: - # Avoid sending response data to a request that already disconnected + # Avoid sending response data to the local request that already disconnected if self._request._disconnected and self.transport is not None: # Close the connection (forcefully) since all the data will get # discarded anyway. @@ -128,7 +141,7 @@ def dataReceived(self, data: bytes) -> None: self._request.write(data) def connectionLost(self, reason: Failure = connectionDone) -> None: - # If the underlying request is already finished (successfully or failed), don't + # If the local request is already finished (successfully or failed), don't # worry about sending anything back. if self._request.finished: return @@ -136,8 +149,6 @@ def connectionLost(self, reason: Failure = connectionDone) -> None: if reason.check(ResponseDone): self._request.finish() else: - # TODO: Should we also log something? - # # Abort the underlying request since our remote request also failed. self._request.transport.abortConnection() diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py index 2cc3a51e5afb..0b4aad7029e0 100644 --- a/tests/http/test_matrixfederationclient.py +++ b/tests/http/test_matrixfederationclient.py @@ -24,7 +24,7 @@ from twisted.web.client import Agent, ResponseNeverReceived from twisted.web.http import HTTPChannel -from synapse.api.errors import RequestSendFailed +from synapse.api.errors import RequestSendFailed, HttpResponseException from synapse.http.matrixfederationclient import ( ByteParser, MatrixFederationHttpClient, @@ -703,3 +703,53 @@ def test_proxy_requests_through_federation_sender_worker(self) -> None: # Make sure the response is as expected back on the main worker res = self.successResultOf(test_request_from_main_process_d) self.assertEqual(res, {"foo": "bar"}) + + @override_config({"outbound_federation_restricted_to": ["federation_sender"]}) + def test_proxy_request_with_network_error_through_federation_sender_worker( + self, + ) -> None: + """ + Test that when the outbound federation request fails with a network related + error, a sensible error makes its way back to the main process. + """ + # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance + # so we can act like some remote server responding to requests + mock_client_on_federation_sender = Mock() + mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True) + mock_client_on_federation_sender.agent = mock_agent_on_federation_sender + + # Create the `federation_sender` worker + self.federation_sender = self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "federation_sender"}, + federation_http_client=mock_client_on_federation_sender, + ) + + # Fake `remoteserv:8008` responding to requests + mock_agent_on_federation_sender.request.side_effect = ( + lambda *args, **kwargs: defer.fail(ResponseNeverReceived("fake error")) + ) + + # This federation request from the main process should be proxied through the + # `federation_sender` worker off to the remote server + test_request_from_main_process_d = defer.ensureDeferred( + self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar") + ) + + # Pump the reactor so our deferred goes through the motions. We pump with 10 + # seconds (0.1 * 100) so the `MatrixFederationHttpClient` runs out of retries + # and finally passes along the error response. + self.pump(0.1) + + # Make sure that the request was proxied through the `federation_sender` worker + mock_agent_on_federation_sender.request.assert_called_with( + b"GET", + b"matrix-federation://remoteserv:8008/foo/bar", + headers=ANY, + bodyProducer=ANY, + ) + + # Make sure we get some sort of error back on the main worker + failure_res = self.failureResultOf(test_request_from_main_process_d) + self.assertIsInstance(failure_res.value, RequestSendFailed) + self.assertIsInstance(failure_res.value.inner_exception, HttpResponseException) From e665fa8b02529f816f1a744682377494a5a3ccb1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 20 Jun 2023 01:46:31 -0500 Subject: [PATCH 27/40] Flesh out docstrings and comments --- synapse/config/workers.py | 4 ++++ synapse/http/matrixfederationclient.py | 5 +++++ synapse/http/proxy.py | 14 ++++++++++++++ synapse/http/proxyagent.py | 3 ++- 4 files changed, 25 insertions(+), 1 deletion(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 1a677825d44b..d44b7e2d1d67 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -162,6 +162,10 @@ class OutboundFederationRestrictedTo: locations: List[InstanceLocationConfig] = attr.Factory(list) def __contains__(self, instance: str) -> bool: + # It feels a bit dirty to return `True` if `instances` is `None`, but it makes + # sense in downstream usage in the sense that if + # `outbound_federation_restricted_to` is not configured, then any instance can + # talk to federation (no restrictions so always return `True`). return self.instances is None or instance in self.instances diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index d9eb8d6b7737..3bb6464a8df7 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -400,6 +400,7 @@ def __init__( hs.config.worker.outbound_federation_restricted_to ) if hs.get_instance_name() in outbound_federation_restricted_to: + # Talk to federation directly federation_agent: IAgent = MatrixFederationAgent( self.reactor, tls_client_options_factory, @@ -408,6 +409,8 @@ def __init__( hs.config.server.federation_ip_range_blocklist, ) else: + # We need to talk to federation via the proxy via one of the configured + # locations federation_proxies = outbound_federation_restricted_to.locations federation_agent = ProxyAgent( self.reactor, @@ -426,6 +429,8 @@ def __init__( self.clock = hs.get_clock() self._store = hs.get_datastores().main self.version_string_bytes = hs.version_string.encode("ascii") + # This is an arbitrary magic value timeout but make sure that if this is + # changed, the timeout in `ProxyResource` is set to something higher. self.default_timeout = 60 self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor)) diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index 9b33807d21e0..3e519e6a8c2e 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -43,6 +43,12 @@ class ProxyResource(_AsyncResource): + """ + A stub resource that proxies any requests with a `matrix-federation://` scheme + through the given `federation_agent` to the remote homeserver and ferries back the + info. + """ + isLeaf = True def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent): @@ -72,6 +78,9 @@ async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: ) request_deferred = timeout_deferred( request_deferred, + # This should be set longer than the timeout in `MatrixFederationHttpClient` + # so that it has enough time to complete and pass us the data before we give + # up. timeout=90, reactor=self.reactor, ) @@ -154,6 +163,11 @@ def connectionLost(self, reason: Failure = connectionDone) -> None: class ProxySite(Site): + """ + Proxies any requests with a `matrix-federation://` scheme through the given + `federation_agent`. Otherwise, behaves like a normal `Site`. + """ + def __init__( self, resource: IResource, diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 4cd79c9d7399..bb0c7a88f46f 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -82,7 +82,8 @@ class ProxyAgent(_AgentBase): use_proxy: Whether proxy settings should be discovered and used from conventional environment variables. - federation_proxies: TODO + federation_proxies: An optional list of locations to proxy outbound federation + traffic through (only requests that use the `matrix-federation://` scheme). Raises: ValueError if use_proxy is set and the environment variables From 2ce202516ffbb3ee1c6a400622e061f99f078c4d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 20 Jun 2023 01:55:39 -0500 Subject: [PATCH 28/40] Update docs --- .../configuration/config_documentation.md | 18 ++++++++++++++++- docs/workers.md | 20 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 2ee7a2387f82..82ea4f0a31e0 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3907,7 +3907,7 @@ federation_sender_instances: When using workers this should be a map from [`worker_name`](#worker_name) to the HTTP replication listener of the worker, if configured, and to the main process. Each worker declared under [`stream_writers`](../../workers.md#stream-writers) and -[`outbound_federation_restricted_to`](#TODO) needs a HTTP replication listener, and that +[`outbound_federation_restricted_to`](#outbound_federation_restricted_to) needs a HTTP replication listener, and that listener should be included in the `instance_map`. The main process also needs an entry on the `instance_map`, and it should be listed under `main` **if even one other worker exists**. Ensure the port matches with what is declared inside the `listener` block for @@ -3941,6 +3941,22 @@ stream_writers: typing: worker1 ``` --- +### `outbound_federation_restricted_to` + +When using workers, you can restrict outbound federation traffic to only go through a +specific subset of workers. Any worker specified here must also be in the +[`instance_map`](#instance_map). + +```yaml +outbound_federation_restricted_to: + - federation_sender1 + - federation_sender2 +``` + +Also see the [worker +documentation](../../workers.md#restrict-outbound-federation-traffic-to-a-specific-set-of-workers) +for more info. +--- ### `run_background_tasks_on` The [worker](../../workers.md#background-tasks) that is used to run diff --git a/docs/workers.md b/docs/workers.md index 735128762a47..da2ad5ccfd89 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -531,6 +531,26 @@ the stream writer for the `presence` stream: ^/_matrix/client/(api/v1|r0|v3|unstable)/presence/ +#### Restrict outbound federation traffic to a specific set of workers + +The `outbound_federation_restricted_to` configuration is useful to make sure outbound +federation traffic only goes through those instances. This allows you to set more strict +access controls (like a firewall) for all workers and only allow the +`federation_sender`'s to contact the outside world. + +```yaml +instance_map: + main: + host: localhost + port: 8030 + federation_sender1: + host: localhost + port: 8034 + +outbound_federation_restricted_to: + - federation_sender1 +``` + #### Background tasks There is also support for moving background tasks to a separate From 632544ab48653823ad36e75e66508e7542a1bacd Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 20 Jun 2023 02:00:56 -0500 Subject: [PATCH 29/40] Add some background behind `matrix-federation://` --- synapse/http/federation/matrix_federation_agent.py | 6 ++++-- synapse/http/proxy.py | 1 - synapse/http/proxyagent.py | 3 ++- tests/http/test_matrixfederationclient.py | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 50a70457d646..91a24efcd019 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -51,8 +51,10 @@ @implementer(IAgent) class MatrixFederationAgent: """An Agent-like thing which provides a `request` method which correctly - handles resolving matrix server names when using matrix-federation://. Handles standard - https URIs as normal. + handles resolving matrix server names when using `matrix-federation://`. Handles + standard https URIs as normal. The `matrix-federation://` scheme is internal to + Synapse and we purposely want to avoid colliding with the `matrix://` URL scheme + which is now specced. Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.) diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index 3e519e6a8c2e..7b1a10cafa17 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -23,7 +23,6 @@ from twisted.python import failure from twisted.python.failure import Failure from twisted.web.client import ResponseDone -from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers from twisted.web.iweb import IAgent, IResponse from twisted.web.resource import IResource diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index bb0c7a88f46f..1fa3adbef20c 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -83,7 +83,8 @@ class ProxyAgent(_AgentBase): from conventional environment variables. federation_proxies: An optional list of locations to proxy outbound federation - traffic through (only requests that use the `matrix-federation://` scheme). + traffic through (only requests that use the `matrix-federation://` scheme + will be proxied). Raises: ValueError if use_proxy is set and the environment variables diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py index 0b4aad7029e0..3ab3f207e9cb 100644 --- a/tests/http/test_matrixfederationclient.py +++ b/tests/http/test_matrixfederationclient.py @@ -24,7 +24,7 @@ from twisted.web.client import Agent, ResponseNeverReceived from twisted.web.http import HTTPChannel -from synapse.api.errors import RequestSendFailed, HttpResponseException +from synapse.api.errors import HttpResponseException, RequestSendFailed from synapse.http.matrixfederationclient import ( ByteParser, MatrixFederationHttpClient, From 033e18a5929731591717fa518233dafc29f8b881 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 20 Jun 2023 02:05:18 -0500 Subject: [PATCH 30/40] Align language --- docs/workers.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/workers.md b/docs/workers.md index da2ad5ccfd89..303e0f0e7a5a 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -534,8 +534,8 @@ the stream writer for the `presence` stream: #### Restrict outbound federation traffic to a specific set of workers The `outbound_federation_restricted_to` configuration is useful to make sure outbound -federation traffic only goes through those instances. This allows you to set more strict -access controls (like a firewall) for all workers and only allow the +federation traffic only goes through a specified subset of workers. This allows you to +set more strict access controls (like a firewall) for all workers and only allow the `federation_sender`'s to contact the outside world. ```yaml From b5e916e04325dc30c6c7182ab42c69f00347ae04 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 20 Jun 2023 02:08:09 -0500 Subject: [PATCH 31/40] Revert back to debug level --- synapse/http/matrixfederationclient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 3bb6464a8df7..cd88d620548a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -763,7 +763,7 @@ async def _send_request( delay = min(delay, 2) delay *= random.uniform(0.8, 1.4) - logger.info( + logger.debug( "{%s} [%s] Waiting %ss before re-sending...", request.txn_id, request.destination, From 2032ea613bcd98325bc933b7afe9fd89c94e9334 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Jun 2023 00:44:31 -0500 Subject: [PATCH 32/40] `master`/`main` is in the `instance_map` so no need to skip checking for it See https://github.com/matrix-org/synapse/pull/15773#discussion_r1234977594 --- synapse/config/workers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index d44b7e2d1d67..0b9789160cc0 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -386,7 +386,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: ) if outbound_federation_restricted_to: for instance in outbound_federation_restricted_to: - if instance != "master" and instance not in self.instance_map: + if instance not in self.instance_map: raise ConfigError( "Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config." % (instance,) From 926e3e0b0fbb95fb62521be4ab2f3c797167c5f7 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Jun 2023 00:45:39 -0500 Subject: [PATCH 33/40] Remove extra proxy logging --- synapse/http/proxy.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index 7b1a10cafa17..9910a98b10af 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -60,8 +60,6 @@ async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: uri = urllib.parse.urlparse(request.uri) assert uri.scheme == b"matrix-federation" - logger.info("Got proxy request %s", request.uri) - headers = Headers() for header_name in (b"User-Agent", b"Authorization", b"Content-Type"): header_value = request.getHeader(header_name) @@ -86,8 +84,6 @@ async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: response = await make_deferred_yieldable(request_deferred) - logger.info("Got proxy response %s", response.code) - return response.code, response def _send_response( From 0a2a9cf5636f633037a9e7941cd871faca86b467 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Jun 2023 01:58:30 -0500 Subject: [PATCH 34/40] Do not copy over hop-by-hop headers See https://github.com/matrix-org/synapse/pull/15773#discussion_r1234983843 --- synapse/http/proxy.py | 69 +++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 2 deletions(-) diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index 9910a98b10af..21e1acae2afe 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -15,7 +15,7 @@ import logging import urllib.parse -from typing import TYPE_CHECKING, Any, Optional, Tuple, cast +from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast from twisted.internet import protocol from twisted.internet.interfaces import ITCPTransport @@ -40,6 +40,55 @@ logger = logging.getLogger(__name__) +# "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616 +# section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be +# consumed by the immediate recipient and not be forwarded on. +HOP_BY_HOP_HEADERS = { + "Connection", + "Keep-Alive", + "Proxy-Authenticate", + "Proxy-Authorization", + "TE", + "Trailers", + "Transfer-Encoding", + "Upgrade", +} + + +def parse_connection_header_value( + connection_header_value: Optional[bytes], +) -> Tuple[Optional[str], Set[str]]: + """ + Parse the `Connection` header to determine which headers we should not be copied + over from the remote response. + + As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1 + + Example: `Connection: close, X-Foo, X-Bar` will return `{"X-Foo", "X-Bar"}` + + Args: + connection_header_value: The value of the `Connection` header. + + Returns: + A tuple containing the connection name and the set of header names that should + not be copied over from the remote response. The keys are capitalized in + canonical capitalization. + """ + headers = Headers() + connection = None + extra_headers_to_remove: Set[str] = set() + if connection_header_value: + connection_options = [ + headers._canonicalNameCaps(connection_option.strip()).decode("ascii") + for connection_option in connection_header_value.split(b",") + ] + # This is probably `close` or `keep-alive + connection = connection_options[0] + # The rest is a list of headers that should not be copied over. + extra_headers_to_remove = Set(connection_options[1:]) + + return connection, extra_headers_to_remove + class ProxyResource(_AsyncResource): """ @@ -93,11 +142,27 @@ def _send_response( response_object: Any, ) -> None: response = cast(IResponse, response_object) + response_headers = cast(Headers, response.headers) request.setResponseCode(code) + # The `Connection` header also defines which headers should not be copied over. + connection_header = response_headers.getRawHeaders(b"connection") + _, extra_headers_to_remove = parse_connection_header_value( + connection_header[0] if connection_header else None + ) + # Copy headers. - for k, v in response.headers.getAllRawHeaders(): + for k, v in response_headers.getAllRawHeaders(): + # Do not copy over any hop-by-hop headers. These are meant to only be + # consumed by the immediate recipient and not be forwarded on. + header_key = k.decode("ascii") + if ( + header_key in HOP_BY_HOP_HEADERS + or header_key in extra_headers_to_remove + ): + continue + request.responseHeaders.setRawHeaders(k, v) response.deliverBody(_ProxyResponseBody(request)) From c757a38be71db99e8864e6e05209f57596f92c5c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Jun 2023 02:20:09 -0500 Subject: [PATCH 35/40] Add tests for `parse_connection_header_value` --- synapse/http/proxy.py | 11 +++++--- tests/http/test_proxy.py | 56 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) create mode 100644 tests/http/test_proxy.py diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index 21e1acae2afe..f24af221e9dd 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -79,13 +79,18 @@ def parse_connection_header_value( extra_headers_to_remove: Set[str] = set() if connection_header_value: connection_options = [ - headers._canonicalNameCaps(connection_option.strip()).decode("ascii") + connection_option.strip() for connection_option in connection_header_value.split(b",") ] # This is probably `close` or `keep-alive - connection = connection_options[0] + connection = connection_options[0].decode("ascii") # The rest is a list of headers that should not be copied over. - extra_headers_to_remove = Set(connection_options[1:]) + extra_headers_to_remove = set( + [ + headers._canonicalNameCaps(x).decode("ascii") + for x in connection_options[1:] + ] + ) return connection, extra_headers_to_remove diff --git a/tests/http/test_proxy.py b/tests/http/test_proxy.py new file mode 100644 index 000000000000..803ea10ea801 --- /dev/null +++ b/tests/http/test_proxy.py @@ -0,0 +1,56 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import base64 +import logging +import os +from typing import List, Optional, Set +from unittest.mock import patch + +from parameterized import parameterized + +from synapse.http.proxy import parse_connection_header_value + +from tests.unittest import TestCase + + +class ProxyTests(TestCase): + @parameterized.expand( + [ + [b"close, X-Foo, X-Bar", "close", set(["X-Foo", "X-Bar"])], + # No whitespace + [b"close,X-Foo,X-Bar", "close", set(["X-Foo", "X-Bar"])], + # More whitespace + [b"close, X-Foo, X-Bar", "close", set(["X-Foo", "X-Bar"])], + # Keeps connection captilization and normalizes headers + [b"kEep-AliVe, x-foo, x-bar", "kEep-AliVe", set(["X-Foo", "X-Bar"])], + # Handles header names with whitespace + [b"keep-alive, x foo, x bar", "keep-alive", set(["X foo", "X bar"])], + ] + ) + def test_parse_connection_header_value( + self, + connection_header_value: str, + expected_connection: Optional[str], + expected_extra_headers_to_remove: Set[str], + ) -> None: + """ + Tests that the connection header value is parsed correctly + """ + self.assertEqual( + ( + expected_connection, + expected_extra_headers_to_remove, + ), + parse_connection_header_value(connection_header_value), + ) From be12f211b03f9934db9e12cef95692247fe542bb Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Jun 2023 03:02:09 -0500 Subject: [PATCH 36/40] Add tests to make sure headers are removed --- synapse/http/matrixfederationclient.py | 71 ++++++++++++++++++++++- synapse/http/proxy.py | 31 ++++------ tests/http/test_matrixfederationclient.py | 68 ++++++++++++++++++++++ tests/http/test_proxy.py | 30 +++++----- 4 files changed, 166 insertions(+), 34 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index cd88d620548a..59a4e0d26ca3 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -1132,6 +1132,73 @@ async def get_json( Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. + Raises: + HttpResponseException: If we get an HTTP response code >= 300 + (except 429). + NotRetryingDestination: If we are not yet ready to retry this + server. + FederationDeniedError: If this destination is not on our + federation whitelist + RequestSendFailed: If there were problems connecting to the + remote, due to e.g. DNS failures, connection timeouts etc. + """ + json_dict, _ = await self.get_json_with_headers( + destination=destination, + path=path, + args=args, + retry_on_dns_fail=retry_on_dns_fail, + timeout=timeout, + ignore_backoff=ignore_backoff, + try_trailing_slash_on_400=try_trailing_slash_on_400, + parser=parser, + ) + return json_dict + + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Optional[ByteParser[T]] = None, + ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]: + """GETs some json from the given host homeserver and path + + Args: + destination: The remote server to send the HTTP request to. + + path: The HTTP path. + + args: A dictionary used to create query strings, defaults to + None. + + retry_on_dns_fail: true if the request should be retried on DNS failures + + timeout: number of milliseconds to wait for the response. + self._default_timeout (60s) by default. + + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + + ignore_backoff: true to ignore the historical backoff data + and try the request anyway. + + try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED + response we should try appending a trailing slash to the end of + the request. Workaround for #3622 in Synapse <= v0.99.3. + + parser: The parser to use to decode the response. Defaults to + parsing as JSON. + + Returns: + Succeeds when we get a 2xx HTTP response. The result will be a tuple of the + decoded JSON body and a dict of the response headers. + Raises: HttpResponseException: If we get an HTTP response code >= 300 (except 429). @@ -1157,6 +1224,8 @@ async def get_json( timeout=timeout, ) + headers = dict(response.headers.getAllRawHeaders()) + if timeout is not None: _sec_timeout = timeout / 1000 else: @@ -1174,7 +1243,7 @@ async def get_json( parser=parser, ) - return body + return body, headers async def delete_json( self, diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index f24af221e9dd..539909cc4d8f 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -57,42 +57,35 @@ def parse_connection_header_value( connection_header_value: Optional[bytes], -) -> Tuple[Optional[str], Set[str]]: +) -> Tuple[bool, Set[str]]: """ Parse the `Connection` header to determine which headers we should not be copied over from the remote response. As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1 - Example: `Connection: close, X-Foo, X-Bar` will return `{"X-Foo", "X-Bar"}` + Example: `Connection: close, X-Foo, X-Bar` will return `[True, {"X-Foo", "X-Bar"}]` Args: connection_header_value: The value of the `Connection` header. Returns: - A tuple containing the connection name and the set of header names that should - not be copied over from the remote response. The keys are capitalized in - canonical capitalization. + A tuple indicating whether the connection should be closed and the set of header + names that should not be copied over from the remote response. The keys are + capitalized in canonical capitalization. """ headers = Headers() - connection = None + close = False extra_headers_to_remove: Set[str] = set() if connection_header_value: - connection_options = [ - connection_option.strip() + connection_options = { + headers._canonicalNameCaps(connection_option.strip()).decode("ascii") for connection_option in connection_header_value.split(b",") - ] - # This is probably `close` or `keep-alive - connection = connection_options[0].decode("ascii") - # The rest is a list of headers that should not be copied over. - extra_headers_to_remove = set( - [ - headers._canonicalNameCaps(x).decode("ascii") - for x in connection_options[1:] - ] - ) + } + close = "Close" in connection_options + extra_headers_to_remove = connection_options - return connection, extra_headers_to_remove + return close, extra_headers_to_remove class ProxyResource(_AsyncResource): diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py index 3ab3f207e9cb..b896fb6ab8a9 100644 --- a/tests/http/test_matrixfederationclient.py +++ b/tests/http/test_matrixfederationclient.py @@ -23,6 +23,7 @@ from twisted.test.proto_helpers import MemoryReactor, StringTransport from twisted.web.client import Agent, ResponseNeverReceived from twisted.web.http import HTTPChannel +from twisted.web.http_headers import Headers from synapse.api.errors import HttpResponseException, RequestSendFailed from synapse.http.matrixfederationclient import ( @@ -753,3 +754,70 @@ def test_proxy_request_with_network_error_through_federation_sender_worker( failure_res = self.failureResultOf(test_request_from_main_process_d) self.assertIsInstance(failure_res.value, RequestSendFailed) self.assertIsInstance(failure_res.value.inner_exception, HttpResponseException) + + @override_config({"outbound_federation_restricted_to": ["federation_sender"]}) + def test_proxy_requests_and_discards_hop_by_hop_headers(self) -> None: + """ + Test to make sure hop-by-hop headers and addional headers defined in the + `Connection` header are discarded when proxying requests + """ + # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance + # so we can act like some remote server responding to requests + mock_client_on_federation_sender = Mock() + mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True) + mock_client_on_federation_sender.agent = mock_agent_on_federation_sender + + # Create the `federation_sender` worker + self.federation_sender = self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "federation_sender"}, + federation_http_client=mock_client_on_federation_sender, + ) + + # Fake `remoteserv:8008` responding to requests + mock_agent_on_federation_sender.request.side_effect = lambda *args, **kwargs: defer.succeed( + FakeResponse( + code=200, + body=b'{"foo": "bar"}', + headers=Headers( + { + "Content-Type": ["application/json"], + "Connection": ["close, X-Foo, X-Bar"], + # Should be removed because it's defined in the `Connection` header + "X-Foo": ["foo"], + "X-Bar": ["bar"], + # Should be removed because it's a hop-by-hop header + "Proxy-Authorization": "abcdef", + } + ), + ) + ) + + # This federation request from the main process should be proxied through the + # `federation_sender` worker off to the remote server + test_request_from_main_process_d = defer.ensureDeferred( + self.hs.get_federation_http_client().get_json_with_headers( + "remoteserv:8008", "foo/bar" + ) + ) + + # Pump the reactor so our deferred goes through the motions + self.pump() + + # Make sure that the request was proxied through the `federation_sender` worker + mock_agent_on_federation_sender.request.assert_called_once_with( + b"GET", + b"matrix-federation://remoteserv:8008/foo/bar", + headers=ANY, + bodyProducer=ANY, + ) + + res, headers = self.successResultOf(test_request_from_main_process_d) + header_names = set(headers.keys()) + + # Make sure the response does not include the hop-by-hop headers + self.assertNotIn(b"X-Foo", header_names) + self.assertNotIn(b"X-Bar", header_names) + self.assertNotIn(b"Proxy-Authorization", header_names) + # Make sure the response is as expected back on the main worker + self.assertEqual(res, {"foo": "bar"}) diff --git a/tests/http/test_proxy.py b/tests/http/test_proxy.py index 803ea10ea801..0fd527375033 100644 --- a/tests/http/test_proxy.py +++ b/tests/http/test_proxy.py @@ -11,11 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import base64 -import logging -import os -from typing import List, Optional, Set -from unittest.mock import patch +from typing import Set from parameterized import parameterized @@ -27,21 +23,27 @@ class ProxyTests(TestCase): @parameterized.expand( [ - [b"close, X-Foo, X-Bar", "close", set(["X-Foo", "X-Bar"])], + [b"close, X-Foo, X-Bar", True, {"Close", "X-Foo", "X-Bar"}], # No whitespace - [b"close,X-Foo,X-Bar", "close", set(["X-Foo", "X-Bar"])], + [b"close,X-Foo,X-Bar", True, {"Close", "X-Foo", "X-Bar"}], # More whitespace - [b"close, X-Foo, X-Bar", "close", set(["X-Foo", "X-Bar"])], - # Keeps connection captilization and normalizes headers - [b"kEep-AliVe, x-foo, x-bar", "kEep-AliVe", set(["X-Foo", "X-Bar"])], + [b"close, X-Foo, X-Bar", True, {"Close", "X-Foo", "X-Bar"}], + # "close" directive in not the first position + [b"X-Foo, X-Bar, close", True, {"X-Foo", "X-Bar", "Close"}], + # Normalizes header capitalization + [b"keep-alive, x-fOo, x-bAr", False, {"Keep-Alive", "X-Foo", "X-Bar"}], # Handles header names with whitespace - [b"keep-alive, x foo, x bar", "keep-alive", set(["X foo", "X bar"])], + [ + b"keep-alive, x foo, x bar", + False, + {"Keep-Alive", "X foo", "X bar"}, + ], ] ) def test_parse_connection_header_value( self, - connection_header_value: str, - expected_connection: Optional[str], + connection_header_value: bytes, + expected_close: bool, expected_extra_headers_to_remove: Set[str], ) -> None: """ @@ -49,7 +51,7 @@ def test_parse_connection_header_value( """ self.assertEqual( ( - expected_connection, + expected_close, expected_extra_headers_to_remove, ), parse_connection_header_value(connection_header_value), From 735203e7df8b820c123b14a1226ef9b6f6762100 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Jun 2023 03:06:19 -0500 Subject: [PATCH 37/40] Ignore lint --- synapse/http/matrixfederationclient.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 59a4e0d26ca3..d595bf43ffd5 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -1150,7 +1150,8 @@ async def get_json( timeout=timeout, ignore_backoff=ignore_backoff, try_trailing_slash_on_400=try_trailing_slash_on_400, - parser=parser, + # TODO: type-ignore: can we resolve this? Argument "parser" to "get_json_with_headers" of "MatrixFederationHttpClient" has incompatible type "Optional[ByteParser[T]]"; expected "Optional[ByteParser[Union[Dict[str, Any], T]]]" + parser=parser, # type: ignore[arg-type] ) return json_dict From c1ec014a6fe0524c612f21ba01841140537bc2d5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Jun 2023 18:09:37 -0500 Subject: [PATCH 38/40] Fix `arg-type` lint See https://github.com/matrix-org/synapse/pull/15773#discussion_r1236577478 --- synapse/http/matrixfederationclient.py | 31 ++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 7d227c6b1b75..b00396fdc71b 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -1164,11 +1164,38 @@ async def get_json( timeout=timeout, ignore_backoff=ignore_backoff, try_trailing_slash_on_400=try_trailing_slash_on_400, - # TODO: type-ignore: can we resolve this? Argument "parser" to "get_json_with_headers" of "MatrixFederationHttpClient" has incompatible type "Optional[ByteParser[T]]"; expected "Optional[ByteParser[Union[Dict[str, Any], T]]]" - parser=parser, # type: ignore[arg-type] + parser=parser, ) return json_dict + @overload + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Literal[None] = None, + ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]: + ... + + @overload + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = ..., + retry_on_dns_fail: bool = ..., + timeout: Optional[int] = ..., + ignore_backoff: bool = ..., + try_trailing_slash_on_400: bool = ..., + parser: ByteParser[T] = ..., + ) -> Tuple[T, Dict[bytes, List[bytes]]]: + ... + async def get_json_with_headers( self, destination: str, From d400b506924818e9f5a1eae6d7d89d94f9d6303a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 21 Jun 2023 18:56:43 -0500 Subject: [PATCH 39/40] Simplify `parse_connection_header_value` --- synapse/http/proxy.py | 22 +++++++++++----------- tests/http/test_proxy.py | 17 ++++++----------- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index 539909cc4d8f..9d3aafb1b8a8 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -57,35 +57,35 @@ def parse_connection_header_value( connection_header_value: Optional[bytes], -) -> Tuple[bool, Set[str]]: +) -> Set[str]: """ Parse the `Connection` header to determine which headers we should not be copied over from the remote response. As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1 - Example: `Connection: close, X-Foo, X-Bar` will return `[True, {"X-Foo", "X-Bar"}]` + Example: `Connection: close, X-Foo, X-Bar` will return `{"Close", "X-Foo", "X-Bar"}` + + Even though "close" is a special directive, let's just treat it as just another + header for simplicity. If people want to check for this directive, they can simply + check for `"Close" in headers`. Args: connection_header_value: The value of the `Connection` header. Returns: - A tuple indicating whether the connection should be closed and the set of header - names that should not be copied over from the remote response. The keys are - capitalized in canonical capitalization. + The set of header names that should not be copied over from the remote response. + The keys are capitalized in canonical capitalization. """ headers = Headers() - close = False extra_headers_to_remove: Set[str] = set() if connection_header_value: - connection_options = { + extra_headers_to_remove = { headers._canonicalNameCaps(connection_option.strip()).decode("ascii") for connection_option in connection_header_value.split(b",") } - close = "Close" in connection_options - extra_headers_to_remove = connection_options - return close, extra_headers_to_remove + return extra_headers_to_remove class ProxyResource(_AsyncResource): @@ -146,7 +146,7 @@ def _send_response( # The `Connection` header also defines which headers should not be copied over. connection_header = response_headers.getRawHeaders(b"connection") - _, extra_headers_to_remove = parse_connection_header_value( + extra_headers_to_remove = parse_connection_header_value( connection_header[0] if connection_header else None ) diff --git a/tests/http/test_proxy.py b/tests/http/test_proxy.py index 0fd527375033..0dc9ba8e0582 100644 --- a/tests/http/test_proxy.py +++ b/tests/http/test_proxy.py @@ -23,19 +23,18 @@ class ProxyTests(TestCase): @parameterized.expand( [ - [b"close, X-Foo, X-Bar", True, {"Close", "X-Foo", "X-Bar"}], + [b"close, X-Foo, X-Bar", {"Close", "X-Foo", "X-Bar"}], # No whitespace - [b"close,X-Foo,X-Bar", True, {"Close", "X-Foo", "X-Bar"}], + [b"close,X-Foo,X-Bar", {"Close", "X-Foo", "X-Bar"}], # More whitespace - [b"close, X-Foo, X-Bar", True, {"Close", "X-Foo", "X-Bar"}], + [b"close, X-Foo, X-Bar", {"Close", "X-Foo", "X-Bar"}], # "close" directive in not the first position - [b"X-Foo, X-Bar, close", True, {"X-Foo", "X-Bar", "Close"}], + [b"X-Foo, X-Bar, close", {"X-Foo", "X-Bar", "Close"}], # Normalizes header capitalization - [b"keep-alive, x-fOo, x-bAr", False, {"Keep-Alive", "X-Foo", "X-Bar"}], + [b"keep-alive, x-fOo, x-bAr", {"Keep-Alive", "X-Foo", "X-Bar"}], # Handles header names with whitespace [ b"keep-alive, x foo, x bar", - False, {"Keep-Alive", "X foo", "X bar"}, ], ] @@ -43,16 +42,12 @@ class ProxyTests(TestCase): def test_parse_connection_header_value( self, connection_header_value: bytes, - expected_close: bool, expected_extra_headers_to_remove: Set[str], ) -> None: """ Tests that the connection header value is parsed correctly """ self.assertEqual( - ( - expected_close, - expected_extra_headers_to_remove, - ), + expected_extra_headers_to_remove, parse_connection_header_value(connection_header_value), ) From e99a5e94a97f79c6e61abd7b78755f62499ea452 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 27 Jun 2023 17:26:51 -0500 Subject: [PATCH 40/40] Use safe `json.dumps` for JSON response See https://github.com/matrix-org/synapse/pull/15773#discussion_r1243424105 --- synapse/http/proxy.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py index 9d3aafb1b8a8..0874d67760ae 100644 --- a/synapse/http/proxy.py +++ b/synapse/http/proxy.py @@ -13,6 +13,7 @@ # limitations under the License. # +import json import logging import urllib.parse from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast @@ -174,12 +175,16 @@ def _send_error_response( request.setHeader(b"Content-Type", b"application/json") request.write( ( - '{"errcode": "%s","err":"ProxyResource: Error when proxying request: %s %s -> %s"}' - % ( - Codes.UNKNOWN, - request.method.decode("ascii"), - request.uri.decode("ascii"), - f, + json.dumps( + { + "errcode": Codes.UNKNOWN, + "err": "ProxyResource: Error when proxying request: %s %s -> %s" + % ( + request.method.decode("ascii"), + request.uri.decode("ascii"), + f, + ), + } ) ).encode() )