Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a wrapper to convert file streams to multipart #17327

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17327.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a file consumer to convert file streams to multipart.
16 changes: 12 additions & 4 deletions synapse/media/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
from synapse.http.server import finish_request, respond_with_json
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock
from synapse.util.stringutils import is_ascii

if TYPE_CHECKING:
from synapse.media.media_storage import MultipartResponder
from synapse.storage.databases.main.media_repository import LocalMedia


Expand Down Expand Up @@ -275,8 +275,9 @@ def _can_encode_filename_as_token(x: str) -> bool:


async def respond_with_multipart_responder(
clock: Clock,
request: SynapseRequest,
responder: "Optional[MultipartResponder]",
responder: "Optional[Responder]",
media_info: "LocalMedia",
) -> None:
"""
Expand All @@ -299,15 +300,22 @@ async def respond_with_multipart_responder(
)
return

from synapse.media.media_storage import MultipartFileConsumer

multipart_consumer = MultipartFileConsumer(
clock, request, media_info.media_type, {}
)

logger.debug("Responding to media request with responder %s", responder)
if media_info.media_length is not None:
request.setHeader(b"Content-Length", b"%d" % (media_info.media_length,))
request.setHeader(
b"Content-Type", b"multipart/mixed; boundary=%s" % responder.boundary
b"Content-Type",
b"multipart/mixed; boundary=%s" % multipart_consumer.boundary,
)

try:
await responder.write_to_consumer(request)
await responder.write_to_consumer(multipart_consumer)
except Exception as e:
# The majority of the time this will be due to the client having gone
# away. Unfortunately, Twisted simply throws a generic exception at us
Expand Down
7 changes: 4 additions & 3 deletions synapse/media/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
respond_with_responder,
)
from synapse.media.filepath import MediaFilePaths
from synapse.media.media_storage import MediaStorage, MultipartResponder
from synapse.media.media_storage import MediaStorage
from synapse.media.storage_provider import StorageProviderWrapper
from synapse.media.thumbnailer import Thumbnailer, ThumbnailError
from synapse.media.url_previewer import UrlPreviewer
Expand Down Expand Up @@ -467,8 +467,9 @@ async def get_local_media(
)
if federation:
# this really should be a Multipart responder but just in case
assert isinstance(responder, MultipartResponder)
await respond_with_multipart_responder(request, responder, media_info)
await respond_with_multipart_responder(
self.clock, request, responder, media_info
)
else:
await respond_with_responder(
request, responder, media_type, media_length, upload_name
Expand Down
256 changes: 141 additions & 115 deletions synapse/media/media_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,24 @@
Tuple,
Type,
Union,
cast,
)
from uuid import uuid4

import attr
from zope.interface import implementer

from twisted.internet import defer, interfaces
from twisted.internet import interfaces
from twisted.internet.defer import Deferred
from twisted.internet.interfaces import IConsumer
from twisted.protocols.basic import FileSender

from synapse.api.errors import NotFoundError
from synapse.logging.context import defer_to_thread, make_deferred_yieldable
from synapse.logging.context import (
defer_to_thread,
make_deferred_yieldable,
run_in_background,
)
from synapse.logging.opentracing import start_active_span, trace, trace_with_opname
from synapse.util import Clock
from synapse.util.file_consumer import BackgroundFileConsumer
Expand Down Expand Up @@ -217,14 +222,7 @@ async def fetch_media(
local_path = os.path.join(self.local_media_directory, path)
if os.path.exists(local_path):
logger.debug("responding with local file %s", local_path)
if federation:
assert media_info is not None
boundary = uuid4().hex.encode("ascii")
return MultipartResponder(
open(local_path, "rb"), media_info, boundary
)
else:
return FileResponder(open(local_path, "rb"))
return FileResponder(open(local_path, "rb"))
logger.debug("local file %s did not exist", local_path)

for provider in self.storage_providers:
Expand Down Expand Up @@ -364,38 +362,6 @@ def __exit__(
self.open_file.close()


class MultipartResponder(Responder):
"""Wraps an open file, formats the response according to MSC3916 and sends it to a
federation request.

