diff --git a/test/torchaudio_unittest/io/stream_reader_test.py b/test/torchaudio_unittest/io/stream_reader_test.py index 852778bf8fe..b006bd48268 100644 --- a/test/torchaudio_unittest/io/stream_reader_test.py +++ b/test/torchaudio_unittest/io/stream_reader_test.py @@ -27,7 +27,7 @@ _TEST_FILEOBJ = "src_is_fileobj" def _class_name(cls, _, params): - return f'{cls.__name__}{"_fileobj" if params[_TEST_FILEOBJ] else ""}' + return f'{cls.__name__}{"_fileobj" if params[_TEST_FILEOBJ] else "_path"}' _media_source = parameterized_class( @@ -42,12 +42,16 @@ def setUp(self): super().setUp() self.src = None + @property + def test_fileobj(self): + return getattr(self, _TEST_FILEOBJ) + def get_video_asset(self, file="nasa_13013.mp4"): if self.src is not None: raise ValueError("get_video_asset can be called only once.") path = get_asset_path(file) - if getattr(self, _TEST_FILEOBJ): + if self.test_fileobj: self.src = open(path, "rb") return self.src return path @@ -145,8 +149,8 @@ def test_src_info(self): bit_rate=None, ), ] - for i, exp in enumerate(expected): - assert exp == s.get_src_stream_info(i) + output = [s.get_src_stream_info(i) for i in range(6)] + assert expected == output def test_src_info_invalid_index(self): """`get_src_stream_info` does not segfault but raise an exception when input is invalid""" @@ -326,7 +330,7 @@ def test_stream_smoke_test(self): w, h = 256, 198 s = StreamReader(self.get_video_asset()) s.add_basic_audio_stream(frames_per_chunk=2000, sample_rate=8000) - s.add_basic_video_stream(frames_per_chunk=15, frame_rate=60, width=w, height=h) + s.add_basic_video_stream(frames_per_chunk=15, frame_rate=60, width=w, height=h, format="YUV" if self.test_fileobj else None) for i, (achunk, vchunk) in enumerate(s.stream()): assert achunk.shape == torch.Size([2000, 2]) assert vchunk.shape == torch.Size([15, 3, h, w]) diff --git a/tools/setup_helpers/extension.py b/tools/setup_helpers/extension.py index ad9529d96f8..ed038e22172 100644 --- a/tools/setup_helpers/extension.py +++ b/tools/setup_helpers/extension.py @@ -57,7 +57,12 @@ def get_ext_modules(): ] ) if _USE_FFMPEG: - modules.append(Extension(name="torchaudio.lib.libtorchaudio_ffmpeg", sources=[])) + modules.extend( + [ + Extension(name="torchaudio.lib.libtorchaudio_ffmpeg", sources=[]), + Extension(name="torchaudio._torchaudio_ffmpeg", sources=[]), + ] + ) return modules diff --git a/torchaudio/csrc/CMakeLists.txt b/torchaudio/csrc/CMakeLists.txt index 0b2a0ad33c6..96c533d0cee 100644 --- a/torchaudio/csrc/CMakeLists.txt +++ b/torchaudio/csrc/CMakeLists.txt @@ -204,11 +204,11 @@ if (BUILD_TORCHAUDIO_PYTHON_EXTENSION) find_package(Python3 ${PYTHON_VERSION} EXACT COMPONENTS Development) set(ADDITIONAL_ITEMS Python3::Python) endif() - function(define_extension name sources libraries definitions) + function(define_extension name sources include_dirs libraries definitions) add_library(${name} SHARED ${sources}) target_compile_definitions(${name} PRIVATE "${definitions}") target_include_directories( - ${name} PRIVATE ${PROJECT_SOURCE_DIR} ${Python_INCLUDE_DIR}) + ${name} PRIVATE ${PROJECT_SOURCE_DIR} ${Python_INCLUDE_DIR} ${include_dirs}) target_link_libraries( ${name} ${libraries} @@ -254,6 +254,7 @@ if (BUILD_TORCHAUDIO_PYTHON_EXTENSION) define_extension( _torchaudio "${EXTENSION_SOURCES}" + "" libtorchaudio "${LIBTORCHAUDIO_COMPILE_DEFINITIONS}" ) @@ -265,8 +266,23 @@ if (BUILD_TORCHAUDIO_PYTHON_EXTENSION) define_extension( _torchaudio_decoder "${DECODER_EXTENSION_SOURCES}" + "" "libtorchaudio_decoder" "${LIBTORCHAUDIO_DECODER_DEFINITIONS}" ) endif() + if(USE_FFMPEG) + set( + FFMPEG_EXTENSION_SOURCES + ffmpeg/pybind/pybind.cpp + ffmpeg/pybind/stream_reader.cpp + ) + define_extension( + _torchaudio_ffmpeg + "${FFMPEG_EXTENSION_SOURCES}" + "${FFMPEG_INCLUDE_DIRS}" + "libtorchaudio_ffmpeg" + "${LIBTORCHAUDIO_DECODER_DEFINITIONS}" + ) + endif() endif() diff --git a/torchaudio/csrc/ffmpeg/ffmpeg.cpp b/torchaudio/csrc/ffmpeg/ffmpeg.cpp index f2a98112c89..7e038013802 100644 --- a/torchaudio/csrc/ffmpeg/ffmpeg.cpp +++ b/torchaudio/csrc/ffmpeg/ffmpeg.cpp @@ -68,8 +68,15 @@ std::string join(std::vector vars) { AVFormatContextPtr get_input_format_context( const std::string& src, const c10::optional& device, - const c10::optional& option) { - AVFormatContext* pFormat = NULL; + const c10::optional& option, + AVIOContext* io_ctx) { + AVFormatContext* pFormat = avformat_alloc_context(); + if (!pFormat) { + throw std::runtime_error("Failed to allocate AVFormatContext."); + } + if (io_ctx) { + pFormat->pb = io_ctx; + } AVINPUT_FORMAT_CONST AVInputFormat* pInput = [&]() -> AVInputFormat* { if (device.has_value()) { @@ -105,6 +112,17 @@ AVFormatContextPtr get_input_format_context( AVFormatContextPtr::AVFormatContextPtr(AVFormatContext* p) : Wrapper(p) {} +//////////////////////////////////////////////////////////////////////////////// +// AVIO +//////////////////////////////////////////////////////////////////////////////// +void AVIOContextDeleter::operator()(AVIOContext* p) { + av_freep(&p->buffer); + av_freep(&p); +}; + +AVIOContextPtr::AVIOContextPtr(AVIOContext* p) + : Wrapper(p) {} + //////////////////////////////////////////////////////////////////////////////// // AVPacket //////////////////////////////////////////////////////////////////////////////// diff --git a/torchaudio/csrc/ffmpeg/ffmpeg.h b/torchaudio/csrc/ffmpeg/ffmpeg.h index 1e77b0bc04f..40bc154ed09 100644 --- a/torchaudio/csrc/ffmpeg/ffmpeg.h +++ b/torchaudio/csrc/ffmpeg/ffmpeg.h @@ -13,6 +13,7 @@ extern "C" { #include #include #include +#include #include #include #include @@ -74,7 +75,19 @@ struct AVFormatContextPtr AVFormatContextPtr get_input_format_context( const std::string& src, const c10::optional& device, - const c10::optional& option); + const c10::optional& option, + AVIOContext* io_ctx = nullptr); + +//////////////////////////////////////////////////////////////////////////////// +// AVIO +//////////////////////////////////////////////////////////////////////////////// +struct AVIOContextDeleter { + void operator()(AVIOContext* p); +}; + +struct AVIOContextPtr : public Wrapper { + explicit AVIOContextPtr(AVIOContext* p); +}; //////////////////////////////////////////////////////////////////////////////// // AVPacket diff --git a/torchaudio/csrc/ffmpeg/prototype.cpp b/torchaudio/csrc/ffmpeg/prototype.cpp index 22fddd0992e..c8bfe2d2acd 100644 --- a/torchaudio/csrc/ffmpeg/prototype.cpp +++ b/torchaudio/csrc/ffmpeg/prototype.cpp @@ -20,7 +20,7 @@ std::tuple, int64_t> load(const std::string& src) { int i = s.find_best_audio_stream(); auto sinfo = s.Streamer::get_src_stream_info(i); int64_t sample_rate = static_cast(sinfo.sample_rate); - s.add_audio_stream(i, -1, -1, {}, {}, {}); + s.add_audio_stream(i, -1, -1, {}, {}, {}, {}); s.process_all_packets(); auto tensors = s.pop_chunks(); return std::make_tuple<>(tensors[0], sample_rate); @@ -66,14 +66,16 @@ TORCH_LIBRARY_FRAGMENT(torchaudio, m) { int64_t num_chunks, const c10::optional& filter_desc, const c10::optional& decoder, - const c10::optional& decoder_options) { + const c10::optional& decoder_options, + const c10::optional& src_format) { s->add_audio_stream( i, frames_per_chunk, num_chunks, filter_desc, decoder, - decoder_options); + decoder_options, + src_format); }); m.def( "torchaudio::ffmpeg_streamer_add_video_stream", @@ -84,6 +86,7 @@ TORCH_LIBRARY_FRAGMENT(torchaudio, m) { const c10::optional& filter_desc, const c10::optional& decoder, const c10::optional& decoder_options, + const c10::optional& src_format, const c10::optional& hw_accel) { s->add_video_stream( i, @@ -92,6 +95,7 @@ TORCH_LIBRARY_FRAGMENT(torchaudio, m) { filter_desc, decoder, decoder_options, + src_format, hw_accel); }); m.def("torchaudio::ffmpeg_streamer_remove_stream", [](S s, int64_t i) { diff --git a/torchaudio/csrc/ffmpeg/pybind/pybind.cpp b/torchaudio/csrc/ffmpeg/pybind/pybind.cpp new file mode 100644 index 00000000000..d751ff0fdbd --- /dev/null +++ b/torchaudio/csrc/ffmpeg/pybind/pybind.cpp @@ -0,0 +1,35 @@ +#include +#include +#include + +namespace torchaudio { +namespace ffmpeg { +namespace { + +PYBIND11_MODULE(_torchaudio_ffmpeg, m) { + py::class_>( + m, "StreamReaderFileObj") + .def(py::init()) + .def("num_src_streams", &StreamReaderFileObj::num_src_streams) + .def("num_out_streams", &StreamReaderFileObj::num_out_streams) + .def( + "find_best_audio_stream", + &StreamReaderFileObj::find_best_audio_stream) + .def( + "find_best_video_stream", + &StreamReaderFileObj::find_best_video_stream) + .def("get_src_stream_info", &StreamReaderFileObj::get_src_stream_info) + .def("get_out_stream_info", &StreamReaderFileObj::get_out_stream_info) + .def("seek", &StreamReaderFileObj::seek) + .def("add_audio_stream", &StreamReaderFileObj::add_audio_stream) + .def("add_video_stream", &StreamReaderFileObj::add_video_stream) + .def("remove_stream", &StreamReaderFileObj::remove_stream) + .def("process_packet", &StreamReaderFileObj::process_packet) + .def("process_all_packets", &StreamReaderFileObj::process_all_packets) + .def("is_buffer_ready", &StreamReaderFileObj::is_buffer_ready) + .def("pop_chunks", &StreamReaderFileObj::pop_chunks); +} + +} // namespace +} // namespace ffmpeg +} // namespace torchaudio diff --git a/torchaudio/csrc/ffmpeg/pybind/stream_reader.cpp b/torchaudio/csrc/ffmpeg/pybind/stream_reader.cpp new file mode 100644 index 00000000000..7fd7c8b4a5f --- /dev/null +++ b/torchaudio/csrc/ffmpeg/pybind/stream_reader.cpp @@ -0,0 +1,108 @@ +#include +#include + +namespace torchaudio { +namespace ffmpeg { +namespace { + +static int read_function(void* opaque, uint8_t* buf, int buf_size) { + FileObj* fileobj = static_cast(opaque); + buf_size = FFMIN(buf_size, fileobj->buffer_size); + + int num_read = 0; + while (num_read < buf_size) { + int request = buf_size - num_read; + auto chunk = static_cast( + static_cast(fileobj->fileobj.attr("read")(request))); + auto chunk_len = chunk.length(); + if (chunk_len == 0) { + break; + } + if (chunk_len > request) { + std::ostringstream message; + message + << "Requested up to " << request << " bytes but, " + << "received " << chunk_len << " bytes. " + << "The given object does not confirm to read protocol of file object."; + throw std::runtime_error(message.str()); + } + memcpy(buf, chunk.data(), chunk_len); + buf += chunk_len; + num_read += chunk_len; + } + return num_read == 0 ? AVERROR_EOF : num_read; +} + +static int64_t seek_function(void* opaque, int64_t offset, int whence) { + // We do not know the file size. + if (whence == AVSEEK_SIZE) { + return AVERROR(EIO); + } + FileObj* fileobj = static_cast(opaque); + return py::cast(fileobj->fileobj.attr("seek")(offset, whence)); +} + +AVIOContextPtr get_io_context(FileObj* opaque, int buffer_size) { + uint8_t* buffer = static_cast(av_malloc(buffer_size)); + if (!buffer) { + throw std::runtime_error("Failed to allocate buffer."); + } + + // If avio_alloc_context succeeds, then buffer will be cleaned up by + // AVIOContextPtr destructor. + // If avio_alloc_context fails, we need to clean up by ourselves. + AVIOContext* av_io_ctx = avio_alloc_context( + buffer, + buffer_size, + 0, + static_cast(opaque), + &read_function, + nullptr, + py::hasattr(opaque->fileobj, "seek") ? &seek_function : nullptr); + + if (!av_io_ctx) { + av_freep(&buffer); + throw std::runtime_error("Failed to allocate AVIO context."); + } + return AVIOContextPtr{av_io_ctx}; +} + +c10::optional convert_dict(py::object dict) { + if (dict.is_none()) + return c10::optional{}; + + c10::Dict out; + for (std::pair item : py::cast(dict)) { + out.insert(item.first.cast(), item.second.cast()); + } + return c10::optional(out); +} + +c10::optional convert_str(py::object s) { + if (s.is_none()) + return c10::optional{}; + return c10::optional{ + static_cast(py::cast(s))}; +} + +} // namespace + +FileObj::FileObj(py::object fileobj_, int buffer_size) + : fileobj(fileobj_), + buffer_size(buffer_size), + pAVIO(get_io_context(this, buffer_size)) {} + +StreamReaderFileObj::StreamReaderFileObj( + py::object fileobj_, + py::object format, + py::object option, + int64_t buffer_size) + : FileObj(fileobj_, static_cast(buffer_size)), + StreamReaderBinding(get_input_format_context( + "", + convert_str(format), + convert_dict(option), + pAVIO)) {} + +} // namespace ffmpeg +} // namespace torchaudio diff --git a/torchaudio/csrc/ffmpeg/pybind/stream_reader.h b/torchaudio/csrc/ffmpeg/pybind/stream_reader.h new file mode 100644 index 00000000000..9ebd83d33f9 --- /dev/null +++ b/torchaudio/csrc/ffmpeg/pybind/stream_reader.h @@ -0,0 +1,33 @@ +#pragma once +#include +#include + +namespace torchaudio { +namespace ffmpeg { + +// The purpose of FileObj class is so that +// FileObjStreamReader class can inherit Streamer while +// AVIOContext is initialized before AVFormat (Streamer) +struct FileObj { + py::object fileobj; + int buffer_size; + AVIOContextPtr pAVIO; + FileObj(py::object fileobj, int buffer_size); +}; + +struct StreamReaderFileObj : public FileObj, public StreamReaderBinding { + public: + StreamReaderFileObj( + py::object fileobj, + // Note: + // Should use `py::str` or `c10::optional` for `format`, + // and `py::dict` or `c10::optional` for `option`, but + // I could not resolve TypeError related to optionality. + // So using generic `py::object`. + py::object format, + py::object option, + int64_t buffer_size); +}; + +} // namespace ffmpeg +} // namespace torchaudio diff --git a/torchaudio/csrc/ffmpeg/streamer.cpp b/torchaudio/csrc/ffmpeg/streamer.cpp index 59fd946dc83..a517b8e7b1c 100644 --- a/torchaudio/csrc/ffmpeg/streamer.cpp +++ b/torchaudio/csrc/ffmpeg/streamer.cpp @@ -81,19 +81,24 @@ SrcStreamInfo Streamer::get_src_stream_info(int i) const { ret.codec_long_name = desc->long_name; } switch (codecpar->codec_type) { - case AVMEDIA_TYPE_AUDIO: - ret.fmt_name = - av_get_sample_fmt_name(static_cast(codecpar->format)); + case AVMEDIA_TYPE_AUDIO: { + AVSampleFormat smp_fmt = static_cast(codecpar->format); + ret.fmt_name = smp_fmt == AV_SAMPLE_FMT_NONE + ? "N/A" + : av_get_sample_fmt_name(smp_fmt); ret.sample_rate = static_cast(codecpar->sample_rate); ret.num_channels = codecpar->channels; break; - case AVMEDIA_TYPE_VIDEO: + } + case AVMEDIA_TYPE_VIDEO: { + AVPixelFormat pix_fmt = static_cast(codecpar->format); ret.fmt_name = - av_get_pix_fmt_name(static_cast(codecpar->format)); + pix_fmt == AV_PIX_FMT_NONE ? "N/A" : av_get_pix_fmt_name(pix_fmt); ret.width = codecpar->width; ret.height = codecpar->height; ret.frame_rate = av_q2d(stream->r_frame_rate); break; + } default:; } return ret; @@ -158,7 +163,8 @@ void Streamer::add_audio_stream( int64_t num_chunks, const c10::optional& filter_desc, const c10::optional& decoder, - const c10::optional& decoder_option) { + const c10::optional& decoder_option, + const c10::optional& src_format) { add_stream( i, AVMEDIA_TYPE_AUDIO, @@ -167,6 +173,7 @@ void Streamer::add_audio_stream( filter_desc, decoder, decoder_option, + src_format, torch::Device(torch::DeviceType::CPU)); } @@ -177,6 +184,7 @@ void Streamer::add_video_stream( const c10::optional& filter_desc, const c10::optional& decoder, const c10::optional& decoder_option, + const c10::optional& src_format, const c10::optional& hw_accel) { const torch::Device device = [&]() { if (!hw_accel) { @@ -205,6 +213,7 @@ void Streamer::add_video_stream( filter_desc, decoder, decoder_option, + src_format, device); } @@ -216,10 +225,32 @@ void Streamer::add_stream( const c10::optional& filter_desc, const c10::optional& decoder, const c10::optional& decoder_option, + const c10::optional& src_format, const torch::Device& device) { validate_src_stream_type(i, media_type); AVStream* stream = pFormatContext->streams[i]; + // When media source is file-like object, it is possible that source codec is + // not detected properly. + if (stream->codecpar->format == -1) { + if (!src_format) { + throw std::runtime_error( + "Failed to detect the source stream format. Please provide the source format."); + } + switch (media_type) { + case AVMEDIA_TYPE_AUDIO: + stream->codecpar->format = + static_cast(av_get_sample_fmt(src_format.value().c_str())); + break; + case AVMEDIA_TYPE_VIDEO: + stream->codecpar->format = + static_cast(av_get_pix_fmt(src_format.value().c_str())); + break; + default:; + // unreachable + } + } + stream->discard = AVDISCARD_DEFAULT; if (!processors[i]) processors[i] = std::make_unique( diff --git a/torchaudio/csrc/ffmpeg/streamer.h b/torchaudio/csrc/ffmpeg/streamer.h index 939ebf34112..fad89b3adf3 100644 --- a/torchaudio/csrc/ffmpeg/streamer.h +++ b/torchaudio/csrc/ffmpeg/streamer.h @@ -64,14 +64,16 @@ class Streamer { int64_t num_chunks, const c10::optional& filter_desc, const c10::optional& decoder, - const c10::optional& decoder_option); + const c10::optional& decoder_options, + const c10::optional& src_format); void add_video_stream( int64_t i, int64_t frames_per_chunk, int64_t num_chunks, const c10::optional& filter_desc, const c10::optional& decoder, - const c10::optional& decoder_option, + const c10::optional& decoder_options, + const c10::optional& src_format, const c10::optional& hw_accel); void remove_stream(int64_t i); @@ -83,7 +85,8 @@ class Streamer { int num_chunks, const c10::optional& filter_desc, const c10::optional& decoder, - const c10::optional& decoder_option, + const c10::optional& decoder_options, + const c10::optional& src_format, const torch::Device& device); public: diff --git a/torchaudio/io/__init__.py b/torchaudio/io/__init__.py index bb1dcf90acc..65f01e1a957 100644 --- a/torchaudio/io/__init__.py +++ b/torchaudio/io/__init__.py @@ -14,6 +14,7 @@ def _init_extension(): try: torchaudio._extension._load_lib("libtorchaudio_ffmpeg") + import torchaudio._torchaudio_ffmpeg except OSError as err: raise ImportError( "Stream API requires FFmpeg libraries (libavformat and such). Please install FFmpeg 4." diff --git a/torchaudio/io/_stream_reader.py b/torchaudio/io/_stream_reader.py index 0ed18fc9b3b..b61005ea250 100644 --- a/torchaudio/io/_stream_reader.py +++ b/torchaudio/io/_stream_reader.py @@ -229,6 +229,7 @@ def add_basic_audio_stream( frames_per_chunk: int, buffer_chunk_size: int, stream_index: Optional[int], + src_format: Optional[str], sample_rate: Optional[int], dtype: torch.dtype, ): @@ -243,6 +244,7 @@ def add_basic_audio_stream( _get_afilter_desc(sample_rate, dtype), None, None, + src_format, ) def add_basic_video_stream( @@ -250,6 +252,7 @@ def add_basic_video_stream( frames_per_chunk: int, buffer_chunk_size: int, stream_index: Optional[int], + src_format: Optional[str], frame_rate: Optional[int], width: Optional[int], height: Optional[int], @@ -266,6 +269,7 @@ def add_basic_video_stream( _get_vfilter_desc(frame_rate, width, height, format), None, None, + src_format, None, ) @@ -277,6 +281,7 @@ def add_audio_stream( filter_desc: Optional[str], decoder: Optional[str], decoder_options: Optional[Dict[str, str]], + src_format: Optional[str], ): i = self.default_audio_stream if stream_index is None else stream_index if i is None: @@ -289,6 +294,7 @@ def add_audio_stream( filter_desc, decoder, decoder_options, + src_format, ) def add_video_stream( @@ -299,6 +305,7 @@ def add_video_stream( filter_desc: Optional[str], decoder: Optional[str], decoder_options: Optional[Dict[str, str]], + src_format: Optional[str], hw_accel: Optional[str], ): i = self.default_video_stream if stream_index is None else stream_index @@ -312,6 +319,7 @@ def add_video_stream( filter_desc, decoder, decoder_options, + src_format, hw_accel, ) @@ -331,13 +339,154 @@ def pop_chunks(self) -> Tuple[Optional[torch.Tensor]]: return torch.ops.torchaudio.ffmpeg_streamer_pop_chunks(self._s) +class _PyBindBackend: + def __init__( + self, + src, + format: Optional[str], + option: Optional[Dict[str, str]], + buffer_size: int, + ): + self._s = torchaudio._torchaudio_ffmpeg.StreamReaderFileObj(src, format, option, buffer_size) + i = self._s.find_best_audio_stream() + self.default_audio_stream = None if i < 0 else i + i = self._s.find_best_video_stream() + self.default_video_stream = None if i < 0 else i + + @property + def num_src_streams(self) -> int: + return self._s.num_src_streams() + + @property + def num_out_streams(self) -> int: + return self._s.num_out_streams() + + def get_src_stream_info(self, i: int) -> torchaudio.io.StreamReaderSourceStream: + return self._s.get_src_stream_info(i) + + def get_out_stream_info(self, i: int) -> torchaudio.io.StreamReaderOutputStream: + return self._s.get_out_stream_info(i) + + def seek(self, timestamp: float): + self._s.seek(timestamp) + + def add_basic_audio_stream( + self, + frames_per_chunk: int, + buffer_chunk_size: int, + stream_index: Optional[int], + src_format: Optional[str], + sample_rate: Optional[int], + dtype: Optional[torch.dtype], + ): + i = self.default_audio_stream if stream_index is None else stream_index + if i is None: + raise RuntimeError("There is no audio stream.") + self._s.add_audio_stream( + i, + frames_per_chunk, + buffer_chunk_size, + _get_afilter_desc(sample_rate, dtype), + None, + None, + src_format, + ) + + def add_basic_video_stream( + self, + frames_per_chunk: int, + buffer_chunk_size: int, + stream_index: Optional[int], + src_format: Optional[str], + frame_rate: Optional[int], + width: Optional[int], + height: Optional[int], + format: str = "RGB", + ): + i = self.default_video_stream if stream_index is None else stream_index + if i is None: + raise RuntimeError("There is no video stream.") + self._s.add_video_stream( + i, + frames_per_chunk, + buffer_chunk_size, + _get_vfilter_desc(frame_rate, width, height, format), + None, + None, + src_format, + None) + + def add_audio_stream( + self, + frames_per_chunk: int, + buffer_chunk_size: int, + stream_index: Optional[int], + filter_desc: Optional[str], + decoder: Optional[str], + decoder_options: Optional[Dict[str, str]], + src_format: Optional[str], + ): + i = self.default_audio_stream if stream_index is None else stream_index + if i is None: + raise RuntimeError("There is no audio stream.") + self._s.add_audio_stream( + i, + frames_per_chunk, + buffer_chunk_size, + filter_desc, + decoder, + decoder_options, + src_format, + ) + + def add_video_stream( + self, + frames_per_chunk: int, + buffer_chunk_size: int, + stream_index: Optional[int], + filter_desc: Optional[str], + decoder: Optional[str], + decoder_options: Optional[Dict[str, str]], + src_format: Optional[str], + hw_accel: Optional[str], + ): + i = self.default_video_stream if stream_index is None else stream_index + if i is None: + raise RuntimeError("There is no video stream.") + self._s.add_video_stream( + i, + frames_per_chunk, + buffer_chunk_size, + filter_desc, + decoder, + decoder_options, + src_format, + hw_accel, + ) + + def remove_stream(self, i: int): + self._s.remove_stream(i) + + def process_packet(self, timeout: Optional[float], backoff: float): + return self._s.process_packet(timeout, backoff) + + def process_all_packets(self): + self._s.process_all_packets() + + def is_buffer_ready(self) -> bool: + return self._s.is_buffer_ready() + + def pop_chunks(self) -> Tuple[Optional[torch.Tensor]]: + return self._s.pop_chunks() + + class StreamReader: """Fetch and decode audio/video streams chunk by chunk. For the detailed usage of this class, please refer to the tutorial. Args: - src (str): Source. Can be a file path, URL, device identifier or filter expression. + src (str or file-like object): Source. Can be a file path, URL, device identifier or filter expression. format (str or None, optional): Override the input format, or specify the source sound device. Default: ``None`` (no override nor device input). @@ -370,6 +519,11 @@ class StreamReader: You can use this argument to change the input source before it is passed to decoder. Default: ``None``. + + buffer_size (int): + The internal buffer size unsed only when `src` is file-like object. + + Default: `4096`. """ def __init__( @@ -377,8 +531,14 @@ def __init__( src: str, format: Optional[str] = None, option: Optional[Dict[str, str]] = None, + buffer_size: int = 4096, ): - self._be = _TorchBindBackend(src, format, option) + if isinstance(src, str): + self._be = _TorchBindBackend(src, format, option) + elif hasattr(src, 'read'): + self._be = _PyBindBackend(src, format, option, buffer_size) + else: + raise ValueError("`src` must be either string or file-like object.") @property def num_src_streams(self): @@ -445,6 +605,7 @@ def add_basic_audio_stream( frames_per_chunk: int, buffer_chunk_size: int = 3, stream_index: Optional[int] = None, + src_format: Optional[str] = None, sample_rate: Optional[int] = None, dtype: torch.dtype = torch.float32, ): @@ -463,19 +624,29 @@ def add_basic_audio_stream( stream_index (int or None, optional): The source audio stream index. If omitted, :py:attr:`default_audio_stream` is used. + src_format (str or None, optional): In case the media format of the source + stream is not detected, this argument overwrites the format. + sample_rate (int or None, optional): If provided, resample the audio. dtype (torch.dtype, optional): If not ``None``, change the output sample precision. If floating point, then the sample value range is `[-1, 1]`. """ - self._be.add_basic_audio_stream(frames_per_chunk, buffer_chunk_size, stream_index, sample_rate, dtype) + self._be.add_basic_audio_stream( + frames_per_chunk, + buffer_chunk_size, + stream_index, + src_format, + sample_rate, + dtype) def add_basic_video_stream( self, frames_per_chunk: int, buffer_chunk_size: int = 3, stream_index: Optional[int] = None, + src_format: Optional[str] = None, frame_rate: Optional[int] = None, width: Optional[int] = None, height: Optional[int] = None, @@ -496,6 +667,9 @@ def add_basic_video_stream( stream_index (int or None, optional): The source video stream index. If omitted, :py:attr:`default_video_stream` is used. + src_format (str or None, optional): In case the media format of the source + stream is not detected, this argument overwrites the format. + frame_rate (int or None, optional): If provided, change the frame rate. width (int or None, optional): If provided, change the image width. Unit: Pixel. @@ -511,6 +685,7 @@ def add_basic_video_stream( frames_per_chunk, buffer_chunk_size, stream_index, + src_format, frame_rate, width, height, @@ -524,6 +699,7 @@ def add_audio_stream( filter_desc: Optional[str] = None, decoder: Optional[str] = None, decoder_options: Optional[Dict[str, str]] = None, + src_format: Optional[str] = None, ): """Add output audio stream @@ -550,6 +726,9 @@ def add_audio_stream( decoder_options (dict or None, optional): Options passed to decoder. Mapping from str to str. + + src_format (str or None, optional): In case the media format of the source + stream is not detected, this argument overwrites the format. """ self._be.add_audio_stream( frames_per_chunk, @@ -558,6 +737,7 @@ def add_audio_stream( filter_desc, decoder, decoder_options, + src_format, ) def add_video_stream( @@ -568,6 +748,7 @@ def add_video_stream( filter_desc: Optional[str] = None, decoder: Optional[str] = None, decoder_options: Optional[Dict[str, str]] = None, + src_format: Optional[str] = None, hw_accel: Optional[str] = None, ): """Add output video stream @@ -608,6 +789,9 @@ def add_video_stream( 2. FFmpeg libraries linked dynamically are compiled with NVDEC support. 3. The codec is supported NVDEC by. (Currently, `"h264_cuvid"` is supported) + src_format (str or None, optional): In case the media format of the source + stream is not detected, this argument overwrites the format. + Example - HW decoding:: >>> # Decode video with NVDEC, create Tensor on CPU. @@ -643,6 +827,7 @@ def add_video_stream( filter_desc, decoder, decoder_options, + src_format, hw_accel, )