Skip to content

Commit

Permalink
Use a larger, dedicated threadpool for media sending (#17564)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Aug 13, 2024
1 parent 6a11bdf commit a9fc1fd
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 17 deletions.
1 change: 1 addition & 0 deletions changelog.d/17564.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up responding to media requests.
19 changes: 13 additions & 6 deletions synapse/media/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@
from synapse.http.server import finish_request, respond_with_json
from synapse.http.site import SynapseRequest
from synapse.logging.context import (
defer_to_thread,
defer_to_threadpool,
make_deferred_yieldable,
run_in_background,
)
from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.stringutils import is_ascii

if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.storage.databases.main.media_repository import LocalMedia


Expand Down Expand Up @@ -132,6 +132,7 @@ def respond_404(request: SynapseRequest) -> None:


async def respond_with_file(
hs: "HomeServer",
request: SynapseRequest,
media_type: str,
file_path: str,
Expand All @@ -148,7 +149,7 @@ async def respond_with_file(
add_file_headers(request, media_type, file_size, upload_name)

with open(file_path, "rb") as f:
await ThreadedFileSender(request.reactor).beginFileTransfer(f, request)
await ThreadedFileSender(hs).beginFileTransfer(f, request)

finish_request(request)
else:
Expand Down Expand Up @@ -632,8 +633,9 @@ class ThreadedFileSender:
# read.
TIMEOUT_SECONDS = 90.0

def __init__(self, reactor: ISynapseReactor) -> None:
self.reactor = reactor
def __init__(self, hs: "HomeServer") -> None:
self.reactor = hs.get_reactor()
self.thread_pool = hs.get_media_sender_thread_pool()

self.file: Optional[BinaryIO] = None
self.deferred: "Deferred[None]" = Deferred()
Expand Down Expand Up @@ -661,7 +663,12 @@ def beginFileTransfer(

# We set the wakeup signal as we should start producing immediately.
self.wakeup_event.set()
run_in_background(defer_to_thread, self.reactor, self._on_thread_read_loop)
run_in_background(
defer_to_threadpool,
self.reactor,
self.thread_pool,
self._on_thread_read_loop,
)

return make_deferred_yieldable(self.deferred)

Expand Down
12 changes: 5 additions & 7 deletions synapse/media/media_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from synapse.util import Clock
from synapse.util.file_consumer import BackgroundFileConsumer

from ..types import ISynapseReactor, JsonDict
from ..types import JsonDict
from ._base import FileInfo, Responder
from .filepath import MediaFilePaths

Expand Down Expand Up @@ -209,7 +209,7 @@ async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]:
local_path = os.path.join(self.local_media_directory, path)
if os.path.exists(local_path):
logger.debug("responding with local file %s", local_path)
return FileResponder(self.reactor, open(local_path, "rb"))
return FileResponder(self.hs, open(local_path, "rb"))
logger.debug("local file %s did not exist", local_path)

for provider in self.storage_providers:
Expand Down Expand Up @@ -332,14 +332,12 @@ class FileResponder(Responder):
is closed when finished streaming.
"""

def __init__(self, reactor: ISynapseReactor, open_file: BinaryIO):
self.reactor = reactor
def __init__(self, hs: "HomeServer", open_file: BinaryIO):
self.hs = hs
self.open_file = open_file

def write_to_consumer(self, consumer: IConsumer) -> Deferred:
return ThreadedFileSender(self.reactor).beginFileTransfer(
self.open_file, consumer
)
return ThreadedFileSender(self.hs).beginFileTransfer(self.open_file, consumer)

def __exit__(
self,
Expand Down
2 changes: 1 addition & 1 deletion synapse/media/storage_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:

backup_fname = os.path.join(self.base_directory, path)
if os.path.isfile(backup_fname):
return FileResponder(self.reactor, open(backup_fname, "rb"))
return FileResponder(self.hs, open(backup_fname, "rb"))

return None

Expand Down
6 changes: 3 additions & 3 deletions synapse/media/thumbnailer.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,11 @@ async def select_or_generate_local_thumbnail(
await respond_with_multipart_responder(
self.hs.get_clock(),
request,
FileResponder(self.reactor, open(file_path, "rb")),
FileResponder(self.hs, open(file_path, "rb")),
media_info,
)
else:
await respond_with_file(request, desired_type, file_path)
await respond_with_file(self.hs, request, desired_type, file_path)
else:
logger.warning("Failed to generate thumbnail")
raise SynapseError(400, "Failed to generate thumbnail.")
Expand Down Expand Up @@ -456,7 +456,7 @@ async def select_or_generate_remote_thumbnail(
)

if file_path:
await respond_with_file(request, desired_type, file_path)
await respond_with_file(self.hs, request, desired_type, file_path)
else:
logger.warning("Failed to generate thumbnail")
raise SynapseError(400, "Failed to generate thumbnail.")
Expand Down
19 changes: 19 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from twisted.internet.interfaces import IOpenSSLContextFactory
from twisted.internet.tcp import Port
from twisted.python.threadpool import ThreadPool
from twisted.web.iweb import IPolicyForHTTPS
from twisted.web.resource import Resource

Expand Down Expand Up @@ -941,3 +942,21 @@ def get_worker_locks_handler(self) -> WorkerLocksHandler:
@cache_in_self
def get_task_scheduler(self) -> TaskScheduler:
return TaskScheduler(self)

@cache_in_self
def get_media_sender_thread_pool(self) -> ThreadPool:
"""Fetch the threadpool used to read files when responding to media
download requests."""

# We can choose a large threadpool size as these threads predominately
# do IO rather than CPU work.
media_threadpool = ThreadPool(
name="media_threadpool", minthreads=1, maxthreads=50
)

media_threadpool.start()
self.get_reactor().addSystemEventTrigger(
"during", "shutdown", media_threadpool.stop
)

return media_threadpool
6 changes: 6 additions & 0 deletions tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,12 @@ async def validate_hash(p: str, h: str) -> bool:

hs.get_auth_handler().validate_hash = validate_hash # type: ignore[assignment]

# We need to replace the media threadpool with the fake test threadpool.
def thread_pool() -> threadpool.ThreadPool:
return reactor.getThreadPool()

hs.get_media_sender_thread_pool = thread_pool # type: ignore[method-assign]

# Load any configured modules into the homeserver
module_api = hs.get_module_api()
for module, module_config in hs.config.modules.loaded_modules:
Expand Down

0 comments on commit a9fc1fd

Please sign in to comment.