Args:
open_file: A file like object to be streamed to the client,
is closed when finished streaming.
media_info: metadata about the media item
boundary: bytes to use for the multipart response boundary
"""

def __init__(self, open_file: IO, media_info: LocalMedia, boundary: bytes) -> None:
self.open_file = open_file
self.media_info = media_info
self.boundary = boundary

def write_to_consumer(self, consumer: IConsumer) -> Deferred:
return make_deferred_yieldable(
MultipartFileSender().beginFileTransfer(
self.open_file, consumer, self.media_info.media_type, {}, self.boundary
)
)

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
self.open_file.close()


class SpamMediaException(NotFoundError):
"""The media was blocked by a spam checker, so we simply 404 the request (in
the same way as if it was quarantined).
Expand Down Expand Up @@ -431,105 +397,165 @@ async def write_chunks_to(self, callback: Callable[[bytes], object]) -> None:
await self.clock.sleep(0)


@implementer(interfaces.IProducer)
class MultipartFileSender:
@implementer(interfaces.IConsumer)
@implementer(interfaces.IPushProducer)
class MultipartFileConsumer:
"""Wraps a given consumer so that any data that gets written to it gets
converted to a multipart format.
"""
A producer that sends the contents of a file to a federation request in the format
outlined in MSC3916 - a multipart/format-data response where the first field is a
JSON object and the second is the requested file.

This is a slight re-writing of twisted.protocols.basic.FileSender to achieve the format
outlined above.
"""

CHUNK_SIZE = 2**14

lastSent = ""
deferred: Optional[defer.Deferred] = None

def beginFileTransfer(
def __init__(
self,
file: IO,
consumer: IConsumer,
clock: Clock,
wrapped_consumer: interfaces.IConsumer,
file_content_type: str,
json_object: JsonDict,
boundary: bytes,
) -> Deferred:
"""
Begin transferring a file

Args:
file: The file object to read data from
consumer: The synapse request to write the data to
file_content_type: The content-type of the file
json_object: The JSON object to write to the first field of the response
boundary: bytes to be used as the multipart/form-data boundary

Returns: A deferred whose callback will be invoked when the file has
been completely written to the consumer. The last byte written to the
consumer is passed to the callback.
"""
self.file: Optional[IO] = file
self.consumer = consumer
) -> None:
self.clock = clock
self.wrapped_consumer = wrapped_consumer
self.json_field = json_object
self.json_field_written = False
self.content_type_written = False
self.file_content_type = file_content_type
self.boundary = boundary
self.deferred: Deferred = defer.Deferred()
self.consumer.registerProducer(self, False)
# while it's not entirely clear why this assignment is necessary, it mirrors
# the behavior in FileSender.beginFileTransfer and thus is preserved here
deferred = self.deferred
return deferred
self.boundary = uuid4().hex.encode("ascii")

def resumeProducing(self) -> None:
# write the first field, which will always be a json field
# The producer that registered with us, and if its a push or pull
# producer.
self.producer: Optional["interfaces.IProducer"] = None
self.streaming: Optional[bool] = None

# Whether the wrapped consumer has asked us to pause.
self.paused = False

### IConsumer APIs ###

def registerProducer(
self, producer: "interfaces.IProducer", streaming: bool
) -> None:
"""
Register to receive data from a producer.

This sets self to be a consumer for a producer. When this object runs
out of data (as when a send(2) call on a socket succeeds in moving the
last data from a userspace buffer into a kernelspace buffer), it will
ask the producer to resumeProducing().

For L{IPullProducer} providers, C{resumeProducing} will be called once
each time data is required.

For L{IPushProducer} providers, C{pauseProducing} will be called
whenever the write buffer fills up and C{resumeProducing} will only be
called when it empties. The consumer will only call C{resumeProducing}
to balance a previous C{pauseProducing} call; the producer is assumed
to start in an un-paused state.

@param streaming: C{True} if C{producer} provides L{IPushProducer},
C{False} if C{producer} provides L{IPullProducer}.

