diff --git a/src/torchaudio/io/_stream_reader.py b/src/torchaudio/io/_stream_reader.py deleted file mode 100644 index a138e3f0ec..0000000000 --- a/src/torchaudio/io/_stream_reader.py +++ /dev/null @@ -1,978 +0,0 @@ -from __future__ import annotations - -import os -from dataclasses import dataclass -from pathlib import Path -from typing import BinaryIO, Dict, Iterator, Optional, Tuple, TypeVar, Union - -import torch -import torchaudio -from torch.utils._pytree import tree_map - -ffmpeg_ext = torchaudio._extension.lazy_import_ffmpeg_ext() - -__all__ = [ - "StreamReader", -] - - -@dataclass -class SourceStream: - """The metadata of a source stream, returned by :meth:`~torchaudio.io.StreamReader.get_src_stream_info`. - - This class is used when representing streams of media type other than `audio` or `video`. - - When source stream is `audio` or `video` type, :class:`SourceAudioStream` and - :class:`SourceVideoStream`, which reports additional media-specific attributes, - are used respectively. - """ - - media_type: str - """The type of the stream. - One of ``"audio"``, ``"video"``, ``"data"``, ``"subtitle"``, ``"attachment"`` and empty string. - - .. note:: - Only audio and video streams are supported for output. - .. note:: - Still images, such as PNG and JPEG formats are reported as video. - """ - codec: str - """Short name of the codec. Such as ``"pcm_s16le"`` and ``"h264"``.""" - codec_long_name: str - """Detailed name of the codec. - - Such as "`PCM signed 16-bit little-endian`" and "`H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10`". - """ - format: Optional[str] - """Media format. Such as ``"s16"`` and ``"yuv420p"``. - - Commonly found audio values are; - - - ``"u8"``, ``"u8p"``: Unsigned 8-bit unsigned interger. - - ``"s16"``, ``"s16p"``: 16-bit signed integer. - - ``"s32"``, ``"s32p"``: 32-bit signed integer. - - ``"flt"``, ``"fltp"``: 32-bit floating-point. - - .. note:: - - `p` at the end indicates the format is `planar`. - Channels are grouped together instead of interspersed in memory. - """ - bit_rate: Optional[int] - """Bit rate of the stream in bits-per-second. - This is an estimated values based on the initial few frames of the stream. - For container formats and variable bit rate, it can be 0. - """ - num_frames: Optional[int] - """The number of frames in the stream""" - bits_per_sample: Optional[int] - """This is the number of valid bits in each output sample. - For compressed format, it can be 0. - """ - metadata: Dict[str, str] - """Metadata attached to the source stream.""" - - -@dataclass -class SourceAudioStream(SourceStream): - """The metadata of an audio source stream, returned by :meth:`~torchaudio.io.StreamReader.get_src_stream_info`. - - This class is used when representing audio stream. - - In addition to the attributes reported by :class:`SourceStream`, - the following attributes are reported. - """ - - sample_rate: float - """Sample rate of the audio.""" - num_channels: int - """Number of channels.""" - - -@dataclass -class SourceVideoStream(SourceStream): - """The metadata of a video source stream, returned by :meth:`~torchaudio.io.StreamReader.get_src_stream_info`. - - This class is used when representing video stream. - - In addition to the attributes reported by :class:`SourceStream`, - the following attributes are reported. - """ - - width: int - """Width of the video frame in pixel.""" - height: int - """Height of the video frame in pixel.""" - frame_rate: float - """Frame rate.""" - - -def _parse_si(i): - media_type = i.media_type - if media_type == "audio": - return SourceAudioStream( - media_type=i.media_type, - codec=i.codec_name, - codec_long_name=i.codec_long_name, - format=i.format, - bit_rate=i.bit_rate, - num_frames=i.num_frames, - bits_per_sample=i.bits_per_sample, - metadata=i.metadata, - sample_rate=i.sample_rate, - num_channels=i.num_channels, - ) - if media_type == "video": - return SourceVideoStream( - media_type=i.media_type, - codec=i.codec_name, - codec_long_name=i.codec_long_name, - format=i.format, - bit_rate=i.bit_rate, - num_frames=i.num_frames, - bits_per_sample=i.bits_per_sample, - metadata=i.metadata, - width=i.width, - height=i.height, - frame_rate=i.frame_rate, - ) - return SourceStream( - media_type=i.media_type, - codec=i.codec_name, - codec_long_name=i.codec_long_name, - format=None, - bit_rate=None, - num_frames=None, - bits_per_sample=None, - metadata=i.metadata, - ) - - -@dataclass -class OutputStream: - """Output stream configured on :class:`StreamReader`, - returned by :meth:`~torchaudio.io.StreamReader.get_out_stream_info`. - """ - - source_index: int - """Index of the source stream that this output stream is connected.""" - filter_description: str - """Description of filter graph applied to the source stream.""" - media_type: str - """The type of the stream. ``"audio"`` or ``"video"``.""" - format: str - """Media format. Such as ``"s16"`` and ``"yuv420p"``. - - Commonly found audio values are; - - - ``"u8"``, ``"u8p"``: Unsigned 8-bit unsigned interger. - - ``"s16"``, ``"s16p"``: 16-bit signed integer. - - ``"s32"``, ``"s32p"``: 32-bit signed integer. - - ``"flt"``, ``"fltp"``: 32-bit floating-point. - - .. note:: - - `p` at the end indicates the format is `planar`. - Channels are grouped together instead of interspersed in memory.""" - - -@dataclass -class OutputAudioStream(OutputStream): - """Information about an audio output stream configured with - :meth:`~torchaudio.io.StreamReader.add_audio_stream` or - :meth:`~torchaudio.io.StreamReader.add_basic_audio_stream`. - - In addition to the attributes reported by :class:`OutputStream`, - the following attributes are reported. - """ - - sample_rate: float - """Sample rate of the audio.""" - num_channels: int - """Number of channels.""" - - -@dataclass -class OutputVideoStream(OutputStream): - """Information about a video output stream configured with - :meth:`~torchaudio.io.StreamReader.add_video_stream` or - :meth:`~torchaudio.io.StreamReader.add_basic_video_stream`. - - In addition to the attributes reported by :class:`OutputStream`, - the following attributes are reported. - """ - - width: int - """Width of the video frame in pixel.""" - height: int - """Height of the video frame in pixel.""" - frame_rate: float - """Frame rate.""" - - -def _parse_oi(i): - media_type = i.media_type - if media_type == "audio": - return OutputAudioStream( - source_index=i.source_index, - filter_description=i.filter_description, - media_type=i.media_type, - format=i.format, - sample_rate=i.sample_rate, - num_channels=i.num_channels, - ) - if media_type == "video": - return OutputVideoStream( - source_index=i.source_index, - filter_description=i.filter_description, - media_type=i.media_type, - format=i.format, - width=i.width, - height=i.height, - frame_rate=i.frame_rate, - ) - raise ValueError(f"Unexpected media_type: {i.media_type}({i})") - - -def _get_afilter_desc(sample_rate: Optional[int], fmt: Optional[str], num_channels: Optional[int]): - descs = [] - if sample_rate is not None: - descs.append(f"aresample={sample_rate}") - if fmt is not None or num_channels is not None: - parts = [] - if fmt is not None: - parts.append(f"sample_fmts={fmt}") - if num_channels is not None: - parts.append(f"channel_layouts={num_channels}c") - descs.append(f"aformat={':'.join(parts)}") - return ",".join(descs) if descs else None - - -def _get_vfilter_desc(frame_rate: Optional[float], width: Optional[int], height: Optional[int], fmt: Optional[str]): - descs = [] - if frame_rate is not None: - descs.append(f"fps={frame_rate}") - scales = [] - if width is not None: - scales.append(f"width={width}") - if height is not None: - scales.append(f"height={height}") - if scales: - descs.append(f"scale={':'.join(scales)}") - if fmt is not None: - descs.append(f"format=pix_fmts={fmt}") - return ",".join(descs) if descs else None - - -# Base class for ChunkTensor -# Based off of TrivialTensorViaComposition -# https://github.com/albanD/subclass_zoo/blob/0eeb1d68fb59879029c610bc407f2997ae43ba0a/trivial_tensors.py#L83 -class ChunkTensorBase(torch.Tensor): - __torch_function__ = torch._C._disabled_torch_function_impl - - @staticmethod - def __new__(cls, _elem, *_): - return super().__new__(cls, _elem) - - @classmethod - def __torch_dispatch__(cls, func, _, args=(), kwargs=None): - def unwrap(t): - return t._elem if isinstance(t, cls) else t - - return func(*tree_map(unwrap, args), **tree_map(unwrap, kwargs)) - - -@dataclass -class ChunkTensor(ChunkTensorBase): - """Decoded media frames with metadata. - - The instance of this class represents the decoded video/audio frames with - metadata, and the instance itself behave like :py:class:`~torch.Tensor`. - - Client codes can pass instance of this class as-if it's - :py:class:`~torch.Tensor` class, or call the methods defined on - :py:class:`~torch.Tensor` class. - - Example: - >>> # Define input streams - >>> reader = StreamReader(...) - >>> reader.add_audio_stream(frames_per_chunk=4000, sample_rate=8000) - >>> reader.add_video_stream(frames_per_chunk=7, frame_rate=28) - >>> # Decode the streams and fetch frames - >>> reader.fill_buffer() - >>> audio_chunk, video_chunk = reader.pop_chunks() - - >>> # Access metadata - >>> (audio_chunk.pts, video_chunks.pts) - (0.0, 0.0) - >>> - >>> # The second time the PTS is different - >>> reader.fill_buffer() - >>> audio_chunk, video_chunk = reader.pop_chunks() - >>> (audio_chunk.pts, video_chunks.pts) - (0.5, 0.25) - - >>> # Call PyTorch ops on chunk - >>> audio_chunk.shape - torch.Size([4000, 2] - >>> power = torch.pow(video_chunk, 2) - >>> - >>> # the result is a plain torch.Tensor class - >>> type(power) - - >>> - >>> # Metadata is not available on the result - >>> power.pts - AttributeError: 'Tensor' object has no attribute 'pts' - """ - - # Keep it private for now - _elem: torch.Tensor - - pts: float - """Presentation time stamp of the first frame in the chunk. - - Unit: second. - """ - - -def _format_doc(**kwargs): - def decorator(obj): - obj.__doc__ = obj.__doc__.format(**kwargs) - return obj - - return decorator - - -_frames_per_chunk = """Number of frames returned as one chunk. - If the source stream is exhausted before enough frames are buffered, - then the chunk is returned as-is. - - Providing ``-1`` disables chunking and :py:func:`pop_chunks` method - will concatenate all the buffered frames and return it.""" - -_buffer_chunk_size = """Internal buffer size. - When the number of chunks buffered exceeds this number, old frames are - dropped. For example, if ``frames_per_chunk`` is 5 and ``buffer_chunk_size`` is - 3, then frames older than ``15`` are dropped. - Providing ``-1`` disables this behavior. - - Default: ``3``.""" - -_audio_stream_index = """The source audio stream index. - If omitted, :py:attr:`default_audio_stream` is used.""" - - -_video_stream_index = """The source video stream index. - If omitted, :py:attr:`default_video_stream` is used.""" - -_decoder = """The name of the decoder to be used. - When provided, use the specified decoder instead of the default one. - - To list the available decoders, please use - :py:func:`~torchaudio.utils.ffmpeg_utils.get_audio_decoders` for audio, and - :py:func:`~torchaudio.utils.ffmpeg_utils.get_video_decoders` for video. - - Default: ``None``.""" - -_decoder_option = """Options passed to decoder. - Mapping from str to str. (Default: ``None``) - - To list decoder options for a decoder, you can use - ``ffmpeg -h decoder=`` command. - - | - - In addition to decoder-specific options, you can also pass options related - to multithreading. They are effective only if the decoder support them. - If neither of them are provided, StreamReader defaults to single thread. - - ``"threads"``: The number of threads (in str). - Providing the value ``"0"`` will let FFmpeg decides based on its heuristics. - - ``"thread_type"``: Which multithreading method to use. - The valid values are ``"frame"`` or ``"slice"``. - Note that each decoder supports different set of methods. - If not provided, a default value is used. - - - ``"frame"``: Decode more than one frame at once. - Each thread handles one frame. - This will increase decoding delay by one frame per thread - - ``"slice"``: Decode more than one part of a single frame at once. - - | - """ - - -_hw_accel = """Enable hardware acceleration. - - When video is decoded on CUDA hardware, for example - `decoder="h264_cuvid"`, passing CUDA device indicator to `hw_accel` - (i.e. `hw_accel="cuda:0"`) will make StreamReader place the resulting - frames directly on the specified CUDA device as CUDA tensor. - - If `None`, the frame will be moved to CPU memory. - Default: ``None``.""" - - -_format_audio_args = _format_doc( - frames_per_chunk=_frames_per_chunk, - buffer_chunk_size=_buffer_chunk_size, - stream_index=_audio_stream_index, - decoder=_decoder, - decoder_option=_decoder_option, -) - - -_format_video_args = _format_doc( - frames_per_chunk=_frames_per_chunk, - buffer_chunk_size=_buffer_chunk_size, - stream_index=_video_stream_index, - decoder=_decoder, - decoder_option=_decoder_option, - hw_accel=_hw_accel, -) - - -InputStreamTypes = TypeVar("InputStream", bound=SourceStream) -OutputStreamTypes = TypeVar("OutputStream", bound=OutputStream) - - -class StreamReader: - """Fetch and decode audio/video streams chunk by chunk. - - For the detailed usage of this class, please refer to the tutorial. - - Args: - src (str, path-like, bytes or file-like object): The media source. - If string-type, it must be a resource indicator that FFmpeg can - handle. This includes a file path, URL, device identifier or - filter expression. The supported value depends on the FFmpeg found - in the system. - - If bytes, it must be an encoded media data in contiguous memory. - - If file-like object, it must support `read` method with the signature - `read(size: int) -> bytes`. - Additionally, if the file-like object has `seek` method, it uses - the method when parsing media metadata. This improves the reliability - of codec detection. The signagure of `seek` method must be - `seek(offset: int, whence: int) -> int`. - - Please refer to the following for the expected signature and behavior - of `read` and `seek` method. - - - https://docs.python.org/3/library/io.html#io.BufferedIOBase.read - - https://docs.python.org/3/library/io.html#io.IOBase.seek - - format (str or None, optional): - Override the input format, or specify the source sound device. - Default: ``None`` (no override nor device input). - - This argument serves two different usecases. - - 1) Override the source format. - This is useful when the input data do not contain a header. - - 2) Specify the input source device. - This allows to load media stream from hardware devices, - such as microphone, camera and screen, or a virtual device. - - - .. note:: - - This option roughly corresponds to ``-f`` option of ``ffmpeg`` command. - Please refer to the ffmpeg documentations for the possible values. - - https://ffmpeg.org/ffmpeg-formats.html#Demuxers - - Please use :py:func:`~torchaudio.utils.ffmpeg_utils.get_demuxers` to list the - demultiplexers available in the current environment. - - For device access, the available values vary based on hardware (AV device) and - software configuration (ffmpeg build). - - https://ffmpeg.org/ffmpeg-devices.html#Input-Devices - - Please use :py:func:`~torchaudio.utils.ffmpeg_utils.get_input_devices` to list - the input devices available in the current environment. - - option (dict of str to str, optional): - Custom option passed when initializing format context (opening source). - - You can use this argument to change the input source before it is passed to decoder. - - Default: ``None``. - - buffer_size (int): - The internal buffer size in byte. Used only when `src` is file-like object. - - Default: `4096`. - """ - - def __init__( - self, - src: Union[str, Path, BinaryIO], - format: Optional[str] = None, - option: Optional[Dict[str, str]] = None, - buffer_size: int = 4096, - ): - self.src = src - if isinstance(src, bytes): - self._be = ffmpeg_ext.StreamReaderBytes(src, format, option, buffer_size) - elif hasattr(src, "read"): - self._be = ffmpeg_ext.StreamReaderFileObj(src, format, option, buffer_size) - else: - self._be = ffmpeg_ext.StreamReader(os.path.normpath(src), format, option) - - i = self._be.find_best_audio_stream() - self._default_audio_stream = None if i < 0 else i - i = self._be.find_best_video_stream() - self._default_video_stream = None if i < 0 else i - - @property - def num_src_streams(self): - """Number of streams found in the provided media source. - - :type: int - """ - return self._be.num_src_streams() - - @property - def num_out_streams(self): - """Number of output streams configured by client code. - - :type: int - """ - return self._be.num_out_streams() - - @property - def default_audio_stream(self): - """The index of default audio stream. ``None`` if there is no audio stream - - :type: Optional[int] - """ - return self._default_audio_stream - - @property - def default_video_stream(self): - """The index of default video stream. ``None`` if there is no video stream - - :type: Optional[int] - """ - return self._default_video_stream - - def get_metadata(self) -> Dict[str, str]: - """Get the metadata of the source media. - - Returns: - dict - """ - return self._be.get_metadata() - - def get_src_stream_info(self, i: int) -> InputStreamTypes: - """Get the metadata of source stream - - Args: - i (int): Stream index. - Returns: - InputStreamTypes: - Information about the source stream. - If the source stream is audio type, then - :class:`~torchaudio.io._stream_reader.SourceAudioStream` is returned. - If it is video type, then - :class:`~torchaudio.io._stream_reader.SourceVideoStream` is returned. - Otherwise :class:`~torchaudio.io._stream_reader.SourceStream` class is returned. - """ - return _parse_si(self._be.get_src_stream_info(i)) - - def get_out_stream_info(self, i: int) -> OutputStreamTypes: - """Get the metadata of output stream - - Args: - i (int): Stream index. - Returns: - OutputStreamTypes - Information about the output stream. - If the output stream is audio type, then - :class:`~torchaudio.io._stream_reader.OutputAudioStream` is returned. - If it is video type, then - :class:`~torchaudio.io._stream_reader.OutputVideoStream` is returned. - """ - info = self._be.get_out_stream_info(i) - return _parse_oi(info) - - def seek(self, timestamp: float, mode: str = "precise"): - """Seek the stream to the given timestamp [second] - - Args: - timestamp (float): Target time in second. - mode (str): Controls how seek is done. - Valid choices are; - - * "key": Seek into the nearest key frame before the given timestamp. - * "any": Seek into any frame (including non-key frames) before the given timestamp. - * "precise": First seek into the nearest key frame before the given timestamp, then - decode frames until it reaches the closes frame to the given timestamp. - - Note: - All the modes invalidate and reset the internal state of decoder. - When using "any" mode and if it ends up seeking into non-key frame, - the image decoded may be invalid due to lack of key frame. - Using "precise" will workaround this issue by decoding frames from previous - key frame, but will be slower. - """ - modes = { - "key": 0, - "any": 1, - "precise": 2, - } - if mode not in modes: - raise ValueError(f"The value of mode must be one of {list(modes.keys())}. Found: {mode}") - self._be.seek(timestamp, modes[mode]) - - @_format_audio_args - def add_basic_audio_stream( - self, - frames_per_chunk: int, - buffer_chunk_size: int = 3, - *, - stream_index: Optional[int] = None, - decoder: Optional[str] = None, - decoder_option: Optional[Dict[str, str]] = None, - format: Optional[str] = "fltp", - sample_rate: Optional[int] = None, - num_channels: Optional[int] = None, - ): - """Add output audio stream - - Args: - frames_per_chunk (int): {frames_per_chunk} - - buffer_chunk_size (int, optional): {buffer_chunk_size} - - stream_index (int or None, optional): {stream_index} - - decoder (str or None, optional): {decoder} - - decoder_option (dict or None, optional): {decoder_option} - - format (str, optional): Output sample format (precision). - - If ``None``, the output chunk has dtype corresponding to - the precision of the source audio. - - Otherwise, the sample is converted and the output dtype is changed - as following. - - - ``"u8p"``: The output is ``torch.uint8`` type. - - ``"s16p"``: The output is ``torch.int16`` type. - - ``"s32p"``: The output is ``torch.int32`` type. - - ``"s64p"``: The output is ``torch.int64`` type. - - ``"fltp"``: The output is ``torch.float32`` type. - - ``"dblp"``: The output is ``torch.float64`` type. - - Default: ``"fltp"``. - - sample_rate (int or None, optional): If provided, resample the audio. - - num_channels (int, or None, optional): If provided, change the number of channels. - """ - self.add_audio_stream( - frames_per_chunk, - buffer_chunk_size, - stream_index=stream_index, - decoder=decoder, - decoder_option=decoder_option, - filter_desc=_get_afilter_desc(sample_rate, format, num_channels), - ) - - @_format_video_args - def add_basic_video_stream( - self, - frames_per_chunk: int, - buffer_chunk_size: int = 3, - *, - stream_index: Optional[int] = None, - decoder: Optional[str] = None, - decoder_option: Optional[Dict[str, str]] = None, - format: Optional[str] = "rgb24", - frame_rate: Optional[int] = None, - width: Optional[int] = None, - height: Optional[int] = None, - hw_accel: Optional[str] = None, - ): - """Add output video stream - - Args: - frames_per_chunk (int): {frames_per_chunk} - - buffer_chunk_size (int, optional): {buffer_chunk_size} - - stream_index (int or None, optional): {stream_index} - - decoder (str or None, optional): {decoder} - - decoder_option (dict or None, optional): {decoder_option} - - format (str, optional): Change the format of image channels. Valid values are, - - - ``"rgb24"``: 8 bits * 3 channels (R, G, B) - - ``"bgr24"``: 8 bits * 3 channels (B, G, R) - - ``"yuv420p"``: 8 bits * 3 channels (Y, U, V) - - ``"gray"``: 8 bits * 1 channels - - Default: ``"rgb24"``. - - frame_rate (int or None, optional): If provided, change the frame rate. - - width (int or None, optional): If provided, change the image width. Unit: Pixel. - - height (int or None, optional): If provided, change the image height. Unit: Pixel. - - hw_accel (str or None, optional): {hw_accel} - """ - self.add_video_stream( - frames_per_chunk, - buffer_chunk_size, - stream_index=stream_index, - decoder=decoder, - decoder_option=decoder_option, - filter_desc=_get_vfilter_desc(frame_rate, width, height, format), - hw_accel=hw_accel, - ) - - @_format_audio_args - def add_audio_stream( - self, - frames_per_chunk: int, - buffer_chunk_size: int = 3, - *, - stream_index: Optional[int] = None, - decoder: Optional[str] = None, - decoder_option: Optional[Dict[str, str]] = None, - filter_desc: Optional[str] = None, - ): - """Add output audio stream - - Args: - frames_per_chunk (int): {frames_per_chunk} - - buffer_chunk_size (int, optional): {buffer_chunk_size} - - stream_index (int or None, optional): {stream_index} - - decoder (str or None, optional): {decoder} - - decoder_option (dict or None, optional): {decoder_option} - - filter_desc (str or None, optional): Filter description. - The list of available filters can be found at - https://ffmpeg.org/ffmpeg-filters.html - Note that complex filters are not supported. - - """ - i = self.default_audio_stream if stream_index is None else stream_index - if i is None: - raise RuntimeError("There is no audio stream.") - self._be.add_audio_stream( - i, - frames_per_chunk, - buffer_chunk_size, - filter_desc, - decoder, - decoder_option or {}, - ) - - @_format_video_args - def add_video_stream( - self, - frames_per_chunk: int, - buffer_chunk_size: int = 3, - *, - stream_index: Optional[int] = None, - decoder: Optional[str] = None, - decoder_option: Optional[Dict[str, str]] = None, - filter_desc: Optional[str] = None, - hw_accel: Optional[str] = None, - ): - """Add output video stream - - Args: - frames_per_chunk (int): {frames_per_chunk} - - buffer_chunk_size (int, optional): {buffer_chunk_size} - - stream_index (int or None, optional): {stream_index} - - decoder (str or None, optional): {decoder} - - decoder_option (dict or None, optional): {decoder_option} - - hw_accel (str or None, optional): {hw_accel} - - filter_desc (str or None, optional): Filter description. - The list of available filters can be found at - https://ffmpeg.org/ffmpeg-filters.html - Note that complex filters are not supported. - """ - i = self.default_video_stream if stream_index is None else stream_index - if i is None: - raise RuntimeError("There is no video stream.") - self._be.add_video_stream( - i, - frames_per_chunk, - buffer_chunk_size, - filter_desc, - decoder, - decoder_option or {}, - hw_accel, - ) - - def remove_stream(self, i: int): - """Remove an output stream. - - Args: - i (int): Index of the output stream to be removed. - """ - self._be.remove_stream(i) - - def process_packet(self, timeout: Optional[float] = None, backoff: float = 10.0) -> int: - """Read the source media and process one packet. - - If a packet is read successfully, then the data in the packet will - be decoded and passed to corresponding output stream processors. - - If the packet belongs to a source stream that is not connected to - an output stream, then the data are discarded. - - When the source reaches EOF, then it triggers all the output stream - processors to enter drain mode. All the output stream processors - flush the pending frames. - - Args: - timeout (float or None, optional): Timeout in milli seconds. - - This argument changes the retry behavior when it failed to - process a packet due to the underlying media resource being - temporarily unavailable. - - When using a media device such as a microphone, there are cases - where the underlying buffer is not ready. - Calling this function in such case would cause the system to report - `EAGAIN (resource temporarily unavailable)`. - - * ``>=0``: Keep retrying until the given time passes. - - * ``0<``: Keep retrying forever. - - * ``None`` : No retrying and raise an exception immediately. - - Default: ``None``. - - Note: - - The retry behavior is applicable only when the reason is the - unavailable resource. It is not invoked if the reason of failure is - other. - - backoff (float, optional): Time to wait before retrying in milli seconds. - - This option is effective only when `timeout` is effective. (not ``None``) - - When `timeout` is effective, this `backoff` controls how long the function - should wait before retrying. Default: ``10.0``. - - Returns: - int: - ``0`` - A packet was processed properly. The caller can keep - calling this function to buffer more frames. - - ``1`` - The streamer reached EOF. All the output stream processors - flushed the pending frames. The caller should stop calling - this method. - """ - return self._be.process_packet(timeout, backoff) - - def process_all_packets(self): - """Process packets until it reaches EOF.""" - self._be.process_all_packets() - - def is_buffer_ready(self) -> bool: - """Returns true if all the output streams have at least one chunk filled.""" - return self._be.is_buffer_ready() - - def pop_chunks(self) -> Tuple[Optional[ChunkTensor]]: - """Pop one chunk from all the output stream buffers. - - Returns: - Tuple[Optional[ChunkTensor]]: - Buffer contents. - If a buffer does not contain any frame, then `None` is returned instead. - """ - ret = [] - for chunk in self._be.pop_chunks(): - if chunk is None: - ret.append(None) - else: - ret.append(ChunkTensor(chunk.frames, chunk.pts)) - return ret - - def fill_buffer(self, timeout: Optional[float] = None, backoff: float = 10.0) -> int: - """Keep processing packets until all buffers have at least one chunk - - Arguments: - timeout (float or None, optional): See - :py:func:`~StreamReader.process_packet`. (Default: ``None``) - - backoff (float, optional): See - :py:func:`~StreamReader.process_packet`. (Default: ``10.0``) - - Returns: - int: - ``0`` - Packets are processed properly and buffers are - ready to be popped once. - - ``1`` - The streamer reached EOF. All the output stream processors - flushed the pending frames. The caller should stop calling - this method. - """ - return self._be.fill_buffer(timeout, backoff) - - def stream( - self, timeout: Optional[float] = None, backoff: float = 10.0 - ) -> Iterator[Tuple[Optional[ChunkTensor], ...]]: - """Return an iterator that generates output tensors - - Arguments: - timeout (float or None, optional): See - :py:func:`~StreamReader.process_packet`. (Default: ``None``) - - backoff (float, optional): See - :py:func:`~StreamReader.process_packet`. (Default: ``10.0``) - - Returns: - Iterator[Tuple[Optional[ChunkTensor], ...]]: - Iterator that yields a tuple of chunks that correspond to the output - streams defined by client code. - If an output stream is exhausted, then the chunk Tensor is substituted - with ``None``. - The iterator stops if all the output streams are exhausted. - """ - if self.num_out_streams == 0: - raise RuntimeError("No output stream is configured.") - - while True: - if self.fill_buffer(timeout, backoff): - break - yield self.pop_chunks() - - while True: - chunks = self.pop_chunks() - if all(c is None for c in chunks): - return - yield chunks diff --git a/src/torchaudio/io/_stream_writer.py b/src/torchaudio/io/_stream_writer.py deleted file mode 100644 index 2f67b25a3e..0000000000 --- a/src/torchaudio/io/_stream_writer.py +++ /dev/null @@ -1,502 +0,0 @@ -from dataclasses import dataclass -from pathlib import Path -from typing import BinaryIO, Dict, Optional, Union - -import torch -import torchaudio - -ffmpeg_ext = torchaudio._extension.lazy_import_ffmpeg_ext() - - -@dataclass -class CodecConfig: - """Codec configuration.""" - - bit_rate: int = -1 - """Bit rate""" - - compression_level: int = -1 - """Compression level""" - - qscale: Optional[int] = None - """Global quality factor. Enables variable bit rate. Valid values depend on encoder. - - For example: MP3 takes ``0`` - ``9`` (https://trac.ffmpeg.org/wiki/Encode/MP3) while - libvorbis takes ``-1`` - ``10``. - """ - - gop_size: int = -1 - """The number of pictures in a group of pictures, or 0 for intra_only""" - - max_b_frames: int = -1 - """maximum number of B-frames between non-B-frames.""" - - -def _convert_config(cfg: CodecConfig): - if cfg is None: - return None - # Convert the codecconfig to C++ compatible type. - # omitting the return type annotation so as not to access ffmpeg_ext here. - return ffmpeg_ext.CodecConfig( - cfg.bit_rate, - cfg.compression_level, - cfg.qscale, - cfg.gop_size, - cfg.max_b_frames, - ) - - -def _format_doc(**kwargs): - def decorator(obj): - obj.__doc__ = obj.__doc__.format(**kwargs) - return obj - - return decorator - - -_encoder = """The name of the encoder to be used. - When provided, use the specified encoder instead of the default one. - - To list the available encoders, please use - :py:func:`~torchaudio.utils.ffmpeg_utils.get_audio_encoders` for audio, and - :py:func:`~torchaudio.utils.ffmpeg_utils.get_video_encoders` for video. - - Default: ``None``.""" - - -_encoder_option = """Options passed to encoder. - Mapping from str to str. - - To list encoder options for a encoder, you can use - ``ffmpeg -h encoder=`` command. - - Default: ``None``. - - | - - In addition to encoder-specific options, you can also pass options related - to multithreading. They are effective only if the encoder support them. - If neither of them are provided, StreamReader defaults to single thread. - - ``"threads"``: The number of threads (in str). - Providing the value ``"0"`` will let FFmpeg decides based on its heuristics. - - ``"thread_type"``: Which multithreading method to use. - The valid values are ``"frame"`` or ``"slice"``. - Note that each encoder supports different set of methods. - If not provided, a default value is used. - - - ``"frame"``: Encode more than one frame at once. - Each thread handles one frame. - This will increase decoding delay by one frame per thread - - ``"slice"``: Encode more than one part of a single frame at once. - - | - """ - - -_encoder_format = """Format used to encode media. - When encoder supports multiple formats, passing this argument will override - the format used for encoding. - - To list supported formats for the encoder, you can use - ``ffmpeg -h encoder=`` command. - - Default: ``None``. - - Note: - When ``encoder_format`` option is not provided, encoder uses its default format. - - For example, when encoding audio into wav format, 16-bit signed integer is used, - and when encoding video into mp4 format (h264 encoder), one of YUV format is used. - - This is because typically, 32-bit or 16-bit floating point is used in audio models but - they are not commonly used in audio formats. Similarly, RGB24 is commonly used in vision - models, but video formats usually (and better) support YUV formats. - """ - -_codec_config = """Codec configuration. Please refer to :py:class:`CodecConfig` for - configuration options. - - Default: ``None``.""" - - -_filter_desc = """Additional processing to apply before encoding the input media. - """ - -_format_common_args = _format_doc( - encoder=_encoder, - encoder_option=_encoder_option, - encoder_format=_encoder_format, - codec_config=_codec_config, - filter_desc=_filter_desc, -) - - -class StreamWriter: - """Encode and write audio/video streams chunk by chunk - - Args: - dst (str, path-like or file-like object): The destination where the encoded data are written. - If string-type, it must be a resource indicator that FFmpeg can - handle. The supported value depends on the FFmpeg found in the system. - - If file-like object, it must support `write` method with the signature - `write(data: bytes) -> int`. - - Please refer to the following for the expected signature and behavior of - `write` method. - - - https://docs.python.org/3/library/io.html#io.BufferedIOBase.write - - format (str or None, optional): - Override the output format, or specify the output media device. - Default: ``None`` (no override nor device output). - - This argument serves two different use cases. - - 1) Override the output format. - This is useful when writing raw data or in a format different from the extension. - - 2) Specify the output device. - This allows to output media streams to hardware devices, - such as speaker and video screen. - - .. note:: - - This option roughly corresponds to ``-f`` option of ``ffmpeg`` command. - Please refer to the ffmpeg documentations for possible values. - - https://ffmpeg.org/ffmpeg-formats.html#Muxers - - Please use :py:func:`~torchaudio.utils.ffmpeg_utils.get_muxers` to list the - multiplexers available in the current environment. - - For device access, the available values vary based on hardware (AV device) and - software configuration (ffmpeg build). - Please refer to the ffmpeg documentations for possible values. - - https://ffmpeg.org/ffmpeg-devices.html#Output-Devices - - Please use :py:func:`~torchaudio.utils.ffmpeg_utils.get_output_devices` to list - the output devices available in the current environment. - - buffer_size (int): - The internal buffer size in byte. Used only when `dst` is a file-like object. - - Default: `4096`. - """ - - def __init__( - self, - dst: Union[str, Path, BinaryIO], - format: Optional[str] = None, - buffer_size: int = 4096, - ): - if hasattr(dst, "write"): - self._s = ffmpeg_ext.StreamWriterFileObj(dst, format, buffer_size) - else: - self._s = ffmpeg_ext.StreamWriter(str(dst), format) - self._is_open = False - - @_format_common_args - def add_audio_stream( - self, - sample_rate: int, - num_channels: int, - format: str = "flt", - *, - encoder: Optional[str] = None, - encoder_option: Optional[Dict[str, str]] = None, - encoder_sample_rate: Optional[int] = None, - encoder_num_channels: Optional[int] = None, - encoder_format: Optional[str] = None, - codec_config: Optional[CodecConfig] = None, - filter_desc: Optional[str] = None, - ): - """Add an output audio stream. - - Args: - sample_rate (int): The sample rate. - - num_channels (int): The number of channels. - - format (str, optional): Input sample format, which determines the dtype - of the input tensor. - - - ``"u8"``: The input tensor must be ``torch.uint8`` type. - - ``"s16"``: The input tensor must be ``torch.int16`` type. - - ``"s32"``: The input tensor must be ``torch.int32`` type. - - ``"s64"``: The input tensor must be ``torch.int64`` type. - - ``"flt"``: The input tensor must be ``torch.float32`` type. - - ``"dbl"``: The input tensor must be ``torch.float64`` type. - - Default: ``"flt"``. - - encoder (str or None, optional): {encoder} - - encoder_option (dict or None, optional): {encoder_option} - - encoder_sample_rate (int or None, optional): Override the sample rate used for encoding time. - Some encoders pose restriction on the sample rate used for encoding. - If the source sample rate is not supported by the encoder, the source sample rate is used, - otherwise a default one is picked. - - For example, ``"opus"`` encoder only supports 48k Hz, so, when encoding a - waveform with ``"opus"`` encoder, it is always encoded as 48k Hz. - Meanwhile ``"mp3"`` (``"libmp3lame"``) supports 44.1k, 48k, 32k, 22.05k, - 24k, 16k, 11.025k, 12k and 8k Hz. - If the original sample rate is one of these, then the original sample rate - is used, otherwise it will be resampled to a default one (44.1k). - When encoding into WAV format, there is no restriction on sample rate, - so the original sample rate will be used. - - Providing ``encoder_sample_rate`` will override this behavior and - make encoder attempt to use the provided sample rate. - The provided value must be one support by the encoder. - - encoder_num_channels (int or None, optional): Override the number of channels used for encoding. - - Similar to sample rate, some encoders (such as ``"opus"``, - ``"vorbis"`` and ``"g722"``) pose restriction on - the numbe of channels that can be used for encoding. - - If the original number of channels is supported by encoder, - then it will be used, otherwise, the encoder attempts to - remix the channel to one of the supported ones. - - Providing ``encoder_num_channels`` will override this behavior and - make encoder attempt to use the provided number of channels. - The provided value must be one support by the encoder. - - encoder_format (str or None, optional): {encoder_format} - - codec_config (CodecConfig or None, optional): {codec_config} - - filter_desc (str or None, optional): {filter_desc} - """ - self._s.add_audio_stream( - sample_rate, - num_channels, - format, - encoder, - encoder_option, - encoder_format, - encoder_sample_rate, - encoder_num_channels, - _convert_config(codec_config), - filter_desc, - ) - - @_format_common_args - def add_video_stream( - self, - frame_rate: float, - width: int, - height: int, - format: str = "rgb24", - *, - encoder: Optional[str] = None, - encoder_option: Optional[Dict[str, str]] = None, - encoder_frame_rate: Optional[float] = None, - encoder_width: Optional[int] = None, - encoder_height: Optional[int] = None, - encoder_format: Optional[str] = None, - codec_config: Optional[CodecConfig] = None, - filter_desc: Optional[str] = None, - hw_accel: Optional[str] = None, - ): - """Add an output video stream. - - This method has to be called before `open` is called. - - Args: - frame_rate (float): Frame rate of the video. - - width (int): Width of the video frame. - - height (int): Height of the video frame. - - format (str, optional): Input pixel format, which determines the - color channel order of the input tensor. - - - ``"gray8"``: One channel, grayscale. - - ``"rgb24"``: Three channels in the order of RGB. - - ``"bgr24"``: Three channels in the order of BGR. - - ``"yuv444p"``: Three channels in the order of YUV. - - Default: ``"rgb24"``. - - In either case, the input tensor has to be ``torch.uint8`` type and - the shape must be (frame, channel, height, width). - - encoder (str or None, optional): {encoder} - - encoder_option (dict or None, optional): {encoder_option} - - encoder_frame_rate (float or None, optional): Override the frame rate used for encoding. - - Some encoders, (such as ``"mpeg1"`` and ``"mpeg2"``) pose restriction on the - frame rate that can be used for encoding. - If such case, if the source frame rate (provided as ``frame_rate``) is not - one of the supported frame rate, then a default one is picked, and the frame rate - is changed on-the-fly. Otherwise the source frame rate is used. - - Providing ``encoder_frame_rate`` will override this behavior and - make encoder attempts to use the provided sample rate. - The provided value must be one support by the encoder. - - encoder_width (int or None, optional): Width of the image used for encoding. - This allows to change the image size during encoding. - - encoder_height (int or None, optional): Height of the image used for encoding. - This allows to change the image size during encoding. - - encoder_format (str or None, optional): {encoder_format} - - codec_config (CodecConfig or None, optional): {codec_config} - - filter_desc (str or None, optional): {filter_desc} - - hw_accel (str or None, optional): Enable hardware acceleration. - - When video is encoded on CUDA hardware, for example - `encoder="h264_nvenc"`, passing CUDA device indicator to `hw_accel` - (i.e. `hw_accel="cuda:0"`) will make StreamWriter expect video - chunk to be CUDA Tensor. Passing CPU Tensor will result in an error. - - If `None`, the video chunk Tensor has to be CPU Tensor. - Default: ``None``. - """ - self._s.add_video_stream( - frame_rate, - width, - height, - format, - encoder, - encoder_option, - encoder_format, - encoder_frame_rate, - encoder_width, - encoder_height, - hw_accel, - _convert_config(codec_config), - filter_desc, - ) - - def set_metadata(self, metadata: Dict[str, str]): - """Set file-level metadata - - Args: - metadata (dict or None, optional): File-level metadata. - """ - self._s.set_metadata(metadata) - - def _print_output_stream(self, i: int): - """[debug] Print the registered stream information to stdout.""" - self._s.dump_format(i) - - def open(self, option: Optional[Dict[str, str]] = None) -> "StreamWriter": - """Open the output file / device and write the header. - - :py:class:`StreamWriter` is also a context manager and therefore supports the - ``with`` statement. - This method returns the instance on which the method is called (i.e. `self`), - so that it can be used in `with` statement. - It is recommended to use context manager, as the file is closed automatically - when exiting from ``with`` clause. - - Args: - option (dict or None, optional): Private options for protocol, device and muxer. See example. - - Example - Protocol option - >>> s = StreamWriter(dst="rtmp://localhost:1234/live/app", format="flv") - >>> s.add_video_stream(...) - >>> # Passing protocol option `listen=1` makes StreamWriter act as RTMP server. - >>> with s.open(option={"listen": "1"}) as f: - >>> f.write_video_chunk(...) - - Example - Device option - >>> s = StreamWriter("-", format="sdl") - >>> s.add_video_stream(..., encoder_format="rgb24") - >>> # Open SDL video player with fullscreen - >>> with s.open(option={"window_fullscreen": "1"}): - >>> f.write_video_chunk(...) - - Example - Muxer option - >>> s = StreamWriter("foo.flac") - >>> s.add_audio_stream(...) - >>> s.set_metadata({"artist": "torchaudio contributors"}) - >>> # FLAC muxer has a private option to not write the header. - >>> # The resulting file does not contain the above metadata. - >>> with s.open(option={"write_header": "false"}) as f: - >>> f.write_audio_chunk(...) - """ - if not self._is_open: - self._s.open(option) - self._is_open = True - return self - - def close(self): - """Close the output - - :py:class:`StreamWriter` is also a context manager and therefore supports the - ``with`` statement. - It is recommended to use context manager, as the file is closed automatically - when exiting from ``with`` clause. - - See :py:meth:`StreamWriter.open` for more detail. - """ - if self._is_open: - self._s.close() - self._is_open = False - - def write_audio_chunk(self, i: int, chunk: torch.Tensor, pts: Optional[float] = None): - """Write audio data - - Args: - i (int): Stream index. - chunk (Tensor): Waveform tensor. Shape: `(frame, channel)`. - The ``dtype`` must match what was passed to :py:meth:`add_audio_stream` method. - pts (float, optional, or None): If provided, overwrite the presentation timestamp. - - .. note:: - - The provided value is converted to integer value expressed in basis of - sample rate. Therefore, it is truncated to the nearest value of - ``n / sample_rate``. - """ - self._s.write_audio_chunk(i, chunk, pts) - - def write_video_chunk(self, i: int, chunk: torch.Tensor, pts: Optional[float] = None): - """Write video/image data - - Args: - i (int): Stream index. - chunk (Tensor): Video/image tensor. - Shape: `(time, channel, height, width)`. - The ``dtype`` must be ``torch.uint8``. - The shape (height, width and the number of channels) must match - what was configured when calling :py:meth:`add_video_stream` - pts (float, optional or None): If provided, overwrite the presentation timestamp. - - .. note:: - - The provided value is converted to integer value expressed in basis of - frame rate. Therefore, it is truncated to the nearest value of - ``n / frame_rate``. - """ - self._s.write_video_chunk(i, chunk, pts) - - def flush(self): - """Flush the frames from encoders and write the frames to the destination.""" - self._s.flush() - - def __enter__(self): - """Context manager so that the destination is closed and data are flushed automatically.""" - return self - - def __exit__(self, exception_type, exception_value, traceback): - """Context manager so that the destination is closed and data are flushed automatically.""" - self.flush() - self.close() diff --git a/src/torchaudio/utils/ffmpeg_utils.py b/src/torchaudio/utils/ffmpeg_utils.py index e17b27e582..385596edc1 100644 --- a/src/torchaudio/utils/ffmpeg_utils.py +++ b/src/torchaudio/utils/ffmpeg_utils.py @@ -1,3 +1,10 @@ +"""Module to change the configuration of FFmpeg libraries (such as libavformat). + +It affects functionalities in :py:mod:`torchaudio.io` (and indirectly :py:func:`torchaudio.load`). +""" + + +# This file is just for BC. def __getattr__(item): from torio.utils import ffmpeg_utils diff --git a/src/torio/utils/ffmpeg_utils.py b/src/torio/utils/ffmpeg_utils.py index 8b4106f84e..06f20bd251 100644 --- a/src/torio/utils/ffmpeg_utils.py +++ b/src/torio/utils/ffmpeg_utils.py @@ -1,4 +1,6 @@ """Module to change the configuration of FFmpeg libraries (such as libavformat). + +It affects functionalities in :py:mod:`torio.io`. """ from typing import Dict, List, Tuple