Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Allow listening on UNIX sockets for HTTP listeners #8103

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8103.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for listening on a named UNIX domain socket for HTTP interfaces. Contributed by David Vo.
30 changes: 23 additions & 7 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import synapse
from synapse.app import check_bind_error
from synapse.config.server import ListenerConfig
from synapse.config.server import ListenerConfig, TcpListenerConfig
from synapse.crypto import context_factory
from synapse.logging.context import PreserveLoggingContext
from synapse.util.async_helpers import Linearizer
Expand Down Expand Up @@ -142,24 +142,38 @@ def quit_with_error(error_string: str) -> NoReturn:
sys.exit(1)


def listen_metrics(bind_addresses, port):
def listen_metrics(socket_options):
"""
Start Prometheus metrics server.
"""
if not isinstance(socket_options, TcpListenerConfig):
logger.warning(
"Metrics listener only supports TCP, use an HTTP listener instead"
)
return

from synapse.metrics import RegistryProxy, start_http_server

for host in bind_addresses:
if socket_options.tls:
logger.warning("Ignoring 'tls' option for metrics listener")

port = socket_options.port
for host in socket_options.bind_addresses:
logger.info("Starting metrics listener on %s:%d", host, port)
start_http_server(port, addr=host, registry=RegistryProxy)


def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
def listen_tcp(socket_options, factory, reactor=reactor, backlog=50):
"""
Create a TCP socket for a port and several addresses

Returns:
list[twisted.internet.tcp.Port]: listening for TCP connections
"""
assert not socket_options.tls
bind_addresses = socket_options.bind_addresses
port = socket_options.port

r = []
for address in bind_addresses:
try:
Expand All @@ -170,15 +184,17 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
return r


def listen_ssl(
bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
):
def listen_ssl(socket_options, factory, context_factory, reactor=reactor, backlog=50):
"""
Create an TLS-over-TCP socket for a port and several addresses

Returns:
list of twisted.internet.tcp.Port listening for TLS connections
"""
assert socket_options.tls
bind_addresses = socket_options.bind_addresses
port = socket_options.port

r = []
for address in bind_addresses:
try:
Expand Down
75 changes: 43 additions & 32 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# limitations under the License.
import contextlib
import logging
import os
import sys
from typing import Dict, Iterable, Optional, Set