@raise RuntimeError: If a producer is already registered.
"""
self.producer = producer
self.streaming = streaming

self.wrapped_consumer.registerProducer(self, streaming)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.wrapped_consumer.registerProducer(self, streaming)
self.wrapped_consumer.registerProducer(self, True)

Would be good to know why you changed it to this. My understanding is that MultipartFileConsumer operates as a IPushProducer irrespective of what producer is, and so we should we always set the streaming flag in registerProducer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was a change that I made after I ran an integration test and found that the file streaming response was hanging indefinitely causing the original request to time out - changing it to this worked and so I didn't think much about it, but it sounds like maybe I need to investigate further...


def unregisterProducer(self) -> None:
"""
Stop consuming data from a producer, without disconnecting.
"""
self.wrapped_consumer.write(CRLF + b"--" + self.boundary + b"--" + CRLF)
self.wrapped_consumer.unregisterProducer()
self.paused = True

def write(self, data: bytes) -> None:
"""
The producer will write data by calling this method.

The implementation must be non-blocking and perform whatever
buffering is necessary. If the producer has provided enough data
for now and it is a L{IPushProducer}, the consumer may call its
C{pauseProducing} method.
"""
if not self.json_field_written:
self.consumer.write(CRLF + b"--" + self.boundary + CRLF)
self.wrapped_consumer.write(CRLF + b"--" + self.boundary + CRLF)

content_type = Header(b"Content-Type", b"application/json")
self.consumer.write(bytes(content_type) + CRLF)
self.wrapped_consumer.write(bytes(content_type) + CRLF)

json_field = json.dumps(self.json_field)
json_bytes = json_field.encode("utf-8")
self.consumer.write(json_bytes)
self.consumer.write(CRLF + b"--" + self.boundary + CRLF)
self.wrapped_consumer.write(CRLF + json_bytes)
self.wrapped_consumer.write(CRLF + b"--" + self.boundary + CRLF)

self.json_field_written = True

chunk: Any = ""
if self.file:
# if we haven't written the content type yet, do so
if not self.content_type_written:
type = self.file_content_type.encode("utf-8")
content_type = Header(b"Content-Type", type)
self.consumer.write(bytes(content_type) + CRLF)
self.content_type_written = True
# if we haven't written the content type yet, do so
if not self.content_type_written:
type = self.file_content_type.encode("utf-8")
content_type = Header(b"Content-Type", type)
self.wrapped_consumer.write(bytes(content_type) + CRLF + CRLF)
self.content_type_written = True

chunk = self.file.read(self.CHUNK_SIZE)
self.wrapped_consumer.write(data)

if not chunk:
# we've reached the end of the file
self.consumer.write(CRLF + b"--" + self.boundary + b"--" + CRLF)
self.file = None
self.consumer.unregisterProducer()
### IPushProducer APIs ###

if self.deferred:
self.deferred.callback(self.lastSent)
self.deferred = None
return
def stopProducing(self) -> None:
"""
Stop producing data.

This tells a producer that its consumer has died, so it must stop
producing data for good.
"""
assert self.producer is not None

self.consumer.write(chunk)
self.lastSent = chunk[-1:]
self.paused = True
self.producer.stopProducing()

def pauseProducing(self) -> None:
pass
"""
Pause producing data.

def stopProducing(self) -> None:
if self.deferred:
self.deferred.errback(Exception("Consumer asked us to stop producing"))
self.deferred = None
Tells a producer that it has produced too much data to process for
the time being, and to stop until C{resumeProducing()} is called.
"""
assert self.producer is not None

self.paused = True

if self.streaming:
cast("interfaces.IPushProducer", self.producer).pauseProducing()
else:
self.paused = True

def resumeProducing(self) -> None:
"""
Resume producing data.

This tells a producer to re-add itself to the main loop and produce
more data for its consumer.
"""
assert self.producer is not None

if self.streaming:
cast("interfaces.IPushProducer", self.producer).resumeProducing()
else:
# If the producer is not a streaming producer we need to start
# repeatedly calling `resumeProducing` in a loop.
run_in_background(self._resumeProducingRepeatedly)

### Internal APIs. ###

async def _resumeProducingRepeatedly(self) -> None:
assert self.producer is not None
assert not self.streaming

producer = cast("interfaces.IPullProducer", self.producer)

self.paused = False
while not self.paused:
producer.resumeProducing()
await self.clock.sleep(0)


class Header:
Expand Down
Loading