From 56efa9b1672abcdf1f42bebcc3c7dd0b4fa40067 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Mon, 3 Apr 2023 04:27:51 -0500 Subject: [PATCH] Experimental Unix socket support (#15353) * Add IReactorUNIX to ISynapseReactor type hint. * Create listen_unix(). Two options, 'path' to the file and 'mode' of permissions(not umask, recommend 666 as default as nginx/other reverse proxies write to it and it's setup as user www-data) For the moment, leave the option to always create a PID lockfile turned on by default * Create UnixListenerConfig and wire it up. Rename ListenerConfig to TCPListenerConfig, then Union them together into ListenerConfig. This spidered around a bit, but I think I got it all. Metrics and manhole have been placed behind a conditional in case of accidental putting them onto a unix socket. Use new helpers to get if a listener is configured for TLS, and to help create a site tag for logging. There are 2 TODO things in parse_listener_def() to finish up at a later point. * Refactor SynapseRequest to handle logging correctly when using a unix socket. This prevents an exception when an IP address can not be retrieved for a request. * Make the 'Synapse now listening on Unix socket' log line a little prettier. * No silent failures on generic workers when trying to use a unix socket with metrics or manhole. * Inline variables in app/_base.py * Update docstring for listen_unix() to remove reference to a hardcoded permission of 0o666 and add a few comments saying where the default IS declared. * Disallow both a unix socket and a ip/port combo on the same listener resource * Linting * Changelog * review: simplify how listen_unix returns(and get rid of a type: ignore) * review: fix typo from ConfigError in app/homeserver.py * review: roll conditional for http_options.tag into get_site_tag() helper(and add docstring) * review: enhance the conditionals for checking if a port or path is valid, remove a TODO line * review: Try updating comment in get_client_ip_if_available to clarify what is being retrieved and why * Pretty up how 'Synapse now listening on Unix Socket' looks by decoding the byte string. * review: In parse_listener_def(), raise ConfigError if neither socket_path nor port is declared(and fix a typo) --- changelog.d/15353.misc | 1 + synapse/app/_base.py | 92 ++++++++++++++++++-------- synapse/app/generic_worker.py | 34 ++++++---- synapse/app/homeserver.py | 42 +++++++----- synapse/config/server.py | 118 ++++++++++++++++++++++++++-------- synapse/config/workers.py | 13 ++-- synapse/http/site.py | 27 +++++++- synapse/types/__init__.py | 2 + 8 files changed, 239 insertions(+), 90 deletions(-) create mode 100644 changelog.d/15353.misc diff --git a/changelog.d/15353.misc b/changelog.d/15353.misc new file mode 100644 index 000000000000..23927fea8ff2 --- /dev/null +++ b/changelog.d/15353.misc @@ -0,0 +1 @@ +Add experimental support for Unix sockets. Contributed by Jason Little. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 28062dd69d3a..f7b866978cca 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -41,7 +41,12 @@ import twisted from twisted.internet import defer, error, reactor as _reactor -from twisted.internet.interfaces import IOpenSSLContextFactory, IReactorSSL, IReactorTCP +from twisted.internet.interfaces import ( + IOpenSSLContextFactory, + IReactorSSL, + IReactorTCP, + IReactorUNIX, +) from twisted.internet.protocol import ServerFactory from twisted.internet.tcp import Port from twisted.logger import LoggingFile, LogLevel @@ -56,7 +61,7 @@ from synapse.config import ConfigError from synapse.config._base import format_config_error from synapse.config.homeserver import HomeServerConfig -from synapse.config.server import ListenerConfig, ManholeConfig +from synapse.config.server import ListenerConfig, ManholeConfig, TCPListenerConfig from synapse.crypto import context_factory from synapse.events.presence_router import load_legacy_presence_router from synapse.events.spamcheck import load_legacy_spam_checkers @@ -351,6 +356,28 @@ def listen_tcp( return r # type: ignore[return-value] +def listen_unix( + path: str, + mode: int, + factory: ServerFactory, + reactor: IReactorUNIX = reactor, + backlog: int = 50, +) -> List[Port]: + """ + Create a UNIX socket for a given path and 'mode' permission + + Returns: + list of twisted.internet.tcp.Port listening for TCP connections + """ + wantPID = True + + return [ + # IReactorUNIX returns an object implementing IListeningPort from listenUNIX, + # but we know it will be a Port instance. + cast(Port, reactor.listenUNIX(path, factory, backlog, mode, wantPID)) + ] + + def listen_http( listener_config: ListenerConfig, root_resource: Resource, @@ -359,18 +386,13 @@ def listen_http( context_factory: Optional[IOpenSSLContextFactory], reactor: ISynapseReactor = reactor, ) -> List[Port]: - port = listener_config.port - bind_addresses = listener_config.bind_addresses - tls = listener_config.tls - assert listener_config.http_options is not None - site_tag = listener_config.http_options.tag - if site_tag is None: - site_tag = str(port) + site_tag = listener_config.get_site_tag() site = SynapseSite( - "synapse.access.%s.%s" % ("https" if tls else "http", site_tag), + "synapse.access.%s.%s" + % ("https" if listener_config.is_tls() else "http", site_tag), site_tag, listener_config, root_resource, @@ -378,25 +400,41 @@ def listen_http( max_request_body_size=max_request_body_size, reactor=reactor, ) - if tls: - # refresh_certificate should have been called before this. - assert context_factory is not None - ports = listen_ssl( - bind_addresses, - port, - site, - context_factory, - reactor=reactor, - ) - logger.info("Synapse now listening on TCP port %d (TLS)", port) + + if isinstance(listener_config, TCPListenerConfig): + if listener_config.is_tls(): + # refresh_certificate should have been called before this. + assert context_factory is not None + ports = listen_ssl( + listener_config.bind_addresses, + listener_config.port, + site, + context_factory, + reactor=reactor, + ) + logger.info( + "Synapse now listening on TCP port %d (TLS)", listener_config.port + ) + else: + ports = listen_tcp( + listener_config.bind_addresses, + listener_config.port, + site, + reactor=reactor, + ) + logger.info("Synapse now listening on TCP port %d", listener_config.port) + else: - ports = listen_tcp( - bind_addresses, - port, - site, - reactor=reactor, + ports = listen_unix( + listener_config.path, listener_config.mode, site, reactor=reactor ) - logger.info("Synapse now listening on TCP port %d", port) + # getHost() returns a UNIXAddress which contains an instance variable of 'name' + # encoded as a byte string. Decode as utf-8 so pretty. + logger.info( + "Synapse now listening on Unix Socket at: " + f"{ports[0].getHost().name.decode('utf-8')}" + ) + return ports diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 0dec24369a49..e17ce35b8e29 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -38,7 +38,7 @@ from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.config.server import ListenerConfig +from synapse.config.server import ListenerConfig, TCPListenerConfig from synapse.federation.transport.server import TransportLayerServer from synapse.http.server import JsonResource, OptionsResource from synapse.logging.context import LoggingContext @@ -236,12 +236,18 @@ def start_listening(self) -> None: if listener.type == "http": self._listen_http(listener) elif listener.type == "manhole": - _base.listen_manhole( - listener.bind_addresses, - listener.port, - manhole_settings=self.config.server.manhole_settings, - manhole_globals={"hs": self}, - ) + if isinstance(listener, TCPListenerConfig): + _base.listen_manhole( + listener.bind_addresses, + listener.port, + manhole_settings=self.config.server.manhole_settings, + manhole_globals={"hs": self}, + ) + else: + raise ConfigError( + "Can not using a unix socket for manhole at this time." + ) + elif listener.type == "metrics": if not self.config.metrics.enable_metrics: logger.warning( @@ -249,10 +255,16 @@ def start_listening(self) -> None: "enable_metrics is not True!" ) else: - _base.listen_metrics( - listener.bind_addresses, - listener.port, - ) + if isinstance(listener, TCPListenerConfig): + _base.listen_metrics( + listener.bind_addresses, + listener.port, + ) + else: + raise ConfigError( + "Can not use a unix socket for metrics at this time." + ) + else: logger.warning("Unsupported listener type: %s", listener.type) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index b8830b1a9c16..84236ac299e0 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -44,7 +44,7 @@ ) from synapse.config._base import ConfigError, format_config_error from synapse.config.homeserver import HomeServerConfig -from synapse.config.server import ListenerConfig +from synapse.config.server import ListenerConfig, TCPListenerConfig from synapse.federation.transport.server import TransportLayerServer from synapse.http.additional_resource import AdditionalResource from synapse.http.server import ( @@ -78,14 +78,13 @@ class SynapseHomeServer(HomeServer): DATASTORE_CLASS = DataStore # type: ignore def _listener_http( - self, config: HomeServerConfig, listener_config: ListenerConfig + self, + config: HomeServerConfig, + listener_config: ListenerConfig, ) -> Iterable[Port]: - port = listener_config.port # Must exist since this is an HTTP listener. assert listener_config.http_options is not None - site_tag = listener_config.http_options.tag - if site_tag is None: - site_tag = str(port) + site_tag = listener_config.get_site_tag() # We always include a health resource. resources: Dict[str, Resource] = {"/health": HealthResource()} @@ -252,12 +251,17 @@ def start_listening(self) -> None: self._listener_http(self.config, listener) ) elif listener.type == "manhole": - _base.listen_manhole( - listener.bind_addresses, - listener.port, - manhole_settings=self.config.server.manhole_settings, - manhole_globals={"hs": self}, - ) + if isinstance(listener, TCPListenerConfig): + _base.listen_manhole( + listener.bind_addresses, + listener.port, + manhole_settings=self.config.server.manhole_settings, + manhole_globals={"hs": self}, + ) + else: + raise ConfigError( + "Can not use a unix socket for manhole at this time." + ) elif listener.type == "metrics": if not self.config.metrics.enable_metrics: logger.warning( @@ -265,10 +269,16 @@ def start_listening(self) -> None: "enable_metrics is not True!" ) else: - _base.listen_metrics( - listener.bind_addresses, - listener.port, - ) + if isinstance(listener, TCPListenerConfig): + _base.listen_metrics( + listener.bind_addresses, + listener.port, + ) + else: + raise ConfigError( + "Can not use a unix socket for metrics at this time." + ) + else: # this shouldn't happen, as the listener type should have been checked # during parsing diff --git a/synapse/config/server.py b/synapse/config/server.py index 0e46b849cf06..386c3194b85a 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -214,17 +214,52 @@ class HttpListenerConfig: @attr.s(slots=True, frozen=True, auto_attribs=True) -class ListenerConfig: - """Object describing the configuration of a single listener.""" +class TCPListenerConfig: + """Object describing the configuration of a single TCP listener.""" port: int = attr.ib(validator=attr.validators.instance_of(int)) - bind_addresses: List[str] + bind_addresses: List[str] = attr.ib(validator=attr.validators.instance_of(List)) type: str = attr.ib(validator=attr.validators.in_(KNOWN_LISTENER_TYPES)) tls: bool = False # http_options is only populated if type=http http_options: Optional[HttpListenerConfig] = None + def get_site_tag(self) -> str: + """Retrieves http_options.tag if it exists, otherwise the port number.""" + if self.http_options and self.http_options.tag is not None: + return self.http_options.tag + else: + return str(self.port) + + def is_tls(self) -> bool: + return self.tls + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class UnixListenerConfig: + """Object describing the configuration of a single Unix socket listener.""" + + # Note: unix sockets can not be tls encrypted, so HAVE to be behind a tls-handling + # reverse proxy + path: str = attr.ib() + # A default(0o666) for this is set in parse_listener_def() below + mode: int + type: str = attr.ib(validator=attr.validators.in_(KNOWN_LISTENER_TYPES)) + + # http_options is only populated if type=http + http_options: Optional[HttpListenerConfig] = None + + def get_site_tag(self) -> str: + return "unix" + + def is_tls(self) -> bool: + """Unix sockets can't have TLS""" + return False + + +ListenerConfig = Union[TCPListenerConfig, UnixListenerConfig] + @attr.s(slots=True, frozen=True, auto_attribs=True) class ManholeConfig: @@ -531,12 +566,12 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.listeners = [parse_listener_def(i, x) for i, x in enumerate(listeners)] - # no_tls is not really supported any more, but let's grandfather it in - # here. + # no_tls is not really supported anymore, but let's grandfather it in here. if config.get("no_tls", False): l2 = [] for listener in self.listeners: - if listener.tls: + if isinstance(listener, TCPListenerConfig) and listener.tls: + # Use isinstance() as the assertion this *has* a listener.port logger.info( "Ignoring TLS-enabled listener on port %i due to no_tls", listener.port, @@ -577,7 +612,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: ) self.listeners.append( - ListenerConfig( + TCPListenerConfig( port=bind_port, bind_addresses=[bind_host], tls=True, @@ -589,7 +624,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: unsecure_port = config.get("unsecure_port", bind_port - 400) if unsecure_port: self.listeners.append( - ListenerConfig( + TCPListenerConfig( port=unsecure_port, bind_addresses=[bind_host], tls=False, @@ -601,7 +636,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: manhole = config.get("manhole") if manhole: self.listeners.append( - ListenerConfig( + TCPListenerConfig( port=manhole, bind_addresses=["127.0.0.1"], type="manhole", @@ -648,7 +683,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: logger.warning(METRICS_PORT_WARNING) self.listeners.append( - ListenerConfig( + TCPListenerConfig( port=metrics_port, bind_addresses=[config.get("metrics_bind_host", "127.0.0.1")], type="http", @@ -724,7 +759,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.delete_stale_devices_after = None def has_tls_listener(self) -> bool: - return any(listener.tls for listener in self.listeners) + return any(listener.is_tls() for listener in self.listeners) def generate_config_section( self, @@ -904,25 +939,25 @@ def parse_listener_def(num: int, listener: Any) -> ListenerConfig: raise ConfigError(DIRECT_TCP_ERROR, ("listeners", str(num), "type")) port = listener.get("port") - if type(port) is not int: + socket_path = listener.get("path") + # Either a port or a path should be declared at a minimum. Using both would be bad. + if port is not None and not isinstance(port, int): raise ConfigError("Listener configuration is lacking a valid 'port' option") + if socket_path is not None and not isinstance(socket_path, str): + raise ConfigError("Listener configuration is lacking a valid 'path' option") + if port and socket_path: + raise ConfigError( + "Can not have both a UNIX socket and an IP/port declared for the same " + "resource!" + ) + if port is None and socket_path is None: + raise ConfigError( + "Must have either a UNIX socket or an IP/port declared for a given " + "resource!" + ) tls = listener.get("tls", False) - bind_addresses = listener.get("bind_addresses", []) - bind_address = listener.get("bind_address") - # if bind_address was specified, add it to the list of addresses - if bind_address: - bind_addresses.append(bind_address) - - # if we still have an empty list of addresses, use the default list - if not bind_addresses: - if listener_type == "metrics": - # the metrics listener doesn't support IPv6 - bind_addresses.append("0.0.0.0") - else: - bind_addresses.extend(DEFAULT_BIND_ADDRESSES) - http_config = None if listener_type == "http": try: @@ -932,8 +967,12 @@ def parse_listener_def(num: int, listener: Any) -> ListenerConfig: except ValueError as e: raise ConfigError("Unknown listener resource") from e + # For a unix socket, default x_forwarded to True, as this is the only way of + # getting a client IP. + # Note: a reverse proxy is required anyway, as there is no way of exposing a + # unix socket to the internet. http_config = HttpListenerConfig( - x_forwarded=listener.get("x_forwarded", False), + x_forwarded=listener.get("x_forwarded", (True if socket_path else False)), resources=resources, additional_resources=listener.get("additional_resources", {}), tag=listener.get("tag"), @@ -941,7 +980,30 @@ def parse_listener_def(num: int, listener: Any) -> ListenerConfig: experimental_cors_msc3886=listener.get("experimental_cors_msc3886", False), ) - return ListenerConfig(port, bind_addresses, listener_type, tls, http_config) + if socket_path: + # TODO: Add in path validation, like if the directory exists and is writable? + # Set a default for the permission, in case it's left out + socket_mode = listener.get("mode", 0o666) + + return UnixListenerConfig(socket_path, socket_mode, listener_type, http_config) + + else: + assert port is not None + bind_addresses = listener.get("bind_addresses", []) + bind_address = listener.get("bind_address") + # if bind_address was specified, add it to the list of addresses + if bind_address: + bind_addresses.append(bind_address) + + # if we still have an empty list of addresses, use the default list + if not bind_addresses: + if listener_type == "metrics": + # the metrics listener doesn't support IPv6 + bind_addresses.append("0.0.0.0") + else: + bind_addresses.extend(DEFAULT_BIND_ADDRESSES) + + return TCPListenerConfig(port, bind_addresses, listener_type, tls, http_config) _MANHOLE_SETTINGS_SCHEMA = { diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 2580660b6c27..1dfbe27e89a2 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -19,15 +19,18 @@ import attr -from synapse.types import JsonDict - -from ._base import ( +from synapse.config._base import ( Config, ConfigError, RoutableShardedWorkerHandlingConfig, ShardedWorkerHandlingConfig, ) -from .server import DIRECT_TCP_ERROR, ListenerConfig, parse_listener_def +from synapse.config.server import ( + DIRECT_TCP_ERROR, + TCPListenerConfig, + parse_listener_def, +) +from synapse.types import JsonDict _DEPRECATED_WORKER_DUTY_OPTION_USED = """ The '%s' configuration option is deprecated and will be removed in a future @@ -161,7 +164,7 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: manhole = config.get("worker_manhole") if manhole: self.worker_listeners.append( - ListenerConfig( + TCPListenerConfig( port=manhole, bind_addresses=["127.0.0.1"], type="manhole", diff --git a/synapse/http/site.py b/synapse/http/site.py index 6a1dbf7f33b6..c530966ef336 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -19,6 +19,7 @@ import attr from zope.interface import implementer +from twisted.internet.address import UNIXAddress from twisted.internet.defer import Deferred from twisted.internet.interfaces import IAddress, IReactorTime from twisted.python.failure import Failure @@ -257,7 +258,7 @@ def render(self, resrc: Resource) -> None: request_id, request=ContextRequest( request_id=request_id, - ip_address=self.getClientAddress().host, + ip_address=self.get_client_ip_if_available(), site_tag=self.synapse_site.site_tag, # The requester is going to be unknown at this point. requester=None, @@ -414,7 +415,7 @@ def _started_processing(self, servlet_name: str) -> None: self.synapse_site.access_logger.debug( "%s - %s - Received request: %s %s", - self.getClientAddress().host, + self.get_client_ip_if_available(), self.synapse_site.site_tag, self.get_method(), self.get_redacted_uri(), @@ -462,7 +463,7 @@ def _finished_processing(self) -> None: "%s - %s - {%s}" " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" ' %sB %s "%s %s %s" "%s" [%d dbevts]', - self.getClientAddress().host, + self.get_client_ip_if_available(), self.synapse_site.site_tag, requester, processing_time, @@ -500,6 +501,26 @@ def _should_log_request(self) -> bool: return True + def get_client_ip_if_available(self) -> str: + """Logging helper. Return something useful when a client IP is not retrievable + from a unix socket. + + In practice, this returns the socket file path on a SynapseRequest if using a + unix socket and the normal IP address for TCP sockets. + + """ + # getClientAddress().host returns a proper IP address for a TCP socket. But + # unix sockets have no concept of IP addresses or ports and return a + # UNIXAddress containing a 'None' value. In order to get something usable for + # logs(where this is used) get the unix socket file. getHost() returns a + # UNIXAddress containing a value of the socket file and has an instance + # variable of 'name' encoded as a byte string containing the path we want. + # Decode to utf-8 so it looks nice. + if isinstance(self.getClientAddress(), UNIXAddress): + return self.getHost().name.decode("utf-8") + else: + return self.getClientAddress().host + class XForwardedForRequest(SynapseRequest): """Request object which honours proxy headers diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index c09b9cf87ddc..5cee9c3194f0 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -50,6 +50,7 @@ IReactorTCP, IReactorThreads, IReactorTime, + IReactorUNIX, ) from synapse.api.errors import Codes, SynapseError @@ -91,6 +92,7 @@ class ISynapseReactor( IReactorTCP, IReactorSSL, + IReactorUNIX, IReactorPluggableNameResolver, IReactorTime, IReactorCore,