diff --git a/examples/tutorials/streaming_api_tutorial.py b/examples/tutorials/streaming_api_tutorial.py index 2a68b5e02f8..f9e29aa064b 100644 --- a/examples/tutorials/streaming_api_tutorial.py +++ b/examples/tutorials/streaming_api_tutorial.py @@ -250,21 +250,24 @@ # When the StreamReader buffered this number of chunks and is asked to pull # more frames, StreamReader drops the old frames/chunks. # - ``stream_index``: The index of the source stream. +# - ``decoder``: If provided, override the decoder. Useful if it fails to detect +# the codec. +# - ``decoder_option``: The option for the decoder. # # For audio output stream, you can provide the following additional # parameters to change the audio properties. # -# - ``sample_rate``: When provided, StreamReader resamples the audio on-the-fly. -# - ``dtype``: By default the StreamReader returns tensor of `float32` dtype, -# with sample values ranging `[-1, 1]`. By providing ``dtype`` argument +# - ``format``: By default the StreamReader returns tensor of `float32` dtype, +# with sample values ranging `[-1, 1]`. By providing ``format`` argument # the resulting dtype and value range is changed. +# - ``sample_rate``: When provided, StreamReader resamples the audio on-the-fly. # # For video output stream, the following parameters are available. # +# - ``format``: Change the image format. # - ``frame_rate``: Change the frame rate by dropping or duplicating # frames. No interpolation is performed. # - ``width``, ``height``: Change the image size. -# - ``format``: Change the image format. # ###################################################################### @@ -298,7 +301,7 @@ # streamer.add_basic_video_stream( # frames_per_chunk=10, # frame_rate=30, -# format="RGB" +# format="rgb24" # ) # # # Stream video from source stream `j`, @@ -310,7 +313,7 @@ # frame_rate=30, # width=128, # height=128, -# format="BGR" +# format="bgr24" # ) # @@ -428,7 +431,7 @@ frame_rate=1, width=960, height=540, - format="RGB", + format="rgb24", ) # Video stream with 320x320 (stretched) at 3 FPS, grayscale @@ -437,7 +440,7 @@ frame_rate=3, width=320, height=320, - format="GRAY", + format="gray", ) # fmt: on diff --git a/test/torchaudio_unittest/io/stream_reader_test.py b/test/torchaudio_unittest/io/stream_reader_test.py index f14c003c56d..0d7713cb577 100644 --- a/test/torchaudio_unittest/io/stream_reader_test.py +++ b/test/torchaudio_unittest/io/stream_reader_test.py @@ -1,5 +1,5 @@ import torch -from parameterized import parameterized +from parameterized import parameterized, parameterized_class from torchaudio_unittest.common_utils import ( get_asset_path, get_image, @@ -22,12 +22,49 @@ ) -def get_video_asset(file="nasa_13013.mp4"): - return get_asset_path(file) +################################################################################ +# Helper decorator and Mixin to duplicate the tests for fileobj +_TEST_FILEOBJ = "src_is_fileobj" + + +def _class_name(cls, _, params): + return f'{cls.__name__}{"_fileobj" if params[_TEST_FILEOBJ] else "_path"}' + + +_media_source = parameterized_class((_TEST_FILEOBJ,), [(False,), (True,)], class_name_func=_class_name) + + +class _MediaSourceMixin: + def setUp(self): + super().setUp() + self.src = None + + @property + def test_fileobj(self): + return getattr(self, _TEST_FILEOBJ) + + def get_video_asset(self, file="nasa_13013.mp4"): + if self.src is not None: + raise ValueError("get_video_asset can be called only once.") + + path = get_asset_path(file) + if self.test_fileobj: + self.src = open(path, "rb") + return self.src + return path + + def tearDown(self): + if self.src is not None: + self.src.close() + super().tearDown() + + +################################################################################ @skipIfNoFFmpeg -class StreamReaderInterfaceTest(TempDirMixin, TorchaudioTestCase): +@_media_source +class StreamReaderInterfaceTest(_MediaSourceMixin, TempDirMixin, TorchaudioTestCase): """Test suite for interface behaviors around StreamReader""" def test_streamer_invalid_input(self): @@ -48,14 +85,13 @@ def test_streamer_invalid_input(self): def test_streamer_invalide_option(self, invalid_keys, options): """When invalid options are given, StreamReader raises an exception with these keys""" options.update({k: k for k in invalid_keys}) - src = get_video_asset() with self.assertRaises(RuntimeError) as ctx: - StreamReader(src, option=options) + StreamReader(self.get_video_asset(), option=options) assert all(f'"{k}"' in str(ctx.exception) for k in invalid_keys) def test_src_info(self): """`get_src_stream_info` properly fetches information""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) assert s.num_src_streams == 6 expected = [ @@ -112,35 +148,35 @@ def test_src_info(self): bit_rate=None, ), ] - for i, exp in enumerate(expected): - assert exp == s.get_src_stream_info(i) + output = [s.get_src_stream_info(i) for i in range(6)] + assert expected == output def test_src_info_invalid_index(self): """`get_src_stream_info` does not segfault but raise an exception when input is invalid""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) for i in [-1, 6, 7, 8]: - with self.assertRaises(IndexError): + with self.assertRaises(RuntimeError): s.get_src_stream_info(i) def test_default_streams(self): """default stream is not None""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) assert s.default_audio_stream is not None assert s.default_video_stream is not None def test_default_audio_stream_none(self): """default audio stream is None for video without audio""" - s = StreamReader(get_video_asset("nasa_13013_no_audio.mp4")) + s = StreamReader(self.get_video_asset("nasa_13013_no_audio.mp4")) assert s.default_audio_stream is None def test_default_video_stream_none(self): """default video stream is None for video with only audio""" - s = StreamReader(get_video_asset("nasa_13013_no_video.mp4")) + s = StreamReader(self.get_video_asset("nasa_13013_no_video.mp4")) assert s.default_video_stream is None def test_num_out_stream(self): """num_out_streams gives the correct count of output streams""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) n, m = 6, 4 for i in range(n): assert s.num_out_streams == i @@ -158,10 +194,10 @@ def test_num_out_stream(self): def test_basic_audio_stream(self): """`add_basic_audio_stream` constructs a correct filter.""" - s = StreamReader(get_video_asset()) - s.add_basic_audio_stream(frames_per_chunk=-1, dtype=None) + s = StreamReader(self.get_video_asset()) + s.add_basic_audio_stream(frames_per_chunk=-1, format=None) s.add_basic_audio_stream(frames_per_chunk=-1, sample_rate=8000) - s.add_basic_audio_stream(frames_per_chunk=-1, dtype=torch.int16) + s.add_basic_audio_stream(frames_per_chunk=-1, format="s16p") sinfo = s.get_out_stream_info(0) assert sinfo.source_index == s.default_audio_stream @@ -177,11 +213,11 @@ def test_basic_audio_stream(self): def test_basic_video_stream(self): """`add_basic_video_stream` constructs a correct filter.""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) s.add_basic_video_stream(frames_per_chunk=-1, format=None) s.add_basic_video_stream(frames_per_chunk=-1, width=3, height=5) s.add_basic_video_stream(frames_per_chunk=-1, frame_rate=7) - s.add_basic_video_stream(frames_per_chunk=-1, format="BGR") + s.add_basic_video_stream(frames_per_chunk=-1, format="bgr24") sinfo = s.get_out_stream_info(0) assert sinfo.source_index == s.default_video_stream @@ -201,7 +237,7 @@ def test_basic_video_stream(self): def test_remove_streams(self): """`remove_stream` removes the correct output stream""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) s.add_basic_audio_stream(frames_per_chunk=-1, sample_rate=24000) s.add_basic_video_stream(frames_per_chunk=-1, width=16, height=16) s.add_basic_audio_stream(frames_per_chunk=-1, sample_rate=8000) @@ -221,21 +257,21 @@ def test_remove_streams(self): def test_remove_stream_invalid(self): """Attempt to remove invalid output streams raises IndexError""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) for i in range(-3, 3): - with self.assertRaises(IndexError): + with self.assertRaises(RuntimeError): s.remove_stream(i) s.add_audio_stream(frames_per_chunk=-1) for i in range(-3, 3): if i == 0: continue - with self.assertRaises(IndexError): + with self.assertRaises(RuntimeError): s.remove_stream(i) def test_process_packet(self): """`process_packet` method returns 0 while there is a packet in source stream""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) # nasa_1013.mp3 contains 1023 packets. for _ in range(1023): code = s.process_packet() @@ -246,19 +282,19 @@ def test_process_packet(self): def test_pop_chunks_no_output_stream(self): """`pop_chunks` method returns empty list when there is no output stream""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) assert s.pop_chunks() == [] def test_pop_chunks_empty_buffer(self): """`pop_chunks` method returns None when a buffer is empty""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) s.add_basic_audio_stream(frames_per_chunk=-1) s.add_basic_video_stream(frames_per_chunk=-1) assert s.pop_chunks() == [None, None] def test_pop_chunks_exhausted_stream(self): """`pop_chunks` method returns None when the source stream is exhausted""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) # video is 16.57 seconds. # audio streams per 10 second chunk # video streams per 20 second chunk @@ -284,14 +320,14 @@ def test_pop_chunks_exhausted_stream(self): def test_stream_empty(self): """`stream` fails when no output stream is configured""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) with self.assertRaises(RuntimeError): next(s.stream()) def test_stream_smoke_test(self): """`stream` streams chunks fine""" w, h = 256, 198 - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) s.add_basic_audio_stream(frames_per_chunk=2000, sample_rate=8000) s.add_basic_video_stream(frames_per_chunk=15, frame_rate=60, width=w, height=h) for i, (achunk, vchunk) in enumerate(s.stream()): @@ -302,7 +338,7 @@ def test_stream_smoke_test(self): def test_seek(self): """Calling `seek` multiple times should not segfault""" - s = StreamReader(get_video_asset()) + s = StreamReader(self.get_video_asset()) for i in range(10): s.seek(i) for _ in range(0): @@ -312,8 +348,8 @@ def test_seek(self): def test_seek_negative(self): """Calling `seek` with negative value should raise an exception""" - s = StreamReader(get_video_asset()) - with self.assertRaises(ValueError): + s = StreamReader(self.get_video_asset()) + with self.assertRaises(RuntimeError): s.seek(-1.0) @@ -327,9 +363,9 @@ def _get_reference_wav(self, sample_rate, channels_first=False, **kwargs): save_wav(path, data, sample_rate, channels_first=channels_first) return path, data - def _test_wav(self, path, original, dtype): + def _test_wav(self, path, original, format): s = StreamReader(path) - s.add_basic_audio_stream(frames_per_chunk=-1, dtype=dtype) + s.add_basic_audio_stream(frames_per_chunk=-1, format=format) s.process_all_packets() (output,) = s.pop_chunks() self.assertEqual(original, output) @@ -342,10 +378,16 @@ def test_basic_audio_stream(self, dtype, num_channels): """`basic_audio_stream` can load WAV file properly.""" path, original = self._get_reference_wav(8000, dtype=dtype, num_channels=num_channels) + format = { + "uint8": "u8p", + "int16": "s16p", + "int32": "s32p", + }[dtype] + # provide the matching dtype - self._test_wav(path, original, getattr(torch, dtype)) + self._test_wav(path, original, format=format) # use the internal dtype ffmpeg picks - self._test_wav(path, original, None) + self._test_wav(path, original, format=None) @nested_params( ["int16", "uint8", "int32"], # "float", "double", "int64"] diff --git a/tools/setup_helpers/extension.py b/tools/setup_helpers/extension.py index ad9529d96f8..ed038e22172 100644 --- a/tools/setup_helpers/extension.py +++ b/tools/setup_helpers/extension.py @@ -57,7 +57,12 @@ def get_ext_modules(): ] ) if _USE_FFMPEG: - modules.append(Extension(name="torchaudio.lib.libtorchaudio_ffmpeg", sources=[])) + modules.extend( + [ + Extension(name="torchaudio.lib.libtorchaudio_ffmpeg", sources=[]), + Extension(name="torchaudio._torchaudio_ffmpeg", sources=[]), + ] + ) return modules diff --git a/torchaudio/csrc/CMakeLists.txt b/torchaudio/csrc/CMakeLists.txt index 0b2a0ad33c6..ac81ffd63a3 100644 --- a/torchaudio/csrc/CMakeLists.txt +++ b/torchaudio/csrc/CMakeLists.txt @@ -1,3 +1,9 @@ +# the following line is added in order to export symbols when building on Windows +# this approach has some limitations as documented in https://github.com/pytorch/pytorch/pull/3650 +if (MSVC) + set(CMAKE_WINDOWS_EXPORT_ALL_SYMBOLS ON) +endif() + ################################################################################ # libtorchaudio ################################################################################ @@ -204,11 +210,11 @@ if (BUILD_TORCHAUDIO_PYTHON_EXTENSION) find_package(Python3 ${PYTHON_VERSION} EXACT COMPONENTS Development) set(ADDITIONAL_ITEMS Python3::Python) endif() - function(define_extension name sources libraries definitions) + function(define_extension name sources include_dirs libraries definitions) add_library(${name} SHARED ${sources}) target_compile_definitions(${name} PRIVATE "${definitions}") target_include_directories( - ${name} PRIVATE ${PROJECT_SOURCE_DIR} ${Python_INCLUDE_DIR}) + ${name} PRIVATE ${PROJECT_SOURCE_DIR} ${Python_INCLUDE_DIR} ${include_dirs}) target_link_libraries( ${name} ${libraries} @@ -254,6 +260,7 @@ if (BUILD_TORCHAUDIO_PYTHON_EXTENSION) define_extension( _torchaudio "${EXTENSION_SOURCES}" + "" libtorchaudio "${LIBTORCHAUDIO_COMPILE_DEFINITIONS}" ) @@ -265,8 +272,23 @@ if (BUILD_TORCHAUDIO_PYTHON_EXTENSION) define_extension( _torchaudio_decoder "${DECODER_EXTENSION_SOURCES}" + "" "libtorchaudio_decoder" "${LIBTORCHAUDIO_DECODER_DEFINITIONS}" ) endif() + if(USE_FFMPEG) + set( + FFMPEG_EXTENSION_SOURCES + ffmpeg/pybind/pybind.cpp + ffmpeg/pybind/stream_reader.cpp + ) + define_extension( + _torchaudio_ffmpeg + "${FFMPEG_EXTENSION_SOURCES}" + "${FFMPEG_INCLUDE_DIRS}" + "libtorchaudio_ffmpeg" + "${LIBTORCHAUDIO_DECODER_DEFINITIONS}" + ) + endif() endif() diff --git a/torchaudio/csrc/ffmpeg/ffmpeg.cpp b/torchaudio/csrc/ffmpeg/ffmpeg.cpp index 648d787e890..d89b4763269 100644 --- a/torchaudio/csrc/ffmpeg/ffmpeg.cpp +++ b/torchaudio/csrc/ffmpeg/ffmpeg.cpp @@ -66,10 +66,17 @@ std::string join(std::vector vars) { AVFormatContextPtr get_input_format_context( const std::string& src, const c10::optional& device, - const OptionDict& option) { - AVFormatContext* pFormat = NULL; + const OptionDict& option, + AVIOContext* io_ctx) { + AVFormatContext* pFormat = avformat_alloc_context(); + if (!pFormat) { + throw std::runtime_error("Failed to allocate AVFormatContext."); + } + if (io_ctx) { + pFormat->pb = io_ctx; + } - AVINPUT_FORMAT_CONST AVInputFormat* pInput = [&]() -> AVInputFormat* { + auto* pInput = [&]() -> AVINPUT_FORMAT_CONST AVInputFormat* { if (device.has_value()) { std::string device_str = device.value(); AVINPUT_FORMAT_CONST AVInputFormat* p = @@ -103,6 +110,17 @@ AVFormatContextPtr get_input_format_context( AVFormatContextPtr::AVFormatContextPtr(AVFormatContext* p) : Wrapper(p) {} +//////////////////////////////////////////////////////////////////////////////// +// AVIO +//////////////////////////////////////////////////////////////////////////////// +void AVIOContextDeleter::operator()(AVIOContext* p) { + av_freep(&p->buffer); + av_freep(&p); +}; + +AVIOContextPtr::AVIOContextPtr(AVIOContext* p) + : Wrapper(p) {} + //////////////////////////////////////////////////////////////////////////////// // AVPacket //////////////////////////////////////////////////////////////////////////////// diff --git a/torchaudio/csrc/ffmpeg/ffmpeg.h b/torchaudio/csrc/ffmpeg/ffmpeg.h index a45a26d2580..9d03b5ff906 100644 --- a/torchaudio/csrc/ffmpeg/ffmpeg.h +++ b/torchaudio/csrc/ffmpeg/ffmpeg.h @@ -13,6 +13,7 @@ extern "C" { #include #include #include +#include #include #include #include @@ -74,7 +75,19 @@ struct AVFormatContextPtr AVFormatContextPtr get_input_format_context( const std::string& src, const c10::optional& device, - const OptionDict& option); + const OptionDict& option, + AVIOContext* io_ctx = nullptr); + +//////////////////////////////////////////////////////////////////////////////// +// AVIO +//////////////////////////////////////////////////////////////////////////////// +struct AVIOContextDeleter { + void operator()(AVIOContext* p); +}; + +struct AVIOContextPtr : public Wrapper { + explicit AVIOContextPtr(AVIOContext* p); +}; //////////////////////////////////////////////////////////////////////////////// // AVPacket diff --git a/torchaudio/csrc/ffmpeg/prototype.cpp b/torchaudio/csrc/ffmpeg/prototype.cpp index 06ee300e338..0ccf498a1e5 100644 --- a/torchaudio/csrc/ffmpeg/prototype.cpp +++ b/torchaudio/csrc/ffmpeg/prototype.cpp @@ -46,84 +46,70 @@ TORCH_LIBRARY_FRAGMENT(torchaudio, m) { av_log_set_level(AV_LOG_ERROR); }); m.def("torchaudio::ffmpeg_load", load); - m.class_("ffmpeg_Streamer"); - m.def("torchaudio::ffmpeg_streamer_init", init); - m.def("torchaudio::ffmpeg_streamer_num_src_streams", [](S s) { - return s->num_src_streams(); - }); - m.def("torchaudio::ffmpeg_streamer_num_out_streams", [](S s) { - return s->num_out_streams(); - }); - m.def("torchaudio::ffmpeg_streamer_get_src_stream_info", [](S s, int64_t i) { - return s->get_src_stream_info(i); - }); - m.def("torchaudio::ffmpeg_streamer_get_out_stream_info", [](S s, int64_t i) { - return s->get_out_stream_info(i); - }); - m.def("torchaudio::ffmpeg_streamer_find_best_audio_stream", [](S s) { - return s->find_best_audio_stream(); - }); - m.def("torchaudio::ffmpeg_streamer_find_best_video_stream", [](S s) { - return s->find_best_video_stream(); - }); - m.def("torchaudio::ffmpeg_streamer_seek", [](S s, double t) { - return s->seek(t); - }); - m.def( - "torchaudio::ffmpeg_streamer_add_audio_stream", - [](S s, - int64_t i, - int64_t frames_per_chunk, - int64_t num_chunks, - const c10::optional& filter_desc, - const c10::optional& decoder, - const c10::optional>& - decoder_options) { - s->add_audio_stream( - i, - frames_per_chunk, - num_chunks, - filter_desc, - decoder, - map(decoder_options)); - }); - m.def( - "torchaudio::ffmpeg_streamer_add_video_stream", - [](S s, - int64_t i, - int64_t frames_per_chunk, - int64_t num_chunks, - const c10::optional& filter_desc, - const c10::optional& decoder, - const c10::optional>& - decoder_options, - const c10::optional& hw_accel) { - s->add_video_stream( - i, - frames_per_chunk, - num_chunks, - filter_desc, - decoder, - map(decoder_options), - hw_accel); - }); - m.def("torchaudio::ffmpeg_streamer_remove_stream", [](S s, int64_t i) { - s->remove_stream(i); - }); - m.def( - "torchaudio::ffmpeg_streamer_process_packet", - [](S s, const c10::optional& timeout, const double backoff) { - return s->process_packet(timeout, backoff); - }); - m.def("torchaudio::ffmpeg_streamer_process_all_packets", [](S s) { - s->process_all_packets(); - }); - m.def("torchaudio::ffmpeg_streamer_is_buffer_ready", [](S s) { - return s->is_buffer_ready(); - }); - m.def("torchaudio::ffmpeg_streamer_pop_chunks", [](S s) { - return s->pop_chunks(); - }); + m.class_("ffmpeg_Streamer") + .def(torch::init<>(init)) + .def("num_src_streams", [](S self) { return self->num_src_streams(); }) + .def("num_out_streams", [](S self) { return self->num_out_streams(); }) + .def( + "get_src_stream_info", + [](S s, int64_t i) { return s->get_src_stream_info(i); }) + .def( + "get_out_stream_info", + [](S s, int64_t i) { return s->get_out_stream_info(i); }) + .def( + "find_best_audio_stream", + [](S s) { return s->find_best_audio_stream(); }) + .def( + "find_best_video_stream", + [](S s) { return s->find_best_video_stream(); }) + .def("seek", [](S s, double t) { return s->seek(t); }) + .def( + "add_audio_stream", + [](S s, + int64_t i, + int64_t frames_per_chunk, + int64_t num_chunks, + const c10::optional& filter_desc, + const c10::optional& decoder, + const c10::optional>& + decoder_options) { + s->add_audio_stream( + i, + frames_per_chunk, + num_chunks, + filter_desc, + decoder, + map(decoder_options)); + }) + .def( + "add_video_stream", + [](S s, + int64_t i, + int64_t frames_per_chunk, + int64_t num_chunks, + const c10::optional& filter_desc, + const c10::optional& decoder, + const c10::optional>& + decoder_options, + const c10::optional& hw_accel) { + s->add_video_stream( + i, + frames_per_chunk, + num_chunks, + filter_desc, + decoder, + map(decoder_options), + hw_accel); + }) + .def("remove_stream", [](S s, int64_t i) { s->remove_stream(i); }) + .def( + "process_packet", + [](S s, const c10::optional& timeout, const double backoff) { + return s->process_packet(timeout, backoff); + }) + .def("process_all_packets", [](S s) { s->process_all_packets(); }) + .def("is_buffer_ready", [](S s) { return s->is_buffer_ready(); }) + .def("pop_chunks", [](S s) { return s->pop_chunks(); }); } } // namespace diff --git a/torchaudio/csrc/ffmpeg/pybind/pybind.cpp b/torchaudio/csrc/ffmpeg/pybind/pybind.cpp new file mode 100644 index 00000000000..46e633262c1 --- /dev/null +++ b/torchaudio/csrc/ffmpeg/pybind/pybind.cpp @@ -0,0 +1,39 @@ +#include +#include +#include + +namespace torchaudio { +namespace ffmpeg { +namespace { + +PYBIND11_MODULE(_torchaudio_ffmpeg, m) { + py::class_>( + m, "StreamReaderFileObj") + .def(py::init< + py::object, + const c10::optional&, + const c10::optional&, + int64_t>()) + .def("num_src_streams", &StreamReaderFileObj::num_src_streams) + .def("num_out_streams", &StreamReaderFileObj::num_out_streams) + .def( + "find_best_audio_stream", + &StreamReaderFileObj::find_best_audio_stream) + .def( + "find_best_video_stream", + &StreamReaderFileObj::find_best_video_stream) + .def("get_src_stream_info", &StreamReaderFileObj::get_src_stream_info) + .def("get_out_stream_info", &StreamReaderFileObj::get_out_stream_info) + .def("seek", &StreamReaderFileObj::seek) + .def("add_audio_stream", &StreamReaderFileObj::add_audio_stream) + .def("add_video_stream", &StreamReaderFileObj::add_video_stream) + .def("remove_stream", &StreamReaderFileObj::remove_stream) + .def("process_packet", &StreamReaderFileObj::process_packet) + .def("process_all_packets", &StreamReaderFileObj::process_all_packets) + .def("is_buffer_ready", &StreamReaderFileObj::is_buffer_ready) + .def("pop_chunks", &StreamReaderFileObj::pop_chunks); +} + +} // namespace +} // namespace ffmpeg +} // namespace torchaudio diff --git a/torchaudio/csrc/ffmpeg/pybind/stream_reader.cpp b/torchaudio/csrc/ffmpeg/pybind/stream_reader.cpp new file mode 100644 index 00000000000..67687ace373 --- /dev/null +++ b/torchaudio/csrc/ffmpeg/pybind/stream_reader.cpp @@ -0,0 +1,89 @@ +#include +#include + +namespace torchaudio { +namespace ffmpeg { +namespace { + +static int read_function(void* opaque, uint8_t* buf, int buf_size) { + FileObj* fileobj = static_cast(opaque); + buf_size = FFMIN(buf_size, fileobj->buffer_size); + + int num_read = 0; + while (num_read < buf_size) { + int request = buf_size - num_read; + auto chunk = static_cast( + static_cast(fileobj->fileobj.attr("read")(request))); + auto chunk_len = chunk.length(); + if (chunk_len == 0) { + break; + } + if (chunk_len > request) { + std::ostringstream message; + message + << "Requested up to " << request << " bytes but, " + << "received " << chunk_len << " bytes. " + << "The given object does not confirm to read protocol of file object."; + throw std::runtime_error(message.str()); + } + memcpy(buf, chunk.data(), chunk_len); + buf += chunk_len; + num_read += chunk_len; + } + return num_read == 0 ? AVERROR_EOF : num_read; +} + +static int64_t seek_function(void* opaque, int64_t offset, int whence) { + // We do not know the file size. + if (whence == AVSEEK_SIZE) { + return AVERROR(EIO); + } + FileObj* fileobj = static_cast(opaque); + return py::cast(fileobj->fileobj.attr("seek")(offset, whence)); +} + +AVIOContextPtr get_io_context(FileObj* opaque, int buffer_size) { + uint8_t* buffer = static_cast(av_malloc(buffer_size)); + if (!buffer) { + throw std::runtime_error("Failed to allocate buffer."); + } + + // If avio_alloc_context succeeds, then buffer will be cleaned up by + // AVIOContextPtr destructor. + // If avio_alloc_context fails, we need to clean up by ourselves. + AVIOContext* av_io_ctx = avio_alloc_context( + buffer, + buffer_size, + 0, + static_cast(opaque), + &read_function, + nullptr, + py::hasattr(opaque->fileobj, "seek") ? &seek_function : nullptr); + + if (!av_io_ctx) { + av_freep(&buffer); + throw std::runtime_error("Failed to allocate AVIO context."); + } + return AVIOContextPtr{av_io_ctx}; +} +} // namespace + +FileObj::FileObj(py::object fileobj_, int buffer_size) + : fileobj(fileobj_), + buffer_size(buffer_size), + pAVIO(get_io_context(this, buffer_size)) {} + +StreamReaderFileObj::StreamReaderFileObj( + py::object fileobj_, + const c10::optional& format, + const c10::optional& option, + int64_t buffer_size) + : FileObj(fileobj_, static_cast(buffer_size)), + StreamReaderBinding(get_input_format_context( + "", + format, + option.value_or(OptionDict{}), + pAVIO)) {} + +} // namespace ffmpeg +} // namespace torchaudio diff --git a/torchaudio/csrc/ffmpeg/pybind/stream_reader.h b/torchaudio/csrc/ffmpeg/pybind/stream_reader.h new file mode 100644 index 00000000000..75044afa8d9 --- /dev/null +++ b/torchaudio/csrc/ffmpeg/pybind/stream_reader.h @@ -0,0 +1,28 @@ +#pragma once +#include +#include + +namespace torchaudio { +namespace ffmpeg { + +struct FileObj { + py::object fileobj; + int buffer_size; + AVIOContextPtr pAVIO; + FileObj(py::object fileobj, int buffer_size); +}; + +// The reason we inherit FileObj instead of making it an attribute +// is so that FileObj is instantiated first. +// AVIOContext must be initialized before AVFormat, and outlive AVFormat. +struct StreamReaderFileObj : public FileObj, public StreamReaderBinding { + public: + StreamReaderFileObj( + py::object fileobj, + const c10::optional& format, + const c10::optional& option, + int64_t buffer_size); +}; + +} // namespace ffmpeg +} // namespace torchaudio diff --git a/torchaudio/csrc/ffmpeg/streamer.cpp b/torchaudio/csrc/ffmpeg/streamer.cpp index 6a7f050beba..93c13b3f8ab 100644 --- a/torchaudio/csrc/ffmpeg/streamer.cpp +++ b/torchaudio/csrc/ffmpeg/streamer.cpp @@ -81,19 +81,25 @@ SrcStreamInfo Streamer::get_src_stream_info(int i) const { ret.codec_long_name = desc->long_name; } switch (codecpar->codec_type) { - case AVMEDIA_TYPE_AUDIO: - ret.fmt_name = - av_get_sample_fmt_name(static_cast(codecpar->format)); + case AVMEDIA_TYPE_AUDIO: { + AVSampleFormat smp_fmt = static_cast(codecpar->format); + if (smp_fmt != AV_SAMPLE_FMT_NONE) { + ret.fmt_name = av_get_sample_fmt_name(smp_fmt); + } ret.sample_rate = static_cast(codecpar->sample_rate); ret.num_channels = codecpar->channels; break; - case AVMEDIA_TYPE_VIDEO: - ret.fmt_name = - av_get_pix_fmt_name(static_cast(codecpar->format)); + } + case AVMEDIA_TYPE_VIDEO: { + AVPixelFormat pix_fmt = static_cast(codecpar->format); + if (pix_fmt != AV_PIX_FMT_NONE) { + ret.fmt_name = av_get_pix_fmt_name(pix_fmt); + } ret.width = codecpar->width; ret.height = codecpar->height; ret.frame_rate = av_q2d(stream->r_frame_rate); break; + } default:; } return ret; @@ -220,6 +226,13 @@ void Streamer::add_stream( validate_src_stream_type(i, media_type); AVStream* stream = pFormatContext->streams[i]; + // When media source is file-like object, it is possible that source codec is + // not detected properly. + if (stream->codecpar->format == -1) { + throw std::runtime_error( + "Failed to detect the source stream format. Please provide the decoder to use."); + } + stream->discard = AVDISCARD_DEFAULT; if (!processors[i]) processors[i] = std::make_unique( diff --git a/torchaudio/io/__init__.py b/torchaudio/io/__init__.py index bb1dcf90acc..65f01e1a957 100644 --- a/torchaudio/io/__init__.py +++ b/torchaudio/io/__init__.py @@ -14,6 +14,7 @@ def _init_extension(): try: torchaudio._extension._load_lib("libtorchaudio_ffmpeg") + import torchaudio._torchaudio_ffmpeg except OSError as err: raise ImportError( "Stream API requires FFmpeg libraries (libavformat and such). Please install FFmpeg 4." diff --git a/torchaudio/io/_stream_reader.py b/torchaudio/io/_stream_reader.py index e3eb00fd3d9..535937dbe7e 100644 --- a/torchaudio/io/_stream_reader.py +++ b/torchaudio/io/_stream_reader.py @@ -154,24 +154,16 @@ def _parse_oi(i): return StreamReaderOutputStream(i[0], i[1]) -def _get_afilter_desc(sample_rate: Optional[int], dtype: torch.dtype): +def _get_afilter_desc(sample_rate: Optional[int], fmt: Optional[str]): descs = [] if sample_rate is not None: descs.append(f"aresample={sample_rate}") - if dtype is not None: - fmt = { - torch.uint8: "u8p", - torch.int16: "s16p", - torch.int32: "s32p", - torch.long: "s64p", - torch.float32: "fltp", - torch.float64: "dblp", - }[dtype] + if fmt is not None: descs.append(f"aformat=sample_fmts={fmt}") return ",".join(descs) if descs else None -def _get_vfilter_desc(frame_rate: Optional[float], width: Optional[int], height: Optional[int], format: Optional[str]): +def _get_vfilter_desc(frame_rate: Optional[float], width: Optional[int], height: Optional[int], fmt: Optional[str]): descs = [] if frame_rate is not None: descs.append(f"fps={frame_rate}") @@ -182,13 +174,7 @@ def _get_vfilter_desc(frame_rate: Optional[float], width: Optional[int], height: scales.append(f"height={height}") if scales: descs.append(f"scale={':'.join(scales)}") - if format is not None: - fmt = { - "RGB": "rgb24", - "BGR": "bgr24", - "YUV": "yuv420p", - "GRAY": "gray", - }[format] + if fmt is not None: descs.append(f"format=pix_fmts={fmt}") return ",".join(descs) if descs else None @@ -199,7 +185,19 @@ class StreamReader: For the detailed usage of this class, please refer to the tutorial. Args: - src (str): Source. Can be a file path, URL, device identifier or filter expression. + src (str or file-like object): The media source. + If string-type, it must be a resource indicator that FFmpeg can + handle. This includes a file path, URL, device identifier or + filter expression. The supported value depends on the FFmpeg found + in the system. + + If file-like object, it must support `read` method with the signature + `read(size: int) -> bytes`. + Additionally, if the file-like object has `seek` method, it uses + the method when parsing media metadata. This improves the reliability + of codec detection. The signagure of `seek` method must be + `seek(offset, whence) -> int`. + format (str or None, optional): Override the input format, or specify the source sound device. Default: ``None`` (no override nor device input). @@ -232,6 +230,11 @@ class StreamReader: You can use this argument to change the input source before it is passed to decoder. Default: ``None``. + + buffer_size (int): + The internal buffer size unsed only when `src` is file-like object. + + Default: `4096`. """ def __init__( @@ -239,12 +242,19 @@ def __init__( src: str, format: Optional[str] = None, option: Optional[Dict[str, str]] = None, + buffer_size: int = 4096, ): - self._s = torch.ops.torchaudio.ffmpeg_streamer_init(src, format, option) - i = torch.ops.torchaudio.ffmpeg_streamer_find_best_audio_stream(self._s) - self._i_audio = None if i < 0 else i - i = torch.ops.torchaudio.ffmpeg_streamer_find_best_video_stream(self._s) - self._i_video = None if i < 0 else i + if isinstance(src, str): + self._be = torch.classes.torchaudio.ffmpeg_Streamer(src, format, option) + elif hasattr(src, "read"): + self._be = torchaudio._torchaudio_ffmpeg.StreamReaderFileObj(src, format, option, buffer_size) + else: + raise ValueError("`src` must be either string or file-like object.") + + i = self._be.find_best_audio_stream() + self._default_audio_stream = None if i < 0 else i + i = self._be.find_best_video_stream() + self._default_video_stream = None if i < 0 else i @property def num_src_streams(self): @@ -252,7 +262,7 @@ def num_src_streams(self): :type: int """ - return torch.ops.torchaudio.ffmpeg_streamer_num_src_streams(self._s) + return self._be.num_src_streams() @property def num_out_streams(self): @@ -260,7 +270,7 @@ def num_out_streams(self): :type: int """ - return torch.ops.torchaudio.ffmpeg_streamer_num_out_streams(self._s) + return self._be.num_out_streams() @property def default_audio_stream(self): @@ -268,7 +278,7 @@ def default_audio_stream(self): :type: Optional[int] """ - return self._i_audio + return self._default_audio_stream @property def default_video_stream(self): @@ -276,7 +286,7 @@ def default_video_stream(self): :type: Optional[int] """ - return self._i_video + return self._default_video_stream def get_src_stream_info(self, i: int) -> torchaudio.io.StreamReaderSourceStream: """Get the metadata of source stream @@ -286,7 +296,7 @@ def get_src_stream_info(self, i: int) -> torchaudio.io.StreamReaderSourceStream: Returns: SourceStream """ - return _parse_si(torch.ops.torchaudio.ffmpeg_streamer_get_src_stream_info(self._s, i)) + return _parse_si(self._be.get_src_stream_info(i)) def get_out_stream_info(self, i: int) -> torchaudio.io.StreamReaderOutputStream: """Get the metadata of output stream @@ -296,7 +306,7 @@ def get_out_stream_info(self, i: int) -> torchaudio.io.StreamReaderOutputStream: Returns: OutputStream """ - return _parse_oi(torch.ops.torchaudio.ffmpeg_streamer_get_out_stream_info(self._s, i)) + return _parse_oi(self._be.get_out_stream_info(i)) def seek(self, timestamp: float): """Seek the stream to the given timestamp [second] @@ -304,15 +314,17 @@ def seek(self, timestamp: float): Args: timestamp (float): Target time in second. """ - torch.ops.torchaudio.ffmpeg_streamer_seek(self._s, timestamp) + self._be.seek(timestamp) def add_basic_audio_stream( self, frames_per_chunk: int, buffer_chunk_size: int = 3, stream_index: Optional[int] = None, + decoder: Optional[str] = None, + decoder_option: Optional[Dict[str, str]] = None, + format: Optional[str] = "fltp", sample_rate: Optional[int] = None, - dtype: torch.dtype = torch.float32, ): """Add output audio stream @@ -329,21 +341,41 @@ def add_basic_audio_stream( stream_index (int or None, optional): The source audio stream index. If omitted, :py:attr:`default_audio_stream` is used. - sample_rate (int or None, optional): If provided, resample the audio. + decoder (str or None, optional): The name of the decoder to be used. + When provided, use the specified decoder instead of the default one. + + To list the available decoders, you can use `ffmpeg -decoders` command. + + decoder_options (dict or None, optional): Options passed to decoder. + Mapping from str to str. + + To list decoder options for a decoder, you can use + `ffmpeg -h decoder=` command. + + format (str, optional): Output sample format (precision). + + If ``None``, the output chunk has dtype corresponding to + the precision of the source audio. - dtype (torch.dtype, optional): If not ``None``, change the output sample precision. - If floating point, then the sample value range is - `[-1, 1]`. + Otherwise, the sample is converted and the output dtype is changed + as following. + + - `"u8p"`: The output is `torch.uint8` type. + - `"s16p"`: The output is `torch.int16` type. + - `"s32p"`: The output is `torch.int32` type. + - `"s64p"`: The output is `torch.int64` type. + - `"fltp"`: The output is `torch.float32` type. + - `"dblp"`: The output is `torch.float64` type. + + sample_rate (int or None, optional): If provided, resample the audio. """ - i = self.default_audio_stream if stream_index is None else stream_index - torch.ops.torchaudio.ffmpeg_streamer_add_audio_stream( - self._s, - i, + self.add_audio_stream( frames_per_chunk, buffer_chunk_size, - _get_afilter_desc(sample_rate, dtype), - None, - None, + stream_index, + decoder, + decoder_option, + _get_afilter_desc(sample_rate, format), ) def add_basic_video_stream( @@ -351,10 +383,13 @@ def add_basic_video_stream( frames_per_chunk: int, buffer_chunk_size: int = 3, stream_index: Optional[int] = None, + decoder: Optional[str] = None, + decoder_option: Optional[Dict[str, str]] = None, + hw_accel: Optional[str] = None, + format: Optional[str] = "rgb24", frame_rate: Optional[int] = None, width: Optional[int] = None, height: Optional[int] = None, - format: str = "RGB", ): """Add output video stream @@ -371,27 +406,38 @@ def add_basic_video_stream( stream_index (int or None, optional): The source video stream index. If omitted, :py:attr:`default_video_stream` is used. + decoder (str or None, optional): The name of the decoder to be used. + When provided, use the specified decoder instead of the default one. + + To list the available decoders, you can use `ffmpeg -decoders` command. + + decoder_options (dict or None, optional): Options passed to decoder. + Mapping from str to str. + + To list decoder options for a decoder, you can use + `ffmpeg -h decoder=` command. + + format (str, optional): Change the format of image channels. Valid values are, + + - `rgb24`: 8 bits * 3 channels (R, G, B) + - `bgr24`: 8 bits * 3 channels (B, G, R) + - `yuv420p`: 8 bits * 3 channels (Y, U, V) + - `gray`: 8 bits * 1 channels + frame_rate (int or None, optional): If provided, change the frame rate. width (int or None, optional): If provided, change the image width. Unit: Pixel. - height (int or None, optional): If provided, change the image height. Unit: Pixel. - format (str, optional): Change the format of image channels. Valid values are, - - `RGB`: 8 bits * 3 channels - - `BGR`: 8 bits * 3 channels - - `YUV`: 8 bits * 3 channels - - `GRAY`: 8 bits * 1 channels + height (int or None, optional): If provided, change the image height. Unit: Pixel. """ - i = self.default_video_stream if stream_index is None else stream_index - torch.ops.torchaudio.ffmpeg_streamer_add_video_stream( - self._s, - i, + self.add_video_stream( frames_per_chunk, buffer_chunk_size, + stream_index, + decoder, + decoder_option, + hw_accel, _get_vfilter_desc(frame_rate, width, height, format), - None, - None, - None, ) def add_audio_stream( @@ -399,9 +445,9 @@ def add_audio_stream( frames_per_chunk: int, buffer_chunk_size: int = 3, stream_index: Optional[int] = None, - filter_desc: Optional[str] = None, decoder: Optional[str] = None, decoder_options: Optional[Dict[str, str]] = None, + filter_desc: Optional[str] = None, ): """Add output audio stream @@ -418,26 +464,33 @@ def add_audio_stream( stream_index (int or None, optional): The source audio stream index. If omitted, :py:attr:`default_audio_stream` is used. - filter_desc (str or None, optional): Filter description. - The list of available filters can be found at - https://ffmpeg.org/ffmpeg-filters.html - Note that complex filters are not supported. - decoder (str or None, optional): The name of the decoder to be used. When provided, use the specified decoder instead of the default one. + To list the available decoders, you can use `ffmpeg -decoders` command. + decoder_options (dict or None, optional): Options passed to decoder. Mapping from str to str. + + To list decoder options for a decoder, you can use + `ffmpeg -h decoder=` command. + + filter_desc (str or None, optional): Filter description. + The list of available filters can be found at + https://ffmpeg.org/ffmpeg-filters.html + Note that complex filters are not supported. + """ i = self.default_audio_stream if stream_index is None else stream_index - torch.ops.torchaudio.ffmpeg_streamer_add_audio_stream( - self._s, + if i is None: + raise RuntimeError("There is no audio stream.") + self._be.add_audio_stream( i, frames_per_chunk, buffer_chunk_size, filter_desc, decoder, - decoder_options, + decoder_options or {}, ) def add_video_stream( @@ -445,10 +498,10 @@ def add_video_stream( frames_per_chunk: int, buffer_chunk_size: int = 3, stream_index: Optional[int] = None, - filter_desc: Optional[str] = None, decoder: Optional[str] = None, decoder_options: Optional[Dict[str, str]] = None, hw_accel: Optional[str] = None, + filter_desc: Optional[str] = None, ): """Add output video stream @@ -465,28 +518,31 @@ def add_video_stream( stream_index (int or None, optional): The source video stream index. If omitted, :py:attr:`default_video_stream` is used. - filter_desc (str or None, optional): Filter description. - The list of available filters can be found at - https://ffmpeg.org/ffmpeg-filters.html - Note that complex filters are not supported. - decoder (str or None, optional): The name of the decoder to be used. When provided, use the specified decoder instead of the default one. + To list the available decoders, you can use `ffmpeg -decoders` command. + decoder_options (dict or None, optional): Options passed to decoder. Mapping from str to str. + To list decoder options for a decoder, you can use + `ffmpeg -h decoder=` command. + hw_accel (str or None, optional): Enable hardware acceleration. - The valid choice is "cuda" or ``None``. - Default: ``None``. (No hardware acceleration.) + When video is decoded on CUDA hardware, for example + `decode="h264_cuvid"`, passing CUDA device indicator to `hw_accel` + (i.e. `hw_accel="cuda:0"`) will place the resulting frames + directly on the specifiec CUDA device. - When the following conditions are met, providing `hw_accel="cuda"` - will create Tensor directly from CUDA HW decoder. + If `None`, the frame will be moved to CPU memory. + Default: ``None``. - 1. TorchAudio is compiled with CUDA support. - 2. FFmpeg libraries linked dynamically are compiled with NVDEC support. - 3. The codec is supported NVDEC by. (Currently, `"h264_cuvid"` is supported) + filter_desc (str or None, optional): Filter description. + The list of available filters can be found at + https://ffmpeg.org/ffmpeg-filters.html + Note that complex filters are not supported. Example - HW decoding:: @@ -517,14 +573,15 @@ def add_video_stream( ... cuda:1 """ i = self.default_video_stream if stream_index is None else stream_index - torch.ops.torchaudio.ffmpeg_streamer_add_video_stream( - self._s, + if i is None: + raise RuntimeError("There is no video stream.") + self._be.add_video_stream( i, frames_per_chunk, buffer_chunk_size, filter_desc, decoder, - decoder_options, + decoder_options or {}, hw_accel, ) @@ -534,7 +591,7 @@ def remove_stream(self, i: int): Args: i (int): Index of the output stream to be removed. """ - torch.ops.torchaudio.ffmpeg_streamer_remove_stream(self._s, i) + self._be.remove_stream(i) def process_packet(self, timeout: Optional[float] = None, backoff: float = 10.0) -> int: """Read the source media and process one packet. @@ -593,15 +650,15 @@ def process_packet(self, timeout: Optional[float] = None, backoff: float = 10.0) flushed the pending frames. The caller should stop calling this method. """ - return torch.ops.torchaudio.ffmpeg_streamer_process_packet(self._s, timeout, backoff) + return self._be.process_packet(timeout, backoff) def process_all_packets(self): """Process packets until it reaches EOF.""" - torch.ops.torchaudio.ffmpeg_streamer_process_all_packets(self._s) + self._be.process_all_packets() def is_buffer_ready(self) -> bool: """Returns true if all the output streams have at least one chunk filled.""" - return torch.ops.torchaudio.ffmpeg_streamer_is_buffer_ready(self._s) + return self._be.is_buffer_ready() def pop_chunks(self) -> Tuple[Optional[torch.Tensor]]: """Pop one chunk from all the output stream buffers. @@ -611,7 +668,7 @@ def pop_chunks(self) -> Tuple[Optional[torch.Tensor]]: Buffer contents. If a buffer does not contain any frame, then `None` is returned instead. """ - return torch.ops.torchaudio.ffmpeg_streamer_pop_chunks(self._s) + return self._be.pop_chunks() def _fill_buffer(self, timeout: Optional[float], backoff: float) -> int: """Keep processing packets until all buffers have at least one chunk