Expand All @@ -37,7 +38,7 @@
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.config.server import ListenerConfig
from synapse.config.server import ListenerConfig, TcpListenerConfig, UnixListenerConfig
from synapse.federation import send_queue
from synapse.federation.transport.server import TransportLayerServer
from synapse.handlers.presence import (
Expand Down Expand Up @@ -486,15 +487,8 @@ class GenericWorkerServer(HomeServer):
DATASTORE_CLASS = GenericWorkerSlavedStore

def _listen_http(self, listener_config: ListenerConfig):
port = listener_config.port
bind_addresses = listener_config.bind_addresses

assert listener_config.http_options is not None

site_tag = listener_config.http_options.tag
if site_tag is None:
site_tag = port

# We always include a health resource.
resources = {"/health": HealthResource()}

Expand Down Expand Up @@ -590,43 +584,60 @@ def _listen_http(self, listener_config: ListenerConfig):

root_resource = create_resource_tree(resources, OptionsResource())

_base.listen_tcp(
bind_addresses,
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
reactor=self.get_reactor(),
socket_options = listener_config.socket_options
site_tag = listener_config.http_options.tag

if isinstance(socket_options, TcpListenerConfig):
port = socket_options.port
if site_tag is None:
site_tag = port
site_type = "http"
else:
assert isinstance(socket_options, UnixListenerConfig)
port = None
socket_path = socket_options.path
if site_tag is None:
site_tag = os.path.basename(socket_path)
site_type = "unix"

site = SynapseSite(
"synapse.access.%s.%s" % (site_type, site_tag),
site_tag,
listener_config,
root_resource,
self.version_string,
)

logger.info("Synapse worker now listening on port %d", port)
if port is not None:
_base.listen_tcp(socket_options, site, reactor=self.get_reactor())
logger.info("Synapse worker now listening on port %d", port)
else:
self.get_reactor().listenUNIX(socket_path, site)
logger.info("Synapse worker now listening on socket %s", socket_path)

def start_listening(self, listeners: Iterable[ListenerConfig]):
for listener in listeners:
if listener.type == "http":
self._listen_http(listener)
elif listener.type == "manhole":
_base.listen_tcp(
listener.bind_addresses,
listener.port,
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
)
if not isinstance(listener.socket_options, TcpListenerConfig):
logger.warning("Manhole listener currently only supports TCP")
else:
_base.listen_tcp(
listener.socket_options,
manhole(
username="matrix",
password="rabbithole",
globals={"hs": self},
),
)
elif listener.type == "metrics":
if not self.get_config().enable_metrics:
logger.warning(
(
"Metrics listener configured, but "
"enable_metrics is not True!"
)
"Metrics listener configured, but enable_metrics is not True!"
)
else:
_base.listen_metrics(listener.bind_addresses, listener.port)
_base.listen_metrics(listener.socket_options)
else:
logger.warning("Unsupported listener type: %s", listener.type)

Expand Down
111 changes: 61 additions & 50 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.server import ListenerConfig
from synapse.config.server import ListenerConfig, TcpListenerConfig, UnixListenerConfig
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import (
Expand Down Expand Up @@ -92,12 +92,7 @@ class SynapseHomeServer(HomeServer):
DATASTORE_CLASS = DataStore

def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig):
port = listener_config.port
bind_addresses = listener_config.bind_addresses
tls = listener_config.tls
site_tag = listener_config.http_options.tag
if site_tag is None:
site_tag = port
assert listener_config.http_options is not None

# We always include a health resource.
resources = {"/health": HealthResource()}
Expand Down Expand Up @@ -137,36 +132,46 @@ def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConf

root_resource = create_resource_tree(resources, root_resource)

if tls:
ports = listen_ssl(
bind_addresses,
port,
SynapseSite(
"synapse.access.https.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
self.tls_server_context_factory,
reactor=self.get_reactor(),
)
logger.info("Synapse now listening on TCP port %d (TLS)", port)
socket_options = listener_config.socket_options
site_tag = listener_config.http_options.tag

if isinstance(socket_options, TcpListenerConfig):
port = socket_options.port
if site_tag is None:
site_tag = port
site_type = "https" if socket_options.tls else "http"
else:
ports = listen_tcp(
bind_addresses,
port,
SynapseSite(
"synapse.access.http.%s" % (site_tag,),
site_tag,
listener_config,
root_resource,
self.version_string,
),
reactor=self.get_reactor(),
)
logger.info("Synapse now listening on TCP port %d", port)
assert isinstance(socket_options, UnixListenerConfig)
port = None
if site_tag is None:
site_tag = os.path.basename(socket_options.path)
site_type = "unix"

site = SynapseSite(
"synapse.access.%s.%s" % (site_type, site_tag),
site_tag,
listener_config,
root_resource,
self.version_string,
)

if port is not None:
if socket_options.tls:
ports = listen_ssl(
socket_options,
site,
self.tls_server_context_factory,
reactor=self.get_reactor(),
)
logger.info("Synapse now listening on TCP port %d (TLS)", port)

else:
ports = listen_tcp(socket_options, site, reactor=self.get_reactor())
logger.info("Synapse now listening on TCP port %d", port)

else:
ports = [self.get_reactor().listenUNIX(socket_options.path, site)]
logger.info("Synapse now listening on UNIX socket %s", socket_options.path)

return ports

Expand Down Expand Up @@ -295,31 +300,37 @@ def start_listening(self, listeners: Iterable[ListenerConfig]):
if listener.type == "http":
self._listening_services.extend(self._listener_http(config, listener))
elif listener.type == "manhole":
listen_tcp(
listener.bind_addresses,
listener.port,
manhole(
username="matrix", password="rabbithole", globals={"hs": self}
),
)
if not isinstance(listener.socket_options, TcpListenerConfig):
logger.warning("Manhole listener currently only supports TCP")
else:
listen_tcp(
listener.socket_options,
manhole(
username="matrix",
password="rabbithole",
globals={"hs": self},
),
)
elif listener.type == "replication":
if not isinstance(listener.socket_options, TcpListenerConfig):
logger.error(
"Replication configured to listen on a UNIX socket,"
" but only TCP is supported"
)
# XXX: should we straight up bail here?
continue
services = listen_tcp(
listener.bind_addresses,
listener.port,
ReplicationStreamProtocolFactory(self),
listener.socket_options, ReplicationStreamProtocolFactory(self),
)
for s in services:
reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
elif listener.type == "metrics":
if not self.get_config().enable_metrics:
logger.warning(
(
"Metrics listener configured, but "
"enable_metrics is not True!"
)
"Metrics listener configured, but enable_metrics is not True!"
)
else:
_base.listen_metrics(listener.bind_addresses, listener.port)
_base.listen_metrics(listener.socket_options)
else:
# this shouldn't happen, as the listener type should have been checked
# during parsing
Expand Down
Loading